#!/usr/bin/env python3 import pandas as pd import redis from tqdm import tqdm import argparse import os # 连接Redis parser = argparse.ArgumentParser() parser.add_argument("type", choices=["pdd", "yt", "tx"], help="目标: pdd/yt/tx") args = parser.parse_args() r = redis.Redis(host='180.163.74.83', port=6379, password='Zzh08165511', decode_responses=True) # 读取Excel文件 base_dir = '/home/hyx/work/batch_import_xlsx' if args.type == "yt": base_name = 'sn_test_yt' pool = 'batch_sn_mapping_yt' mac_col = 'MAC' elif args.type == "pdd": base_name = 'sn_test_pdd' pool = 'batch_sn_mapping_pdd' mac_col = 'MAC' else: base_name = 'sn_test_tx' pool = 'batch_sn_mapping' mac_col = 'SN_MAC' # 自动检测文件扩展名(优先.xlsx,其次.xls) excel_path = None for ext in ['.xlsx', '.xls']: test_path = os.path.join(base_dir, f'{base_name}{ext}') if os.path.exists(test_path): excel_path = test_path break if not excel_path: print(f"错误: 找不到文件 {base_name}.xlsx 或 {base_name}.xls") exit(1) # 根据文件扩展名选择合适的引擎 if excel_path.endswith('.xls'): df = pd.read_excel(excel_path, engine='xlrd') else: df = pd.read_excel(excel_path) existing = r.hgetall(pool) mac_to_batches = {} for b, m in existing.items(): mac_to_batches.setdefault(m, []).append(b) s = df[mac_col].astype(str).str.strip() dup_keys = set(s[s.duplicated(keep=False)].unique()) # 批量导入数据 pipe = r.pipeline() duplicates = [] inserted_count = 0 invalids = [] duplicates_current = {} dup_current_count = 0 for index, row in tqdm(df.iterrows(), total=len(df)): batch_no = str(row['批次号']).strip() sn_mac = str(row[mac_col]).strip() expected_len = 27 if args.type == 'tx' else 12 if len(sn_mac) != expected_len: invalids.append((sn_mac, batch_no)) continue if sn_mac in dup_keys: s = duplicates_current.get(sn_mac, set()) s.add(batch_no) duplicates_current[sn_mac] = s dup_current_count += 1 continue if sn_mac in mac_to_batches: for b in mac_to_batches[sn_mac]: duplicates.append((sn_mac, b)) continue pipe.hset(pool, batch_no, sn_mac) inserted_count += 1 if (index + 1) % 100 == 0: pipe.execute() pipe = r.pipeline() pipe.execute() print(f"成功导入 {inserted_count} 条数据,数据库重复跳过 {len(duplicates)} 条,当前批次重复跳过 {dup_current_count} 条,长度错误跳过 {len(invalids)} 条") # 输出成功导入的数据(JSON格式,方便前端解析) if inserted_count > 0: print("\n=== 成功导入的数据 ===") import json success_records = [] for index, row in df.iterrows(): batch_no = str(row['批次号']).strip() sn_mac = str(row[mac_col]).strip() expected_len = 27 if args.type == 'tx' else 12 # 只输出成功导入的记录 if len(sn_mac) == expected_len and sn_mac not in dup_keys and sn_mac not in mac_to_batches: success_records.append({ 'mac': sn_mac, 'batch': batch_no }) # 移除数量限制,输出所有成功导入的记录 print(json.dumps(success_records, ensure_ascii=False)) print("=== 数据输出结束 ===") if duplicates: for mac, b in duplicates: print(f"重复: {mac} 已存在于批次号 {b}") dup_df = pd.DataFrame(duplicates, columns=[mac_col, '批次号']) out_path = f"/home/hyx/work/batch_import_xlsx/duplicates_{args.type}.xlsx" if os.path.exists(out_path): old_df = pd.read_excel(out_path) combined = pd.concat([old_df, dup_df], ignore_index=True) combined.to_excel(out_path, index=False) else: dup_df.to_excel(out_path, index=False) #print(f"重复数据已导出: {out_path}") if duplicates_current: for mac, bs in duplicates_current.items(): for b in bs: print(f"重复: {mac} 当前批次号 {b}") cur_rows = [(mac, b) for mac, bs in duplicates_current.items() for b in bs] cur_dup_df = pd.DataFrame(cur_rows, columns=[mac_col, '批次号']) out_path_cur = f"/home/hyx/work/batch_import_xlsx/duplicates_current_{args.type}.xlsx" if os.path.exists(out_path_cur): old_cur_df = pd.read_excel(out_path_cur) combined_cur = pd.concat([old_cur_df, cur_dup_df], ignore_index=True) combined_cur.to_excel(out_path_cur, index=False) else: cur_dup_df.to_excel(out_path_cur, index=False) #print(f"当前批次重复数据已导出: {out_path_cur}") if invalids: for mac, b in invalids: print(f"长度错误: {mac} 批次号 {b}") inv_df = pd.DataFrame(invalids, columns=[mac_col, '批次号']) out_path_inv = f"/home/hyx/work/batch_import_xlsx/invalid_{args.type}.xlsx" if os.path.exists(out_path_inv): old_inv_df = pd.read_excel(out_path_inv) combined_inv = pd.concat([old_inv_df, inv_df], ignore_index=True) combined_inv.to_excel(out_path_inv, index=False) else: inv_df.to_excel(out_path_inv, index=False) #print(f"长度错误数据已导出: {out_path_inv}")