Batch_Import_xlsx/batch_import.py
2025-12-08 11:25:24 +08:00

112 lines
3.9 KiB
Python

#!/usr/bin/env python3
import pandas as pd
import redis
from tqdm import tqdm
import argparse
import os
# 连接Redis
parser = argparse.ArgumentParser()
parser.add_argument("type", choices=["pdd", "yt", "tx"], help="目标: pdd/yt/tx")
args = parser.parse_args()
r = redis.Redis(host='180.163.74.83', port=6379, password='Zzh08165511', decode_responses=True)
# 读取Excel文件
if args.type == "yt":
excel_path = '/home/hyx/work/batch_import_xlsx/sn_test_yt.xlsx'
pool = 'batch_sn_mapping_yt'
mac_col = 'MAC'
elif args.type == "pdd":
excel_path = '/home/hyx/work/batch_import_xlsx/sn_test_pdd.xlsx'
pool = 'batch_sn_mapping_pdd'
mac_col = 'MAC'
else:
excel_path = '/home/hyx/work/batch_import_xlsx/sn_test_tx.xlsx'
pool = 'batch_sn_mapping'
mac_col = 'SN_MAC'
df = pd.read_excel(excel_path)
existing = r.hgetall(pool)
mac_to_batches = {}
for b, m in existing.items():
mac_to_batches.setdefault(m, []).append(b)
s = df[mac_col].astype(str).str.strip()
dup_keys = set(s[s.duplicated(keep=False)].unique())
# 批量导入数据
pipe = r.pipeline()
duplicates = []
inserted_count = 0
invalids = []
duplicates_current = {}
dup_current_count = 0
for index, row in tqdm(df.iterrows(), total=len(df)):
batch_no = str(row['批次号']).strip()
sn_mac = str(row[mac_col]).strip()
expected_len = 27 if args.type == 'tx' else 12
if len(sn_mac) != expected_len:
invalids.append((sn_mac, batch_no))
continue
if sn_mac in dup_keys:
s = duplicates_current.get(sn_mac, set())
s.add(batch_no)
duplicates_current[sn_mac] = s
dup_current_count += 1
continue
if sn_mac in mac_to_batches:
for b in mac_to_batches[sn_mac]:
duplicates.append((sn_mac, b))
continue
pipe.hset(pool, batch_no, sn_mac)
inserted_count += 1
if (index + 1) % 100 == 0:
pipe.execute()
pipe = r.pipeline()
pipe.execute()
print(f"成功导入 {inserted_count} 条数据,数据库重复跳过 {len(duplicates)} 条,当前批次重复跳过 {dup_current_count} 条,长度错误跳过 {len(invalids)}")
if duplicates:
for mac, b in duplicates:
print(f"重复: {mac} 已存在于批次号 {b}")
dup_df = pd.DataFrame(duplicates, columns=[mac_col, '批次号'])
out_path = f"/home/hyx/work/batch_import_xlsx/duplicates_{args.type}.xlsx"
if os.path.exists(out_path):
old_df = pd.read_excel(out_path)
combined = pd.concat([old_df, dup_df], ignore_index=True)
combined.to_excel(out_path, index=False)
else:
dup_df.to_excel(out_path, index=False)
print(f"重复数据已导出: {out_path}")
if duplicates_current:
for mac, bs in duplicates_current.items():
for b in bs:
print(f"重复: {mac} 当前批次号 {b}")
cur_rows = [(mac, b) for mac, bs in duplicates_current.items() for b in bs]
cur_dup_df = pd.DataFrame(cur_rows, columns=[mac_col, '批次号'])
out_path_cur = f"/home/hyx/work/batch_import_xlsx/duplicates_current_{args.type}.xlsx"
if os.path.exists(out_path_cur):
old_cur_df = pd.read_excel(out_path_cur)
combined_cur = pd.concat([old_cur_df, cur_dup_df], ignore_index=True)
combined_cur.to_excel(out_path_cur, index=False)
else:
cur_dup_df.to_excel(out_path_cur, index=False)
print(f"当前批次重复数据已导出: {out_path_cur}")
if invalids:
for mac, b in invalids:
print(f"长度错误: {mac} 批次号 {b}")
inv_df = pd.DataFrame(invalids, columns=[mac_col, '批次号'])
out_path_inv = f"/home/hyx/work/batch_import_xlsx/invalid_{args.type}.xlsx"
if os.path.exists(out_path_inv):
old_inv_df = pd.read_excel(out_path_inv)
combined_inv = pd.concat([old_inv_df, inv_df], ignore_index=True)
combined_inv.to_excel(out_path_inv, index=False)
else:
inv_df.to_excel(out_path_inv, index=False)
print(f"长度错误数据已导出: {out_path_inv}")