2025-11-21 13:27:40 +00:00
# -*- coding: utf-8 -*-
import os
import json
import sqlite3
from datetime import datetime
from functools import wraps
from flask import Flask , request , jsonify , session , send_from_directory
from werkzeug . security import generate_password_hash , check_password_hash
from werkzeug . utils import secure_filename
try :
import redis
except Exception :
redis = None
_redis_client = None
_audit_cache = { ' pdd ' : { ' ts ' : 0 , ' list ' : [ ] } , ' yt ' : { ' ts ' : 0 , ' list ' : [ ] } }
BASE_DIR = os . path . dirname ( os . path . abspath ( __file__ ) )
DB_PATH = os . path . join ( BASE_DIR , ' data.db ' )
FRONTEND_DIR = os . path . join ( os . path . dirname ( BASE_DIR ) , ' frontend ' )
app = Flask ( __name__ , static_folder = FRONTEND_DIR , static_url_path = ' ' )
app . config [ ' SECRET_KEY ' ] = os . environ . get ( ' APP_SECRET ' , ' change-me ' )
2025-11-22 12:40:46 +00:00
app . config [ ' MAX_CONTENT_LENGTH ' ] = 50 * 1024 * 1024 # 限制上传文件大小为50MB
2025-11-21 13:27:40 +00:00
def get_db ( ) :
conn = sqlite3 . connect ( DB_PATH )
conn . row_factory = sqlite3 . Row
return conn
def init_db ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ''' CREATE TABLE IF NOT EXISTS users(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
username TEXT UNIQUE NOT NULL ,
password_hash TEXT NOT NULL ,
role TEXT NOT NULL
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS operations_log(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
user_id INTEGER ,
action TEXT ,
detail TEXT ,
ts TEXT
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS notifications(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
user_id INTEGER ,
username TEXT ,
action TEXT ,
detail TEXT ,
ts TEXT ,
read INTEGER DEFAULT 0
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS mac_batches(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
mac TEXT ,
batch TEXT ,
ts TEXT
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS stats(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
good INTEGER ,
bad INTEGER ,
fpy_good INTEGER DEFAULT 0 ,
platform TEXT DEFAULT ' pdd ' ,
ts TEXT
) ''' )
# 为已存在的表添加列(如果不存在)
try :
c . execute ( ' ALTER TABLE stats ADD COLUMN fpy_good INTEGER DEFAULT 0 ' )
except Exception :
pass # 列已存在
try :
c . execute ( ' ALTER TABLE stats ADD COLUMN platform TEXT DEFAULT " pdd " ' )
except Exception :
pass # 列已存在
try :
c . execute ( ' ALTER TABLE users ADD COLUMN avatar TEXT ' )
except Exception :
pass # 列已存在
2025-11-22 01:53:18 +00:00
try :
c . execute ( ' ALTER TABLE mac_batches ADD COLUMN platform TEXT DEFAULT " pdd " ' )
except Exception :
pass # 列已存在
2025-11-21 13:27:40 +00:00
c . execute ( ''' CREATE TABLE IF NOT EXISTS defects(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
mac TEXT ,
batch TEXT ,
ts TEXT
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS shipments(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
date TEXT ,
qty INTEGER ,
receiver TEXT ,
ts TEXT
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS devices(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
name TEXT ,
status TEXT ,
ts TEXT
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS environment(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
temp TEXT ,
hum TEXT ,
ts TEXT
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS personnel(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
name TEXT ,
role TEXT ,
ts TEXT
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS qa(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
title TEXT ,
date TEXT ,
ts TEXT
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS production(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
batch TEXT ,
duration TEXT ,
ts TEXT
) ''' )
c . execute ( ''' CREATE TABLE IF NOT EXISTS repairs(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
qty INTEGER ,
note TEXT ,
ts TEXT
) ''' )
2025-11-22 12:40:46 +00:00
c . execute ( ''' CREATE TABLE IF NOT EXISTS sop_files(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
filename TEXT NOT NULL ,
original_name TEXT NOT NULL ,
description TEXT ,
uploader TEXT ,
ts TEXT
) ''' )
2025-11-23 05:28:53 +00:00
c . execute ( ''' CREATE TABLE IF NOT EXISTS work_orders(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
factory TEXT NOT NULL ,
order_no TEXT NOT NULL ,
product_model TEXT ,
order_qty INTEGER NOT NULL ,
production_start_time TEXT ,
production_end_time TEXT ,
status TEXT DEFAULT ' issued ' ,
status_text TEXT DEFAULT ' 已下发 ' ,
remark TEXT ,
created_by TEXT ,
created_at TEXT ,
updated_at TEXT
) ''' )
2025-11-24 01:30:08 +00:00
c . execute ( ''' CREATE TABLE IF NOT EXISTS material_purchase(
id INTEGER PRIMARY KEY AUTOINCREMENT ,
title TEXT NOT NULL ,
list_no TEXT NOT NULL ,
plan_no TEXT NOT NULL ,
bom_result TEXT ,
status TEXT NOT NULL ,
demand_status TEXT NOT NULL ,
complete_rate REAL ,
material_code TEXT NOT NULL ,
material_name TEXT NOT NULL ,
batch_no TEXT ,
level INTEGER ,
required_qty INTEGER NOT NULL ,
stock_qty INTEGER ,
shortage INTEGER ,
acquire_method TEXT ,
realtime_stock INTEGER ,
pending_qty INTEGER ,
dispatched_qty INTEGER ,
received_qty INTEGER ,
submitter TEXT ,
submit_time TEXT ,
update_time TEXT ,
deleted INTEGER DEFAULT 0 ,
deleted_at TEXT
) ''' )
2025-11-23 05:28:53 +00:00
# 为已存在的表添加列(如果不存在)
try :
c . execute ( ' ALTER TABLE work_orders ADD COLUMN product_model TEXT ' )
except Exception :
pass # 列已存在
2025-11-21 13:27:40 +00:00
conn . commit ( )
# create default admin
c . execute ( ' SELECT id FROM users WHERE username=? ' , ( ' admin ' , ) )
if not c . fetchone ( ) :
pwd = os . environ . get ( ' ADMIN_PASSWORD ' , ' admin123 ' )
c . execute ( ' INSERT INTO users(username, password_hash, role) VALUES(?,?,?) ' , (
' admin ' , generate_password_hash ( pwd ) , ' admin '
) )
conn . commit ( )
# create superadmin from env
su_user = os . environ . get ( ' SUPERADMIN_USERNAME ' )
su_pass = os . environ . get ( ' SUPERADMIN_PASSWORD ' )
if su_user and su_pass :
c . execute ( ' SELECT id FROM users WHERE username=? ' , ( su_user , ) )
if not c . fetchone ( ) :
c . execute ( ' INSERT INTO users(username, password_hash, role) VALUES(?,?,?) ' , (
su_user , generate_password_hash ( su_pass ) , ' superadmin '
) )
conn . commit ( )
conn . close ( )
def log ( action , detail = ' ' ) :
try :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' INSERT INTO operations_log(user_id, action, detail, ts) VALUES(?,?,?,?) ' , (
2025-11-21 14:45:00 +00:00
session . get ( ' user_id ' ) , action , detail , get_beijing_time ( )
2025-11-21 13:27:40 +00:00
) )
conn . commit ( )
conn . close ( )
except Exception :
pass
def notify_superadmin ( action , detail = ' ' ) :
""" 为超级管理员创建通知 """
try :
user_id = session . get ( ' user_id ' )
if not user_id :
return
conn = get_db ( )
c = conn . cursor ( )
# 获取当前用户信息
c . execute ( ' SELECT username, role FROM users WHERE id=? ' , ( user_id , ) )
user = c . fetchone ( )
if not user :
conn . close ( )
return
# 如果是超级管理员自己的操作,不创建通知
if user [ ' role ' ] == ' superadmin ' :
conn . close ( )
return
# 为所有超级管理员创建通知
c . execute ( ' SELECT id FROM users WHERE role=? ' , ( ' superadmin ' , ) )
superadmins = c . fetchall ( )
# 使用北京时间( UTC+8)
from datetime import timezone , timedelta
beijing_tz = timezone ( timedelta ( hours = 8 ) )
now = datetime . now ( beijing_tz ) . isoformat ( )
for admin in superadmins :
c . execute ( ' INSERT INTO notifications(user_id, username, action, detail, ts, read) VALUES(?,?,?,?,?,?) ' , (
admin [ ' id ' ] , user [ ' username ' ] , action , detail , now , 0
) )
conn . commit ( )
conn . close ( )
except Exception :
pass
2025-11-23 05:28:53 +00:00
def notify_admins ( action , detail = ' ' ) :
""" 为管理员创建通知(超级管理员操作时使用) """
try :
user_id = session . get ( ' user_id ' )
if not user_id :
return
conn = get_db ( )
c = conn . cursor ( )
# 获取当前用户信息
c . execute ( ' SELECT username, role FROM users WHERE id=? ' , ( user_id , ) )
user = c . fetchone ( )
if not user :
conn . close ( )
return
# 只有超级管理员的操作才通知管理员
if user [ ' role ' ] != ' superadmin ' :
conn . close ( )
return
# 为所有管理员创建通知
c . execute ( ' SELECT id FROM users WHERE role=? ' , ( ' admin ' , ) )
admins = c . fetchall ( )
# 使用北京时间( UTC+8)
from datetime import timezone , timedelta
beijing_tz = timezone ( timedelta ( hours = 8 ) )
now = datetime . now ( beijing_tz ) . isoformat ( )
for admin in admins :
c . execute ( ' INSERT INTO notifications(user_id, username, action, detail, ts, read) VALUES(?,?,?,?,?,?) ' , (
admin [ ' id ' ] , user [ ' username ' ] , action , detail , now , 0
) )
conn . commit ( )
conn . close ( )
except Exception :
pass
2025-11-21 13:27:40 +00:00
def get_redis ( ) :
global _redis_client
if not redis :
raise RuntimeError ( ' redis missing ' )
if _redis_client is not None :
return _redis_client
host = os . environ . get ( ' REDIS_HOST ' , ' 180.163.74.83 ' )
port = int ( os . environ . get ( ' REDIS_PORT ' , ' 6379 ' ) )
password = os . environ . get ( ' REDIS_PASSWORD ' ) or os . environ . get ( ' SUPERADMIN_PASSWORD ' )
db = int ( os . environ . get ( ' REDIS_DB ' , ' 0 ' ) )
_redis_client = redis . Redis ( host = host , port = port , password = password , db = db , decode_responses = True , socket_timeout = 0.5 , socket_connect_timeout = 0.5 )
return _redis_client
2025-11-21 14:45:00 +00:00
def get_beijing_time ( ) :
""" 获取北京时间( UTC+8) 的ISO格式字符串 """
from datetime import timezone , timedelta
beijing_tz = timezone ( timedelta ( hours = 8 ) )
return datetime . now ( beijing_tz ) . isoformat ( )
2025-11-21 13:27:40 +00:00
def parse_audit_line ( s ) :
if not s :
return { ' ts_cn ' : None , ' batch ' : None , ' mac ' : None , ' note ' : None }
def normalize_ts ( ts ) :
try :
from datetime import datetime , timezone , timedelta
# Attempt ISO parsing
# Support "Z" UTC suffix and offsets like +08:00
if ts . endswith ( ' Z ' ) :
dt = datetime . fromisoformat ( ts . replace ( ' Z ' , ' +00:00 ' ) )
else :
dt = datetime . fromisoformat ( ts )
# Convert to Beijing time (UTC+8)
bj = dt . astimezone ( timezone ( timedelta ( hours = 8 ) ) )
return bj . strftime ( ' % Y- % m- %d % H: % M: % S ' )
except Exception :
return ts
def has_time ( v ) :
return isinstance ( v , str ) and ( ( ' T ' in v ) or ( ' : ' in v ) )
def choose_ts ( d ) :
candidates = [ d . get ( ' ts_cn ' ) , d . get ( ' ts_local ' ) , d . get ( ' ts ' ) , d . get ( ' ts_utc ' ) , d . get ( ' timestamp ' ) , d . get ( ' time ' ) ]
for v in candidates :
if has_time ( v ) :
return v
for v in candidates :
if v :
return v
return None
try :
obj = json . loads ( s )
ts = choose_ts ( obj )
if not ts :
ts = s if isinstance ( s , str ) else None
return {
' ts_cn ' : ts if ts else None ,
' batch ' : obj . get ( ' batch ' ) or obj . get ( ' batch_no ' ) or obj . get ( ' lot ' ) ,
' mac ' : obj . get ( ' mac ' ) or obj . get ( ' mac_addr ' ) or obj . get ( ' mac_address ' ) ,
' note ' : obj . get ( ' note ' ) or obj . get ( ' msg ' ) or obj . get ( ' message ' )
}
except Exception :
pass
d = { }
parts = [ ]
for sep in [ ' ' , ' , ' , ' ; ' , ' | ' ] :
if sep in s :
parts = s . split ( sep )
break
if not parts :
parts = [ s ]
i = 0
while i < len ( parts ) :
part = parts [ i ]
if ' = ' in part :
k , v = part . split ( ' = ' , 1 )
kk = k . strip ( )
vv = v . strip ( )
try :
import re
if kk in ( ' ts_cn ' , ' ts_local ' , ' ts ' , ' timestamp ' , ' time ' ) :
if re . match ( r ' ^ \ d {4} - \ d {2} - \ d {2} $ ' , vv ) and i + 1 < len ( parts ) and re . match ( r ' ^ \ d {2} : \ d {2} : \ d {2} ' , parts [ i + 1 ] ) :
vv = vv + ' ' + parts [ i + 1 ]
i + = 1
except Exception :
pass
d [ kk ] = vv
i + = 1
ts = choose_ts ( d )
if not ts :
ts = s if isinstance ( s , str ) else None
return {
' ts_cn ' : ts if ts else None ,
' batch ' : d . get ( ' batch ' ) or d . get ( ' batch_no ' ) or d . get ( ' lot ' ) ,
' mac ' : d . get ( ' mac ' ) or d . get ( ' mac_addr ' ) or d . get ( ' mac_address ' ) ,
' note ' : d . get ( ' note ' ) or d . get ( ' msg ' ) or d . get ( ' message ' ) or s
}
def require_login ( fn ) :
@wraps ( fn )
def wrapper ( * args , * * kwargs ) :
if not session . get ( ' user_id ' ) :
return jsonify ( { ' error ' : ' unauthorized ' } ) , 401
return fn ( * args , * * kwargs )
return wrapper
def require_role ( role ) :
def deco ( fn ) :
@wraps ( fn )
def wrapper ( * args , * * kwargs ) :
if session . get ( ' role ' ) != role :
return jsonify ( { ' error ' : ' forbidden ' } ) , 403
return fn ( * args , * * kwargs )
return wrapper
return deco
def require_any_role ( * roles ) :
def deco ( fn ) :
@wraps ( fn )
def wrapper ( * args , * * kwargs ) :
if session . get ( ' role ' ) not in roles :
return jsonify ( { ' error ' : ' forbidden ' } ) , 403
return fn ( * args , * * kwargs )
return wrapper
return deco
@app.route ( ' / ' )
def index ( ) :
return send_from_directory ( FRONTEND_DIR , ' index.html ' )
2025-11-24 01:30:08 +00:00
@app.route ( ' /index.html ' )
def index_html ( ) :
return send_from_directory ( FRONTEND_DIR , ' index.html ' )
2025-11-21 13:27:40 +00:00
# auth
2025-11-24 01:30:08 +00:00
@app.get ( ' /api/auth/captcha ' )
def captcha ( ) :
""" 生成验证码图片 """
try :
from PIL import Image , ImageDraw , ImageFont
import random
import io
import base64
# 生成4位随机数字
code = ' ' . join ( [ str ( random . randint ( 0 , 9 ) ) for _ in range ( 4 ) ] )
# 将验证码存储到session
session [ ' captcha ' ] = code
# 创建图片
width , height = 120 , 40
image = Image . new ( ' RGB ' , ( width , height ) , color = ' #f0f4f8 ' )
draw = ImageDraw . Draw ( image )
# 尝试使用系统字体,如果失败则使用默认字体
try :
font = ImageFont . truetype ( ' /usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf ' , 28 )
except :
try :
font = ImageFont . truetype ( ' /System/Library/Fonts/Helvetica.ttc ' , 28 )
except :
font = ImageFont . load_default ( )
# 绘制干扰线
for _ in range ( 3 ) :
x1 = random . randint ( 0 , width )
y1 = random . randint ( 0 , height )
x2 = random . randint ( 0 , width )
y2 = random . randint ( 0 , height )
draw . line ( [ ( x1 , y1 ) , ( x2 , y2 ) ] , fill = ' #cbd5e1 ' , width = 1 )
# 绘制验证码文字
colors = [ ' #3b82f6 ' , ' #2563eb ' , ' #1e40af ' , ' #1e3a8a ' ]
for i , char in enumerate ( code ) :
x = 20 + i * 25 + random . randint ( - 3 , 3 )
y = 5 + random . randint ( - 3 , 3 )
color = random . choice ( colors )
draw . text ( ( x , y ) , char , font = font , fill = color )
# 绘制干扰点
for _ in range ( 50 ) :
x = random . randint ( 0 , width )
y = random . randint ( 0 , height )
draw . point ( ( x , y ) , fill = ' #94a3b8 ' )
# 转换为base64
buffer = io . BytesIO ( )
image . save ( buffer , format = ' PNG ' )
buffer . seek ( 0 )
img_base64 = base64 . b64encode ( buffer . getvalue ( ) ) . decode ( )
return jsonify ( { ' image ' : f ' data:image/png;base64, { img_base64 } ' } )
except Exception as e :
log ( ' captcha_error ' , str ( e ) )
# 如果生成失败,返回简单的验证码
code = ' ' . join ( [ str ( random . randint ( 0 , 9 ) ) for _ in range ( 4 ) ] )
session [ ' captcha ' ] = code
return jsonify ( { ' image ' : ' ' , ' code ' : code } )
2025-11-21 13:27:40 +00:00
@app.post ( ' /api/auth/login ' )
def login ( ) :
data = request . get_json ( ) or { }
username = data . get ( ' username ' )
password = data . get ( ' password ' )
2025-11-24 01:30:08 +00:00
captcha = data . get ( ' captcha ' )
# 验证验证码
session_captcha = session . get ( ' captcha ' , ' ' ) . lower ( )
if not captcha or captcha . lower ( ) != session_captcha :
# 清除验证码,防止重复使用
session . pop ( ' captcha ' , None )
return jsonify ( { ' error ' : ' 验证码错误 ' } ) , 400
# 清除验证码,防止重复使用
session . pop ( ' captcha ' , None )
2025-11-21 13:27:40 +00:00
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT id, password_hash, role FROM users WHERE username=? ' , ( username , ) )
row = c . fetchone ( )
conn . close ( )
if not row or not check_password_hash ( row [ ' password_hash ' ] , password or ' ' ) :
2025-11-24 01:30:08 +00:00
return jsonify ( { ' error ' : ' 用户名或密码错误 ' } ) , 400
2025-11-21 13:27:40 +00:00
session [ ' user_id ' ] = row [ ' id ' ]
session [ ' role ' ] = row [ ' role ' ]
2025-11-23 05:28:53 +00:00
session [ ' username ' ] = username
2025-11-21 13:27:40 +00:00
session . permanent = True
log ( ' login ' , username )
return jsonify ( { ' ok ' : True } )
@app.get ( ' /api/auth/me ' )
def me ( ) :
uid = session . get ( ' user_id ' )
if not uid :
return jsonify ( { ' username ' : None , ' role ' : None , ' avatar ' : None } )
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT username, role, avatar FROM users WHERE id=? ' , ( uid , ) )
row = c . fetchone ( )
conn . close ( )
return jsonify ( { ' username ' : row [ ' username ' ] , ' role ' : row [ ' role ' ] , ' avatar ' : row [ ' avatar ' ] if row [ ' avatar ' ] else None } )
@app.post ( ' /api/auth/logout ' )
def logout ( ) :
log ( ' logout ' )
session . clear ( )
return jsonify ( { ' ok ' : True } )
@app.post ( ' /api/user/upload-avatar ' )
@require_login
def upload_avatar ( ) :
uid = session . get ( ' user_id ' )
if not uid :
return jsonify ( { ' error ' : ' 未登录 ' } ) , 401
if ' avatar ' not in request . files :
return jsonify ( { ' error ' : ' 未选择文件 ' } ) , 400
file = request . files [ ' avatar ' ]
if file . filename == ' ' :
return jsonify ( { ' error ' : ' 未选择文件 ' } ) , 400
# 验证文件类型
allowed_extensions = { ' png ' , ' jpg ' , ' jpeg ' , ' gif ' , ' webp ' }
2025-11-21 13:44:28 +00:00
# 先从原始文件名获取扩展名
original_filename = file . filename
if ' . ' not in original_filename :
2025-11-21 13:27:40 +00:00
return jsonify ( { ' error ' : ' 无效的文件格式 ' } ) , 400
2025-11-21 13:44:28 +00:00
ext = original_filename . rsplit ( ' . ' , 1 ) [ 1 ] . lower ( )
2025-11-21 13:27:40 +00:00
if ext not in allowed_extensions :
return jsonify ( { ' error ' : ' 不支持的文件格式,请上传 PNG、JPG、GIF 或 WEBP 格式 ' } ) , 400
# 创建avatars目录
avatars_dir = os . path . join ( FRONTEND_DIR , ' assets ' , ' avatars ' )
os . makedirs ( avatars_dir , exist_ok = True )
# 生成唯一文件名
timestamp = datetime . now ( ) . strftime ( ' % Y % m %d % H % M % S ' )
new_filename = f ' avatar_ { uid } _ { timestamp } . { ext } '
filepath = os . path . join ( avatars_dir , new_filename )
# 保存文件
file . save ( filepath )
# 更新数据库
avatar_url = f ' ./assets/avatars/ { new_filename } '
conn = get_db ( )
c = conn . cursor ( )
# 删除旧头像文件(如果存在)
c . execute ( ' SELECT avatar FROM users WHERE id=? ' , ( uid , ) )
row = c . fetchone ( )
if row and row [ ' avatar ' ] and row [ ' avatar ' ] . startswith ( ' ./assets/avatars/ ' ) :
old_file = os . path . join ( FRONTEND_DIR , row [ ' avatar ' ] . replace ( ' ./ ' , ' ' ) )
if os . path . exists ( old_file ) :
try :
os . remove ( old_file )
except Exception :
pass
c . execute ( ' UPDATE users SET avatar=? WHERE id=? ' , ( avatar_url , uid ) )
conn . commit ( )
conn . close ( )
log ( ' upload_avatar ' , f ' 上传头像: { new_filename } ' )
return jsonify ( { ' ok ' : True , ' avatar_url ' : avatar_url } )
@app.post ( ' /api/user/reset-avatar ' )
@require_login
def reset_avatar ( ) :
uid = session . get ( ' user_id ' )
if not uid :
return jsonify ( { ' error ' : ' 未登录 ' } ) , 401
conn = get_db ( )
c = conn . cursor ( )
# 删除旧头像文件(如果存在)
c . execute ( ' SELECT avatar FROM users WHERE id=? ' , ( uid , ) )
row = c . fetchone ( )
if row and row [ ' avatar ' ] and row [ ' avatar ' ] . startswith ( ' ./assets/avatars/ ' ) :
old_file = os . path . join ( FRONTEND_DIR , row [ ' avatar ' ] . replace ( ' ./ ' , ' ' ) )
if os . path . exists ( old_file ) :
try :
os . remove ( old_file )
except Exception :
pass
c . execute ( ' UPDATE users SET avatar=NULL WHERE id=? ' , ( uid , ) )
conn . commit ( )
conn . close ( )
log ( ' reset_avatar ' , ' 恢复默认头像 ' )
return jsonify ( { ' ok ' : True } )
# dashboard
@app.get ( ' /api/dashboard ' )
@require_login
def dashboard ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT COALESCE(SUM(good),0) AS good_total, COALESCE(SUM(bad),0) AS bad_total, COALESCE(SUM(fpy_good),0) AS fpy_good_total FROM stats ' )
s = c . fetchone ( )
c . execute ( ' SELECT COUNT(1) AS total FROM defects ' )
defects = c . fetchone ( )
conn . close ( )
good = s [ ' good_total ' ] if s else 0
bad = s [ ' bad_total ' ] if s else 0
fpy_good = s [ ' fpy_good_total ' ] if s else 0
# 计算总良品率
rate = " {} % " . format ( round ( ( good / ( good + bad ) ) * 100 , 2 ) ) if ( good + bad ) > 0 else u ' — '
# 计算直通良品率( FPY = First Pass Yield)
total_produced = good + bad
fpy_rate = " {} % " . format ( round ( ( fpy_good / total_produced ) * 100 , 2 ) ) if total_produced > 0 else u ' — '
# 从 Redis 获取发货数量( SN 记录数)
shipments_count = 0
try :
r = get_redis ( )
redis_key = ' shipment_sn_mapping '
shipments_count = r . hlen ( redis_key )
except Exception as e :
log ( ' dashboard_redis_error ' , str ( e ) )
# Redis 失败时回退到 SQLite
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT SUM(qty) AS total FROM shipments ' )
ship = c . fetchone ( )
conn . close ( )
shipments_count = ( ship [ ' total ' ] or 0 ) if ship else 0
return jsonify ( {
' fpyRate ' : fpy_rate ,
' goodRate ' : rate ,
' shipments ' : shipments_count ,
' defects ' : ( defects [ ' total ' ] or 0 ) if defects else 0 ,
' badCount ' : bad
} )
@app.get ( ' /api/audit/pdd ' )
@require_login
def audit_pdd ( ) :
start = datetime . utcnow ( )
try :
q_start = request . args . get ( ' start ' )
q_end = request . args . get ( ' end ' )
q_limit = request . args . get ( ' limit ' )
q_order = request . args . get ( ' order ' , ' desc ' )
has_filter = bool ( q_start or q_end or q_limit or q_order )
# 缓存优化: 3秒内不重复查询
if ( not has_filter ) and ( ( datetime . utcnow ( ) . timestamp ( ) - _audit_cache [ ' pdd ' ] [ ' ts ' ] ) < 3 ) :
return jsonify ( { ' list ' : _audit_cache [ ' pdd ' ] [ ' list ' ] } )
r = get_redis ( )
# 设置Redis超时为5秒
r . connection_pool . connection_kwargs [ ' socket_timeout ' ] = 5
items = [ ]
# 限制最大返回数量,避免数据过大
max_items = 500 if has_filter else 200
for key in [ ' mac_batch_audit_pdd ' , ' audit:pdd ' , ' pdd:audit ' ] :
try :
if r . exists ( key ) :
t = r . type ( key )
if t == ' list ' :
# 限制最大查询数量
total = r . llen ( key )
if has_filter :
items = r . lrange ( key , max ( 0 , total - max_items ) , - 1 )
else :
items = r . lrange ( key , - 200 , - 1 )
elif t == ' zset ' :
items = r . zrevrange ( key , 0 , max_items - 1 )
elif t == ' stream ' :
entries = r . xrevrange ( key , max = ' + ' , min = ' - ' , count = max_items )
items = [ json . dumps ( v ) for _id , v in entries ]
else :
v = r . get ( key )
items = [ v ] if v else [ ]
break
except Exception as e :
log ( ' audit_pdd_error ' , f ' Redis query error: { str ( e ) } ' )
continue
try :
host = os . environ . get ( ' REDIS_HOST ' )
db = os . environ . get ( ' REDIS_DB ' )
tp = r . type ( ' mac_batch_audit_pdd ' )
ln = 0
try :
ln = r . llen ( ' mac_batch_audit_pdd ' )
except Exception :
pass
log ( ' audit_pdd_probe ' , json . dumps ( { ' host ' : host , ' db ' : db , ' type ' : tp , ' len ' : ln } ) )
except Exception :
pass
if not items and r . exists ( ' batch_sn_mapping_pdd ' ) and r . type ( ' batch_sn_mapping_pdd ' ) == ' hash ' :
try :
pairs = [ ]
cursor = 0
while True :
cursor , res = r . hscan ( ' batch_sn_mapping_pdd ' , cursor = cursor , count = 200 )
for k , v in ( res or { } ) . items ( ) :
pairs . append ( { ' mac ' : k , ' batch ' : v } )
if len ( pairs ) > = 100 :
break
if cursor == 0 or len ( pairs ) > = 100 :
break
items = [ json . dumps ( { ' mac ' : p [ ' mac ' ] , ' batch ' : p [ ' batch ' ] , ' note ' : ' mapping ' } ) for p in pairs ]
except Exception :
pass
res = [ parse_audit_line ( x ) for x in items ]
if q_start or q_end :
def to_epoch ( s ) :
try :
if not s :
return None
if ' T ' in s or ' Z ' in s or ' + ' in s :
return datetime . fromisoformat ( s . replace ( ' Z ' , ' +00:00 ' ) ) . timestamp ( )
if ' ' in s and ' : ' in s :
return datetime . strptime ( s , ' % Y- % m- %d % H: % M: % S ' ) . timestamp ( )
return datetime . strptime ( s , ' % Y- % m- %d ' ) . timestamp ( )
except Exception :
return None
s_epoch = to_epoch ( q_start ) if q_start else None
e_epoch = to_epoch ( q_end ) if q_end else None
tmp = [ ]
for r0 in res :
ts = to_epoch ( r0 . get ( ' ts_cn ' ) )
if ts is None :
continue
if s_epoch is not None and ts < s_epoch :
continue
if e_epoch is not None and ts > e_epoch :
continue
tmp . append ( r0 )
res = tmp
try :
def to_key ( r ) :
s = r . get ( ' ts_cn ' ) or ' '
try :
if ' T ' in s or ' Z ' in s or ' + ' in s :
return datetime . fromisoformat ( s . replace ( ' Z ' , ' +00:00 ' ) ) . timestamp ( )
if ' ' in s and ' : ' in s :
return datetime . strptime ( s , ' % Y- % m- %d % H: % M: % S ' ) . timestamp ( )
return datetime . strptime ( s , ' % Y- % m- %d ' ) . timestamp ( )
except Exception :
return 0
res . sort ( key = to_key , reverse = ( q_order != ' asc ' ) )
except Exception :
res . reverse ( )
if q_limit :
try :
lim = int ( q_limit )
if lim > 0 :
res = res [ : lim ]
except Exception :
pass
if not has_filter :
_audit_cache [ ' pdd ' ] = { ' ts ' : datetime . utcnow ( ) . timestamp ( ) , ' list ' : res }
dur = ( datetime . utcnow ( ) - start ) . total_seconds ( )
log ( ' audit_pdd_cost ' , f " { dur } s len= { len ( res ) } " )
return jsonify ( { ' list ' : res } )
except Exception as e :
log ( ' audit_pdd_error ' , str ( e ) )
return jsonify ( { ' list ' : [ ] } )
@app.get ( ' /api/audit/yt ' )
@require_login
def audit_yt ( ) :
start = datetime . utcnow ( )
try :
q_start = request . args . get ( ' start ' )
q_end = request . args . get ( ' end ' )
q_limit = request . args . get ( ' limit ' )
q_order = request . args . get ( ' order ' , ' desc ' )
has_filter = bool ( q_start or q_end or q_limit or q_order )
# 缓存优化: 3秒内不重复查询
if ( not has_filter ) and ( ( datetime . utcnow ( ) . timestamp ( ) - _audit_cache [ ' yt ' ] [ ' ts ' ] ) < 3 ) :
return jsonify ( { ' list ' : _audit_cache [ ' yt ' ] [ ' list ' ] } )
r = get_redis ( )
# 设置Redis超时为5秒
r . connection_pool . connection_kwargs [ ' socket_timeout ' ] = 5
items = [ ]
# 限制最大返回数量,避免数据过大
max_items = 500 if has_filter else 200
for key in [ ' mac_batch_audit_yt ' , ' audit:yt ' , ' yt:audit ' ] :
try :
if r . exists ( key ) :
t = r . type ( key )
if t == ' list ' :
# 限制最大查询数量
total = r . llen ( key )
if has_filter :
items = r . lrange ( key , max ( 0 , total - max_items ) , - 1 )
else :
items = r . lrange ( key , - 200 , - 1 )
elif t == ' zset ' :
items = r . zrevrange ( key , 0 , max_items - 1 )
elif t == ' stream ' :
entries = r . xrevrange ( key , max = ' + ' , min = ' - ' , count = max_items )
items = [ json . dumps ( v ) for _id , v in entries ]
else :
v = r . get ( key )
items = [ v ] if v else [ ]
break
except Exception as e :
log ( ' audit_yt_error ' , f ' Redis query error: { str ( e ) } ' )
continue
try :
host = os . environ . get ( ' REDIS_HOST ' )
db = os . environ . get ( ' REDIS_DB ' )
tp = r . type ( ' mac_batch_audit_yt ' )
ln = 0
try :
ln = r . llen ( ' mac_batch_audit_yt ' )
except Exception :
pass
log ( ' audit_yt_probe ' , json . dumps ( { ' host ' : host , ' db ' : db , ' type ' : tp , ' len ' : ln } ) )
except Exception :
pass
if not items and r . exists ( ' batch_sn_mapping_yt ' ) and r . type ( ' batch_sn_mapping_yt ' ) == ' hash ' :
try :
pairs = [ ]
cursor = 0
while True :
cursor , res = r . hscan ( ' batch_sn_mapping_yt ' , cursor = cursor , count = 200 )
for k , v in ( res or { } ) . items ( ) :
pairs . append ( { ' mac ' : k , ' batch ' : v } )
if len ( pairs ) > = 100 :
break
if cursor == 0 or len ( pairs ) > = 100 :
break
items = [ json . dumps ( { ' mac ' : p [ ' mac ' ] , ' batch ' : p [ ' batch ' ] , ' note ' : ' mapping ' } ) for p in pairs ]
except Exception :
pass
res = [ parse_audit_line ( x ) for x in items ]
if q_start or q_end :
def to_epoch ( s ) :
try :
if not s :
return None
if ' T ' in s or ' Z ' in s or ' + ' in s :
return datetime . fromisoformat ( s . replace ( ' Z ' , ' +00:00 ' ) ) . timestamp ( )
if ' ' in s and ' : ' in s :
return datetime . strptime ( s , ' % Y- % m- %d % H: % M: % S ' ) . timestamp ( )
return datetime . strptime ( s , ' % Y- % m- %d ' ) . timestamp ( )
except Exception :
return None
s_epoch = to_epoch ( q_start ) if q_start else None
e_epoch = to_epoch ( q_end ) if q_end else None
tmp = [ ]
for r0 in res :
ts = to_epoch ( r0 . get ( ' ts_cn ' ) )
if ts is None :
continue
if s_epoch is not None and ts < s_epoch :
continue
if e_epoch is not None and ts > e_epoch :
continue
tmp . append ( r0 )
res = tmp
try :
def to_key ( r ) :
s = r . get ( ' ts_cn ' ) or ' '
try :
if ' T ' in s or ' Z ' in s or ' + ' in s :
return datetime . fromisoformat ( s . replace ( ' Z ' , ' +00:00 ' ) ) . timestamp ( )
if ' ' in s and ' : ' in s :
return datetime . strptime ( s , ' % Y- % m- %d % H: % M: % S ' ) . timestamp ( )
return datetime . strptime ( s , ' % Y- % m- %d ' ) . timestamp ( )
except Exception :
return 0
res . sort ( key = to_key , reverse = ( q_order != ' asc ' ) )
except Exception :
res . reverse ( )
if q_limit :
try :
lim = int ( q_limit )
if lim > 0 :
res = res [ : lim ]
except Exception :
pass
if not has_filter :
_audit_cache [ ' yt ' ] = { ' ts ' : datetime . utcnow ( ) . timestamp ( ) , ' list ' : res }
dur = ( datetime . utcnow ( ) - start ) . total_seconds ( )
log ( ' audit_yt_cost ' , f " { dur } s len= { len ( res ) } " )
return jsonify ( { ' list ' : res } )
except Exception as e :
log ( ' audit_yt_error ' , str ( e ) )
return jsonify ( { ' list ' : [ ] } )
@app.get ( ' /api/audit/diagnose ' )
@require_login
def audit_diagnose ( ) :
try :
r = get_redis ( )
result = { }
for key in [ ' mac_batch_audit_pdd ' , ' mac_batch_audit_yt ' , ' batch_sn_mapping_pdd ' , ' batch_sn_mapping_yt ' ] :
try :
t = r . type ( key )
if t == ' list ' :
result [ key ] = { ' type ' : t , ' len ' : r . llen ( key ) }
elif t == ' zset ' :
result [ key ] = { ' type ' : t , ' len ' : r . zcard ( key ) }
elif t == ' stream ' :
info = r . xinfo_stream ( key )
result [ key ] = { ' type ' : t , ' len ' : info . get ( ' length ' ) }
elif t == ' hash ' :
result [ key ] = { ' type ' : t , ' len ' : r . hlen ( key ) }
elif t == ' none ' :
result [ key ] = { ' type ' : t , ' len ' : 0 }
else :
v = r . get ( key )
result [ key ] = { ' type ' : t , ' len ' : 1 if v else 0 }
except Exception as e :
result [ key ] = { ' error ' : str ( e ) }
return jsonify ( result )
except Exception as e :
return jsonify ( { ' error ' : str ( e ) } ) , 500
@app.get ( ' /api/overview ' )
@require_login
def overview ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT COUNT(1) AS cnt, COALESCE(SUM(good),0) AS good_total, COALESCE(SUM(bad),0) AS bad_total, COALESCE(SUM(fpy_good),0) AS fpy_good_total FROM stats ' )
stats_row = c . fetchone ( )
c . execute ( ' SELECT COUNT(1) AS cnt FROM defects ' )
defects_row = c . fetchone ( )
c . execute ( ' SELECT COUNT(1) AS cnt FROM mac_batches ' )
mac_row = c . fetchone ( )
c . execute ( ' SELECT COUNT(1) AS cnt, COALESCE(SUM(qty),0) AS qty_total FROM shipments ' )
ship_row = c . fetchone ( )
c . execute ( ' SELECT COUNT(1) AS cnt FROM devices ' )
devices_row = c . fetchone ( )
c . execute ( ' SELECT COUNT(1) AS cnt FROM personnel ' )
personnel_row = c . fetchone ( )
c . execute ( ' SELECT COUNT(1) AS cnt FROM qa ' )
qa_row = c . fetchone ( )
c . execute ( ' SELECT COUNT(1) AS cnt FROM production ' )
production_row = c . fetchone ( )
conn . close ( )
return jsonify ( {
' stats ' : {
' records ' : ( stats_row [ ' cnt ' ] or 0 ) if stats_row else 0 ,
' goodTotal ' : ( stats_row [ ' good_total ' ] or 0 ) if stats_row else 0 ,
' badTotal ' : ( stats_row [ ' bad_total ' ] or 0 ) if stats_row else 0 ,
' fpyGoodTotal ' : ( stats_row [ ' fpy_good_total ' ] or 0 ) if stats_row else 0
} ,
' defects ' : ( defects_row [ ' cnt ' ] or 0 ) if defects_row else 0 ,
' mac ' : ( mac_row [ ' cnt ' ] or 0 ) if mac_row else 0 ,
' shipments ' : {
' records ' : ( ship_row [ ' cnt ' ] or 0 ) if ship_row else 0 ,
' qtyTotal ' : ( ship_row [ ' qty_total ' ] or 0 ) if ship_row else 0
} ,
' devices ' : ( devices_row [ ' cnt ' ] or 0 ) if devices_row else 0 ,
' personnel ' : ( personnel_row [ ' cnt ' ] or 0 ) if personnel_row else 0 ,
' qa ' : ( qa_row [ ' cnt ' ] or 0 ) if qa_row else 0 ,
' production ' : ( production_row [ ' cnt ' ] or 0 ) if production_row else 0
} )
# uploads
@app.post ( ' /api/upload/mac ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def upload_mac ( ) :
data = request . get_json ( ) or { }
rows = data . get ( ' rows ' ) or [ ]
if not isinstance ( rows , list ) :
return jsonify ( { ' error ' : ' invalid rows ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
2025-11-21 14:45:00 +00:00
now = get_beijing_time ( )
2025-11-21 13:27:40 +00:00
for r in rows :
mac = ( r or { } ) . get ( ' mac ' )
batch = ( r or { } ) . get ( ' batch ' )
if not mac or not batch :
continue
c . execute ( ' INSERT INTO mac_batches(mac, batch, ts) VALUES(?,?,?) ' , ( mac , batch , now ) )
conn . commit ( )
conn . close ( )
log ( ' upload_mac ' , f " count= { len ( rows ) } " )
notify_superadmin ( ' 上传MAC与批次 ' , f " 上传了 { len ( rows ) } 条记录 " )
return jsonify ( { ' ok ' : True } )
@app.post ( ' /api/upload/stats ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def upload_stats ( ) :
data = request . get_json ( ) or { }
good = int ( data . get ( ' good ' ) or 0 )
bad = int ( data . get ( ' bad ' ) or 0 )
fpy_good = int ( data . get ( ' fpy_good ' ) or 0 ) # 直通良品数
platform = data . get ( ' platform ' ) or ' pdd ' # 平台: pdd/yt/tx
details = data . get ( ' details ' ) or [ ]
if good < 0 or bad < 0 or fpy_good < 0 :
return jsonify ( { ' error ' : ' invalid count ' } ) , 400
if platform not in [ ' pdd ' , ' yt ' , ' tx ' ] :
platform = ' pdd '
conn = get_db ( )
c = conn . cursor ( )
2025-11-21 14:45:00 +00:00
now = get_beijing_time ( )
2025-11-21 13:27:40 +00:00
# 保存统计数据
c . execute ( ' INSERT INTO stats(good,bad,fpy_good,platform,ts) VALUES(?,?,?,?,?) ' , ( good , bad , fpy_good , platform , now ) )
# 如果有不良明细, 保存到defects表
if details and isinstance ( details , list ) :
for item in details :
mac = ( item or { } ) . get ( ' mac ' )
batch = ( item or { } ) . get ( ' batch ' )
if mac and batch :
c . execute ( ' INSERT INTO defects(mac, batch, ts) VALUES(?,?,?) ' , ( mac , batch , now ) )
conn . commit ( )
conn . close ( )
platform_name = { ' pdd ' : ' 拼多多 ' , ' yt ' : ' 圆通 ' , ' tx ' : ' 兔喜 ' } . get ( platform , platform )
log ( ' upload_stats ' , json . dumps ( { ' good ' : good , ' bad ' : bad , ' fpy_good ' : fpy_good , ' platform ' : platform , ' details_count ' : len ( details ) } ) )
notify_superadmin ( ' 上传良/不良统计 ' , f " 平台: { platform_name } , 良品: { good } , 不良品: { bad } , 直通良品: { fpy_good } " )
return jsonify ( { ' ok ' : True } )
@app.post ( ' /api/upload/repairs ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def upload_repairs ( ) :
data = request . get_json ( ) or { }
qty = int ( data . get ( ' qty ' ) or 0 )
note = data . get ( ' note ' ) or ' '
if qty < 0 :
return jsonify ( { ' error ' : ' invalid quantity ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
2025-11-21 14:45:00 +00:00
now = get_beijing_time ( )
2025-11-21 13:27:40 +00:00
c . execute ( ' INSERT INTO repairs(qty, note, ts) VALUES(?,?,?) ' , ( qty , note , now ) )
conn . commit ( )
conn . close ( )
log ( ' upload_repairs ' , json . dumps ( { ' qty ' : qty , ' note ' : note } ) )
notify_superadmin ( ' 上传返修记录 ' , f " 数量: { qty } " )
return jsonify ( { ' ok ' : True } )
@app.post ( ' /api/upload/defects ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def upload_defects ( ) :
data = request . get_json ( ) or { }
rows = data . get ( ' rows ' ) or [ ]
conn = get_db ( )
c = conn . cursor ( )
2025-11-21 14:45:00 +00:00
now = get_beijing_time ( )
2025-11-21 13:27:40 +00:00
for r in rows :
mac = ( r or { } ) . get ( ' mac ' )
batch = ( r or { } ) . get ( ' batch ' )
if not mac or not batch :
continue
c . execute ( ' INSERT INTO defects(mac, batch, ts) VALUES(?,?,?) ' , ( mac , batch , now ) )
conn . commit ( )
conn . close ( )
log ( ' upload_defects ' , f " count= { len ( rows ) } " )
notify_superadmin ( ' 上传不良明细 ' , f " 上传了 { len ( rows ) } 条记录 " )
return jsonify ( { ' ok ' : True } )
@app.post ( ' /api/upload/shipments ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def upload_shipments ( ) :
data = request . get_json ( ) or { }
date = data . get ( ' date ' )
qty = int ( data . get ( ' qty ' ) or 0 )
to = data . get ( ' to ' )
platform = data . get ( ' platform ' , ' ' )
box_no = data . get ( ' box_no ' , ' ' )
if not date or qty < = 0 or not to or not platform :
return jsonify ( { ' error ' : ' invalid payload ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
# 检查shipments表是否有platform和box_no列, 如果没有则添加
c . execute ( " PRAGMA table_info(shipments) " )
columns = [ col [ 1 ] for col in c . fetchall ( ) ]
if ' platform ' not in columns :
c . execute ( ' ALTER TABLE shipments ADD COLUMN platform TEXT ' )
if ' box_no ' not in columns :
c . execute ( ' ALTER TABLE shipments ADD COLUMN box_no TEXT ' )
c . execute (
' INSERT INTO shipments(date, qty, receiver, platform, box_no, ts) VALUES(?,?,?,?,?,?) ' ,
2025-11-21 14:45:00 +00:00
( date , qty , to , platform , box_no , get_beijing_time ( ) )
2025-11-21 13:27:40 +00:00
)
conn . commit ( )
conn . close ( )
platform_name = { ' pdd ' : ' 拼多多 ' , ' yt ' : ' 圆通 ' , ' tx ' : ' 兔喜 ' } . get ( platform , platform )
log_data = { ' date ' : date , ' qty ' : qty , ' to ' : to , ' platform ' : platform }
if box_no :
log_data [ ' box_no ' ] = box_no
log ( ' upload_shipments ' , json . dumps ( log_data ) )
notify_superadmin ( ' 上传发货记录 ' , f " 机种: { platform_name } , 日期: { date } , 数量: { qty } , 接收方: { to } " )
return jsonify ( { ' ok ' : True } )
# collect
@app.get ( ' /api/collect/devices ' )
@require_login
def devices ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT name, status FROM devices ORDER BY id DESC LIMIT 50 ' )
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
@app.get ( ' /api/collect/environment ' )
@require_login
def environment ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT temp, hum FROM environment ORDER BY id DESC LIMIT 1 ' )
r = c . fetchone ( )
conn . close ( )
return jsonify ( { ' temp ' : ( r [ ' temp ' ] if r else ' — ' ) , ' hum ' : ( r [ ' hum ' ] if r else ' — ' ) } )
@app.get ( ' /api/collect/personnel ' )
@require_login
def personnel ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT name, role FROM personnel ORDER BY id DESC LIMIT 100 ' )
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
@app.post ( ' /api/collect/personnel ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def add_personnel ( ) :
data = request . get_json ( ) or { }
name = ( data . get ( ' name ' ) or ' ' ) . strip ( )
role = ( data . get ( ' role ' ) or ' ' ) . strip ( )
if not name :
return jsonify ( { ' error ' : ' invalid payload ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
2025-11-21 14:45:00 +00:00
c . execute ( ' INSERT INTO personnel(name, role, ts) VALUES(?,?,?) ' , ( name , role , get_beijing_time ( ) ) )
2025-11-21 13:27:40 +00:00
conn . commit ( )
conn . close ( )
log ( ' add_personnel ' , name )
notify_superadmin ( ' 添加人员信息 ' , f " 姓名: { name } , 角色: { role } " )
return jsonify ( { ' ok ' : True } )
@app.get ( ' /api/collect/qa ' )
@require_login
def qa ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT title, date FROM qa ORDER BY id DESC LIMIT 100 ' )
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
@app.get ( ' /api/collect/production ' )
@require_login
def production ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT batch, duration FROM production ORDER BY id DESC LIMIT 100 ' )
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
# export
@app.post ( ' /api/export/excel ' )
@require_login
def export_excel ( ) :
try :
import openpyxl
from openpyxl . styles import Font , Alignment , PatternFill
from io import BytesIO
data = request . get_json ( ) or { }
data_type = data . get ( ' type ' , ' stats ' )
# 创建工作簿
wb = openpyxl . Workbook ( )
ws = wb . active
# 设置标题样式
header_fill = PatternFill ( start_color = ' 4F8CFF ' , end_color = ' 4F8CFF ' , fill_type = ' solid ' )
header_font = Font ( bold = True , color = ' FFFFFF ' )
header_alignment = Alignment ( horizontal = ' center ' , vertical = ' center ' )
conn = get_db ( )
c = conn . cursor ( )
# 根据类型导出不同的数据
if data_type == ' stats ' :
ws . title = ' 良不良统计 '
ws . append ( [ ' 直通良品数 ' , ' 良品数 ' , ' 不良品数 ' , ' 时间 ' ] )
c . execute ( ' SELECT fpy_good, good, bad, ts FROM stats ORDER BY id DESC ' )
elif data_type == ' mac ' :
ws . title = ' MAC与批次 '
ws . append ( [ ' MAC地址 ' , ' 批次号 ' , ' 时间 ' ] )
c . execute ( ' SELECT mac, batch, ts FROM mac_batches ORDER BY id DESC ' )
elif data_type == ' repairs ' :
ws . title = ' 返修记录 '
ws . append ( [ ' 返修数量 ' , ' 备注 ' , ' 时间 ' ] )
c . execute ( ' SELECT qty, note, ts FROM repairs ORDER BY id DESC ' )
elif data_type == ' defects ' :
ws . title = ' 不良明细 '
ws . append ( [ ' MAC地址 ' , ' 批次号 ' , ' 时间 ' ] )
c . execute ( ' SELECT mac, batch, ts FROM defects ORDER BY id DESC ' )
elif data_type == ' shipments ' :
ws . title = ' 发货记录 '
ws . append ( [ ' 日期 ' , ' 数量 ' , ' 收货方 ' , ' 时间 ' ] )
c . execute ( ' SELECT date, qty, receiver, ts FROM shipments ORDER BY id DESC ' )
elif data_type == ' devices ' :
ws . title = ' 设备状态 '
ws . append ( [ ' 设备名称 ' , ' 状态 ' ] )
c . execute ( ' SELECT name, status FROM devices ORDER BY id DESC ' )
elif data_type == ' personnel ' :
ws . title = ' 人员信息 '
ws . append ( [ ' 姓名 ' , ' 角色 ' ] )
c . execute ( ' SELECT name, role FROM personnel ORDER BY id DESC ' )
elif data_type == ' qa ' :
ws . title = ' 质检报告 '
ws . append ( [ ' 标题 ' , ' 日期 ' ] )
c . execute ( ' SELECT title, date FROM qa ORDER BY id DESC ' )
elif data_type == ' production ' :
ws . title = ' 时间记录 '
ws . append ( [ ' 批次 ' , ' 时长 ' ] )
c . execute ( ' SELECT batch, duration FROM production ORDER BY id DESC ' )
else :
conn . close ( )
return jsonify ( { ' error ' : ' invalid type ' } ) , 400
# 应用标题样式
for cell in ws [ 1 ] :
cell . fill = header_fill
cell . font = header_font
cell . alignment = header_alignment
# 写入数据
rows = c . fetchall ( )
for row in rows :
ws . append ( list ( row ) )
conn . close ( )
# 自动调整列宽
for column in ws . columns :
max_length = 0
column_letter = column [ 0 ] . column_letter
for cell in column :
try :
if len ( str ( cell . value ) ) > max_length :
max_length = len ( str ( cell . value ) )
except :
pass
adjusted_width = min ( max_length + 2 , 50 )
ws . column_dimensions [ column_letter ] . width = adjusted_width
# 保存到内存
output = BytesIO ( )
wb . save ( output )
output . seek ( 0 )
log ( ' export_excel ' , data_type )
# 返回文件
from flask import send_file
filename = f ' { ws . title } _ { datetime . now ( ) . strftime ( " % Y % m %d _ % H % M % S " ) } .xlsx '
return send_file (
output ,
mimetype = ' application/vnd.openxmlformats-officedocument.spreadsheetml.sheet ' ,
as_attachment = True ,
download_name = filename
)
except Exception as e :
log ( ' export_excel_error ' , str ( e ) )
return jsonify ( { ' error ' : str ( e ) } ) , 500
@app.post ( ' /api/export/pdf ' )
@require_login
def export_pdf ( ) :
try :
from reportlab . lib import colors
from reportlab . lib . pagesizes import A4 , landscape
from reportlab . platypus import SimpleDocTemplate , Table , TableStyle , Paragraph , Spacer
from reportlab . lib . styles import getSampleStyleSheet , ParagraphStyle
from reportlab . lib . units import cm
from reportlab . pdfbase import pdfmetrics
from reportlab . pdfbase . ttfonts import TTFont
from reportlab . lib . enums import TA_CENTER
from io import BytesIO
data = request . get_json ( ) or { }
data_type = data . get ( ' type ' , ' stats ' )
# 注册中文字体
font_name = ' Helvetica '
try :
# 尝试常见的中文字体路径
font_paths = [
' /usr/share/fonts/truetype/wqy/wqy-zenhei.ttc ' ,
' /usr/share/fonts/truetype/arphic/uming.ttc ' ,
' /System/Library/Fonts/PingFang.ttc ' ,
' C: \\ Windows \\ Fonts \\ simhei.ttf '
]
for font_path in font_paths :
if os . path . exists ( font_path ) :
pdfmetrics . registerFont ( TTFont ( ' ChineseFont ' , font_path ) )
font_name = ' ChineseFont '
break
except Exception as e :
log ( ' pdf_font_warning ' , f ' 无法加载中文字体: { str ( e ) } ' )
# 创建PDF
buffer = BytesIO ( )
doc = SimpleDocTemplate (
buffer ,
pagesize = landscape ( A4 ) ,
topMargin = 1.5 * cm ,
bottomMargin = 1.5 * cm ,
leftMargin = 1.5 * cm ,
rightMargin = 1.5 * cm
)
elements = [ ]
# 样式
styles = getSampleStyleSheet ( )
title_style = ParagraphStyle (
' CustomTitle ' ,
parent = styles [ ' Heading1 ' ] ,
fontName = font_name ,
fontSize = 18 ,
textColor = colors . HexColor ( ' #4F8CFF ' ) ,
spaceAfter = 20 ,
alignment = TA_CENTER ,
leading = 24
)
conn = get_db ( )
c = conn . cursor ( )
# 根据类型导出不同的数据
if data_type == ' stats ' :
title = ' 良/不良统计报表 '
headers = [ ' 直通良品数 ' , ' 良品数 ' , ' 不良品数 ' , ' 时间 ' ]
c . execute ( ' SELECT fpy_good, good, bad, ts FROM stats ORDER BY id DESC LIMIT 200 ' )
elif data_type == ' mac ' :
title = ' MAC与批次报表 '
headers = [ ' MAC地址 ' , ' 批次号 ' , ' 时间 ' ]
c . execute ( ' SELECT mac, batch, ts FROM mac_batches ORDER BY id DESC LIMIT 200 ' )
elif data_type == ' repairs ' :
title = ' 返修记录报表 '
headers = [ ' 返修数量 ' , ' 备注 ' , ' 时间 ' ]
c . execute ( ' SELECT qty, note, ts FROM repairs ORDER BY id DESC LIMIT 200 ' )
elif data_type == ' defects ' :
title = ' 不良明细报表 '
headers = [ ' MAC地址 ' , ' 批次号 ' , ' 时间 ' ]
c . execute ( ' SELECT mac, batch, ts FROM defects ORDER BY id DESC LIMIT 200 ' )
elif data_type == ' shipments ' :
title = ' 发货记录报表 '
headers = [ ' 日期 ' , ' 数量 ' , ' 收货方 ' , ' 时间 ' ]
c . execute ( ' SELECT date, qty, receiver, ts FROM shipments ORDER BY id DESC LIMIT 200 ' )
elif data_type == ' devices ' :
title = ' 设备状态报表 '
headers = [ ' 设备名称 ' , ' 状态 ' ]
c . execute ( ' SELECT name, status FROM devices ORDER BY id DESC LIMIT 200 ' )
elif data_type == ' personnel ' :
title = ' 人员信息报表 '
headers = [ ' 姓名 ' , ' 角色 ' ]
c . execute ( ' SELECT name, role FROM personnel ORDER BY id DESC LIMIT 200 ' )
elif data_type == ' qa ' :
title = ' 质检报告 '
headers = [ ' 标题 ' , ' 日期 ' ]
c . execute ( ' SELECT title, date FROM qa ORDER BY id DESC LIMIT 200 ' )
elif data_type == ' production ' :
title = ' 生产时间记录 '
headers = [ ' 批次 ' , ' 时长 ' ]
c . execute ( ' SELECT batch, duration FROM production ORDER BY id DESC LIMIT 200 ' )
else :
conn . close ( )
return jsonify ( { ' error ' : ' invalid type ' } ) , 400
# 添加标题
elements . append ( Paragraph ( title , title_style ) )
elements . append ( Spacer ( 1 , 0.5 * cm ) )
# 获取数据
rows = c . fetchall ( )
conn . close ( )
if len ( rows ) == 0 :
# 没有数据时的提示
no_data_style = ParagraphStyle (
' NoData ' ,
parent = styles [ ' Normal ' ] ,
fontName = font_name ,
fontSize = 12 ,
textColor = colors . grey ,
alignment = TA_CENTER
)
elements . append ( Paragraph ( ' 暂无数据 ' , no_data_style ) )
else :
# 构建表格数据
table_data = [ headers ]
for row in rows :
table_data . append ( [ str ( val ) if val is not None else ' ' for val in row ] )
# 创建表格
table = Table ( table_data , repeatRows = 1 )
table . setStyle ( TableStyle ( [
# 表头样式
( ' BACKGROUND ' , ( 0 , 0 ) , ( - 1 , 0 ) , colors . HexColor ( ' #4F8CFF ' ) ) ,
( ' TEXTCOLOR ' , ( 0 , 0 ) , ( - 1 , 0 ) , colors . whitesmoke ) ,
( ' ALIGN ' , ( 0 , 0 ) , ( - 1 , - 1 ) , ' CENTER ' ) ,
( ' VALIGN ' , ( 0 , 0 ) , ( - 1 , - 1 ) , ' MIDDLE ' ) ,
( ' FONTNAME ' , ( 0 , 0 ) , ( - 1 , 0 ) , font_name ) ,
( ' FONTSIZE ' , ( 0 , 0 ) , ( - 1 , 0 ) , 10 ) ,
( ' BOTTOMPADDING ' , ( 0 , 0 ) , ( - 1 , 0 ) , 8 ) ,
( ' TOPPADDING ' , ( 0 , 0 ) , ( - 1 , 0 ) , 8 ) ,
# 数据行样式
( ' BACKGROUND ' , ( 0 , 1 ) , ( - 1 , - 1 ) , colors . white ) ,
( ' GRID ' , ( 0 , 0 ) , ( - 1 , - 1 ) , 0.5 , colors . grey ) ,
( ' FONTNAME ' , ( 0 , 1 ) , ( - 1 , - 1 ) , font_name ) ,
( ' FONTSIZE ' , ( 0 , 1 ) , ( - 1 , - 1 ) , 8 ) ,
( ' ROWBACKGROUNDS ' , ( 0 , 1 ) , ( - 1 , - 1 ) , [ colors . white , colors . HexColor ( ' #F5F7FA ' ) ] ) ,
( ' TOPPADDING ' , ( 0 , 1 ) , ( - 1 , - 1 ) , 5 ) ,
( ' BOTTOMPADDING ' , ( 0 , 1 ) , ( - 1 , - 1 ) , 5 ) ,
] ) )
elements . append ( table )
# 添加页脚信息
elements . append ( Spacer ( 1 , 0.5 * cm ) )
footer_style = ParagraphStyle (
' Footer ' ,
parent = styles [ ' Normal ' ] ,
fontName = font_name ,
fontSize = 8 ,
textColor = colors . grey ,
alignment = TA_CENTER
)
footer_text = f ' 导出时间: { datetime . now ( ) . strftime ( " % Y- % m- %d % H: % M: % S " ) } | 共 { len ( rows ) } 条记录 '
elements . append ( Paragraph ( footer_text , footer_style ) )
# 生成PDF
doc . build ( elements )
buffer . seek ( 0 )
log ( ' export_pdf ' , data_type )
# 返回文件
from flask import send_file
filename = f ' { title } _ { datetime . now ( ) . strftime ( " % Y % m %d _ % H % M % S " ) } .pdf '
return send_file (
buffer ,
mimetype = ' application/pdf ' ,
as_attachment = True ,
download_name = filename
)
except Exception as e :
log ( ' export_pdf_error ' , str ( e ) )
return jsonify ( { ' error ' : f ' PDF导出失败: { str ( e ) } ' } ) , 500
# lists
@app.get ( ' /api/list/mac ' )
@require_login
def list_mac ( ) :
conn = get_db ( )
c = conn . cursor ( )
2025-11-22 01:53:18 +00:00
c . execute ( ' SELECT mac, batch, platform, ts FROM mac_batches ORDER BY id DESC LIMIT 200 ' )
2025-11-21 13:27:40 +00:00
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
@app.get ( ' /api/list/stats ' )
@require_login
def list_stats ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT good, bad, fpy_good, platform, ts FROM stats ORDER BY id DESC LIMIT 200 ' )
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
@app.get ( ' /api/list/repairs ' )
@require_login
def list_repairs ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT qty, note, ts FROM repairs ORDER BY id DESC LIMIT 200 ' )
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
@app.get ( ' /api/list/defects ' )
@require_login
def list_defects ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT mac, batch, ts FROM defects ORDER BY id DESC LIMIT 200 ' )
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
@app.get ( ' /api/list/shipments ' )
@require_login
def list_shipments ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT date, qty, receiver, ts FROM shipments ORDER BY id DESC LIMIT 200 ' )
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
# admin management
@app.get ( ' /api/admin/users ' )
@require_login
@require_any_role ( ' superadmin ' )
def list_users ( ) :
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT username, role FROM users ORDER BY id ASC ' )
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
@app.post ( ' /api/admin/reset-password ' )
@require_login
@require_any_role ( ' superadmin ' )
def reset_password ( ) :
data = request . get_json ( ) or { }
username = data . get ( ' username ' )
new_password = data . get ( ' new_password ' )
if not username or not new_password :
return jsonify ( { ' error ' : ' invalid payload ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT id FROM users WHERE username=? ' , ( username , ) )
row = c . fetchone ( )
if not row :
conn . close ( )
return jsonify ( { ' error ' : ' user not found ' } ) , 404
c . execute ( ' UPDATE users SET password_hash=? WHERE id=? ' , ( generate_password_hash ( new_password ) , row [ ' id ' ] ) )
conn . commit ( )
conn . close ( )
log ( ' reset_password ' , username )
return jsonify ( { ' ok ' : True } )
@app.post ( ' /api/admin/change-password ' )
@require_login
@require_any_role ( ' superadmin ' )
def change_password ( ) :
data = request . get_json ( ) or { }
username = data . get ( ' username ' )
new_password = data . get ( ' new_password ' )
if not username or not new_password :
return jsonify ( { ' error ' : ' invalid payload ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT id FROM users WHERE username=? ' , ( username , ) )
row = c . fetchone ( )
if not row :
conn . close ( )
return jsonify ( { ' error ' : ' user not found ' } ) , 404
c . execute ( ' UPDATE users SET password_hash=? WHERE id=? ' , ( generate_password_hash ( new_password ) , row [ ' id ' ] ) )
conn . commit ( )
conn . close ( )
log ( ' change_password ' , username )
return jsonify ( { ' ok ' : True } )
@app.post ( ' /api/admin/add-user ' )
@require_login
@require_any_role ( ' superadmin ' )
def add_user ( ) :
""" 添加新用户 """
data = request . get_json ( ) or { }
username = ( data . get ( ' username ' ) or ' ' ) . strip ( )
password = data . get ( ' password ' )
role = ( data . get ( ' role ' ) or ' admin ' ) . strip ( )
if not username or not password :
return jsonify ( { ' error ' : ' 用户名和密码不能为空 ' } ) , 400
if role not in [ ' admin ' , ' superadmin ' ] :
return jsonify ( { ' error ' : ' 角色必须是 admin 或 superadmin ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
# 检查用户名是否已存在
c . execute ( ' SELECT id FROM users WHERE username=? ' , ( username , ) )
if c . fetchone ( ) :
conn . close ( )
return jsonify ( { ' error ' : ' 用户名已存在 ' } ) , 400
# 创建新用户
try :
c . execute ( ' INSERT INTO users(username, password_hash, role) VALUES(?,?,?) ' ,
( username , generate_password_hash ( password ) , role ) )
conn . commit ( )
conn . close ( )
log ( ' add_user ' , f ' username= { username } , role= { role } ' )
return jsonify ( { ' ok ' : True , ' message ' : f ' 用户 { username } 创建成功 ' } )
except Exception as e :
conn . close ( )
return jsonify ( { ' error ' : f ' 创建用户失败: { str ( e ) } ' } ) , 500
2025-11-23 01:15:37 +00:00
@app.post ( ' /api/admin/delete-user ' )
@require_login
@require_any_role ( ' superadmin ' )
def delete_user ( ) :
""" 删除用户 """
data = request . get_json ( ) or { }
username = ( data . get ( ' username ' ) or ' ' ) . strip ( )
if not username :
return jsonify ( { ' error ' : ' 用户名不能为空 ' } ) , 400
# 获取当前登录用户
current_user = session . get ( ' user ' )
if current_user and current_user . get ( ' username ' ) == username :
return jsonify ( { ' error ' : ' 不能删除当前登录的用户 ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
# 检查用户是否存在
c . execute ( ' SELECT id, role FROM users WHERE username=? ' , ( username , ) )
user = c . fetchone ( )
if not user :
conn . close ( )
return jsonify ( { ' error ' : ' 用户不存在 ' } ) , 404
# 检查是否是最后一个超级管理员
if user [ ' role ' ] == ' superadmin ' :
c . execute ( ' SELECT COUNT(*) as cnt FROM users WHERE role=? ' , ( ' superadmin ' , ) )
count = c . fetchone ( ) [ ' cnt ' ]
if count < = 1 :
conn . close ( )
return jsonify ( { ' error ' : ' 不能删除最后一个超级管理员 ' } ) , 400
# 删除用户
try :
c . execute ( ' DELETE FROM users WHERE username=? ' , ( username , ) )
conn . commit ( )
conn . close ( )
log ( ' delete_user ' , f ' username= { username } ' )
return jsonify ( { ' ok ' : True , ' message ' : f ' 用户 { username } 已删除 ' } )
except Exception as e :
conn . close ( )
return jsonify ( { ' error ' : f ' 删除用户失败: { str ( e ) } ' } ) , 500
2025-11-21 13:27:40 +00:00
@app.post ( ' /api/admin/clear ' )
@require_login
@require_any_role ( ' superadmin ' )
def clear_module ( ) :
data = request . get_json ( ) or { }
module = data . get ( ' module ' )
tables = {
' mac ' : ' mac_batches ' ,
' stats ' : ' stats ' ,
' defects ' : ' defects ' ,
' shipments ' : ' shipments ' ,
' devices ' : ' devices ' ,
' environment ' : ' environment ' ,
' personnel ' : ' personnel ' ,
' qa ' : ' qa ' ,
' production ' : ' production '
}
table = tables . get ( module )
if not table :
return jsonify ( { ' error ' : ' invalid module ' } ) , 400
2025-11-21 14:04:43 +00:00
# 清空 SQLite 表
2025-11-21 13:27:40 +00:00
conn = get_db ( )
c = conn . cursor ( )
c . execute ( f ' DELETE FROM { table } ' )
conn . commit ( )
conn . close ( )
2025-11-21 14:04:43 +00:00
# 如果是清空发货记录,同时清空 Redis
if module == ' shipments ' :
try :
r = get_redis ( )
redis_key = ' shipment_sn_mapping '
redis_count = r . hlen ( redis_key )
r . delete ( redis_key )
log ( ' clear_module_redis ' , f ' shipments: cleared { redis_count } records from redis ' )
except Exception as e :
log ( ' clear_module_redis_error ' , str ( e ) )
2025-11-21 13:27:40 +00:00
log ( ' clear_module ' , module )
return jsonify ( { ' ok ' : True } )
# notifications
@app.get ( ' /api/notifications ' )
@require_login
2025-11-23 05:28:53 +00:00
@require_any_role ( ' superadmin ' , ' admin ' )
2025-11-21 13:27:40 +00:00
def get_notifications ( ) :
""" 获取当前用户的通知列表 """
user_id = session . get ( ' user_id ' )
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT id, username, action, detail, ts, read FROM notifications WHERE user_id=? ORDER BY id DESC LIMIT 100 ' , ( user_id , ) )
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
@app.get ( ' /api/notifications/unread-count ' )
@require_login
2025-11-23 05:28:53 +00:00
@require_any_role ( ' superadmin ' , ' admin ' )
2025-11-21 13:27:40 +00:00
def get_unread_count ( ) :
""" 获取未读通知数量 """
user_id = session . get ( ' user_id ' )
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT COUNT(*) as count FROM notifications WHERE user_id=? AND read=0 ' , ( user_id , ) )
row = c . fetchone ( )
conn . close ( )
return jsonify ( { ' count ' : row [ ' count ' ] if row else 0 } )
@app.post ( ' /api/notifications/mark-read ' )
@require_login
2025-11-23 05:28:53 +00:00
@require_any_role ( ' superadmin ' , ' admin ' )
2025-11-21 13:27:40 +00:00
def mark_notification_read ( ) :
""" 标记通知为已读 """
data = request . get_json ( ) or { }
notification_id = data . get ( ' id ' )
if not notification_id :
return jsonify ( { ' error ' : ' invalid id ' } ) , 400
user_id = session . get ( ' user_id ' )
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' UPDATE notifications SET read=1 WHERE id=? AND user_id=? ' , ( notification_id , user_id ) )
conn . commit ( )
conn . close ( )
return jsonify ( { ' ok ' : True } )
@app.post ( ' /api/notifications/mark-all-read ' )
@require_login
2025-11-23 05:28:53 +00:00
@require_any_role ( ' superadmin ' , ' admin ' )
2025-11-21 13:27:40 +00:00
def mark_all_notifications_read ( ) :
""" 标记所有通知为已读 """
user_id = session . get ( ' user_id ' )
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' UPDATE notifications SET read=1 WHERE user_id=? ' , ( user_id , ) )
conn . commit ( )
conn . close ( )
return jsonify ( { ' ok ' : True } )
@app.post ( ' /api/notifications/delete-read ' )
@require_login
2025-11-23 05:28:53 +00:00
@require_any_role ( ' superadmin ' , ' admin ' )
2025-11-21 13:27:40 +00:00
def delete_read_notifications ( ) :
""" 删除所有已读通知 """
user_id = session . get ( ' user_id ' )
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' DELETE FROM notifications WHERE user_id=? AND read=1 ' , ( user_id , ) )
deleted_count = c . rowcount
conn . commit ( )
conn . close ( )
return jsonify ( { ' ok ' : True , ' count ' : deleted_count } )
@app.errorhandler ( 404 )
def not_found ( _ ) :
return jsonify ( { ' error ' : ' not found ' } ) , 404
@app.route ( ' /api/validate/mac-file ' , methods = [ ' POST ' ] )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def validate_mac_file ( ) :
""" 验证Excel文件格式是否符合要求 """
f = request . files . get ( ' file ' )
if not f :
return jsonify ( { ' error ' : ' no file ' } ) , 400
name = secure_filename ( f . filename or ' ' )
ext = ( name . split ( ' . ' ) [ - 1 ] or ' ' ) . lower ( )
if ext not in [ ' csv ' , ' xlsx ' , ' xls ' ] :
return jsonify ( { ' valid ' : False , ' message ' : ' 文件格式不支持, 请上传CSV或Excel文件 ' } ) , 200
try :
if ext == ' csv ' :
text = f . stream . read ( ) . decode ( ' utf-8 ' , errors = ' ignore ' )
lines = [ l . strip ( ) for l in text . splitlines ( ) if l . strip ( ) ]
if not lines :
return jsonify ( { ' valid ' : False , ' message ' : ' 文件为空,没有数据 ' } ) , 200
# 检查第一行(表头)
header = [ h . strip ( ) for h in lines [ 0 ] . split ( ' , ' ) ]
if len ( header ) != 2 :
return jsonify ( { ' valid ' : False , ' message ' : f ' 文件应该只包含2列数据, 当前有 { len ( header ) } 列 ' } ) , 200
# 记录表头用于调试
log ( ' validate_mac_file_csv ' , f ' headers: { header } ' )
# 更灵活的表头检查(不区分大小写)
header_lower = [ h . lower ( ) for h in header ]
has_mac = any ( ' mac ' in h and ' sn ' not in h for h in header_lower )
has_sn_mac = any ( ' sn_mac ' in h or ' sn-mac ' in h for h in header_lower )
has_batch = any ( ' 批次 ' in h or ' batch ' in h for h in header_lower )
if not ( has_mac or has_sn_mac ) :
return jsonify ( { ' valid ' : False , ' message ' : f ' 缺少必需的列: MAC 或 SN_MAC( 当前列: { " , " . join ( header ) } ) ' } ) , 200
if not has_batch :
return jsonify ( { ' valid ' : False , ' message ' : f ' 缺少必需的列:批次号(当前列: { " , " . join ( header ) } ) ' } ) , 200
data_rows = len ( lines ) - 1
mac_col = ' MAC ' if has_mac else ' SN_MAC '
return jsonify ( { ' valid ' : True , ' message ' : f ' 文件格式正确,包含列: { mac_col } 和 批次号,共 { data_rows } 行数据 ' } ) , 200
else :
import openpyxl
wb = openpyxl . load_workbook ( f )
ws = wb . active
if ws . max_row < 2 :
wb . close ( )
return jsonify ( { ' valid ' : False , ' message ' : ' 文件为空,没有数据 ' } ) , 200
if ws . max_column != 2 :
wb . close ( )
return jsonify ( { ' valid ' : False , ' message ' : f ' 文件应该只包含2列数据, 当前有 { ws . max_column } 列 ' } ) , 200
# 检查表头
header_row = list ( ws . iter_rows ( min_row = 1 , max_row = 1 , values_only = True ) ) [ 0 ]
header = [ str ( h ) . strip ( ) if h else ' ' for h in header_row ]
# 记录表头用于调试
log ( ' validate_mac_file ' , f ' headers: { header } ' )
# 更灵活的表头检查(不区分大小写)
header_lower = [ h . lower ( ) for h in header ]
has_mac = any ( ' mac ' in h and ' sn ' not in h for h in header_lower )
has_sn_mac = any ( ' sn_mac ' in h or ' sn-mac ' in h for h in header_lower )
has_batch = any ( ' 批次 ' in h or ' batch ' in h for h in header_lower )
if not ( has_mac or has_sn_mac ) :
wb . close ( )
return jsonify ( { ' valid ' : False , ' message ' : f ' 缺少必需的列: MAC 或 SN_MAC( 当前列: { " , " . join ( header ) } ) ' } ) , 200
if not has_batch :
wb . close ( )
return jsonify ( { ' valid ' : False , ' message ' : f ' 缺少必需的列:批次号(当前列: { " , " . join ( header ) } ) ' } ) , 200
data_rows = ws . max_row - 1
mac_col = ' MAC ' if has_mac else ' SN_MAC '
wb . close ( )
return jsonify ( { ' valid ' : True , ' message ' : f ' 文件格式正确,包含列: { mac_col } 和 批次号,共 { data_rows } 行数据 ' } ) , 200
except Exception as e :
return jsonify ( { ' valid ' : False , ' message ' : f ' 读取文件失败: { str ( e ) } ' } ) , 200
@app.post ( ' /api/upload/mac-file ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def upload_mac_file ( ) :
import subprocess
import tempfile
f = request . files . get ( ' file ' )
upload_type = request . form . get ( ' type ' , ' pdd ' ) # pdd, yt, or tx
if not f :
return jsonify ( { ' error ' : ' no file ' } ) , 400
if upload_type not in [ ' pdd ' , ' yt ' , ' tx ' ] :
return jsonify ( { ' error ' : ' invalid type ' } ) , 400
# 保存上传的文件到临时位置
name = secure_filename ( f . filename or ' upload.xlsx ' )
temp_dir = ' /home/hyx/work/batch_import_xlsx '
os . makedirs ( temp_dir , exist_ok = True )
# 根据类型确定文件名
if upload_type == ' yt ' :
temp_path = os . path . join ( temp_dir , ' sn_test_yt.xlsx ' )
elif upload_type == ' pdd ' :
temp_path = os . path . join ( temp_dir , ' sn_test_pdd.xlsx ' )
else :
temp_path = os . path . join ( temp_dir , ' sn_test_tx.xlsx ' )
f . save ( temp_path )
# 调用batch_import.py脚本
script_path = ' /home/hyx/work/生产管理系统/batch_import.py '
python_path = ' /home/hyx/work/.venv/bin/python '
try :
result = subprocess . run (
[ python_path , script_path , upload_type ] ,
capture_output = True ,
text = True ,
timeout = 300 # 5分钟超时
)
output = result . stdout + result . stderr
success = result . returncode == 0
2025-11-21 14:45:00 +00:00
# 解析输出中的成功导入数据, 同时保存到SQLite数据库
if success :
import re
json_match = re . search ( r ' === 成功导入的数据 === \ n([ \ s \ S]*?) \ n=== 数据输出结束 === ' , output )
if json_match :
try :
import json as json_lib
records = json_lib . loads ( json_match . group ( 1 ) . strip ( ) )
if records and isinstance ( records , list ) :
# 保存到SQLite数据库, 使用北京时间( UTC+8)
conn = get_db ( )
c = conn . cursor ( )
from datetime import timezone , timedelta
beijing_tz = timezone ( timedelta ( hours = 8 ) )
now = datetime . now ( beijing_tz ) . isoformat ( )
for record in records :
mac = record . get ( ' mac ' )
batch = record . get ( ' batch ' )
if mac and batch :
2025-11-22 01:53:18 +00:00
c . execute ( ' INSERT INTO mac_batches(mac, batch, platform, ts) VALUES(?,?,?,?) ' , ( mac , batch , upload_type , now ) )
2025-11-21 14:45:00 +00:00
conn . commit ( )
conn . close ( )
log ( ' upload_mac_file_db ' , f " saved { len ( records ) } records to database " )
except Exception as e :
log ( ' upload_mac_file_db_error ' , str ( e ) )
2025-11-21 13:27:40 +00:00
log ( ' upload_mac_file ' , f " type= { upload_type } , success= { success } " )
if success :
notify_superadmin ( ' 批量上传MAC文件 ' , f " 类型: { upload_type } " )
return jsonify ( {
' ok ' : success ,
' output ' : output ,
' returncode ' : result . returncode
} )
except subprocess . TimeoutExpired :
return jsonify ( { ' error ' : ' 上传超时 ' , ' output ' : ' 处理时间超过5分钟 ' } ) , 500
except Exception as e :
return jsonify ( { ' error ' : str ( e ) , ' output ' : ' ' } ) , 500
@app.post ( ' /api/upload/defects-file ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def upload_defects_file ( ) :
f = request . files . get ( ' file ' )
if not f :
return jsonify ( { ' error ' : ' no file ' } ) , 400
name = secure_filename ( f . filename or ' ' )
ext = ( name . split ( ' . ' ) [ - 1 ] or ' ' ) . lower ( )
rows = [ ]
if ext == ' csv ' :
text = f . stream . read ( ) . decode ( ' utf-8 ' , errors = ' ignore ' )
for l in text . splitlines ( ) :
parts = [ p . strip ( ) for p in l . split ( ' , ' ) ]
if len ( parts ) > = 2 :
rows . append ( { ' mac ' : parts [ 0 ] , ' batch ' : parts [ 1 ] } )
else :
try :
import openpyxl
wb = openpyxl . load_workbook ( f )
ws = wb . active
for r in ws . iter_rows ( values_only = True ) :
mac = str ( r [ 0 ] ) . strip ( ) if r and r [ 0 ] else None
batch = str ( r [ 1 ] ) . strip ( ) if r and len ( r ) > 1 and r [ 1 ] else None
if mac and batch :
rows . append ( { ' mac ' : mac , ' batch ' : batch } )
except Exception :
return jsonify ( { ' error ' : ' parse error ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
2025-11-21 14:45:00 +00:00
now = get_beijing_time ( )
2025-11-21 13:27:40 +00:00
for r in rows :
c . execute ( ' INSERT INTO defects(mac, batch, ts) VALUES(?,?,?) ' , ( r [ ' mac ' ] , r [ ' batch ' ] , now ) )
conn . commit ( )
conn . close ( )
log ( ' upload_defects_file ' , f " count= { len ( rows ) } " )
notify_superadmin ( ' 批量上传不良明细文件 ' , f " 上传了 { len ( rows ) } 条记录 " )
return jsonify ( { ' ok ' : True , ' count ' : len ( rows ) } )
@app.route ( ' /api/validate/shipments-file ' , methods = [ ' POST ' ] )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def validate_shipments_file ( ) :
""" 验证发货记录Excel文件格式 """
f = request . files . get ( ' file ' )
if not f :
return jsonify ( { ' error ' : ' no file ' } ) , 400
name = secure_filename ( f . filename or ' ' )
ext = ( name . split ( ' . ' ) [ - 1 ] or ' ' ) . lower ( )
if ext not in [ ' csv ' , ' xlsx ' , ' xls ' ] :
return jsonify ( { ' valid ' : False , ' message ' : ' 文件格式不支持, 请上传CSV或Excel文件 ' } ) , 200
try :
if ext == ' csv ' :
text = f . stream . read ( ) . decode ( ' utf-8 ' , errors = ' ignore ' )
lines = [ l . strip ( ) for l in text . splitlines ( ) if l . strip ( ) ]
if not lines :
return jsonify ( { ' valid ' : False , ' message ' : ' 文件为空,没有数据 ' } ) , 200
header = [ h . strip ( ) for h in lines [ 0 ] . split ( ' , ' ) ]
header_lower = [ h . lower ( ) for h in header ]
# 检查必需的列
has_date = any ( ' 出货日期 ' in h or ' 发货日期 ' in h or ' date ' in hl for h , hl in zip ( header , header_lower ) )
has_box = any ( ' 箱号 ' in h or ' box ' in hl for h , hl in zip ( header , header_lower ) )
if not has_date :
return jsonify ( { ' valid ' : False , ' message ' : ' 缺少必需的列:出货日期 ' } ) , 200
if not has_box :
return jsonify ( { ' valid ' : False , ' message ' : ' 缺少必需的列:箱号 ' } ) , 200
# 检查SN列( SN1-SN20)
sn_cols = [ h for h in header if h . startswith ( ' SN ' ) and h [ 2 : ] . isdigit ( ) ]
if not sn_cols :
return jsonify ( { ' valid ' : False , ' message ' : ' 缺少SN列( SN1, SN2, ... SN20) ' } ) , 200
data_rows = len ( lines ) - 1
return jsonify ( { ' valid ' : True , ' message ' : f ' 文件格式正确,包含 { len ( sn_cols ) } 个SN列, 共 { data_rows } 行数据 ' } ) , 200
else :
import openpyxl
wb = openpyxl . load_workbook ( f )
ws = wb . active
if ws . max_row < 2 :
wb . close ( )
return jsonify ( { ' valid ' : False , ' message ' : ' 文件为空,没有数据 ' } ) , 200
# 检查表头
header_row = list ( ws . iter_rows ( min_row = 1 , max_row = 1 , values_only = True ) ) [ 0 ]
header = [ str ( h ) . strip ( ) if h else ' ' for h in header_row ]
header_lower = [ h . lower ( ) for h in header ]
# 检查必需的列
has_date = any ( ' 出货日期 ' in h or ' 发货日期 ' in h or ' date ' in hl for h , hl in zip ( header , header_lower ) )
has_box = any ( ' 箱号 ' in h or ' box ' in hl for h , hl in zip ( header , header_lower ) )
if not has_date :
wb . close ( )
return jsonify ( { ' valid ' : False , ' message ' : ' 缺少必需的列:出货日期 ' } ) , 200
if not has_box :
wb . close ( )
return jsonify ( { ' valid ' : False , ' message ' : ' 缺少必需的列:箱号 ' } ) , 200
# 检查SN列
sn_cols = [ h for h in header if h . startswith ( ' SN ' ) and h [ 2 : ] . isdigit ( ) ]
if not sn_cols :
wb . close ( )
return jsonify ( { ' valid ' : False , ' message ' : ' 缺少SN列( SN1, SN2, ... SN20) ' } ) , 200
data_rows = ws . max_row - 1
wb . close ( )
return jsonify ( { ' valid ' : True , ' message ' : f ' 文件格式正确,包含 { len ( sn_cols ) } 个SN列, 共 { data_rows } 行数据 ' } ) , 200
except Exception as e :
return jsonify ( { ' valid ' : False , ' message ' : f ' 读取文件失败: { str ( e ) } ' } ) , 200
@app.get ( ' /api/shipments/query-by-sn ' )
@require_login
def query_shipment_by_sn ( ) :
""" 通过 SN/MAC 号查询出货信息 """
sn = request . args . get ( ' sn ' , ' ' ) . strip ( )
if not sn :
return jsonify ( { ' error ' : ' 请提供 SN/MAC 号 ' } ) , 400
try :
r = get_redis ( )
redis_key = ' shipment_sn_mapping '
# 从 Redis Hash 中查询
result = r . hget ( redis_key , sn )
if result :
# 解析 JSON 数据
shipment_info = json . loads ( result )
platform = shipment_info . get ( ' platform ' , ' pdd ' ) # 默认拼多多
platform_name = { ' pdd ' : ' 拼多多 ' , ' yt ' : ' 圆通 ' , ' tx ' : ' 兔喜 ' } . get ( platform , platform )
return jsonify ( {
' found ' : True ,
' sn ' : sn ,
' date ' : shipment_info . get ( ' date ' ) ,
' box ' : shipment_info . get ( ' box ' ) ,
' platform ' : platform ,
' platform_name ' : platform_name ,
' ts ' : shipment_info . get ( ' ts ' )
} )
else :
return jsonify ( {
' found ' : False ,
' sn ' : sn ,
' message ' : ' 未找到该 SN 的出货记录 '
} )
except Exception as e :
log ( ' query_shipment_error ' , str ( e ) )
return jsonify ( { ' error ' : f ' 查询失败: { str ( e ) } ' } ) , 500
2025-11-24 03:11:13 +00:00
@app.get ( ' /api/shipments/query-by-box ' )
@require_login
def query_shipment_by_box ( ) :
""" 通过箱号查询出货信息 """
box_no = request . args . get ( ' box ' , ' ' ) . strip ( )
if not box_no :
return jsonify ( { ' error ' : ' 请提供箱号 ' } ) , 400
try :
r = get_redis ( )
redis_key = ' shipment_sn_mapping '
# 获取所有记录
all_records = r . hgetall ( redis_key )
# 筛选出匹配箱号的记录
matched_records = [ ]
for sn , data in all_records . items ( ) :
try :
shipment_info = json . loads ( data )
if shipment_info . get ( ' box ' ) == box_no :
platform = shipment_info . get ( ' platform ' , ' pdd ' )
platform_name = { ' pdd ' : ' 拼多多 ' , ' yt ' : ' 圆通 ' , ' tx ' : ' 兔喜 ' } . get ( platform , platform )
matched_records . append ( {
' sn ' : sn . decode ( ' utf-8 ' ) if isinstance ( sn , bytes ) else sn ,
' date ' : shipment_info . get ( ' date ' ) ,
' box ' : shipment_info . get ( ' box ' ) ,
' platform ' : platform ,
' platform_name ' : platform_name ,
' ts ' : shipment_info . get ( ' ts ' )
} )
except :
continue
if matched_records :
return jsonify ( {
' found ' : True ,
' box ' : box_no ,
' count ' : len ( matched_records ) ,
' records ' : matched_records
} )
else :
return jsonify ( {
' found ' : False ,
' box ' : box_no ,
' message ' : ' 未找到该箱号的出货记录 '
} )
except Exception as e :
log ( ' query_shipment_by_box_error ' , str ( e ) )
return jsonify ( { ' error ' : f ' 查询失败: { str ( e ) } ' } ) , 500
2025-11-21 13:27:40 +00:00
@app.post ( ' /api/shipments/update-platform ' )
@require_login
@require_any_role ( ' superadmin ' )
def update_shipments_platform ( ) :
""" 批量更新Redis中发货记录的机种字段 """
try :
r = get_redis ( )
redis_key = ' shipment_sn_mapping '
# 获取所有记录
all_data = r . hgetall ( redis_key )
updated_count = 0
pipe = r . pipeline ( )
for sn , value in all_data . items ( ) :
try :
info = json . loads ( value )
# 如果没有platform字段, 添加为pdd
if ' platform ' not in info :
info [ ' platform ' ] = ' pdd '
pipe . hset ( redis_key , sn , json . dumps ( info , ensure_ascii = False ) )
updated_count + = 1
except Exception :
continue
pipe . execute ( )
log ( ' update_shipments_platform ' , f ' updated { updated_count } records ' )
return jsonify ( {
' ok ' : True ,
' message ' : f ' 已更新 { updated_count } 条记录为拼多多 ' ,
' updated ' : updated_count
} )
except Exception as e :
log ( ' update_shipments_platform_error ' , str ( e ) )
return jsonify ( { ' error ' : f ' 更新失败: { str ( e ) } ' } ) , 500
@app.get ( ' /api/shipments/redis-stats ' )
@require_login
def shipments_redis_stats ( ) :
""" 获取 Redis 中发货记录的统计信息 """
try :
r = get_redis ( )
redis_key = ' shipment_sn_mapping '
count = r . hlen ( redis_key )
return jsonify ( {
' key ' : redis_key ,
' count ' : count ,
' exists ' : r . exists ( redis_key ) > 0
} )
except Exception as e :
log ( ' shipments_redis_stats_error ' , str ( e ) )
return jsonify ( { ' error ' : f ' 获取统计失败: { str ( e ) } ' } ) , 500
2025-11-21 14:04:43 +00:00
@app.get ( ' /api/shipments/summary ' )
@require_login
def shipments_summary ( ) :
""" 查询发货记录汇总信息(按日期范围) """
try :
start_date = request . args . get ( ' start ' )
end_date = request . args . get ( ' end ' )
if not start_date or not end_date :
return jsonify ( { ' error ' : ' 请提供开始和结束日期 ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
# 查询指定日期范围内的发货记录
c . execute ( '''
SELECT date , qty , receiver , ts
FROM shipments
WHERE date > = ? AND date < = ?
ORDER BY date DESC
''' , (start_date, end_date))
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
log ( ' shipments_summary ' , f ' start= { start_date } , end= { end_date } , count= { len ( rows ) } ' )
return jsonify ( {
' ok ' : True ,
' records ' : rows ,
' count ' : len ( rows )
} )
except Exception as e :
log ( ' shipments_summary_error ' , str ( e ) )
return jsonify ( { ' error ' : f ' 查询失败: { str ( e ) } ' } ) , 500
2025-11-21 13:27:40 +00:00
@app.post ( ' /api/shipments/clear-redis ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def clear_shipments_redis ( ) :
2025-11-21 14:04:43 +00:00
""" 清空 Redis 和 SQLite 中的发货记录数据 """
2025-11-21 13:27:40 +00:00
try :
2025-11-21 14:04:43 +00:00
# 清空 Redis
2025-11-21 13:27:40 +00:00
r = get_redis ( )
redis_key = ' shipment_sn_mapping '
2025-11-21 14:04:43 +00:00
redis_count = r . hlen ( redis_key )
2025-11-21 13:27:40 +00:00
r . delete ( redis_key )
2025-11-21 14:04:43 +00:00
# 同时清空 SQLite 中的 shipments 表
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT COUNT(*) as cnt FROM shipments ' )
sqlite_count = c . fetchone ( ) [ ' cnt ' ]
c . execute ( ' DELETE FROM shipments ' )
conn . commit ( )
conn . close ( )
log ( ' clear_shipments_all ' , f ' cleared redis= { redis_count } , sqlite= { sqlite_count } ' )
2025-11-21 13:27:40 +00:00
return jsonify ( {
' ok ' : True ,
2025-11-21 14:04:43 +00:00
' message ' : f ' 已清空 Redis { redis_count } 条记录和 SQLite { sqlite_count } 条记录 ' ,
' redis_count ' : redis_count ,
' sqlite_count ' : sqlite_count
2025-11-21 13:27:40 +00:00
} )
except Exception as e :
log ( ' clear_shipments_redis_error ' , str ( e ) )
return jsonify ( { ' error ' : f ' 清空失败: { str ( e ) } ' } ) , 500
@app.route ( ' /api/upload/shipments-file ' , methods = [ ' POST ' ] )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def upload_shipments_file ( ) :
""" 上传发货记录Excel文件 """
f = request . files . get ( ' file ' )
platform = request . form . get ( ' platform ' ) # 获取机种参数
if not f :
return jsonify ( { ' error ' : ' 请选择文件 ' } ) , 400
if not platform or platform not in [ ' pdd ' , ' yt ' , ' tx ' ] :
return jsonify ( { ' error ' : ' 请选择机种(拼多多/圆通/兔喜) ' } ) , 400
name = secure_filename ( f . filename or ' ' )
ext = ( name . split ( ' . ' ) [ - 1 ] or ' ' ) . lower ( )
if ext not in [ ' csv ' , ' xlsx ' , ' xls ' ] :
return jsonify ( { ' error ' : ' 文件格式不支持 ' } ) , 400
try :
rows = [ ]
if ext == ' csv ' :
text = f . stream . read ( ) . decode ( ' utf-8 ' , errors = ' ignore ' )
lines = [ l . strip ( ) for l in text . splitlines ( ) if l . strip ( ) ]
if len ( lines ) < 2 :
return jsonify ( { ' error ' : ' 文件为空 ' } ) , 400
header = [ h . strip ( ) for h in lines [ 0 ] . split ( ' , ' ) ]
header_lower = [ h . lower ( ) for h in header ]
# 找到列索引
date_idx = next ( ( i for i , h in enumerate ( header ) if ' 出货日期 ' in h or ' 发货日期 ' in h or ' date ' in header_lower [ i ] ) , None )
box_idx = next ( ( i for i , h in enumerate ( header ) if ' 箱号 ' in h or ' box ' in header_lower [ i ] ) , None )
if date_idx is None or box_idx is None :
return jsonify ( { ' error ' : ' 缺少必需的列 ' } ) , 400
# 找到所有SN列( 按数字排序)
sn_cols = [ ( i , h ) for i , h in enumerate ( header ) if h . startswith ( ' SN ' ) and h [ 2 : ] . isdigit ( ) ]
sn_indices = sorted ( sn_cols , key = lambda x : int ( x [ 1 ] [ 2 : ] ) ) # 按 SN 后面的数字排序
# 记录上一个有效的日期(用于处理合并单元格)
last_valid_date = None
for line in lines [ 1 : ] :
parts = [ p . strip ( ) for p in line . split ( ' , ' ) ]
if len ( parts ) < = max ( date_idx , box_idx ) :
continue
# 处理日期(合并单元格时可能为空)
current_date = parts [ date_idx ] if date_idx < len ( parts ) and parts [ date_idx ] else ' '
# 如果当前行日期为空,使用上一个有效日期
if current_date :
last_valid_date = current_date
date = current_date
else :
date = last_valid_date
# 处理箱号
box = parts [ box_idx ] if box_idx < len ( parts ) and parts [ box_idx ] else ' '
# 如果没有日期或箱号,跳过这行
if not date or not box :
continue
# 收集所有SN( 横向 20 个)
sns = [ ]
for idx , _ in sn_indices :
if idx < len ( parts ) and parts [ idx ] :
sns . append ( parts [ idx ] )
# 只有当有 SN 数据时才添加记录
if sns :
rows . append ( {
' date ' : date ,
' box ' : box ,
' sns ' : sns ,
' qty ' : len ( sns )
} )
else :
import openpyxl
wb = openpyxl . load_workbook ( f )
ws = wb . active
if ws . max_row < 2 :
wb . close ( )
return jsonify ( { ' error ' : ' 文件为空 ' } ) , 400
# 读取表头
header_row = list ( ws . iter_rows ( min_row = 1 , max_row = 1 , values_only = True ) ) [ 0 ]
header = [ str ( h ) . strip ( ) if h else ' ' for h in header_row ]
header_lower = [ h . lower ( ) for h in header ]
# 找到列索引
date_idx = next ( ( i for i , h in enumerate ( header ) if ' 出货日期 ' in h or ' 发货日期 ' in h or ' date ' in header_lower [ i ] ) , None )
box_idx = next ( ( i for i , h in enumerate ( header ) if ' 箱号 ' in h or ' box ' in header_lower [ i ] ) , None )
if date_idx is None or box_idx is None :
wb . close ( )
return jsonify ( { ' error ' : ' 缺少必需的列 ' } ) , 400
# 找到所有SN列( 按数字排序)
sn_cols = [ ( i , h ) for i , h in enumerate ( header ) if h . startswith ( ' SN ' ) and h [ 2 : ] . isdigit ( ) ]
sn_indices = sorted ( sn_cols , key = lambda x : int ( x [ 1 ] [ 2 : ] ) ) # 按 SN 后面的数字排序
# 记录上一个有效的日期(用于处理合并单元格)
last_valid_date = None
# 读取数据行
for row in ws . iter_rows ( min_row = 2 , values_only = True ) :
# 处理日期(合并单元格时可能为 None)
date_value = row [ date_idx ] if date_idx < len ( row ) else None
current_date = None
if date_value :
# 如果是 datetime.datetime 或 datetime.date 对象
if hasattr ( date_value , ' strftime ' ) :
current_date = date_value . strftime ( ' % Y- % m- %d ' )
# 如果是数字( Excel 日期序列号)
elif isinstance ( date_value , ( int , float ) ) :
try :
# Excel 日期从 1900-01-01 开始计数
from datetime import datetime as dt , timedelta
# Excel 的 bug: 1900 不是闰年但 Excel 认为是, 所以需要减1
excel_epoch = dt ( 1899 , 12 , 30 )
date_obj = excel_epoch + timedelta ( days = float ( date_value ) )
current_date = date_obj . strftime ( ' % Y- % m- %d ' )
except Exception :
current_date = str ( date_value ) . strip ( )
else :
current_date = str ( date_value ) . strip ( )
if current_date == ' None ' :
current_date = None
# 如果当前行日期为空,使用上一个有效日期
if current_date :
last_valid_date = current_date
date = current_date
else :
date = last_valid_date
# 处理箱号
box = str ( row [ box_idx ] ) . strip ( ) if box_idx < len ( row ) and row [ box_idx ] else ' '
# 如果没有日期或箱号,跳过这行
if not date or not box or box == ' None ' :
continue
# 收集所有SN( 横向 20 个)
sns = [ ]
for idx , _ in sn_indices :
if idx < len ( row ) and row [ idx ] :
sn_value = str ( row [ idx ] ) . strip ( )
if sn_value and sn_value != ' None ' :
sns . append ( sn_value )
# 只有当有 SN 数据时才添加记录
if sns :
rows . append ( {
' date ' : date ,
' box ' : box ,
' sns ' : sns ,
' qty ' : len ( sns )
} )
wb . close ( )
# 保存到 SQLite 数据库
conn = get_db ( )
c = conn . cursor ( )
# 使用北京时间( UTC+8)
from datetime import timezone , timedelta
beijing_tz = timezone ( timedelta ( hours = 8 ) )
now = datetime . now ( beijing_tz ) . strftime ( ' % Y- % m- %d % H: % M: % S ' )
total_qty = 0
for r in rows :
receiver_info = f " 箱号: { r [ ' box ' ] } "
c . execute ( ' INSERT INTO shipments(date, qty, receiver, ts) VALUES(?,?,?,?) ' ,
( r [ ' date ' ] , r [ ' qty ' ] , receiver_info , now ) )
total_qty + = r [ ' qty ' ]
conn . commit ( )
conn . close ( )
# 保存到 Redis - 使用 Hash 结构存储 MAC -> 出货信息的映射
try :
r = get_redis ( )
redis_key = ' shipment_sn_mapping ' # Redis Hash key
# 使用 pipeline 批量写入,提高性能
pipe = r . pipeline ( )
redis_count = 0
for row_data in rows :
date = row_data [ ' date ' ]
box = row_data [ ' box ' ]
sns = row_data [ ' sns ' ]
# 为每个 SN/MAC 创建映射记录
for sn in sns :
if sn : # 确保 SN 不为空
# 存储格式: MAC -> JSON(date, box, platform, timestamp)
shipment_info = json . dumps ( {
' date ' : date ,
' box ' : box ,
' platform ' : platform ,
' ts ' : now
} , ensure_ascii = False )
pipe . hset ( redis_key , sn , shipment_info )
redis_count + = 1
# 执行批量写入
pipe . execute ( )
log ( ' upload_shipments_redis ' , f " redis_key= { redis_key } , sn_count= { redis_count } " )
except Exception as redis_error :
# Redis 写入失败不影响主流程,只记录日志
log ( ' upload_shipments_redis_error ' , str ( redis_error ) )
log ( ' upload_shipments_file ' , f " boxes= { len ( rows ) } , total_qty= { total_qty } " )
notify_superadmin ( ' 批量上传发货记录文件 ' , f " 箱数: { len ( rows ) } , 总数量: { total_qty } " )
return jsonify ( { ' ok ' : True , ' count ' : len ( rows ) , ' total_qty ' : total_qty } )
except Exception as e :
log ( ' upload_shipments_file_error ' , str ( e ) )
return jsonify ( { ' error ' : f ' 处理文件失败: { str ( e ) } ' } ) , 500
2025-11-22 12:40:46 +00:00
# SOP 文件管理
SOP_DIR = os . path . join ( FRONTEND_DIR , ' sop_files ' )
os . makedirs ( SOP_DIR , exist_ok = True )
@app.get ( ' /api/sop/list ' )
@require_login
def list_sop_files ( ) :
""" 获取所有 SOP 文件列表 """
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT id, filename, original_name, description, uploader, ts FROM sop_files ORDER BY id DESC ' )
rows = [ dict ( r ) for r in c . fetchall ( ) ]
conn . close ( )
return jsonify ( { ' list ' : rows } )
@app.post ( ' /api/sop/upload ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def upload_sop_file ( ) :
""" 上传 SOP 文件 """
f = request . files . get ( ' file ' )
description = request . form . get ( ' description ' , ' ' ) . strip ( )
if not f :
return jsonify ( { ' error ' : ' 请选择文件 ' } ) , 400
# 保留原始文件名(包含中文)
original_name = f . filename or ' sop.xlsx '
# 获取扩展名
if ' . ' in original_name :
ext = original_name . rsplit ( ' . ' , 1 ) [ 1 ] . lower ( )
else :
ext = ' '
# 验证文件类型( Excel 和 Word 文件)
if ext not in [ ' xlsx ' , ' xls ' , ' csv ' , ' doc ' , ' docx ' ] :
return jsonify ( { ' error ' : ' 只支持 Excel 和 Word 文件格式(.xlsx, .xls, .csv, .doc, .docx) ' } ) , 400
# 生成唯一文件名(用于存储)
timestamp = datetime . now ( ) . strftime ( ' % Y % m %d % H % M % S ' )
# 存储文件名使用安全的名称
safe_name = secure_filename ( original_name )
if not safe_name or safe_name == ext :
# 如果 secure_filename 返回空或只有扩展名,使用时间戳
safe_name = f ' file. { ext } '
filename = f ' sop_ { timestamp } _ { safe_name } '
filepath = os . path . join ( SOP_DIR , filename )
# 保存文件
f . save ( filepath )
# 保存到数据库
conn = get_db ( )
c = conn . cursor ( )
uploader = session . get ( ' user_id ' )
# 获取上传者用户名
c . execute ( ' SELECT username FROM users WHERE id=? ' , ( uploader , ) )
user = c . fetchone ( )
uploader_name = user [ ' username ' ] if user else ' 未知 '
c . execute ( ' INSERT INTO sop_files(filename, original_name, description, uploader, ts) VALUES(?,?,?,?,?) ' ,
( filename , original_name , description , uploader_name , get_beijing_time ( ) ) )
conn . commit ( )
conn . close ( )
log ( ' upload_sop ' , f ' filename= { original_name } , description= { description } ' )
notify_superadmin ( ' 上传 SOP 文件 ' , f ' 文件名: { original_name } ' )
return jsonify ( { ' ok ' : True , ' message ' : ' SOP 文件上传成功 ' } )
@app.get ( ' /api/sop/download/<int:file_id> ' )
@require_login
def download_sop_file ( file_id ) :
""" 下载 SOP 文件 """
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT filename, original_name FROM sop_files WHERE id=? ' , ( file_id , ) )
row = c . fetchone ( )
conn . close ( )
if not row :
return jsonify ( { ' error ' : ' 文件不存在 ' } ) , 404
filepath = os . path . join ( SOP_DIR , row [ ' filename ' ] )
if not os . path . exists ( filepath ) :
return jsonify ( { ' error ' : ' 文件已被删除 ' } ) , 404
log ( ' download_sop ' , f ' file_id= { file_id } , filename= { row [ " original_name " ] } ' )
from flask import send_file
return send_file (
filepath ,
as_attachment = True ,
download_name = row [ ' original_name ' ]
)
@app.post ( ' /api/sop/delete/<int:file_id> ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def delete_sop_file ( file_id ) :
""" 删除 SOP 文件 """
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' SELECT filename, original_name FROM sop_files WHERE id=? ' , ( file_id , ) )
row = c . fetchone ( )
if not row :
conn . close ( )
return jsonify ( { ' error ' : ' 文件不存在 ' } ) , 404
# 删除物理文件
filepath = os . path . join ( SOP_DIR , row [ ' filename ' ] )
if os . path . exists ( filepath ) :
try :
os . remove ( filepath )
except Exception as e :
log ( ' delete_sop_file_error ' , str ( e ) )
# 删除数据库记录
c . execute ( ' DELETE FROM sop_files WHERE id=? ' , ( file_id , ) )
conn . commit ( )
conn . close ( )
log ( ' delete_sop ' , f ' file_id= { file_id } , filename= { row [ " original_name " ] } ' )
notify_superadmin ( ' 删除 SOP 文件 ' , f ' 文件名: { row [ " original_name " ] } ' )
return jsonify ( { ' ok ' : True , ' message ' : ' SOP 文件已删除 ' } )
2025-11-23 05:28:53 +00:00
# ==================== 工单管理 API ====================
@app.get ( ' /api/work-orders ' )
@require_login
def get_work_orders ( ) :
""" 获取工单列表 """
factory = request . args . get ( ' factory ' , ' ' )
order_no = request . args . get ( ' order ' , ' ' )
date = request . args . get ( ' date ' , ' ' )
conn = get_db ( )
c = conn . cursor ( )
query = ' SELECT * FROM work_orders WHERE 1=1 '
params = [ ]
if factory :
query + = ' AND factory LIKE ? '
params . append ( f ' % { factory } % ' )
if order_no :
query + = ' AND order_no LIKE ? '
params . append ( f ' % { order_no } % ' )
if date :
query + = ' AND (production_start_time LIKE ? OR production_end_time LIKE ?) '
params . append ( f ' { date } % ' )
params . append ( f ' { date } % ' )
query + = ' ORDER BY created_at DESC '
c . execute ( query , params )
rows = c . fetchall ( )
conn . close ( )
orders = [ ]
for row in rows :
orders . append ( {
' id ' : str ( row [ ' id ' ] ) ,
' factory ' : row [ ' factory ' ] ,
' orderNo ' : row [ ' order_no ' ] ,
' productModel ' : row [ ' product_model ' ] if ' product_model ' in row . keys ( ) else ' ' ,
' orderQty ' : row [ ' order_qty ' ] ,
' productionStartTime ' : row [ ' production_start_time ' ] ,
' productionEndTime ' : row [ ' production_end_time ' ] ,
' status ' : row [ ' status ' ] ,
' statusText ' : row [ ' status_text ' ] ,
' remark ' : row [ ' remark ' ] ,
' createdBy ' : row [ ' created_by ' ] ,
' createdAt ' : row [ ' created_at ' ]
} )
return jsonify ( { ' ok ' : True , ' data ' : orders } )
@app.post ( ' /api/work-orders ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def create_work_order ( ) :
""" 创建工单 """
data = request . get_json ( )
factory = data . get ( ' factory ' , ' ' ) . strip ( )
order_no = data . get ( ' orderNo ' , ' ' ) . strip ( )
product_model = data . get ( ' productModel ' , ' ' ) . strip ( )
order_qty = data . get ( ' orderQty ' , 0 )
production_start_time = data . get ( ' productionStartTime ' , ' ' )
production_end_time = data . get ( ' productionEndTime ' , ' ' )
remark = data . get ( ' remark ' , ' ' ) . strip ( )
if not factory or not order_no or not order_qty :
return jsonify ( { ' error ' : ' 请填写所有必填项 ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
username = session . get ( ' username ' , ' ' )
now = get_beijing_time ( )
c . execute ( ''' INSERT INTO work_orders(
factory , order_no , product_model , order_qty , production_start_time , production_end_time ,
status , status_text , remark , created_by , created_at , updated_at
) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? ) ''' , (
factory , order_no , product_model , order_qty , production_start_time , production_end_time ,
' issued ' , ' 已下发 ' , remark , username , now , now
) )
order_id = c . lastrowid
conn . commit ( )
conn . close ( )
log ( ' create_work_order ' , f ' 工单号: { order_no } , 工厂: { factory } , 型号: { product_model } ' )
# 如果是超级管理员添加工单,通知所有管理员
notify_admins ( ' 添加工单 ' , f ' 工单号: { order_no } , 工厂: { factory } , 数量: { order_qty } ' )
return jsonify ( { ' ok ' : True , ' id ' : order_id , ' message ' : ' 工单创建成功 ' } )
@app.put ( ' /api/work-orders/<int:order_id> ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def update_work_order ( order_id ) :
""" 更新工单 """
data = request . get_json ( )
factory = data . get ( ' factory ' , ' ' ) . strip ( )
order_no = data . get ( ' orderNo ' , ' ' ) . strip ( )
product_model = data . get ( ' productModel ' , ' ' ) . strip ( )
order_qty = data . get ( ' orderQty ' , 0 )
production_start_time = data . get ( ' productionStartTime ' , ' ' )
production_end_time = data . get ( ' productionEndTime ' , ' ' )
remark = data . get ( ' remark ' , ' ' ) . strip ( )
if not factory or not order_no or not order_qty :
return jsonify ( { ' error ' : ' 请填写所有必填项 ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
# 检查工单是否存在
c . execute ( ' SELECT id FROM work_orders WHERE id=? ' , ( order_id , ) )
if not c . fetchone ( ) :
conn . close ( )
return jsonify ( { ' error ' : ' 工单不存在 ' } ) , 404
now = get_beijing_time ( )
c . execute ( ''' UPDATE work_orders SET
factory = ? , order_no = ? , product_model = ? , order_qty = ? , production_start_time = ? , production_end_time = ? ,
remark = ? , updated_at = ?
WHERE id = ? ''' , (
factory , order_no , product_model , order_qty , production_start_time , production_end_time ,
remark , now , order_id
) )
conn . commit ( )
conn . close ( )
log ( ' update_work_order ' , f ' 工单ID: { order_id } , 工单号: { order_no } , 型号: { product_model } ' )
notify_superadmin ( ' 更新工单 ' , f ' 工单号: { order_no } , 工厂: { factory } , 型号: { product_model } ' )
return jsonify ( { ' ok ' : True , ' message ' : ' 工单更新成功 ' } )
@app.delete ( ' /api/work-orders/<int:order_id> ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def delete_work_order ( order_id ) :
""" 删除工单 """
conn = get_db ( )
c = conn . cursor ( )
# 获取工单信息用于日志
c . execute ( ' SELECT order_no, factory FROM work_orders WHERE id=? ' , ( order_id , ) )
row = c . fetchone ( )
if not row :
conn . close ( )
return jsonify ( { ' error ' : ' 工单不存在 ' } ) , 404
order_no = row [ ' order_no ' ]
factory = row [ ' factory ' ]
c . execute ( ' DELETE FROM work_orders WHERE id=? ' , ( order_id , ) )
conn . commit ( )
conn . close ( )
log ( ' delete_work_order ' , f ' 工单ID: { order_id } , 工单号: { order_no } ' )
notify_superadmin ( ' 删除工单 ' , f ' 工单号: { order_no } , 工厂: { factory } ' )
return jsonify ( { ' ok ' : True , ' message ' : ' 工单删除成功 ' } )
@app.post ( ' /api/work-orders/<int:order_id>/confirm ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def confirm_work_order ( order_id ) :
""" 确认工单 """
conn = get_db ( )
c = conn . cursor ( )
# 检查工单是否存在
c . execute ( ' SELECT order_no, factory, status FROM work_orders WHERE id=? ' , ( order_id , ) )
row = c . fetchone ( )
if not row :
conn . close ( )
return jsonify ( { ' error ' : ' 工单不存在 ' } ) , 404
order_no = row [ ' order_no ' ]
factory = row [ ' factory ' ]
current_status = row [ ' status ' ]
# 如果已经确认,返回提示
if current_status == ' confirmed ' :
conn . close ( )
return jsonify ( { ' ok ' : True , ' message ' : ' 工单已确认 ' } )
# 更新状态为已确认
now = get_beijing_time ( )
c . execute ( ''' UPDATE work_orders SET
status = ? , status_text = ? , updated_at = ?
WHERE id = ? ''' , (
' confirmed ' , ' 已确认 ' , now , order_id
) )
conn . commit ( )
conn . close ( )
log ( ' confirm_work_order ' , f ' 工单ID: { order_id } , 工单号: { order_no } ' )
notify_superadmin ( ' 确认工单 ' , f ' 工单号: { order_no } , 工厂: { factory } ' )
return jsonify ( { ' ok ' : True , ' message ' : ' 工单确认成功 ' } )
2025-11-24 01:30:08 +00:00
# ==================== 物料清单-采购 API ====================
def convert_material_purchase_to_camel ( row ) :
""" 将数据库行转换为驼峰命名的字典 """
return {
' id ' : row [ ' id ' ] ,
' title ' : row [ ' title ' ] ,
' listNo ' : row [ ' list_no ' ] ,
' planNo ' : row [ ' plan_no ' ] ,
' bomResult ' : row [ ' bom_result ' ] ,
' status ' : row [ ' status ' ] ,
' demandStatus ' : row [ ' demand_status ' ] ,
' completeRate ' : row [ ' complete_rate ' ] ,
' materialCode ' : row [ ' material_code ' ] ,
' materialName ' : row [ ' material_name ' ] ,
' batchNo ' : row [ ' batch_no ' ] ,
' level ' : row [ ' level ' ] ,
' requiredQty ' : row [ ' required_qty ' ] ,
' stockQty ' : row [ ' stock_qty ' ] ,
' shortage ' : row [ ' shortage ' ] ,
' acquireMethod ' : row [ ' acquire_method ' ] ,
' realtimeStock ' : row [ ' realtime_stock ' ] ,
' pendingQty ' : row [ ' pending_qty ' ] ,
' dispatchedQty ' : row [ ' dispatched_qty ' ] ,
' receivedQty ' : row [ ' received_qty ' ] ,
' submitter ' : row [ ' submitter ' ] ,
' submitTime ' : row [ ' submit_time ' ] ,
' updateTime ' : row [ ' update_time ' ] ,
' deleted ' : row [ ' deleted ' ] ,
' deletedAt ' : row [ ' deleted_at ' ]
}
@app.get ( ' /api/material-purchase/list ' )
@require_login
def list_material_purchase ( ) :
""" 获取物料清单列表(不包括已删除的) """
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ''' SELECT * FROM material_purchase
WHERE deleted = 0
ORDER BY submit_time DESC ''' )
rows = c . fetchall ( )
conn . close ( )
return jsonify ( { ' list ' : [ convert_material_purchase_to_camel ( r ) for r in rows ] } )
@app.get ( ' /api/material-purchase/recycle-bin ' )
@require_login
def list_material_purchase_recycle_bin ( ) :
""" 获取回收站列表 """
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ''' SELECT * FROM material_purchase
WHERE deleted = 1
ORDER BY deleted_at DESC ''' )
rows = c . fetchall ( )
conn . close ( )
return jsonify ( { ' list ' : [ convert_material_purchase_to_camel ( r ) for r in rows ] } )
@app.post ( ' /api/material-purchase/add ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def add_material_purchase ( ) :
""" 新增物料需求 """
data = request . get_json ( ) or { }
required_fields = [ ' title ' , ' list_no ' , ' plan_no ' , ' status ' , ' demand_status ' ,
' material_code ' , ' material_name ' , ' required_qty ' ]
for field in required_fields :
if not data . get ( field ) :
return jsonify ( { ' error ' : f ' 缺少必填字段: { field } ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
now = get_beijing_time ( )
username = session . get ( ' username ' , ' ' )
c . execute ( ''' INSERT INTO material_purchase(
title , list_no , plan_no , bom_result , status , demand_status ,
complete_rate , material_code , material_name , batch_no , level ,
required_qty , stock_qty , shortage , acquire_method , realtime_stock ,
pending_qty , dispatched_qty , received_qty , submitter , submit_time , update_time
) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? ) ''' , (
data . get ( ' title ' ) , data . get ( ' list_no ' ) , data . get ( ' plan_no ' ) ,
data . get ( ' bom_result ' ) , data . get ( ' status ' ) , data . get ( ' demand_status ' ) ,
data . get ( ' complete_rate ' ) , data . get ( ' material_code ' ) , data . get ( ' material_name ' ) ,
data . get ( ' batch_no ' ) , data . get ( ' level ' ) , data . get ( ' required_qty ' ) ,
data . get ( ' stock_qty ' ) , data . get ( ' shortage ' ) , data . get ( ' acquire_method ' ) ,
data . get ( ' realtime_stock ' ) , data . get ( ' pending_qty ' ) , data . get ( ' dispatched_qty ' ) ,
data . get ( ' received_qty ' ) , username , now , now
) )
conn . commit ( )
item_id = c . lastrowid
conn . close ( )
log ( ' add_material_purchase ' , f ' 物料编码: { data . get ( " material_code " ) } ' )
return jsonify ( { ' ok ' : True , ' id ' : item_id } )
@app.post ( ' /api/material-purchase/update ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def update_material_purchase ( ) :
""" 更新物料需求 """
data = request . get_json ( ) or { }
item_id = data . get ( ' id ' )
if not item_id :
return jsonify ( { ' error ' : ' 缺少ID ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
now = get_beijing_time ( )
c . execute ( ''' UPDATE material_purchase SET
title = ? , list_no = ? , plan_no = ? , bom_result = ? , status = ? , demand_status = ? ,
complete_rate = ? , material_code = ? , material_name = ? , batch_no = ? , level = ? ,
required_qty = ? , stock_qty = ? , shortage = ? , acquire_method = ? , realtime_stock = ? ,
pending_qty = ? , dispatched_qty = ? , received_qty = ? , update_time = ?
WHERE id = ? ''' , (
data . get ( ' title ' ) , data . get ( ' list_no ' ) , data . get ( ' plan_no ' ) ,
data . get ( ' bom_result ' ) , data . get ( ' status ' ) , data . get ( ' demand_status ' ) ,
data . get ( ' complete_rate ' ) , data . get ( ' material_code ' ) , data . get ( ' material_name ' ) ,
data . get ( ' batch_no ' ) , data . get ( ' level ' ) , data . get ( ' required_qty ' ) ,
data . get ( ' stock_qty ' ) , data . get ( ' shortage ' ) , data . get ( ' acquire_method ' ) ,
data . get ( ' realtime_stock ' ) , data . get ( ' pending_qty ' ) , data . get ( ' dispatched_qty ' ) ,
data . get ( ' received_qty ' ) , now , item_id
) )
conn . commit ( )
conn . close ( )
log ( ' update_material_purchase ' , f ' ID: { item_id } ' )
return jsonify ( { ' ok ' : True } )
@app.post ( ' /api/material-purchase/delete ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def delete_material_purchase ( ) :
""" 删除物料需求(移到回收站) """
data = request . get_json ( ) or { }
item_ids = data . get ( ' ids ' , [ ] )
if not item_ids :
return jsonify ( { ' error ' : ' 缺少ID ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
now = get_beijing_time ( )
placeholders = ' , ' . join ( ' ? ' * len ( item_ids ) )
c . execute ( f ''' UPDATE material_purchase SET
deleted = 1 , deleted_at = ?
WHERE id IN ( { placeholders } ) ''' , [now] + item_ids)
conn . commit ( )
conn . close ( )
log ( ' delete_material_purchase ' , f ' 删除数量: { len ( item_ids ) } ' )
return jsonify ( { ' ok ' : True , ' count ' : len ( item_ids ) } )
@app.post ( ' /api/material-purchase/restore ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def restore_material_purchase ( ) :
""" 从回收站恢复 """
data = request . get_json ( ) or { }
item_id = data . get ( ' id ' )
if not item_id :
return jsonify ( { ' error ' : ' 缺少ID ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ''' UPDATE material_purchase SET
deleted = 0 , deleted_at = NULL
WHERE id = ? ''' , (item_id,))
conn . commit ( )
conn . close ( )
log ( ' restore_material_purchase ' , f ' ID: { item_id } ' )
return jsonify ( { ' ok ' : True } )
@app.post ( ' /api/material-purchase/permanent-delete ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def permanent_delete_material_purchase ( ) :
""" 永久删除 """
data = request . get_json ( ) or { }
item_id = data . get ( ' id ' )
if not item_id :
return jsonify ( { ' error ' : ' 缺少ID ' } ) , 400
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' DELETE FROM material_purchase WHERE id=? ' , ( item_id , ) )
conn . commit ( )
conn . close ( )
log ( ' permanent_delete_material_purchase ' , f ' ID: { item_id } ' )
return jsonify ( { ' ok ' : True } )
@app.post ( ' /api/material-purchase/empty-recycle-bin ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def empty_recycle_bin ( ) :
""" 清空回收站 """
conn = get_db ( )
c = conn . cursor ( )
c . execute ( ' DELETE FROM material_purchase WHERE deleted=1 ' )
count = c . rowcount
conn . commit ( )
conn . close ( )
log ( ' empty_recycle_bin ' , f ' 清空数量: { count } ' )
return jsonify ( { ' ok ' : True , ' count ' : count } )
@app.route ( ' /api/validate/material-purchase-file ' , methods = [ ' POST ' ] )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def validate_material_purchase_file ( ) :
""" 验证物料清单Excel文件格式 """
f = request . files . get ( ' file ' )
if not f :
return jsonify ( { ' error ' : ' no file ' } ) , 400
name = secure_filename ( f . filename or ' ' )
ext = ( name . split ( ' . ' ) [ - 1 ] or ' ' ) . lower ( )
if ext not in [ ' xlsx ' , ' xls ' ] :
return jsonify ( { ' valid ' : False , ' message ' : ' 文件格式不支持, 请上传Excel文件( .xlsx或.xls) ' } ) , 200
try :
import openpyxl
wb = openpyxl . load_workbook ( f )
ws = wb . active
if ws . max_row < 2 :
wb . close ( )
return jsonify ( { ' valid ' : False , ' message ' : ' 文件为空,没有数据 ' } ) , 200
# 检查表头
header_row = list ( ws . iter_rows ( min_row = 1 , max_row = 1 , values_only = True ) ) [ 0 ]
header = [ str ( h ) . strip ( ) if h else ' ' for h in header_row ]
# 必需的列
required_columns = [ ' 标题 ' , ' 生产计划明细物料需求清单编号 ' , ' 生产计划编号 ' , ' 状态 ' ,
' 需求状态 ' , ' 物料编码 ' , ' 物料名称 ' , ' 所需物料数 ' ]
missing_columns = [ ]
for col in required_columns :
if col not in header :
missing_columns . append ( col )
if missing_columns :
wb . close ( )
return jsonify ( {
' valid ' : False ,
' message ' : f ' 缺少必需的列: { " , " . join ( missing_columns ) } '
} ) , 200
data_rows = ws . max_row - 1
wb . close ( )
return jsonify ( {
' valid ' : True ,
' message ' : f ' 文件格式正确,共 { data_rows } 行数据 '
} ) , 200
except Exception as e :
return jsonify ( { ' valid ' : False , ' message ' : f ' 读取文件失败: { str ( e ) } ' } ) , 200
@app.post ( ' /api/upload/material-purchase-file ' )
@require_login
@require_any_role ( ' admin ' , ' superadmin ' )
def upload_material_purchase_file ( ) :
""" 上传物料清单Excel文件 """
f = request . files . get ( ' file ' )
if not f :
return jsonify ( { ' error ' : ' no file ' } ) , 400
try :
import openpyxl
wb = openpyxl . load_workbook ( f )
ws = wb . active
# 获取表头
header_row = list ( ws . iter_rows ( min_row = 1 , max_row = 1 , values_only = True ) ) [ 0 ]
header = [ str ( h ) . strip ( ) if h else ' ' for h in header_row ]
# 创建列索引映射
col_map = { h : i for i , h in enumerate ( header ) }
# 辅助函数:解析百分比字符串
def parse_percentage ( value ) :
if value is None :
return 0
if isinstance ( value , ( int , float ) ) :
return float ( value )
# 如果是字符串,去除%符号
value_str = str ( value ) . strip ( )
if value_str . endswith ( ' % ' ) :
value_str = value_str [ : - 1 ]
try :
return float ( value_str )
except :
return 0
# 辅助函数:安全转换为整数
def safe_int ( value , default = 0 ) :
if value is None :
return default
try :
return int ( float ( value ) )
except :
return default
# 读取数据
rows = [ ]
for row in ws . iter_rows ( min_row = 2 , values_only = True ) :
if not any ( row ) : # 跳过空行
continue
# 辅助函数:安全获取单元格值
def get_cell_value ( col_name , default = ' ' ) :
if col_name not in col_map :
return default
idx = col_map [ col_name ]
if idx > = len ( row ) :
return default
value = row [ idx ]
return value if value is not None else default
# 提取数据
item = {
' title ' : str ( get_cell_value ( ' 标题 ' ) or ' ' ) ,
' list_no ' : str ( get_cell_value ( ' 生产计划明细物料需求清单编号 ' ) or ' ' ) ,
' plan_no ' : str ( get_cell_value ( ' 生产计划编号 ' ) or ' ' ) ,
' bom_result ' : str ( get_cell_value ( ' 产品BOM分析结果 ' ) or ' ' ) ,
' status ' : str ( get_cell_value ( ' 状态 ' ) or ' pending ' ) ,
' demand_status ' : str ( get_cell_value ( ' 需求状态 ' ) or ' normal ' ) ,
' complete_rate ' : parse_percentage ( get_cell_value ( ' 物料齐套率 ' , 0 ) ) ,
' material_code ' : str ( get_cell_value ( ' 物料编码 ' ) or ' ' ) ,
' material_name ' : str ( get_cell_value ( ' 物料名称 ' ) or ' ' ) ,
' batch_no ' : str ( get_cell_value ( ' 物料批次号 ' ) or ' ' ) ,
' level ' : safe_int ( get_cell_value ( ' 物料层级 ' , 1 ) , 1 ) ,
' required_qty ' : safe_int ( get_cell_value ( ' 所需物料数 ' , 0 ) , 0 ) ,
' stock_qty ' : safe_int ( get_cell_value ( ' 库存现有物料数 ' , 0 ) , 0 ) ,
' shortage ' : safe_int ( get_cell_value ( ' 欠缺值 ' , 0 ) , 0 ) ,
' acquire_method ' : str ( get_cell_value ( ' 物料获取方式 ' ) or ' ' ) ,
' realtime_stock ' : safe_int ( get_cell_value ( ' 实时库存值 ' , 0 ) , 0 ) ,
' pending_qty ' : safe_int ( get_cell_value ( ' 待入库数量 ' , 0 ) , 0 ) ,
' dispatched_qty ' : safe_int ( get_cell_value ( ' 派发数量 ' , 0 ) , 0 ) ,
' received_qty ' : safe_int ( get_cell_value ( ' 入库数量 ' , 0 ) , 0 ) ,
}
# 验证必填字段
if not item [ ' title ' ] or not item [ ' list_no ' ] or not item [ ' material_code ' ] :
continue
rows . append ( item )
wb . close ( )
if not rows :
return jsonify ( { ' error ' : ' 文件中没有有效数据 ' } ) , 400
# 批量插入数据库
conn = get_db ( )
c = conn . cursor ( )
now = get_beijing_time ( )
username = session . get ( ' username ' , ' ' )
for item in rows :
c . execute ( ''' INSERT INTO material_purchase(
title , list_no , plan_no , bom_result , status , demand_status ,
complete_rate , material_code , material_name , batch_no , level ,
required_qty , stock_qty , shortage , acquire_method , realtime_stock ,
pending_qty , dispatched_qty , received_qty , submitter , submit_time , update_time
) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? ) ''' , (
item [ ' title ' ] , item [ ' list_no ' ] , item [ ' plan_no ' ] , item [ ' bom_result ' ] ,
item [ ' status ' ] , item [ ' demand_status ' ] , item [ ' complete_rate ' ] ,
item [ ' material_code ' ] , item [ ' material_name ' ] , item [ ' batch_no ' ] ,
item [ ' level ' ] , item [ ' required_qty ' ] , item [ ' stock_qty ' ] , item [ ' shortage ' ] ,
item [ ' acquire_method ' ] , item [ ' realtime_stock ' ] , item [ ' pending_qty ' ] ,
item [ ' dispatched_qty ' ] , item [ ' received_qty ' ] , username , now , now
) )
conn . commit ( )
conn . close ( )
log ( ' upload_material_purchase_file ' , f ' 导入数量: { len ( rows ) } ' )
return jsonify ( { ' ok ' : True , ' count ' : len ( rows ) , ' message ' : f ' 成功导入 { len ( rows ) } 条数据 ' } )
except Exception as e :
return jsonify ( { ' error ' : f ' 导入失败: { str ( e ) } ' } ) , 500
@app.errorhandler ( 404 )
def not_found ( e ) :
# 如果请求的是 HTML 页面(通过 Accept header 判断),返回 index.html
# 这样可以支持前端路由的直接访问
if request . path and not request . path . startswith ( ' /api/ ' ) :
accept = request . headers . get ( ' Accept ' , ' ' )
if ' text/html ' in accept :
return send_from_directory ( FRONTEND_DIR , ' index.html ' )
return jsonify ( { ' error ' : ' not found ' } ) , 404
2025-11-21 13:27:40 +00:00
init_db ( )
if __name__ == ' __main__ ' :
app . run ( host = ' 0.0.0.0 ' , port = int ( os . environ . get ( ' PORT ' , ' 5000 ' ) ) )