#!/usr/bin/env python3 # -*- coding: utf-8 -*- """从Excel导入对账单数据""" import pandas as pd import sqlite3 import os from datetime import datetime, timezone, timedelta # 数据库路径 DB_PATH = os.path.join(os.path.dirname(__file__), 'server', 'data.db') EXCEL_FILE = '25年11月份对账单-易泰勒.xlsx' def get_beijing_time(): """获取北京时间(UTC+8)的ISO格式字符串""" beijing_tz = timezone(timedelta(hours=8)) return datetime.now(beijing_tz).isoformat() def import_from_excel(): """从Excel导入对账单数据""" # 读取Excel文件 df = pd.read_excel(EXCEL_FILE) # 打印前20行查看结构 print("Excel文件结构:") print("=" * 80) for i in range(min(20, len(df))): print(f"第{i}行: {df.iloc[i].tolist()}") print("=" * 80) # 查找表头行(包含"序号"的行) header_row = None for i in range(len(df)): row_values = df.iloc[i].tolist() if any(str(val).strip() == '序号' for val in row_values if pd.notna(val)): header_row = i print(f"\n找到表头行: 第{i}行") print(f"表头内容: {row_values}") break if header_row is None: print("❌ 未找到表头行(包含'序号'的行)") return # 重新读取,跳过前面的行,使用找到的行作为表头 df = pd.read_excel(EXCEL_FILE, skiprows=header_row) # 设置第一行为列名 df.columns = df.iloc[0] df = df[1:] # 删除第一行(已经作为列名) df = df.reset_index(drop=True) print(f"\n数据形状: {df.shape}") print(f"列名: {df.columns.tolist()}") print(f"\n前5行数据:") print(df.head()) # 连接数据库 conn = sqlite3.connect(DB_PATH) c = conn.cursor() now = get_beijing_time() imported_count = 0 # 遍历数据行 for idx, row in df.iterrows(): # 跳过空行或无效行 if pd.isna(row.get('序号')): continue try: # 提取数据 # 处理日期格式 def format_date(date_val): if pd.isna(date_val): return '' if isinstance(date_val, datetime): return date_val.strftime('%Y/%m/%d') date_str = str(date_val).strip() # 如果已经是 YYYY/MM/DD 格式,保持不变 if '/' in date_str: return date_str # 如果是 YYYY-MM-DD 格式,转换为 YYYY/MM/DD if '-' in date_str: return date_str.split()[0].replace('-', '/') return date_str order_date = format_date(row.get('下单时间')) contract_no = str(row.get('合同编号', '')).strip() if pd.notna(row.get('合同编号')) else '' material_name = str(row.get('物料名称', '')).strip() if pd.notna(row.get('物料名称')) else '' spec_model = str(row.get('规格型号', '')).strip() if pd.notna(row.get('规格型号')) else '' transport_no = str(row.get('运输单号', '')).strip() if pd.notna(row.get('运输单号')) else '' quantity = int(row.get('数量', 0)) if pd.notna(row.get('数量')) else 0 unit = str(row.get('单位', 'pcs')).strip() if pd.notna(row.get('单位')) else 'pcs' unit_price = float(row.get('含税单价', 0)) if pd.notna(row.get('含税单价')) else 0.0 total_amount = float(row.get('含税金额', 0)) if pd.notna(row.get('含税金额')) else 0.0 delivery_date = format_date(row.get('交货日期')) shipment_date = format_date(row.get('出货日期')) # 验证必填字段 if not all([order_date, contract_no, material_name, spec_model, quantity, unit, unit_price]): print(f"⚠️ 跳过第{idx+1}行: 缺少必填字段") continue # 插入数据库 c.execute(''' INSERT INTO reconciliations( order_date, contract_no, material_name, spec_model, transport_no, quantity, unit, unit_price, total_amount, delivery_date, shipment_date, created_by, created_at, updated_at ) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?) ''', ( order_date, contract_no, material_name, spec_model, transport_no, quantity, unit, unit_price, total_amount, delivery_date, shipment_date, 'admin', now, now )) imported_count += 1 print(f"✅ 导入第{idx+1}行: {contract_no} - {material_name}") except Exception as e: print(f"❌ 导入第{idx+1}行失败: {e}") continue conn.commit() conn.close() print(f"\n{'='*80}") print(f"✅ 成功导入 {imported_count} 条对账单数据") print(f"{'='*80}") if __name__ == '__main__': import_from_excel()