2025-11-21 13:27:40 +00:00
# -*- coding: utf-8 -*-
import os
import json
import sqlite3
from datetime import datetime
from functools import wraps
from flask import Flask , request , jsonify , session , send_from_directory
from werkzeug . security import generate_password_hash , check_password_hash
from werkzeug . utils import secure_filename
try :
import redis
except Exception :
redis = None
_redis_client = None
_audit_cache = { ' pdd ' : { ' ts ' : 0 , ' list ' : [ ] } , ' yt ' : { ' ts ' : 0 , ' list ' : [ ] } }
BASE_DIR = os . path . dirname ( os . path . abspath ( __file__ ) )
DB_PATH = os . path . join ( BASE_DIR , ' data.db ' )
FRONTEND_DIR = os . path . join ( os . path . dirname ( BASE_DIR ) , ' frontend ' )
app = Flask ( __name__ , static_folder = FRONTEND_DIR , static_url_path = ' ' )
app . config [ ' SECRET_KEY ' ] = os . environ . get ( ' APP_SECRET ' , ' change-me ' )
def get_db ( ) :
conn = sqlite3 . connect ( DB_PATH )
conn . row_factory = sqlite3 . Row
return conn
def init_db ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ''' CREATE TABLE IF NOT EXISTS users(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
username TEXT UNIQUE NOT NULL ,
password_hash TEXT NOT NULL ,
role TEXT NOT NULL
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS operations_log(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
user_id INTEGER ,
action TEXT ,
detail TEXT ,
ts TEXT
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS notifications(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
user_id INTEGER ,
username TEXT ,
action TEXT ,
detail TEXT ,
ts TEXT ,
read INTEGER DEFAULT 0
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS mac_batches(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
mac TEXT ,
batch TEXT ,
ts TEXT
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS stats(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
good INTEGER ,
bad INTEGER ,
fpy_good INTEGER DEFAULT 0 ,
platform TEXT DEFAULT ' pdd ' ,
ts TEXT
) ''' )
# 为已存在的表添加列(如果不存在)
try :
c . execute ( ' ALTER TABLE stats ADD COLUMN fpy_good INTEGER DEFAULT 0 ' )
except Exception :
pass # 列已存在
try :
c . execute ( ' ALTER TABLE stats ADD COLUMN platform TEXT DEFAULT " pdd " ' )
except Exception :
pass # 列已存在
try :
c . execute ( ' ALTER TABLE users ADD COLUMN avatar TEXT ' )
except Exception :
pass # 列已存在
2025-11-22 01:53:18 +00:00
try :
c . execute ( ' ALTER TABLE mac_batches ADD COLUMN platform TEXT DEFAULT " pdd " ' )
except Exception :
pass # 列已存在
2025-11-21 13:27:40 +00:00
c . execute ( ''' CREATE TABLE IF NOT EXISTS defects(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
mac TEXT ,
batch TEXT ,
ts TEXT
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS shipments(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
date TEXT ,
qty INTEGER ,
receiver TEXT ,
ts TEXT
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS devices(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
name TEXT ,
status TEXT ,
ts TEXT
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS environment(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
temp TEXT ,
hum TEXT ,
ts TEXT
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS personnel(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
name TEXT ,
role TEXT ,
ts TEXT
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS qa(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
title TEXT ,
date TEXT ,
ts TEXT
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS production(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
batch TEXT ,
duration TEXT ,
ts TEXT
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS repairs(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
qty INTEGER ,
note TEXT ,
ts TEXT
) ''' )
conn . commit ( )
# create default admin
c . execute ( ' SELECT id FROM users WHERE username=? ' , ( ' admin ' , ) )
if not c . fetchone ( ) :
pwd = os . environ . get ( ' ADMIN_PASSWORD ' , ' admin123 ' )
c . execute ( ' INSERT INTO users(username, password_hash, role) VALUES(?,?,?) ' , (
' admin ' , generate_password_hash ( pwd ) , ' admin '
) )
conn . commit ( )
# create superadmin from env
su_user = os . environ . get ( ' SUPERADMIN_USERNAME ' )
su_pass = os . environ . get ( ' SUPERADMIN_PASSWORD ' )
if su_user and su_pass :
c . execute ( ' SELECT id FROM users WHERE username=? ' , ( su_user , ) )
if not c . fetchone ( ) :
c . execute ( ' INSERT INTO users(username, password_hash, role) VALUES(?,?,?) ' , (
su_user , generate_password_hash ( su_pass ) , ' superadmin '
) )
conn . commit ( )
conn . close ( )
def log ( action , detail = ' ' ) :
try :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' INSERT INTO operations_log(user_id, action, detail, ts) VALUES(?,?,?,?) ' , (
2025-11-21 14:45:00 +00:00
session . get ( ' user_id ' ) , action , detail , get_beijing_time ( )
2025-11-21 13:27:40 +00:00
) )
conn . commit ( )
conn . close ( )
except Exception :
pass
def notify_superadmin ( action , detail = ' ' ) :
""" 为超级管理员创建通知 """
try :
user_id = session . get ( ' user_id ' )
if not user_id :
return
conn = get_db ( )
c = conn . cursor ( )
# 获取当前用户信息
c . execute ( ' SELECT username, role FROM users WHERE id=? ' , ( user_id , ) )
user = c . fetchone ( )
if not user :
conn . close ( )
return
# 如果是超级管理员自己的操作,不创建通知
if user [ ' role ' ] == ' superadmin ' :
conn . close ( )
return
# 为所有超级管理员创建通知
c . execute ( ' SELECT id FROM users WHERE role=? ' , ( ' superadmin ' , ) )
superadmins = c . fetchall ( )
# 使用北京时间( UTC+8)
from datetime import timezone , timedelta
beijing_tz = timezone ( timedelta ( hours = 8 ) )
now = datetime . now ( beijing_tz ) . isoformat ( )
for admin in superadmins :
c . execute ( ' INSERT INTO notifications(user_id, username, action, detail, ts, read) VALUES(?,?,?,?,?,?) ' , (
admin [ ' id ' ] , user [ ' username ' ] , action , detail , now , 0
) )
conn . commit ( )
conn . close ( )
except Exception :
pass
def get_redis ( ) :
global _redis_client
if not redis :
raise RuntimeError ( ' redis missing ' )
if _redis_client is not None :
return _redis_client
host = os . environ . get ( ' REDIS_HOST ' , ' 180.163.74.83 ' )
port = int ( os . environ . get ( ' REDIS_PORT ' , ' 6379 ' ) )
password = os . environ . get ( ' REDIS_PASSWORD ' ) or os . environ . get ( ' SUPERADMIN_PASSWORD ' )
db = int ( os . environ . get ( ' REDIS_DB ' , ' 0 ' ) )
_redis_client = redis . Redis ( host = host , port = port , password = password , db = db , decode_responses = True , socket_timeout = 0.5 , socket_connect_timeout = 0.5 )
return _redis_client
2025-11-21 14:45:00 +00:00
def get_beijing_time ( ) :
""" 获取北京时间( UTC+8) 的ISO格式字符串 """
from datetime import timezone , timedelta
beijing_tz = timezone ( timedelta ( hours = 8 ) )
return datetime . now ( beijing_tz ) . isoformat ( )
2025-11-21 13:27:40 +00:00
def parse_audit_line ( s ) :
if not s :
return { ' ts_cn ' : None , ' batch ' : None , ' mac ' : None , ' note ' : None }
def normalize_ts ( ts ) :
try :
from datetime import datetime , timezone , timedelta
# Attempt ISO parsing
# Support "Z" UTC suffix and offsets like +08:00
if ts . endswith ( ' Z ' ) :
dt = datetime . fromisoformat ( ts . replace ( ' Z ' , ' +00:00 ' ) )
else :
dt = datetime . fromisoformat ( ts )
# Convert to Beijing time (UTC+8)
bj = dt . astimezone ( timezone ( timedelta ( hours = 8 ) ) )
return bj . strftime ( ' % Y- % m- %d % H: % M: % S ' )
except Exception :
return ts
def has_time ( v ) :
return isinstance ( v , str ) and ( ( ' T ' in v ) or ( ' : ' in v ) )
def choose_ts ( d ) :
candidates = [ d . get ( ' ts_cn ' ) , d . get ( ' ts_local ' ) , d . get ( ' ts ' ) , d . get ( ' ts_utc ' ) , d . get ( ' timestamp ' ) , d . get ( ' time ' ) ]
for v in candidates :
if has_time ( v ) :
return v
for v in candidates :
if v :
return v
return None
try :
obj = json . loads ( s )
ts = choose_ts ( obj )
if not ts :
ts = s if isinstance ( s , str ) else None
return {
' ts_cn ' : ts if ts else None ,
' batch ' : obj . get ( ' batch ' ) or obj . get ( ' batch_no ' ) or obj . get ( ' lot ' ) ,
' mac ' : obj . get ( ' mac ' ) or obj . get ( ' mac_addr ' ) or obj . get ( ' mac_address ' ) ,
' note ' : obj . get ( ' note ' ) or obj . get ( ' msg ' ) or obj . get ( ' message ' )
}
except Exception :
pass
d = { }
parts = [ ]
for sep in [ ' ' , ' , ' , ' ; ' , ' | ' ] :
if sep in s :
parts = s . split ( sep )
break
if not parts :
parts = [ s ]
i = 0
while i < len ( parts ) :
part = parts [ i ]
if ' = ' in part :
k , v = part . split ( ' = ' , 1 )
kk = k . strip ( )
vv = v . strip ( )
try :
import re
if kk in ( ' ts_cn ' , ' ts_local ' , ' ts ' , ' timestamp ' , ' time ' ) :
if re . match ( r ' ^ \ d {4} - \ d {2} - \ d {2} $ ' , vv ) and i + 1 < len ( parts ) and re . match ( r ' ^ \ d {2} : \ d {2} : \ d {2} ' , parts [ i + 1 ] ) :
vv = vv + ' ' + parts [ i + 1 ]
i + = 1
except Exception :
pass
d [ kk ] = vv
i + = 1
ts = choose_ts ( d )
if not ts :
ts = s if isinstance ( s , str ) else None
return {
' ts_cn ' : ts if ts else None ,
' batch ' : d . get ( ' batch ' ) or d . get ( ' batch_no ' ) or d . get ( ' lot ' ) ,
' mac ' : d . get ( ' mac ' ) or d . get ( ' mac_addr ' ) or d . get ( ' mac_address ' ) ,
' note ' : d . get ( ' note ' ) or d . get ( ' msg ' ) or d . get ( ' message ' ) or s
}
def require_login ( fn ) :
@wraps ( fn )
def wrapper ( * args , * * kwargs ) :
if not session . get ( ' user_id ' ) :
return jsonify ( { ' error ' : ' unauthorized ' } ) , 401
return fn ( * args , * * kwargs )
return wrapper
def require_role ( role ) :
def deco ( fn ) :
@wraps ( fn )
def wrapper ( * args , * * kwargs ) :
if session . get ( ' role ' ) != role :
return jsonify ( { ' error ' : ' forbidden ' } ) , 403
return fn ( * args , * * kwargs )
return wrapper
return deco
def require_any_role ( * roles ) :
def deco ( fn ) :
@wraps ( fn )
def wrapper ( * args , * * kwargs ) :
if session . get ( ' role ' ) not in roles :
return jsonify ( { ' error ' : ' forbidden ' } ) , 403
return fn ( * args , * * kwargs )
return wrapper
return deco
@app.route ( ' / ' )
def index ( ) :
return send_from_directory ( FRONTEND_DIR , ' index.html ' )
# auth
@app.post ( ' /api/auth/login ' )
def login ( ) :
data = request . get_json ( ) or { }
username = data . get ( ' username ' )
password = data . get ( ' password ' )
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT id, password_hash, role FROM users WHERE username=? ' , ( username , ) )
row = c . fetchone ( )
conn . close ( )
if not row or not check_password_hash ( row [ ' password_hash ' ] , password or ' ' ) :
return jsonify ( { ' error ' : ' invalid credentials ' } ) , 400
session [ ' user_id ' ] = row [ ' id ' ]
session [ ' role ' ] = row [ ' role ' ]
session . permanent = True
log ( ' login ' , username )
return jsonify ( { ' ok ' : True } )
@app.get ( ' /api/auth/me ' )
def me ( ) :
uid = session . get ( ' user_id ' )
if not uid :
return jsonify ( { ' username ' : None , ' role ' : None , ' avatar ' : None } )
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT username, role, avatar FROM users WHERE id=? ' , ( uid , ) )
row = c . fetchone ( )
conn . close ( )
return jsonify ( { ' username ' : row [ ' username ' ] , ' role ' : row [ ' role ' ] , ' avatar ' : row [ ' avatar ' ] if row [ ' avatar ' ] else None } )
@app.post ( ' /api/auth/logout ' )
def logout ( ) :
log ( ' logout ' )
session . clear ( )
return jsonify ( { ' ok ' : True } )
@app.post ( ' /api/user/upload-avatar ' )
@require_login
def upload_avatar ( ) :
uid = session . get ( ' user_id ' )
if not uid :
return jsonify ( { ' error ' : ' 未登录 ' } ) , 401
if ' avatar ' not in request . files :
return jsonify ( { ' error ' : ' 未选择文件 ' } ) , 400
file = request . files [ ' avatar ' ]
if file . filename == ' ' :
return jsonify ( { ' error ' : ' 未选择文件 ' } ) , 400
# 验证文件类型
allowed_extensions = { ' png ' , ' jpg ' , ' jpeg ' , ' gif ' , ' webp ' }
2025-11-21 13:44:28 +00:00
# 先从原始文件名获取扩展名
original_filename = file . filename
if ' . ' not in original_filename :
2025-11-21 13:27:40 +00:00
return jsonify ( { ' error ' : ' 无效的文件格式 ' } ) , 400
2025-11-21 13:44:28 +00:00
ext = original_filename . rsplit ( ' . ' , 1 ) [ 1 ] . lower ( )
2025-11-21 13:27:40 +00:00
if ext not in allowed_extensions :
return jsonify ( { ' error ' : ' 不支持的文件格式,请上传 PNG、JPG、GIF 或 WEBP 格式 ' } ) , 400
# 创建avatars目录
avatars_dir = os . path . join ( FRONTEND_DIR , ' assets ' , ' avatars ' )
os . makedirs ( avatars_dir , exist_ok = True )
# 生成唯一文件名
timestamp = datetime . now ( ) . strftime ( ' % Y % m %d % H % M % S ' )
new_filename = f ' avatar_ { uid } _ { timestamp } . { ext } '
filepath = os . path . join ( avatars_dir , new_filename )
# 保存文件
file . save ( filepath )
# 更新数据库
avatar_url = f ' ./assets/avatars/ { new_filename } '
conn = get_db ( )
c = conn . cursor ( )
# 删除旧头像文件(如果存在)
c . execute ( ' SELECT avatar FROM users WHERE id=? ' , ( uid , ) )
row = c . fetchone ( )
if row and row [ ' avatar ' ] and row [ ' avatar ' ] . startswith ( ' ./assets/avatars/ ' ) :
old_file = os . path . join ( FRONTEND_DIR , row [ ' avatar ' ] . replace ( ' ./ ' , ' ' ) )
if os . path . exists ( old_file ) :
try :
os . remove ( old_file )
except Exception :
pass
c . execute ( ' UPDATE users SET avatar=? WHERE id=? ' , ( avatar_url , uid ) )
conn . commit ( )
conn . close ( )
log ( ' upload_avatar ' , f ' 上传头像: { new_filename } ' )
return jsonify ( { ' ok ' : True , ' avatar_url ' : avatar_url } )
@app.post ( ' /api/user/reset-avatar ' )
@require_login
def reset_avatar ( ) :
uid = session . get ( ' user_id ' )
if not uid :
return jsonify ( { ' error ' : ' 未登录 ' } ) , 401
conn = get_db ( )
c = conn . cursor ( )
# 删除旧头像文件(如果存在)
c . execute ( ' SELECT avatar FROM users WHERE id=? ' , ( uid , ) )
row = c . fetchone ( )
if row and row [ ' avatar ' ] and row [ ' avatar ' ] . startswith ( ' ./assets/avatars/ ' ) :
old_file = os . path . join ( FRONTEND_DIR , row [ ' avatar ' ] . replace ( ' ./ ' , ' ' ) )
if os . path . exists ( old_file ) :
try :
os . remove ( old_file )
except Exception :
pass
c . execute ( ' UPDATE users SET avatar=NULL WHERE id=? ' , ( uid , ) )
conn . commit ( )
conn . close ( )
log ( ' reset_avatar ' , ' 恢复默认头像 ' )
return jsonify ( { ' ok ' : True } )
# dashboard
@app.get ( ' /api/dashboard ' )
@require_login
def dashboard ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT COALESCE(SUM(good),0) AS good_total, COALESCE(SUM(bad),0) AS bad_total, COALESCE(SUM(fpy_good),0) AS fpy_good_total FROM stats ' )
s = c . fetchone ( )
c . execute ( ' SELECT COUNT(1) AS total FROM defects ' )
defects = c . fetchone ( )
conn . close ( )
good = s [ ' good_total ' ] if s else 0
bad = s [ ' bad_total ' ] if s else 0
fpy_good = s [ ' fpy_good_total ' ] if s else 0
# 计算总良品率
rate = " {} % " . format ( round ( ( good / ( good + bad ) ) * 100 , 2 ) ) if ( good + bad ) > 0 else u ' — '
# 计算直通良品率( FPY = First Pass Yield)
total_produced = good + bad
fpy_rate = " {} % " . format ( round ( ( fpy_good / total_produced ) * 100 , 2 ) ) if total_produced > 0 else u ' — '
# 从 Redis 获取发货数量( SN 记录数)
shipments_count = 0
try :
r = get_redis ( )
redis_key = ' shipment_sn_mapping '
shipments_count = r . hlen ( redis_key )
except Exception as e :
log ( ' dashboard_redis_error ' , str ( e ) )
# Redis 失败时回退到 SQLite
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT SUM(qty) AS total FROM shipments ' )
ship = c . fetchone ( )
conn . close ( )
shipments_count = ( ship [ ' total ' ] or 0 ) if ship else 0
return jsonify ( {
' fpyRate ' : fpy_rate ,
' goodRate ' : rate ,
' shipments ' : shipments_count ,
' defects ' : ( defects [ ' total ' ] or 0 ) if defects else 0 ,
' badCount ' : bad
} )
@app.get ( ' /api/audit/pdd ' )
@require_login
def audit_pdd ( ) :
start = datetime . utcnow ( )
try :
q_start = request . args . get ( ' start ' )
q_end = request . args . get ( ' end ' )
q_limit = request . args . get ( ' limit ' )
q_order = request . args . get ( ' order ' , ' desc ' )
has_filter = bool ( q_start or q_end or q_limit or q_order )
# 缓存优化: 3秒内不重复查询
if ( not has_filter ) and ( ( datetime . utcnow ( ) . timestamp ( ) - _audit_cache [ ' pdd ' ] [ ' ts ' ] ) < 3 ) :
return jsonify ( { ' list ' : _audit_cache [ ' pdd ' ] [ ' list ' ] } )
r = get_redis ( )
# 设置Redis超时为5秒
r . connection_pool . connection_kwargs [ ' socket_timeout ' ] = 5
items = [ ]
# 限制最大返回数量,避免数据过大
max_items = 500 if has_filter else 200
for key in [ ' mac_batch_audit_pdd ' , ' audit:pdd ' , ' pdd:audit ' ] :
try :
if r . exists ( key ) :
t = r . type ( key )
if t == ' list ' :
# 限制最大查询数量
total = r . llen ( key )
if has_filter :
items = r . lrange ( key , max ( 0 , total - max_items ) , - 1 )
else :
items = r . lrange ( key , - 200 , - 1 )
elif t == ' zset ' :
items = r . zrevrange ( key , 0 , max_items - 1 )
elif t == ' stream ' :
entries = r . xrevrange ( key , max = ' + ' , min = ' - ' , count = max_items )
items = [ json . dumps ( v ) for _id , v in entries ]
else :
v = r . get ( key )
items = [ v ] if v else [ ]
break
except Exception as e :
log ( ' audit_pdd_error ' , f ' Redis query error: { str ( e ) } ' )
continue
try :
host = os . environ . get ( ' REDIS_HOST ' )
db = os . environ . get ( ' REDIS_DB ' )
tp = r . type ( ' mac_batch_audit_pdd ' )
ln = 0
try :
ln = r . llen ( ' mac_batch_audit_pdd ' )
except Exception :
pass
log ( ' audit_pdd_probe ' , json . dumps ( { ' host ' : host , ' db ' : db , ' type ' : tp , ' len ' : ln } ) )
except Exception :
pass
if not items and r . exists ( ' batch_sn_mapping_pdd ' ) and r . type ( ' batch_sn_mapping_pdd ' ) == ' hash ' :
try :
pairs = [ ]
cursor = 0
while True :
cursor , res = r . hscan ( ' batch_sn_mapping_pdd ' , cursor = cursor , count = 200 )
for k , v in ( res or { } ) . items ( ) :
pairs . append ( { ' mac ' : k , ' batch ' : v } )
if len ( pairs ) > = 100 :
break
if cursor == 0 or len ( pairs ) > = 100 :
break
items = [ json . dumps ( { ' mac ' : p [ ' mac ' ] , ' batch ' : p [ ' batch ' ] , ' note ' : ' mapping ' } ) for p in pairs ]
except Exception :
pass
res = [ parse_audit_line ( x ) for x in items ]
if q_start or q_end :
def to_epoch ( s ) :
try :
if not s :
return None
if ' T ' in s or ' Z ' in s or ' + ' in s :
return datetime . fromisoformat ( s . replace ( ' Z ' , ' +00:00 ' ) ) . timestamp ( )
if ' ' in s and ' : ' in s :
return datetime . strptime ( s , ' % Y- % m- %d % H: % M: % S ' ) . timestamp ( )
return datetime . strptime ( s , ' % Y- % m- %d ' ) . timestamp ( )
except Exception :
return None
s_epoch = to_epoch ( q_start ) if q_start else None
e_epoch = to_epoch ( q_end ) if q_end else None
tmp = [ ]
for r0 in res :
ts = to_epoch ( r0 . get ( ' ts_cn ' ) )
if ts is None :
continue
if s_epoch is not None and ts < s_epoch :
continue
if e_epoch is not None and ts > e_epoch :
continue
tmp . append ( r0 )
res = tmp
try :
def to_key ( r ) :
s = r . get ( ' ts_cn ' ) or ' '
try :
if ' T ' in s or ' Z ' in s or ' + ' in s :
return datetime . fromisoformat ( s . replace ( ' Z ' , ' +00:00 ' ) ) . timestamp ( )
if ' ' in s and ' : ' in s :
return datetime . strptime ( s , ' % Y- % m- %d % H: % M: % S ' ) . timestamp ( )
return datetime . strptime ( s , ' % Y- % m- %d ' ) . timestamp ( )
except Exception :
return 0
res . sort ( key = to_key , reverse = ( q_order != ' asc ' ) )
except Exception :
res . reverse ( )
if q_limit :
try :
lim = int ( q_limit )
if lim > 0 :
res = res [ : lim ]
except Exception :
pass
if not has_filter :
_audit_cache [ ' pdd ' ] = { ' ts ' : datetime . utcnow ( ) . timestamp ( ) , ' list ' : res }
dur = ( datetime . utcnow ( ) - start ) . total_seconds ( )
log ( ' audit_pdd_cost ' , f " { dur } s len= { len ( res ) } " )
return jsonify ( { ' list ' : res } )
except Exception as e :
log ( ' audit_pdd_error ' , str ( e ) )
return jsonify ( { ' list ' : [ ] } )
@app.get ( ' /api/audit/yt ' )
@require_login
def audit_yt ( ) :
start = datetime . utcnow ( )
try :
q_start = request . args . get ( ' start ' )
q_end = request . args . get ( ' end ' )
q_limit = request . args . get ( ' limit ' )
q_order = request . args . get ( ' order ' , ' desc ' )
has_filter = bool ( q_start or q_end or q_limit or q_order )
# 缓存优化: 3秒内不重复查询
if ( not has_filter ) and ( ( datetime . utcnow ( ) . timestamp ( ) - _audit_cache [ ' yt ' ] [ ' ts ' ] ) < 3 ) :
return jsonify ( { ' list ' : _audit_cache [ ' yt ' ] [ ' list ' ] } )
r = get_redis ( )
# 设置Redis超时为5秒
r . connection_pool . connection_kwargs [ ' socket_timeout ' ] = 5
items = [ ]
# 限制最大返回数量,避免数据过大
max_items = 500 if has_filter else 200
for key in [ ' mac_batch_audit_yt ' , ' audit:yt ' , ' yt:audit ' ] :
try :
if r . exists ( key ) :
t = r . type ( key )
if t == ' list ' :
# 限制最大查询数量
total = r . llen ( key )
if has_filter :
items = r . lrange ( key , max ( 0 , total - max_items ) , - 1 )
else :
items = r . lrange ( key , - 200 , - 1 )
elif t == ' zset ' :
items = r . zrevrange ( key , 0 , max_items - 1 )
elif t == ' stream ' :
entries = r . xrevrange ( key , max = ' + ' , min = ' - ' , count = max_items )
items = [ json . dumps ( v ) for _id , v in entries ]
else :
v = r . get ( key )
items = [ v ] if v else [ ]
break
except Exception as e :
log ( ' audit_yt_error ' , f ' Redis query error: { str ( e ) } ' )
continue
try :
host = os . environ . get ( ' REDIS_HOST ' )
db = os . environ . get ( ' REDIS_DB ' )
tp = r . type ( ' mac_batch_audit_yt ' )
ln = 0
try :
ln = r . llen ( ' mac_batch_audit_yt ' )
except Exception :
pass
log ( ' audit_yt_probe ' , json . dumps ( { ' host ' : host , ' db ' : db , ' type ' : tp , ' len ' : ln } ) )
except Exception :
pass
if not items and r . exists ( ' batch_sn_mapping_yt ' ) and r . type ( ' batch_sn_mapping_yt ' ) == ' hash ' :
try :
pairs = [ ]
cursor = 0
while True :
cursor , res = r . hscan ( ' batch_sn_mapping_yt ' , cursor = cursor , count = 200 )
for k , v in ( res or { } ) . items ( ) :
pairs . append ( { ' mac ' : k , ' batch ' : v } )
if len ( pairs ) > = 100 :
break
if cursor == 0 or len ( pairs ) > = 100 :
break
items = [ json . dumps ( { ' mac ' : p [ ' mac ' ] , ' batch ' : p [ ' batch ' ] , ' note ' : ' mapping ' } ) for p in pairs ]
except Exception :
pass
res = [ parse_audit_line ( x ) for x in items ]
if q_start or q_end :
def to_epoch ( s ) :
try :
if not s :
return None
if ' T ' in s or ' Z ' in s or ' + ' in s :
return datetime . fromisoformat ( s . replace ( ' Z ' , ' +00:00 ' ) ) . timestamp ( )
if ' ' in s and ' : ' in s :
return datetime . strptime ( s , ' % Y- % m- %d % H: % M: % S ' ) . timestamp ( )
return datetime . strptime ( s , ' % Y- % m- %d ' ) . timestamp ( )
except Exception :
return None
s_epoch = to_epoch ( q_start ) if q_start else None
e_epoch = to_epoch ( q_end ) if q_end else None
tmp = [ ]
for r0 in res :
ts = to_epoch ( r0 . get ( ' ts_cn ' ) )
if ts is None :
continue
if s_epoch is not None and ts < s_epoch :
continue
if e_epoch is not None and ts > e_epoch :
continue
tmp . append ( r0 )
res = tmp
try :
def to_key ( r ) :
s = r . get ( ' ts_cn ' ) or ' '
try :
if ' T ' in s or ' Z ' in s or ' + ' in s :
return datetime . fromisoformat ( s . replace ( ' Z ' , ' +00:00 ' ) ) . timestamp ( )
if ' ' in s and ' : ' in s :
return datetime . strptime ( s , ' % Y- % m- %d % H: % M: % S ' ) . timestamp ( )
return datetime . strptime ( s , ' % Y- % m- %d ' ) . timestamp ( )
except Exception :
return 0
res . sort ( key = to_key , reverse = ( q_order != ' asc ' ) )
except Exception :
res . reverse ( )
if q_limit :
try :
lim = int ( q_limit )
if lim > 0 :
res = res [ : lim ]
except Exception :
pass
if not has_filter :
_audit_cache [ ' yt ' ] = { ' ts ' : datetime . utcnow ( ) . timestamp ( ) , ' list ' : res }
dur = ( datetime . utcnow ( ) - start ) . total_seconds ( )
log ( ' audit_yt_cost ' , f " { dur } s len= { len ( res ) } " )
return jsonify ( { ' list ' : res } )
except Exception as e :
log ( ' audit_yt_error ' , str ( e ) )
return jsonify ( { ' list ' : [ ] } )
@app.get ( ' /api/audit/diagnose ' )
@require_login
def audit_diagnose ( ) :
try :
r = get_redis ( )
result = { }
for key in [ ' mac_batch_audit_pdd ' , ' mac_batch_audit_yt ' , ' batch_sn_mapping_pdd ' , ' batch_sn_mapping_yt ' ] :
try :
t = r . type ( key )
if t == ' list ' :
result [ key ] = { ' type ' : t , ' len ' : r . llen ( key ) }
elif t == ' zset ' :
result [ key ] = { ' type ' : t , ' len ' : r . zcard ( key ) }
elif t == ' stream ' :
info = r . xinfo_stream ( key )
result [ key ] = { ' type ' : t , ' len ' : info . get ( ' length ' ) }
elif t == ' hash ' :
result [ key ] = { ' type ' : t , ' len ' : r . hlen ( key ) }
elif t == ' none ' :
result [ key ] = { ' type ' : t , ' len ' : 0 }
else :
v = r . get ( key )
result [ key ] = { ' type ' : t , ' len ' : 1 if v else 0 }
except Exception as e :
result [ key ] = { ' error ' : str ( e ) }
return jsonify ( result )
except Exception as e :
return jsonify ( { ' error ' : str ( e ) } ) , 500
@app.get ( ' /api/overview ' )
@require_login
def overview ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT COUNT(1) AS cnt, COALESCE(SUM(good),0) AS good_total, COALESCE(SUM(bad),0) AS bad_total, COALESCE(SUM(fpy_good),0) AS fpy_good_total FROM stats ' )
stats_row = c . fetchone ( )
c . execute ( ' SELECT COUNT(1) AS cnt FROM defects ' )
defects_row = c . fetchone ( )
c . execute ( ' SELECT COUNT(1) AS cnt FROM mac_batches ' )
mac_row = c . fetchone ( )
c . execute ( ' SELECT COUNT(1) AS cnt, COALESCE(SUM(qty),0) AS qty_total FROM shipments ' )
ship_row = c . fetchone ( )
c . execute ( ' SELECT COUNT(1) AS cnt FROM devices ' )
devices_row = c . fetchone ( )
c . execute ( ' SELECT COUNT(1) AS cnt FROM personnel ' )
personnel_row = c . fetchone ( )
c . execute ( ' SELECT COUNT(1) AS cnt FROM qa ' )
qa_row = c . fetchone ( )
c . execute ( ' SELECT COUNT(1) AS cnt FROM production ' )
production_row = c . fetchone ( )
conn . close ( )
return jsonify ( {
' stats ' : {
' records ' : ( stats_row [ ' cnt ' ] or 0 ) if stats_row else 0 ,
' goodTotal ' : ( stats_row [ ' good_total ' ] or 0 ) if stats_row else 0 ,
' badTotal ' : ( stats_row [ ' bad_total ' ] or 0 ) if stats_row else 0 ,
' fpyGoodTotal ' : ( stats_row [ ' fpy_good_total ' ] or 0 ) if stats_row else 0
} ,
' defects ' : ( defects_row [ ' cnt ' ] or 0 ) if defects_row else 0 ,
' mac ' : ( mac_row [ ' cnt ' ] or 0 ) if mac_row else 0 ,
' shipments ' : {
' records ' : ( ship_row [ ' cnt ' ] or 0 ) if ship_row else 0 ,
' qtyTotal ' : ( ship_row [ ' qty_total ' ] or 0 ) if ship_row else 0
} ,
' devices ' : ( devices_row [ ' cnt ' ] or 0 ) if devices_row else 0 ,
' personnel ' : ( personnel_row [ ' cnt ' ] or 0 ) if personnel_row else 0 ,
' qa ' : ( qa_row [ ' cnt ' ] or 0 ) if qa_row else 0 ,
' production ' : ( production_row [ ' cnt ' ] or 0 ) if production_row else 0
} )
# uploads
@app.post ( ' /api/upload/mac ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def upload_mac ( ) :
data = request . get_json ( ) or { }
rows = data . get ( ' rows ' ) or [ ]
if not isinstance ( rows , list ) :
return jsonify ( { ' error ' : ' invalid rows ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
2025-11-21 14:45:00 +00:00
now = get_beijing_time ( )
2025-11-21 13:27:40 +00:00
for r in rows :
mac = ( r or { } ) . get ( ' mac ' )
batch = ( r or { } ) . get ( ' batch ' )
if not mac or not batch :
continue
c . execute ( ' INSERT INTO mac_batches(mac, batch, ts) VALUES(?,?,?) ' , ( mac , batch , now ) )
conn . commit ( )
conn . close ( )
log ( ' upload_mac ' , f " count= { len ( rows ) } " )
notify_superadmin ( ' 上传MAC与批次 ' , f " 上传了 { len ( rows ) } 条记录 " )
return jsonify ( { ' ok ' : True } )
@app.post ( ' /api/upload/stats ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def upload_stats ( ) :
data = request . get_json ( ) or { }
good = int ( data . get ( ' good ' ) or 0 )
bad = int ( data . get ( ' bad ' ) or 0 )
fpy_good = int ( data . get ( ' fpy_good ' ) or 0 ) # 直通良品数
platform = data . get ( ' platform ' ) or ' pdd ' # 平台: pdd/yt/tx
details = data . get ( ' details ' ) or [ ]
if good < 0 or bad < 0 or fpy_good < 0 :
return jsonify ( { ' error ' : ' invalid count ' } ) , 400
if platform not in [ ' pdd ' , ' yt ' , ' tx ' ] :
platform = ' pdd '
conn = get_db ( )
c = conn . cursor ( )
2025-11-21 14:45:00 +00:00
now = get_beijing_time ( )
2025-11-21 13:27:40 +00:00
# 保存统计数据
c . execute ( ' INSERT INTO stats(good,bad,fpy_good,platform,ts) VALUES(?,?,?,?,?) ' , ( good , bad , fpy_good , platform , now ) )
# 如果有不良明细, 保存到defects表
if details and isinstance ( details , list ) :
for item in details :
mac = ( item or { } ) . get ( ' mac ' )
batch = ( item or { } ) . get ( ' batch ' )
if mac and batch :
c . execute ( ' INSERT INTO defects(mac, batch, ts) VALUES(?,?,?) ' , ( mac , batch , now ) )
conn . commit ( )
conn . close ( )
platform_name = { ' pdd ' : ' 拼多多 ' , ' yt ' : ' 圆通 ' , ' tx ' : ' 兔喜 ' } . get ( platform , platform )
log ( ' upload_stats ' , json . dumps ( { ' good ' : good , ' bad ' : bad , ' fpy_good ' : fpy_good , ' platform ' : platform , ' details_count ' : len ( details ) } ) )
notify_superadmin ( ' 上传良/不良统计 ' , f " 平台: { platform_name } , 良品: { good } , 不良品: { bad } , 直通良品: { fpy_good } " )
return jsonify ( { ' ok ' : True } )
@app.post ( ' /api/upload/repairs ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def upload_repairs ( ) :
data = request . get_json ( ) or { }
qty = int ( data . get ( ' qty ' ) or 0 )
note = data . get ( ' note ' ) or ' '
if qty < 0 :
return jsonify ( { ' error ' : ' invalid quantity ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
2025-11-21 14:45:00 +00:00
now = get_beijing_time ( )
2025-11-21 13:27:40 +00:00
c . execute ( ' INSERT INTO repairs(qty, note, ts) VALUES(?,?,?) ' , ( qty , note , now ) )
conn . commit ( )
conn . close ( )
log ( ' upload_repairs ' , json . dumps ( { ' qty ' : qty , ' note ' : note } ) )
notify_superadmin ( ' 上传返修记录 ' , f " 数量: { qty } " )
return jsonify ( { ' ok ' : True } )
@app.post ( ' /api/upload/defects ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def upload_defects ( ) :
data = request . get_json ( ) or { }
rows = data . get ( ' rows ' ) or [ ]
conn = get_db ( )
c = conn . cursor ( )
2025-11-21 14:45:00 +00:00
now = get_beijing_time ( )
2025-11-21 13:27:40 +00:00
for r in rows :
mac = ( r or { } ) . get ( ' mac ' )
batch = ( r or { } ) . get ( ' batch ' )
if not mac or not batch :
continue
c . execute ( ' INSERT INTO defects(mac, batch, ts) VALUES(?,?,?) ' , ( mac , batch , now ) )
conn . commit ( )
conn . close ( )
log ( ' upload_defects ' , f " count= { len ( rows ) } " )
notify_superadmin ( ' 上传不良明细 ' , f " 上传了 { len ( rows ) } 条记录 " )
return jsonify ( { ' ok ' : True } )
@app.post ( ' /api/upload/shipments ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def upload_shipments ( ) :
data = request . get_json ( ) or { }
date = data . get ( ' date ' )
qty = int ( data . get ( ' qty ' ) or 0 )
to = data . get ( ' to ' )
platform = data . get ( ' platform ' , ' ' )
box_no = data . get ( ' box_no ' , ' ' )
if not date or qty < = 0 or not to or not platform :
return jsonify ( { ' error ' : ' invalid payload ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
# 检查shipments表是否有platform和box_no列, 如果没有则添加
c . execute ( " PRAGMA table_info(shipments) " )
columns = [ col [ 1 ] for col in c . fetchall ( ) ]
if ' platform ' not in columns :
c . execute ( ' ALTER TABLE shipments ADD COLUMN platform TEXT ' )
if ' box_no ' not in columns :
c . execute ( ' ALTER TABLE shipments ADD COLUMN box_no TEXT ' )
c . execute (
' INSERT INTO shipments(date, qty, receiver, platform, box_no, ts) VALUES(?,?,?,?,?,?) ' ,
2025-11-21 14:45:00 +00:00
( date , qty , to , platform , box_no , get_beijing_time ( ) )
2025-11-21 13:27:40 +00:00
)
conn . commit ( )
conn . close ( )
platform_name = { ' pdd ' : ' 拼多多 ' , ' yt ' : ' 圆通 ' , ' tx ' : ' 兔喜 ' } . get ( platform , platform )
log_data = { ' date ' : date , ' qty ' : qty , ' to ' : to , ' platform ' : platform }
if box_no :
log_data [ ' box_no ' ] = box_no
log ( ' upload_shipments ' , json . dumps ( log_data ) )
notify_superadmin ( ' 上传发货记录 ' , f " 机种: { platform_name } , 日期: { date } , 数量: { qty } , 接收方: { to } " )
return jsonify ( { ' ok ' : True } )
# collect
@app.get ( ' /api/collect/devices ' )
@require_login
def devices ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT name, status FROM devices ORDER BY id DESC LIMIT 50 ' )
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
@app.get ( ' /api/collect/environment ' )
@require_login
def environment ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT temp, hum FROM environment ORDER BY id DESC LIMIT 1 ' )
r = c . fetchone ( )
conn . close ( )
return jsonify ( { ' temp ' : ( r [ ' temp ' ] if r else ' — ' ) , ' hum ' : ( r [ ' hum ' ] if r else ' — ' ) } )
@app.get ( ' /api/collect/personnel ' )
@require_login
def personnel ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT name, role FROM personnel ORDER BY id DESC LIMIT 100 ' )
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
@app.post ( ' /api/collect/personnel ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def add_personnel ( ) :
data = request . get_json ( ) or { }
name = ( data . get ( ' name ' ) or ' ' ) . strip ( )
role = ( data . get ( ' role ' ) or ' ' ) . strip ( )
if not name :
return jsonify ( { ' error ' : ' invalid payload ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
2025-11-21 14:45:00 +00:00
c . execute ( ' INSERT INTO personnel(name, role, ts) VALUES(?,?,?) ' , ( name , role , get_beijing_time ( ) ) )
2025-11-21 13:27:40 +00:00
conn . commit ( )
conn . close ( )
log ( ' add_personnel ' , name )
notify_superadmin ( ' 添加人员信息 ' , f " 姓名: { name } , 角色: { role } " )
return jsonify ( { ' ok ' : True } )
@app.get ( ' /api/collect/qa ' )
@require_login
def qa ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT title, date FROM qa ORDER BY id DESC LIMIT 100 ' )
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
@app.get ( ' /api/collect/production ' )
@require_login
def production ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT batch, duration FROM production ORDER BY id DESC LIMIT 100 ' )
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
# export
@app.post ( ' /api/export/excel ' )
@require_login
def export_excel ( ) :
try :
import openpyxl
from openpyxl . styles import Font , Alignment , PatternFill
from io import BytesIO
data = request . get_json ( ) or { }
data_type = data . get ( ' type ' , ' stats ' )
# 创建工作簿
wb = openpyxl . Workbook ( )
ws = wb . active
# 设置标题样式
header_fill = PatternFill ( start_color = ' 4F8CFF ' , end_color = ' 4F8CFF ' , fill_type = ' solid ' )
header_font = Font ( bold = True , color = ' FFFFFF ' )
header_alignment = Alignment ( horizontal = ' center ' , vertical = ' center ' )
conn = get_db ( )
c = conn . cursor ( )
# 根据类型导出不同的数据
if data_type == ' stats ' :
ws . title = ' 良不良统计 '
ws . append ( [ ' 直通良品数 ' , ' 良品数 ' , ' 不良品数 ' , ' 时间 ' ] )
c . execute ( ' SELECT fpy_good, good, bad, ts FROM stats ORDER BY id DESC ' )
elif data_type == ' mac ' :
ws . title = ' MAC与批次 '
ws . append ( [ ' MAC地址 ' , ' 批次号 ' , ' 时间 ' ] )
c . execute ( ' SELECT mac, batch, ts FROM mac_batches ORDER BY id DESC ' )
elif data_type == ' repairs ' :
ws . title = ' 返修记录 '
ws . append ( [ ' 返修数量 ' , ' 备注 ' , ' 时间 ' ] )
c . execute ( ' SELECT qty, note, ts FROM repairs ORDER BY id DESC ' )
elif data_type == ' defects ' :
ws . title = ' 不良明细 '
ws . append ( [ ' MAC地址 ' , ' 批次号 ' , ' 时间 ' ] )
c . execute ( ' SELECT mac, batch, ts FROM defects ORDER BY id DESC ' )
elif data_type == ' shipments ' :
ws . title = ' 发货记录 '
ws . append ( [ ' 日期 ' , ' 数量 ' , ' 收货方 ' , ' 时间 ' ] )
c . execute ( ' SELECT date, qty, receiver, ts FROM shipments ORDER BY id DESC ' )
elif data_type == ' devices ' :
ws . title = ' 设备状态 '
ws . append ( [ ' 设备名称 ' , ' 状态 ' ] )
c . execute ( ' SELECT name, status FROM devices ORDER BY id DESC ' )
elif data_type == ' personnel ' :
ws . title = ' 人员信息 '
ws . append ( [ ' 姓名 ' , ' 角色 ' ] )
c . execute ( ' SELECT name, role FROM personnel ORDER BY id DESC ' )
elif data_type == ' qa ' :
ws . title = ' 质检报告 '
ws . append ( [ ' 标题 ' , ' 日期 ' ] )
c . execute ( ' SELECT title, date FROM qa ORDER BY id DESC ' )
elif data_type == ' production ' :
ws . title = ' 时间记录 '
ws . append ( [ ' 批次 ' , ' 时长 ' ] )
c . execute ( ' SELECT batch, duration FROM production ORDER BY id DESC ' )
else :
conn . close ( )
return jsonify ( { ' error ' : ' invalid type ' } ) , 400
# 应用标题样式
for cell in ws [ 1 ] :
cell . fill = header_fill
cell . font = header_font
cell . alignment = header_alignment
# 写入数据
rows = c . fetchall ( )
for row in rows :
ws . append ( list ( row ) )
conn . close ( )
# 自动调整列宽
for column in ws . columns :
max_length = 0
column_letter = column [ 0 ] . column_letter
for cell in column :
try :
if len ( str ( cell . value ) ) > max_length :
max_length = len ( str ( cell . value ) )
except :
pass
adjusted_width = min ( max_length + 2 , 50 )
ws . column_dimensions [ column_letter ] . width = adjusted_width
# 保存到内存
output = BytesIO ( )
wb . save ( output )
output . seek ( 0 )
log ( ' export_excel ' , data_type )
# 返回文件
from flask import send_file
filename = f ' { ws . title } _ { datetime . now ( ) . strftime ( " % Y % m %d _ % H % M % S " ) } .xlsx '
return send_file (
output ,
mimetype = ' application/vnd.openxmlformats-officedocument.spreadsheetml.sheet ' ,
as_attachment = True ,
download_name = filename
)
except Exception as e :
log ( ' export_excel_error ' , str ( e ) )
return jsonify ( { ' error ' : str ( e ) } ) , 500
@app.post ( ' /api/export/pdf ' )
@require_login
def export_pdf ( ) :
try :
from reportlab . lib import colors
from reportlab . lib . pagesizes import A4 , landscape
from reportlab . platypus import SimpleDocTemplate , Table , TableStyle , Paragraph , Spacer
from reportlab . lib . styles import getSampleStyleSheet , ParagraphStyle
from reportlab . lib . units import cm
from reportlab . pdfbase import pdfmetrics
from reportlab . pdfbase . ttfonts import TTFont
from reportlab . lib . enums import TA_CENTER
from io import BytesIO
data = request . get_json ( ) or { }
data_type = data . get ( ' type ' , ' stats ' )
# 注册中文字体
font_name = ' Helvetica '
try :
# 尝试常见的中文字体路径
font_paths = [
' /usr/share/fonts/truetype/wqy/wqy-zenhei.ttc ' ,
' /usr/share/fonts/truetype/arphic/uming.ttc ' ,
' /System/Library/Fonts/PingFang.ttc ' ,
' C: \\ Windows \\ Fonts \\ simhei.ttf '
]
for font_path in font_paths :
if os . path . exists ( font_path ) :
pdfmetrics . registerFont ( TTFont ( ' ChineseFont ' , font_path ) )
font_name = ' ChineseFont '
break
except Exception as e :
log ( ' pdf_font_warning ' , f ' 无法加载中文字体: { str ( e ) } ' )
# 创建PDF
buffer = BytesIO ( )
doc = SimpleDocTemplate (
buffer ,
pagesize = landscape ( A4 ) ,
topMargin = 1.5 * cm ,
bottomMargin = 1.5 * cm ,
leftMargin = 1.5 * cm ,
rightMargin = 1.5 * cm
)
elements = [ ]
# 样式
styles = getSampleStyleSheet ( )
title_style = ParagraphStyle (
' CustomTitle ' ,
parent = styles [ ' Heading1 ' ] ,
fontName = font_name ,
fontSize = 18 ,
textColor = colors . HexColor ( ' #4F8CFF ' ) ,
spaceAfter = 20 ,
alignment = TA_CENTER ,
leading = 24
)
conn = get_db ( )
c = conn . cursor ( )
# 根据类型导出不同的数据
if data_type == ' stats ' :
title = ' 良/不良统计报表 '
headers = [ ' 直通良品数 ' , ' 良品数 ' , ' 不良品数 ' , ' 时间 ' ]
c . execute ( ' SELECT fpy_good, good, bad, ts FROM stats ORDER BY id DESC LIMIT 200 ' )
elif data_type == ' mac ' :
title = ' MAC与批次报表 '
headers = [ ' MAC地址 ' , ' 批次号 ' , ' 时间 ' ]
c . execute ( ' SELECT mac, batch, ts FROM mac_batches ORDER BY id DESC LIMIT 200 ' )
elif data_type == ' repairs ' :
title = ' 返修记录报表 '
headers = [ ' 返修数量 ' , ' 备注 ' , ' 时间 ' ]
c . execute ( ' SELECT qty, note, ts FROM repairs ORDER BY id DESC LIMIT 200 ' )
elif data_type == ' defects ' :
title = ' 不良明细报表 '
headers = [ ' MAC地址 ' , ' 批次号 ' , ' 时间 ' ]
c . execute ( ' SELECT mac, batch, ts FROM defects ORDER BY id DESC LIMIT 200 ' )
elif data_type == ' shipments ' :
title = ' 发货记录报表 '
headers = [ ' 日期 ' , ' 数量 ' , ' 收货方 ' , ' 时间 ' ]
c . execute ( ' SELECT date, qty, receiver, ts FROM shipments ORDER BY id DESC LIMIT 200 ' )
elif data_type == ' devices ' :
title = ' 设备状态报表 '
headers = [ ' 设备名称 ' , ' 状态 ' ]
c . execute ( ' SELECT name, status FROM devices ORDER BY id DESC LIMIT 200 ' )
elif data_type == ' personnel ' :
title = ' 人员信息报表 '
headers = [ ' 姓名 ' , ' 角色 ' ]
c . execute ( ' SELECT name, role FROM personnel ORDER BY id DESC LIMIT 200 ' )
elif data_type == ' qa ' :
title = ' 质检报告 '
headers = [ ' 标题 ' , ' 日期 ' ]
c . execute ( ' SELECT title, date FROM qa ORDER BY id DESC LIMIT 200 ' )
elif data_type == ' production ' :
title = ' 生产时间记录 '
headers = [ ' 批次 ' , ' 时长 ' ]
c . execute ( ' SELECT batch, duration FROM production ORDER BY id DESC LIMIT 200 ' )
else :
conn . close ( )
return jsonify ( { ' error ' : ' invalid type ' } ) , 400
# 添加标题
elements . append ( Paragraph ( title , title_style ) )
elements . append ( Spacer ( 1 , 0.5 * cm ) )
# 获取数据
rows = c . fetchall ( )
conn . close ( )
if len ( rows ) == 0 :
# 没有数据时的提示
no_data_style = ParagraphStyle (
' NoData ' ,
parent = styles [ ' Normal ' ] ,
fontName = font_name ,
fontSize = 12 ,
textColor = colors . grey ,
alignment = TA_CENTER
)
elements . append ( Paragraph ( ' 暂无数据 ' , no_data_style ) )
else :
# 构建表格数据
table_data = [ headers ]
for row in rows :
table_data . append ( [ str ( val ) if val is not None else ' ' for val in row ] )
# 创建表格
table = Table ( table_data , repeatRows = 1 )
table . setStyle ( TableStyle ( [
# 表头样式
( ' BACKGROUND ' , ( 0 , 0 ) , ( - 1 , 0 ) , colors . HexColor ( ' #4F8CFF ' ) ) ,
( ' TEXTCOLOR ' , ( 0 , 0 ) , ( - 1 , 0 ) , colors . whitesmoke ) ,
( ' ALIGN ' , ( 0 , 0 ) , ( - 1 , - 1 ) , ' CENTER ' ) ,
( ' VALIGN ' , ( 0 , 0 ) , ( - 1 , - 1 ) , ' MIDDLE ' ) ,
( ' FONTNAME ' , ( 0 , 0 ) , ( - 1 , 0 ) , font_name ) ,
( ' FONTSIZE ' , ( 0 , 0 ) , ( - 1 , 0 ) , 10 ) ,
( ' BOTTOMPADDING ' , ( 0 , 0 ) , ( - 1 , 0 ) , 8 ) ,
( ' TOPPADDING ' , ( 0 , 0 ) , ( - 1 , 0 ) , 8 ) ,
# 数据行样式
( ' BACKGROUND ' , ( 0 , 1 ) , ( - 1 , - 1 ) , colors . white ) ,
( ' GRID ' , ( 0 , 0 ) , ( - 1 , - 1 ) , 0.5 , colors . grey ) ,
( ' FONTNAME ' , ( 0 , 1 ) , ( - 1 , - 1 ) , font_name ) ,
( ' FONTSIZE ' , ( 0 , 1 ) , ( - 1 , - 1 ) , 8 ) ,
( ' ROWBACKGROUNDS ' , ( 0 , 1 ) , ( - 1 , - 1 ) , [ colors . white , colors . HexColor ( ' #F5F7FA ' ) ] ) ,
( ' TOPPADDING ' , ( 0 , 1 ) , ( - 1 , - 1 ) , 5 ) ,
( ' BOTTOMPADDING ' , ( 0 , 1 ) , ( - 1 , - 1 ) , 5 ) ,
] ) )
elements . append ( table )
# 添加页脚信息
elements . append ( Spacer ( 1 , 0.5 * cm ) )
footer_style = ParagraphStyle (
' Footer ' ,
parent = styles [ ' Normal ' ] ,
fontName = font_name ,
fontSize = 8 ,
textColor = colors . grey ,
alignment = TA_CENTER
)
footer_text = f ' 导出时间: { datetime . now ( ) . strftime ( " % Y- % m- %d % H: % M: % S " ) } | 共 { len ( rows ) } 条记录 '
elements . append ( Paragraph ( footer_text , footer_style ) )
# 生成PDF
doc . build ( elements )
buffer . seek ( 0 )
log ( ' export_pdf ' , data_type )
# 返回文件
from flask import send_file
filename = f ' { title } _ { datetime . now ( ) . strftime ( " % Y % m %d _ % H % M % S " ) } .pdf '
return send_file (
buffer ,
mimetype = ' application/pdf ' ,
as_attachment = True ,
download_name = filename
)
except Exception as e :
log ( ' export_pdf_error ' , str ( e ) )
return jsonify ( { ' error ' : f ' PDF导出失败: { str ( e ) } ' } ) , 500
# lists
@app.get ( ' /api/list/mac ' )
@require_login
def list_mac ( ) :
conn = get_db ( )
c = conn . cursor ( )
2025-11-22 01:53:18 +00:00
c . execute ( ' SELECT mac, batch, platform, ts FROM mac_batches ORDER BY id DESC LIMIT 200 ' )
2025-11-21 13:27:40 +00:00
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
@app.get ( ' /api/list/stats ' )
@require_login
def list_stats ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT good, bad, fpy_good, platform, ts FROM stats ORDER BY id DESC LIMIT 200 ' )
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
@app.get ( ' /api/list/repairs ' )
@require_login
def list_repairs ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT qty, note, ts FROM repairs ORDER BY id DESC LIMIT 200 ' )
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
@app.get ( ' /api/list/defects ' )
@require_login
def list_defects ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT mac, batch, ts FROM defects ORDER BY id DESC LIMIT 200 ' )
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
@app.get ( ' /api/list/shipments ' )
@require_login
def list_shipments ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT date, qty, receiver, ts FROM shipments ORDER BY id DESC LIMIT 200 ' )
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
# admin management
@app.get ( ' /api/admin/users ' )
@require_login
@require_any_role ( ' superadmin ' )
def list_users ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT username, role FROM users ORDER BY id ASC ' )
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
@app.post ( ' /api/admin/reset-password ' )
@require_login
@require_any_role ( ' superadmin ' )
def reset_password ( ) :
data = request . get_json ( ) or { }
username = data . get ( ' username ' )
new_password = data . get ( ' new_password ' )
if not username or not new_password :
return jsonify ( { ' error ' : ' invalid payload ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT id FROM users WHERE username=? ' , ( username , ) )
row = c . fetchone ( )
if not row :
conn . close ( )
return jsonify ( { ' error ' : ' user not found ' } ) , 404
c . execute ( ' UPDATE users SET password_hash=? WHERE id=? ' , ( generate_password_hash ( new_password ) , row [ ' id ' ] ) )
conn . commit ( )
conn . close ( )
log ( ' reset_password ' , username )
return jsonify ( { ' ok ' : True } )
@app.post ( ' /api/admin/change-password ' )
@require_login
@require_any_role ( ' superadmin ' )
def change_password ( ) :
data = request . get_json ( ) or { }
username = data . get ( ' username ' )
new_password = data . get ( ' new_password ' )
if not username or not new_password :
return jsonify ( { ' error ' : ' invalid payload ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT id FROM users WHERE username=? ' , ( username , ) )
row = c . fetchone ( )
if not row :
conn . close ( )
return jsonify ( { ' error ' : ' user not found ' } ) , 404
c . execute ( ' UPDATE users SET password_hash=? WHERE id=? ' , ( generate_password_hash ( new_password ) , row [ ' id ' ] ) )
conn . commit ( )
conn . close ( )
log ( ' change_password ' , username )
return jsonify ( { ' ok ' : True } )
@app.post ( ' /api/admin/add-user ' )
@require_login
@require_any_role ( ' superadmin ' )
def add_user ( ) :
""" 添加新用户 """
data = request . get_json ( ) or { }
username = ( data . get ( ' username ' ) or ' ' ) . strip ( )
password = data . get ( ' password ' )
role = ( data . get ( ' role ' ) or ' admin ' ) . strip ( )
if not username or not password :
return jsonify ( { ' error ' : ' 用户名和密码不能为空 ' } ) , 400
if role not in [ ' admin ' , ' superadmin ' ] :
return jsonify ( { ' error ' : ' 角色必须是 admin 或 superadmin ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
# 检查用户名是否已存在
c . execute ( ' SELECT id FROM users WHERE username=? ' , ( username , ) )
if c . fetchone ( ) :
conn . close ( )
return jsonify ( { ' error ' : ' 用户名已存在 ' } ) , 400
# 创建新用户
try :
c . execute ( ' INSERT INTO users(username, password_hash, role) VALUES(?,?,?) ' ,
( username , generate_password_hash ( password ) , role ) )
conn . commit ( )
conn . close ( )
log ( ' add_user ' , f ' username= { username } , role= { role } ' )
return jsonify ( { ' ok ' : True , ' message ' : f ' 用户 { username } 创建成功 ' } )
except Exception as e :
conn . close ( )
return jsonify ( { ' error ' : f ' 创建用户失败: { str ( e ) } ' } ) , 500
@app.post ( ' /api/admin/clear ' )
@require_login
@require_any_role ( ' superadmin ' )
def clear_module ( ) :
data = request . get_json ( ) or { }
module = data . get ( ' module ' )
tables = {
' mac ' : ' mac_batches ' ,
' stats ' : ' stats ' ,
' defects ' : ' defects ' ,
' shipments ' : ' shipments ' ,
' devices ' : ' devices ' ,
' environment ' : ' environment ' ,
' personnel ' : ' personnel ' ,
' qa ' : ' qa ' ,
' production ' : ' production '
}
table = tables . get ( module )
if not table :
return jsonify ( { ' error ' : ' invalid module ' } ) , 400
2025-11-21 14:04:43 +00:00
# 清空 SQLite 表
2025-11-21 13:27:40 +00:00
conn = get_db ( )
c = conn . cursor ( )
c . execute ( f ' DELETE FROM { table } ' )
conn . commit ( )
conn . close ( )
2025-11-21 14:04:43 +00:00
# 如果是清空发货记录,同时清空 Redis
if module == ' shipments ' :
try :
r = get_redis ( )
redis_key = ' shipment_sn_mapping '
redis_count = r . hlen ( redis_key )
r . delete ( redis_key )
log ( ' clear_module_redis ' , f ' shipments: cleared { redis_count } records from redis ' )
except Exception as e :
log ( ' clear_module_redis_error ' , str ( e ) )
2025-11-21 13:27:40 +00:00
log ( ' clear_module ' , module )
return jsonify ( { ' ok ' : True } )
# notifications
@app.get ( ' /api/notifications ' )
@require_login
@require_any_role ( ' superadmin ' )
def get_notifications ( ) :
""" 获取当前用户的通知列表 """
user_id = session . get ( ' user_id ' )
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT id, username, action, detail, ts, read FROM notifications WHERE user_id=? ORDER BY id DESC LIMIT 100 ' , ( user_id , ) )
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
@app.get ( ' /api/notifications/unread-count ' )
@require_login
@require_any_role ( ' superadmin ' )
def get_unread_count ( ) :
""" 获取未读通知数量 """
user_id = session . get ( ' user_id ' )
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT COUNT(*) as count FROM notifications WHERE user_id=? AND read=0 ' , ( user_id , ) )
row = c . fetchone ( )
conn . close ( )
return jsonify ( { ' count ' : row [ ' count ' ] if row else 0 } )
@app.post ( ' /api/notifications/mark-read ' )
@require_login
@require_any_role ( ' superadmin ' )
def mark_notification_read ( ) :
""" 标记通知为已读 """
data = request . get_json ( ) or { }
notification_id = data . get ( ' id ' )
if not notification_id :
return jsonify ( { ' error ' : ' invalid id ' } ) , 400
user_id = session . get ( ' user_id ' )
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' UPDATE notifications SET read=1 WHERE id=? AND user_id=? ' , ( notification_id , user_id ) )
conn . commit ( )
conn . close ( )
return jsonify ( { ' ok ' : True } )
@app.post ( ' /api/notifications/mark-all-read ' )
@require_login
@require_any_role ( ' superadmin ' )
def mark_all_notifications_read ( ) :
""" 标记所有通知为已读 """
user_id = session . get ( ' user_id ' )
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' UPDATE notifications SET read=1 WHERE user_id=? ' , ( user_id , ) )
conn . commit ( )
conn . close ( )
return jsonify ( { ' ok ' : True } )
@app.post ( ' /api/notifications/delete-read ' )
@require_login
@require_any_role ( ' superadmin ' )
def delete_read_notifications ( ) :
""" 删除所有已读通知 """
user_id = session . get ( ' user_id ' )
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' DELETE FROM notifications WHERE user_id=? AND read=1 ' , ( user_id , ) )
deleted_count = c . rowcount
conn . commit ( )
conn . close ( )
return jsonify ( { ' ok ' : True , ' count ' : deleted_count } )
@app.errorhandler ( 404 )
def not_found ( _ ) :
return jsonify ( { ' error ' : ' not found ' } ) , 404
@app.route ( ' /api/validate/mac-file ' , methods = [ ' POST ' ] )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def validate_mac_file ( ) :
""" 验证Excel文件格式是否符合要求 """
f = request . files . get ( ' file ' )
if not f :
return jsonify ( { ' error ' : ' no file ' } ) , 400
name = secure_filename ( f . filename or ' ' )
ext = ( name . split ( ' . ' ) [ - 1 ] or ' ' ) . lower ( )
if ext not in [ ' csv ' , ' xlsx ' , ' xls ' ] :
return jsonify ( { ' valid ' : False , ' message ' : ' 文件格式不支持, 请上传CSV或Excel文件 ' } ) , 200
try :
if ext == ' csv ' :
text = f . stream . read ( ) . decode ( ' utf-8 ' , errors = ' ignore ' )
lines = [ l . strip ( ) for l in text . splitlines ( ) if l . strip ( ) ]
if not lines :
return jsonify ( { ' valid ' : False , ' message ' : ' 文件为空,没有数据 ' } ) , 200
# 检查第一行(表头)
header = [ h . strip ( ) for h in lines [ 0 ] . split ( ' , ' ) ]
if len ( header ) != 2 :
return jsonify ( { ' valid ' : False , ' message ' : f ' 文件应该只包含2列数据, 当前有 { len ( header ) } 列 ' } ) , 200
# 记录表头用于调试
log ( ' validate_mac_file_csv ' , f ' headers: { header } ' )
# 更灵活的表头检查(不区分大小写)
header_lower = [ h . lower ( ) for h in header ]
has_mac = any ( ' mac ' in h and ' sn ' not in h for h in header_lower )
has_sn_mac = any ( ' sn_mac ' in h or ' sn-mac ' in h for h in header_lower )
has_batch = any ( ' 批次 ' in h or ' batch ' in h for h in header_lower )
if not ( has_mac or has_sn_mac ) :
return jsonify ( { ' valid ' : False , ' message ' : f ' 缺少必需的列: MAC 或 SN_MAC( 当前列: { " , " . join ( header ) } ) ' } ) , 200
if not has_batch :
return jsonify ( { ' valid ' : False , ' message ' : f ' 缺少必需的列:批次号(当前列: { " , " . join ( header ) } ) ' } ) , 200
data_rows = len ( lines ) - 1
mac_col = ' MAC ' if has_mac else ' SN_MAC '
return jsonify ( { ' valid ' : True , ' message ' : f ' 文件格式正确,包含列: { mac_col } 和 批次号,共 { data_rows } 行数据 ' } ) , 200
else :
import openpyxl
wb = openpyxl . load_workbook ( f )
ws = wb . active
if ws . max_row < 2 :
wb . close ( )
return jsonify ( { ' valid ' : False , ' message ' : ' 文件为空,没有数据 ' } ) , 200
if ws . max_column != 2 :
wb . close ( )
return jsonify ( { ' valid ' : False , ' message ' : f ' 文件应该只包含2列数据, 当前有 { ws . max_column } 列 ' } ) , 200
# 检查表头
header_row = list ( ws . iter_rows ( min_row = 1 , max_row = 1 , values_only = True ) ) [ 0 ]
header = [ str ( h ) . strip ( ) if h else ' ' for h in header_row ]
# 记录表头用于调试
log ( ' validate_mac_file ' , f ' headers: { header } ' )
# 更灵活的表头检查(不区分大小写)
header_lower = [ h . lower ( ) for h in header ]
has_mac = any ( ' mac ' in h and ' sn ' not in h for h in header_lower )
has_sn_mac = any ( ' sn_mac ' in h or ' sn-mac ' in h for h in header_lower )
has_batch = any ( ' 批次 ' in h or ' batch ' in h for h in header_lower )
if not ( has_mac or has_sn_mac ) :
wb . close ( )
return jsonify ( { ' valid ' : False , ' message ' : f ' 缺少必需的列: MAC 或 SN_MAC( 当前列: { " , " . join ( header ) } ) ' } ) , 200
if not has_batch :
wb . close ( )
return jsonify ( { ' valid ' : False , ' message ' : f ' 缺少必需的列:批次号(当前列: { " , " . join ( header ) } ) ' } ) , 200
data_rows = ws . max_row - 1
mac_col = ' MAC ' if has_mac else ' SN_MAC '
wb . close ( )
return jsonify ( { ' valid ' : True , ' message ' : f ' 文件格式正确,包含列: { mac_col } 和 批次号,共 { data_rows } 行数据 ' } ) , 200
except Exception as e :
return jsonify ( { ' valid ' : False , ' message ' : f ' 读取文件失败: { str ( e ) } ' } ) , 200
@app.post ( ' /api/upload/mac-file ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def upload_mac_file ( ) :
import subprocess
import tempfile
f = request . files . get ( ' file ' )
upload_type = request . form . get ( ' type ' , ' pdd ' ) # pdd, yt, or tx
if not f :
return jsonify ( { ' error ' : ' no file ' } ) , 400
if upload_type not in [ ' pdd ' , ' yt ' , ' tx ' ] :
return jsonify ( { ' error ' : ' invalid type ' } ) , 400
# 保存上传的文件到临时位置
name = secure_filename ( f . filename or ' upload.xlsx ' )
temp_dir = ' /home/hyx/work/batch_import_xlsx '
os . makedirs ( temp_dir , exist_ok = True )
# 根据类型确定文件名
if upload_type == ' yt ' :
temp_path = os . path . join ( temp_dir , ' sn_test_yt.xlsx ' )
elif upload_type == ' pdd ' :
temp_path = os . path . join ( temp_dir , ' sn_test_pdd.xlsx ' )
else :
temp_path = os . path . join ( temp_dir , ' sn_test_tx.xlsx ' )
f . save ( temp_path )
# 调用batch_import.py脚本
script_path = ' /home/hyx/work/生产管理系统/batch_import.py '
python_path = ' /home/hyx/work/.venv/bin/python '
try :
result = subprocess . run (
[ python_path , script_path , upload_type ] ,
capture_output = True ,
text = True ,
timeout = 300 # 5分钟超时
)
output = result . stdout + result . stderr
success = result . returncode == 0
2025-11-21 14:45:00 +00:00
# 解析输出中的成功导入数据, 同时保存到SQLite数据库
if success :
import re
json_match = re . search ( r ' === 成功导入的数据 === \ n([ \ s \ S]*?) \ n=== 数据输出结束 === ' , output )
if json_match :
try :
import json as json_lib
records = json_lib . loads ( json_match . group ( 1 ) . strip ( ) )
if records and isinstance ( records , list ) :
# 保存到SQLite数据库, 使用北京时间( UTC+8)
conn = get_db ( )
c = conn . cursor ( )
from datetime import timezone , timedelta
beijing_tz = timezone ( timedelta ( hours = 8 ) )
now = datetime . now ( beijing_tz ) . isoformat ( )
for record in records :
mac = record . get ( ' mac ' )
batch = record . get ( ' batch ' )
if mac and batch :
2025-11-22 01:53:18 +00:00
c . execute ( ' INSERT INTO mac_batches(mac, batch, platform, ts) VALUES(?,?,?,?) ' , ( mac , batch , upload_type , now ) )
2025-11-21 14:45:00 +00:00
conn . commit ( )
conn . close ( )
log ( ' upload_mac_file_db ' , f " saved { len ( records ) } records to database " )
except Exception as e :
log ( ' upload_mac_file_db_error ' , str ( e ) )
2025-11-21 13:27:40 +00:00
log ( ' upload_mac_file ' , f " type= { upload_type } , success= { success } " )
if success :
notify_superadmin ( ' 批量上传MAC文件 ' , f " 类型: { upload_type } " )
return jsonify ( {
' ok ' : success ,
' output ' : output ,
' returncode ' : result . returncode
} )
except subprocess . TimeoutExpired :
return jsonify ( { ' error ' : ' 上传超时 ' , ' output ' : ' 处理时间超过5分钟 ' } ) , 500
except Exception as e :
return jsonify ( { ' error ' : str ( e ) , ' output ' : ' ' } ) , 500
@app.post ( ' /api/upload/defects-file ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def upload_defects_file ( ) :
f = request . files . get ( ' file ' )
if not f :
return jsonify ( { ' error ' : ' no file ' } ) , 400
name = secure_filename ( f . filename or ' ' )
ext = ( name . split ( ' . ' ) [ - 1 ] or ' ' ) . lower ( )
rows = [ ]
if ext == ' csv ' :
text = f . stream . read ( ) . decode ( ' utf-8 ' , errors = ' ignore ' )
for l in text . splitlines ( ) :
parts = [ p . strip ( ) for p in l . split ( ' , ' ) ]
if len ( parts ) > = 2 :
rows . append ( { ' mac ' : parts [ 0 ] , ' batch ' : parts [ 1 ] } )
else :
try :
import openpyxl
wb = openpyxl . load_workbook ( f )
ws = wb . active
for r in ws . iter_rows ( values_only = True ) :
mac = str ( r [ 0 ] ) . strip ( ) if r and r [ 0 ] else None
batch = str ( r [ 1 ] ) . strip ( ) if r and len ( r ) > 1 and r [ 1 ] else None
if mac and batch :
rows . append ( { ' mac ' : mac , ' batch ' : batch } )
except Exception :
return jsonify ( { ' error ' : ' parse error ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
2025-11-21 14:45:00 +00:00
now = get_beijing_time ( )
2025-11-21 13:27:40 +00:00
for r in rows :
c . execute ( ' INSERT INTO defects(mac, batch, ts) VALUES(?,?,?) ' , ( r [ ' mac ' ] , r [ ' batch ' ] , now ) )
conn . commit ( )
conn . close ( )
log ( ' upload_defects_file ' , f " count= { len ( rows ) } " )
notify_superadmin ( ' 批量上传不良明细文件 ' , f " 上传了 { len ( rows ) } 条记录 " )
return jsonify ( { ' ok ' : True , ' count ' : len ( rows ) } )
@app.route ( ' /api/validate/shipments-file ' , methods = [ ' POST ' ] )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def validate_shipments_file ( ) :
""" 验证发货记录Excel文件格式 """
f = request . files . get ( ' file ' )
if not f :
return jsonify ( { ' error ' : ' no file ' } ) , 400
name = secure_filename ( f . filename or ' ' )
ext = ( name . split ( ' . ' ) [ - 1 ] or ' ' ) . lower ( )
if ext not in [ ' csv ' , ' xlsx ' , ' xls ' ] :
return jsonify ( { ' valid ' : False , ' message ' : ' 文件格式不支持, 请上传CSV或Excel文件 ' } ) , 200
try :
if ext == ' csv ' :
text = f . stream . read ( ) . decode ( ' utf-8 ' , errors = ' ignore ' )
lines = [ l . strip ( ) for l in text . splitlines ( ) if l . strip ( ) ]
if not lines :
return jsonify ( { ' valid ' : False , ' message ' : ' 文件为空,没有数据 ' } ) , 200
header = [ h . strip ( ) for h in lines [ 0 ] . split ( ' , ' ) ]
header_lower = [ h . lower ( ) for h in header ]
# 检查必需的列
has_date = any ( ' 出货日期 ' in h or ' 发货日期 ' in h or ' date ' in hl for h , hl in zip ( header , header_lower ) )
has_box = any ( ' 箱号 ' in h or ' box ' in hl for h , hl in zip ( header , header_lower ) )
if not has_date :
return jsonify ( { ' valid ' : False , ' message ' : ' 缺少必需的列:出货日期 ' } ) , 200
if not has_box :
return jsonify ( { ' valid ' : False , ' message ' : ' 缺少必需的列:箱号 ' } ) , 200
# 检查SN列( SN1-SN20)
sn_cols = [ h for h in header if h . startswith ( ' SN ' ) and h [ 2 : ] . isdigit ( ) ]
if not sn_cols :
return jsonify ( { ' valid ' : False , ' message ' : ' 缺少SN列( SN1, SN2, ... SN20) ' } ) , 200
data_rows = len ( lines ) - 1
return jsonify ( { ' valid ' : True , ' message ' : f ' 文件格式正确,包含 { len ( sn_cols ) } 个SN列, 共 { data_rows } 行数据 ' } ) , 200
else :
import openpyxl
wb = openpyxl . load_workbook ( f )
ws = wb . active
if ws . max_row < 2 :
wb . close ( )
return jsonify ( { ' valid ' : False , ' message ' : ' 文件为空,没有数据 ' } ) , 200
# 检查表头
header_row = list ( ws . iter_rows ( min_row = 1 , max_row = 1 , values_only = True ) ) [ 0 ]
header = [ str ( h ) . strip ( ) if h else ' ' for h in header_row ]
header_lower = [ h . lower ( ) for h in header ]
# 检查必需的列
has_date = any ( ' 出货日期 ' in h or ' 发货日期 ' in h or ' date ' in hl for h , hl in zip ( header , header_lower ) )
has_box = any ( ' 箱号 ' in h or ' box ' in hl for h , hl in zip ( header , header_lower ) )
if not has_date :
wb . close ( )
return jsonify ( { ' valid ' : False , ' message ' : ' 缺少必需的列:出货日期 ' } ) , 200
if not has_box :
wb . close ( )
return jsonify ( { ' valid ' : False , ' message ' : ' 缺少必需的列:箱号 ' } ) , 200
# 检查SN列
sn_cols = [ h for h in header if h . startswith ( ' SN ' ) and h [ 2 : ] . isdigit ( ) ]
if not sn_cols :
wb . close ( )
return jsonify ( { ' valid ' : False , ' message ' : ' 缺少SN列( SN1, SN2, ... SN20) ' } ) , 200
data_rows = ws . max_row - 1
wb . close ( )
return jsonify ( { ' valid ' : True , ' message ' : f ' 文件格式正确,包含 { len ( sn_cols ) } 个SN列, 共 { data_rows } 行数据 ' } ) , 200
except Exception as e :
return jsonify ( { ' valid ' : False , ' message ' : f ' 读取文件失败: { str ( e ) } ' } ) , 200
@app.get ( ' /api/shipments/query-by-sn ' )
@require_login
def query_shipment_by_sn ( ) :
""" 通过 SN/MAC 号查询出货信息 """
sn = request . args . get ( ' sn ' , ' ' ) . strip ( )
if not sn :
return jsonify ( { ' error ' : ' 请提供 SN/MAC 号 ' } ) , 400
try :
r = get_redis ( )
redis_key = ' shipment_sn_mapping '
# 从 Redis Hash 中查询
result = r . hget ( redis_key , sn )
if result :
# 解析 JSON 数据
shipment_info = json . loads ( result )
platform = shipment_info . get ( ' platform ' , ' pdd ' ) # 默认拼多多
platform_name = { ' pdd ' : ' 拼多多 ' , ' yt ' : ' 圆通 ' , ' tx ' : ' 兔喜 ' } . get ( platform , platform )
return jsonify ( {
' found ' : True ,
' sn ' : sn ,
' date ' : shipment_info . get ( ' date ' ) ,
' box ' : shipment_info . get ( ' box ' ) ,
' platform ' : platform ,
' platform_name ' : platform_name ,
' ts ' : shipment_info . get ( ' ts ' )
} )
else :
return jsonify ( {
' found ' : False ,
' sn ' : sn ,
' message ' : ' 未找到该 SN 的出货记录 '
} )
except Exception as e :
log ( ' query_shipment_error ' , str ( e ) )
return jsonify ( { ' error ' : f ' 查询失败: { str ( e ) } ' } ) , 500
@app.post ( ' /api/shipments/update-platform ' )
@require_login
@require_any_role ( ' superadmin ' )
def update_shipments_platform ( ) :
""" 批量更新Redis中发货记录的机种字段 """
try :
r = get_redis ( )
redis_key = ' shipment_sn_mapping '
# 获取所有记录
all_data = r . hgetall ( redis_key )
updated_count = 0
pipe = r . pipeline ( )
for sn , value in all_data . items ( ) :
try :
info = json . loads ( value )
# 如果没有platform字段, 添加为pdd
if ' platform ' not in info :
info [ ' platform ' ] = ' pdd '
pipe . hset ( redis_key , sn , json . dumps ( info , ensure_ascii = False ) )
updated_count + = 1
except Exception :
continue
pipe . execute ( )
log ( ' update_shipments_platform ' , f ' updated { updated_count } records ' )
return jsonify ( {
' ok ' : True ,
' message ' : f ' 已更新 { updated_count } 条记录为拼多多 ' ,
' updated ' : updated_count
} )
except Exception as e :
log ( ' update_shipments_platform_error ' , str ( e ) )
return jsonify ( { ' error ' : f ' 更新失败: { str ( e ) } ' } ) , 500
@app.get ( ' /api/shipments/redis-stats ' )
@require_login
def shipments_redis_stats ( ) :
""" 获取 Redis 中发货记录的统计信息 """
try :
r = get_redis ( )
redis_key = ' shipment_sn_mapping '
count = r . hlen ( redis_key )
return jsonify ( {
' key ' : redis_key ,
' count ' : count ,
' exists ' : r . exists ( redis_key ) > 0
} )
except Exception as e :
log ( ' shipments_redis_stats_error ' , str ( e ) )
return jsonify ( { ' error ' : f ' 获取统计失败: { str ( e ) } ' } ) , 500
2025-11-21 14:04:43 +00:00
@app.get ( ' /api/shipments/summary ' )
@require_login
def shipments_summary ( ) :
""" 查询发货记录汇总信息(按日期范围) """
try :
start_date = request . args . get ( ' start ' )
end_date = request . args . get ( ' end ' )
if not start_date or not end_date :
return jsonify ( { ' error ' : ' 请提供开始和结束日期 ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
# 查询指定日期范围内的发货记录
c . execute ( '''
SELECT date , qty , receiver , ts
FROM shipments
WHERE date > = ? AND date < = ?
ORDER BY date DESC
''' , (start_date, end_date))
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
log ( ' shipments_summary ' , f ' start= { start_date } , end= { end_date } , count= { len ( rows ) } ' )
return jsonify ( {
' ok ' : True ,
' records ' : rows ,
' count ' : len ( rows )
} )
except Exception as e :
log ( ' shipments_summary_error ' , str ( e ) )
return jsonify ( { ' error ' : f ' 查询失败: { str ( e ) } ' } ) , 500
2025-11-21 13:27:40 +00:00
@app.post ( ' /api/shipments/clear-redis ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def clear_shipments_redis ( ) :
2025-11-21 14:04:43 +00:00
""" 清空 Redis 和 SQLite 中的发货记录数据 """
2025-11-21 13:27:40 +00:00
try :
2025-11-21 14:04:43 +00:00
# 清空 Redis
2025-11-21 13:27:40 +00:00
r = get_redis ( )
redis_key = ' shipment_sn_mapping '
2025-11-21 14:04:43 +00:00
redis_count = r . hlen ( redis_key )
2025-11-21 13:27:40 +00:00
r . delete ( redis_key )
2025-11-21 14:04:43 +00:00
# 同时清空 SQLite 中的 shipments 表
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT COUNT(*) as cnt FROM shipments ' )
sqlite_count = c . fetchone ( ) [ ' cnt ' ]
c . execute ( ' DELETE FROM shipments ' )
conn . commit ( )
conn . close ( )
log ( ' clear_shipments_all ' , f ' cleared redis= { redis_count } , sqlite= { sqlite_count } ' )
2025-11-21 13:27:40 +00:00
return jsonify ( {
' ok ' : True ,
2025-11-21 14:04:43 +00:00
' message ' : f ' 已清空 Redis { redis_count } 条记录和 SQLite { sqlite_count } 条记录 ' ,
' redis_count ' : redis_count ,
' sqlite_count ' : sqlite_count
2025-11-21 13:27:40 +00:00
} )
except Exception as e :
log ( ' clear_shipments_redis_error ' , str ( e ) )
return jsonify ( { ' error ' : f ' 清空失败: { str ( e ) } ' } ) , 500
@app.route ( ' /api/upload/shipments-file ' , methods = [ ' POST ' ] )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def upload_shipments_file ( ) :
""" 上传发货记录Excel文件 """
f = request . files . get ( ' file ' )
platform = request . form . get ( ' platform ' ) # 获取机种参数
if not f :
return jsonify ( { ' error ' : ' 请选择文件 ' } ) , 400
if not platform or platform not in [ ' pdd ' , ' yt ' , ' tx ' ] :
return jsonify ( { ' error ' : ' 请选择机种(拼多多/圆通/兔喜) ' } ) , 400
name = secure_filename ( f . filename or ' ' )
ext = ( name . split ( ' . ' ) [ - 1 ] or ' ' ) . lower ( )
if ext not in [ ' csv ' , ' xlsx ' , ' xls ' ] :
return jsonify ( { ' error ' : ' 文件格式不支持 ' } ) , 400
try :
rows = [ ]
if ext == ' csv ' :
text = f . stream . read ( ) . decode ( ' utf-8 ' , errors = ' ignore ' )
lines = [ l . strip ( ) for l in text . splitlines ( ) if l . strip ( ) ]
if len ( lines ) < 2 :
return jsonify ( { ' error ' : ' 文件为空 ' } ) , 400
header = [ h . strip ( ) for h in lines [ 0 ] . split ( ' , ' ) ]
header_lower = [ h . lower ( ) for h in header ]
# 找到列索引
date_idx = next ( ( i for i , h in enumerate ( header ) if ' 出货日期 ' in h or ' 发货日期 ' in h or ' date ' in header_lower [ i ] ) , None )
box_idx = next ( ( i for i , h in enumerate ( header ) if ' 箱号 ' in h or ' box ' in header_lower [ i ] ) , None )
if date_idx is None or box_idx is None :
return jsonify ( { ' error ' : ' 缺少必需的列 ' } ) , 400
# 找到所有SN列( 按数字排序)
sn_cols = [ ( i , h ) for i , h in enumerate ( header ) if h . startswith ( ' SN ' ) and h [ 2 : ] . isdigit ( ) ]
sn_indices = sorted ( sn_cols , key = lambda x : int ( x [ 1 ] [ 2 : ] ) ) # 按 SN 后面的数字排序
# 记录上一个有效的日期(用于处理合并单元格)
last_valid_date = None
for line in lines [ 1 : ] :
parts = [ p . strip ( ) for p in line . split ( ' , ' ) ]
if len ( parts ) < = max ( date_idx , box_idx ) :
continue
# 处理日期(合并单元格时可能为空)
current_date = parts [ date_idx ] if date_idx < len ( parts ) and parts [ date_idx ] else ' '
# 如果当前行日期为空,使用上一个有效日期
if current_date :
last_valid_date = current_date
date = current_date
else :
date = last_valid_date
# 处理箱号
box = parts [ box_idx ] if box_idx < len ( parts ) and parts [ box_idx ] else ' '
# 如果没有日期或箱号,跳过这行
if not date or not box :
continue
# 收集所有SN( 横向 20 个)
sns = [ ]
for idx , _ in sn_indices :
if idx < len ( parts ) and parts [ idx ] :
sns . append ( parts [ idx ] )
# 只有当有 SN 数据时才添加记录
if sns :
rows . append ( {
' date ' : date ,
' box ' : box ,
' sns ' : sns ,
' qty ' : len ( sns )
} )
else :
import openpyxl
wb = openpyxl . load_workbook ( f )
ws = wb . active
if ws . max_row < 2 :
wb . close ( )
return jsonify ( { ' error ' : ' 文件为空 ' } ) , 400
# 读取表头
header_row = list ( ws . iter_rows ( min_row = 1 , max_row = 1 , values_only = True ) ) [ 0 ]
header = [ str ( h ) . strip ( ) if h else ' ' for h in header_row ]
header_lower = [ h . lower ( ) for h in header ]
# 找到列索引
date_idx = next ( ( i for i , h in enumerate ( header ) if ' 出货日期 ' in h or ' 发货日期 ' in h or ' date ' in header_lower [ i ] ) , None )
box_idx = next ( ( i for i , h in enumerate ( header ) if ' 箱号 ' in h or ' box ' in header_lower [ i ] ) , None )
if date_idx is None or box_idx is None :
wb . close ( )
return jsonify ( { ' error ' : ' 缺少必需的列 ' } ) , 400
# 找到所有SN列( 按数字排序)
sn_cols = [ ( i , h ) for i , h in enumerate ( header ) if h . startswith ( ' SN ' ) and h [ 2 : ] . isdigit ( ) ]
sn_indices = sorted ( sn_cols , key = lambda x : int ( x [ 1 ] [ 2 : ] ) ) # 按 SN 后面的数字排序
# 记录上一个有效的日期(用于处理合并单元格)
last_valid_date = None
# 读取数据行
for row in ws . iter_rows ( min_row = 2 , values_only = True ) :
# 处理日期(合并单元格时可能为 None)
date_value = row [ date_idx ] if date_idx < len ( row ) else None
current_date = None
if date_value :
# 如果是 datetime.datetime 或 datetime.date 对象
if hasattr ( date_value , ' strftime ' ) :
current_date = date_value . strftime ( ' % Y- % m- %d ' )
# 如果是数字( Excel 日期序列号)
elif isinstance ( date_value , ( int , float ) ) :
try :
# Excel 日期从 1900-01-01 开始计数
from datetime import datetime as dt , timedelta
# Excel 的 bug: 1900 不是闰年但 Excel 认为是, 所以需要减1
excel_epoch = dt ( 1899 , 12 , 30 )
date_obj = excel_epoch + timedelta ( days = float ( date_value ) )
current_date = date_obj . strftime ( ' % Y- % m- %d ' )
except Exception :
current_date = str ( date_value ) . strip ( )
else :
current_date = str ( date_value ) . strip ( )
if current_date == ' None ' :
current_date = None
# 如果当前行日期为空,使用上一个有效日期
if current_date :
last_valid_date = current_date
date = current_date
else :
date = last_valid_date
# 处理箱号
box = str ( row [ box_idx ] ) . strip ( ) if box_idx < len ( row ) and row [ box_idx ] else ' '
# 如果没有日期或箱号,跳过这行
if not date or not box or box == ' None ' :
continue
# 收集所有SN( 横向 20 个)
sns = [ ]
for idx , _ in sn_indices :
if idx < len ( row ) and row [ idx ] :
sn_value = str ( row [ idx ] ) . strip ( )
if sn_value and sn_value != ' None ' :
sns . append ( sn_value )
# 只有当有 SN 数据时才添加记录
if sns :
rows . append ( {
' date ' : date ,
' box ' : box ,
' sns ' : sns ,
' qty ' : len ( sns )
} )
wb . close ( )
# 保存到 SQLite 数据库
conn = get_db ( )
c = conn . cursor ( )
# 使用北京时间( UTC+8)
from datetime import timezone , timedelta
beijing_tz = timezone ( timedelta ( hours = 8 ) )
now = datetime . now ( beijing_tz ) . strftime ( ' % Y- % m- %d % H: % M: % S ' )
total_qty = 0
for r in rows :
receiver_info = f " 箱号: { r [ ' box ' ] } "
c . execute ( ' INSERT INTO shipments(date, qty, receiver, ts) VALUES(?,?,?,?) ' ,
( r [ ' date ' ] , r [ ' qty ' ] , receiver_info , now ) )
total_qty + = r [ ' qty ' ]
conn . commit ( )
conn . close ( )
# 保存到 Redis - 使用 Hash 结构存储 MAC -> 出货信息的映射
try :
r = get_redis ( )
redis_key = ' shipment_sn_mapping ' # Redis Hash key
# 使用 pipeline 批量写入,提高性能
pipe = r . pipeline ( )
redis_count = 0
for row_data in rows :
date = row_data [ ' date ' ]
box = row_data [ ' box ' ]
sns = row_data [ ' sns ' ]
# 为每个 SN/MAC 创建映射记录
for sn in sns :
if sn : # 确保 SN 不为空
# 存储格式: MAC -> JSON(date, box, platform, timestamp)
shipment_info = json . dumps ( {
' date ' : date ,
' box ' : box ,
' platform ' : platform ,
' ts ' : now
} , ensure_ascii = False )
pipe . hset ( redis_key , sn , shipment_info )
redis_count + = 1
# 执行批量写入
pipe . execute ( )
log ( ' upload_shipments_redis ' , f " redis_key= { redis_key } , sn_count= { redis_count } " )
except Exception as redis_error :
# Redis 写入失败不影响主流程,只记录日志
log ( ' upload_shipments_redis_error ' , str ( redis_error ) )
log ( ' upload_shipments_file ' , f " boxes= { len ( rows ) } , total_qty= { total_qty } " )
notify_superadmin ( ' 批量上传发货记录文件 ' , f " 箱数: { len ( rows ) } , 总数量: { total_qty } " )
return jsonify ( { ' ok ' : True , ' count ' : len ( rows ) , ' total_qty ' : total_qty } )
except Exception as e :
log ( ' upload_shipments_file_error ' , str ( e ) )
return jsonify ( { ' error ' : f ' 处理文件失败: { str ( e ) } ' } ) , 500
init_db ( )
if __name__ == ' __main__ ' :
app . run ( host = ' 0.0.0.0 ' , port = int ( os . environ . get ( ' PORT ' , ' 5000 ' ) ) )