ERP/batch_import.py

133 lines
4.7 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import pandas as pd
import redis
from tqdm import tqdm
import argparse
import os
# 连接Redis
parser = argparse.ArgumentParser()
parser.add_argument("type", choices=["pdd", "yt", "tx"], help="目标: pdd/yt/tx")
args = parser.parse_args()
r = redis.Redis(host='180.163.74.83', port=6379, password='Zzh08165511', decode_responses=True)
# 读取Excel文件
if args.type == "yt":
excel_path = '/home/hyx/work/batch_import_xlsx/sn_test_yt.xlsx'
pool = 'batch_sn_mapping_yt'
mac_col = 'MAC'
elif args.type == "pdd":
excel_path = '/home/hyx/work/batch_import_xlsx/sn_test_pdd.xlsx'
pool = 'batch_sn_mapping_pdd'
mac_col = 'MAC'
else:
excel_path = '/home/hyx/work/batch_import_xlsx/sn_test_tx.xlsx'
pool = 'batch_sn_mapping'
mac_col = 'SN_MAC'
df = pd.read_excel(excel_path)
existing = r.hgetall(pool)
mac_to_batches = {}
for b, m in existing.items():
mac_to_batches.setdefault(m, []).append(b)
s = df[mac_col].astype(str).str.strip()
dup_keys = set(s[s.duplicated(keep=False)].unique())
# 批量导入数据
pipe = r.pipeline()
duplicates = []
inserted_count = 0
invalids = []
duplicates_current = {}
dup_current_count = 0
for index, row in tqdm(df.iterrows(), total=len(df)):
batch_no = str(row['批次号']).strip()
sn_mac = str(row[mac_col]).strip()
expected_len = 27 if args.type == 'tx' else 12
if len(sn_mac) != expected_len:
invalids.append((sn_mac, batch_no))
continue
if sn_mac in dup_keys:
s = duplicates_current.get(sn_mac, set())
s.add(batch_no)
duplicates_current[sn_mac] = s
dup_current_count += 1
continue
if sn_mac in mac_to_batches:
for b in mac_to_batches[sn_mac]:
duplicates.append((sn_mac, b))
continue
pipe.hset(pool, batch_no, sn_mac)
inserted_count += 1
if (index + 1) % 100 == 0:
pipe.execute()
pipe = r.pipeline()
pipe.execute()
print(f"成功导入 {inserted_count} 条数据,数据库重复跳过 {len(duplicates)} 条,当前批次重复跳过 {dup_current_count} 条,长度错误跳过 {len(invalids)}")
# 输出成功导入的数据JSON格式方便前端解析
if inserted_count > 0:
print("\n=== 成功导入的数据 ===")
import json
success_records = []
for index, row in df.iterrows():
batch_no = str(row['批次号']).strip()
sn_mac = str(row[mac_col]).strip()
expected_len = 27 if args.type == 'tx' else 12
# 只输出成功导入的记录
if len(sn_mac) == expected_len and sn_mac not in dup_keys and sn_mac not in mac_to_batches:
success_records.append({
'mac': sn_mac,
'batch': batch_no
})
2025-11-21 14:45:00 +00:00
# 移除数量限制,输出所有成功导入的记录
print(json.dumps(success_records, ensure_ascii=False))
print("=== 数据输出结束 ===")
if duplicates:
for mac, b in duplicates:
print(f"重复: {mac} 已存在于批次号 {b}")
dup_df = pd.DataFrame(duplicates, columns=[mac_col, '批次号'])
out_path = f"/home/hyx/work/batch_import_xlsx/duplicates_{args.type}.xlsx"
if os.path.exists(out_path):
old_df = pd.read_excel(out_path)
combined = pd.concat([old_df, dup_df], ignore_index=True)
combined.to_excel(out_path, index=False)
else:
dup_df.to_excel(out_path, index=False)
#print(f"重复数据已导出: {out_path}")
if duplicates_current:
for mac, bs in duplicates_current.items():
for b in bs:
print(f"重复: {mac} 当前批次号 {b}")
cur_rows = [(mac, b) for mac, bs in duplicates_current.items() for b in bs]
cur_dup_df = pd.DataFrame(cur_rows, columns=[mac_col, '批次号'])
out_path_cur = f"/home/hyx/work/batch_import_xlsx/duplicates_current_{args.type}.xlsx"
if os.path.exists(out_path_cur):
old_cur_df = pd.read_excel(out_path_cur)
combined_cur = pd.concat([old_cur_df, cur_dup_df], ignore_index=True)
combined_cur.to_excel(out_path_cur, index=False)
else:
cur_dup_df.to_excel(out_path_cur, index=False)
#print(f"当前批次重复数据已导出: {out_path_cur}")
if invalids:
for mac, b in invalids:
print(f"长度错误: {mac} 批次号 {b}")
inv_df = pd.DataFrame(invalids, columns=[mac_col, '批次号'])
out_path_inv = f"/home/hyx/work/batch_import_xlsx/invalid_{args.type}.xlsx"
if os.path.exists(out_path_inv):
old_inv_df = pd.read_excel(out_path_inv)
combined_inv = pd.concat([old_inv_df, inv_df], ignore_index=True)
combined_inv.to_excel(out_path_inv, index=False)
else:
inv_df.to_excel(out_path_inv, index=False)
#print(f"长度错误数据已导出: {out_path_inv}")