145 lines
5.2 KiB
Python
145 lines
5.2 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""从Excel导入对账单数据"""
|
||
|
||
import pandas as pd
|
||
import sqlite3
|
||
import os
|
||
from datetime import datetime, timezone, timedelta
|
||
|
||
# 数据库路径
|
||
DB_PATH = os.path.join(os.path.dirname(__file__), 'server', 'data.db')
|
||
EXCEL_FILE = '25年11月份对账单-易泰勒.xlsx'
|
||
|
||
def get_beijing_time():
|
||
"""获取北京时间(UTC+8)的ISO格式字符串"""
|
||
beijing_tz = timezone(timedelta(hours=8))
|
||
return datetime.now(beijing_tz).isoformat()
|
||
|
||
def import_from_excel():
|
||
"""从Excel导入对账单数据"""
|
||
# 读取Excel文件
|
||
df = pd.read_excel(EXCEL_FILE)
|
||
|
||
# 打印前20行查看结构
|
||
print("Excel文件结构:")
|
||
print("=" * 80)
|
||
for i in range(min(20, len(df))):
|
||
print(f"第{i}行: {df.iloc[i].tolist()}")
|
||
print("=" * 80)
|
||
|
||
# 查找表头行(包含"序号"的行)
|
||
header_row = None
|
||
for i in range(len(df)):
|
||
row_values = df.iloc[i].tolist()
|
||
if any(str(val).strip() == '序号' for val in row_values if pd.notna(val)):
|
||
header_row = i
|
||
print(f"\n找到表头行: 第{i}行")
|
||
print(f"表头内容: {row_values}")
|
||
break
|
||
|
||
if header_row is None:
|
||
print("❌ 未找到表头行(包含'序号'的行)")
|
||
return
|
||
|
||
# 重新读取,跳过前面的行,使用找到的行作为表头
|
||
df = pd.read_excel(EXCEL_FILE, skiprows=header_row)
|
||
|
||
# 设置第一行为列名
|
||
df.columns = df.iloc[0]
|
||
df = df[1:] # 删除第一行(已经作为列名)
|
||
df = df.reset_index(drop=True)
|
||
|
||
print(f"\n数据形状: {df.shape}")
|
||
print(f"列名: {df.columns.tolist()}")
|
||
print(f"\n前5行数据:")
|
||
print(df.head())
|
||
|
||
# 连接数据库
|
||
conn = sqlite3.connect(DB_PATH)
|
||
c = conn.cursor()
|
||
|
||
now = get_beijing_time()
|
||
imported_count = 0
|
||
|
||
# 遍历数据行
|
||
for idx, row in df.iterrows():
|
||
# 跳过空行或无效行
|
||
if pd.isna(row.get('序号')):
|
||
continue
|
||
|
||
try:
|
||
# 提取数据
|
||
# 处理日期格式
|
||
def format_date(date_val):
|
||
if pd.isna(date_val):
|
||
return ''
|
||
if isinstance(date_val, datetime):
|
||
return date_val.strftime('%Y/%m/%d')
|
||
date_str = str(date_val).strip()
|
||
# 如果已经是 YYYY/MM/DD 格式,保持不变
|
||
if '/' in date_str:
|
||
return date_str
|
||
# 如果是 YYYY-MM-DD 格式,转换为 YYYY/MM/DD
|
||
if '-' in date_str:
|
||
return date_str.split()[0].replace('-', '/')
|
||
return date_str
|
||
|
||
order_date = format_date(row.get('下单时间'))
|
||
contract_no = str(row.get('合同编号', '')).strip() if pd.notna(row.get('合同编号')) else ''
|
||
material_name = str(row.get('物料名称', '')).strip() if pd.notna(row.get('物料名称')) else ''
|
||
spec_model = str(row.get('规格型号', '')).strip() if pd.notna(row.get('规格型号')) else ''
|
||
transport_no = str(row.get('运输单号', '')).strip() if pd.notna(row.get('运输单号')) else ''
|
||
quantity = int(row.get('数量', 0)) if pd.notna(row.get('数量')) else 0
|
||
unit = str(row.get('单位', 'pcs')).strip() if pd.notna(row.get('单位')) else 'pcs'
|
||
unit_price = float(row.get('含税单价', 0)) if pd.notna(row.get('含税单价')) else 0.0
|
||
total_amount = float(row.get('含税金额', 0)) if pd.notna(row.get('含税金额')) else 0.0
|
||
delivery_date = format_date(row.get('交货日期'))
|
||
shipment_date = format_date(row.get('出货日期'))
|
||
|
||
# 验证必填字段
|
||
if not all([order_date, contract_no, material_name, spec_model, quantity, unit, unit_price]):
|
||
print(f"⚠️ 跳过第{idx+1}行: 缺少必填字段")
|
||
continue
|
||
|
||
# 插入数据库
|
||
c.execute('''
|
||
INSERT INTO reconciliations(
|
||
order_date, contract_no, material_name, spec_model, transport_no,
|
||
quantity, unit, unit_price, total_amount, delivery_date, shipment_date,
|
||
created_by, created_at, updated_at
|
||
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)
|
||
''', (
|
||
order_date,
|
||
contract_no,
|
||
material_name,
|
||
spec_model,
|
||
transport_no,
|
||
quantity,
|
||
unit,
|
||
unit_price,
|
||
total_amount,
|
||
delivery_date,
|
||
shipment_date,
|
||
'admin',
|
||
now,
|
||
now
|
||
))
|
||
|
||
imported_count += 1
|
||
print(f"✅ 导入第{idx+1}行: {contract_no} - {material_name}")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 导入第{idx+1}行失败: {e}")
|
||
continue
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
print(f"\n{'='*80}")
|
||
print(f"✅ 成功导入 {imported_count} 条对账单数据")
|
||
print(f"{'='*80}")
|
||
|
||
if __name__ == '__main__':
|
||
import_from_excel()
|