ERP/server/app.py
2025-11-24 16:21:13 +08:00

3678 lines
128 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import os
import json
import sqlite3
from datetime import datetime
from functools import wraps
from flask import Flask, request, jsonify, session, send_from_directory
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.utils import secure_filename
try:
import redis
except Exception:
redis = None
_redis_client = None
_audit_cache = {'pdd': {'ts': 0, 'list': []}, 'yt': {'ts': 0, 'list': []}}
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DB_PATH = os.path.join(BASE_DIR, 'data.db')
FRONTEND_DIR = os.path.join(os.path.dirname(BASE_DIR), 'frontend')
app = Flask(__name__, static_folder=FRONTEND_DIR, static_url_path='')
app.config['SECRET_KEY'] = os.environ.get('APP_SECRET', 'change-me')
app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 限制上传文件大小为50MB
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
conn = get_db()
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS users(
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
role TEXT NOT NULL
)''')
c.execute('''CREATE TABLE IF NOT EXISTS operations_log(
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
action TEXT,
detail TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS notifications(
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
username TEXT,
action TEXT,
detail TEXT,
ts TEXT,
read INTEGER DEFAULT 0
)''')
c.execute('''CREATE TABLE IF NOT EXISTS mac_batches(
id INTEGER PRIMARY KEY AUTOINCREMENT,
mac TEXT,
batch TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS stats(
id INTEGER PRIMARY KEY AUTOINCREMENT,
good INTEGER,
bad INTEGER,
fpy_good INTEGER DEFAULT 0,
platform TEXT DEFAULT 'pdd',
ts TEXT
)''')
# 为已存在的表添加列(如果不存在)
try:
c.execute('ALTER TABLE stats ADD COLUMN fpy_good INTEGER DEFAULT 0')
except Exception:
pass # 列已存在
try:
c.execute('ALTER TABLE stats ADD COLUMN platform TEXT DEFAULT "pdd"')
except Exception:
pass # 列已存在
try:
c.execute('ALTER TABLE users ADD COLUMN avatar TEXT')
except Exception:
pass # 列已存在
try:
c.execute('ALTER TABLE users ADD COLUMN factory TEXT')
except Exception:
pass # 列已存在
try:
c.execute('ALTER TABLE mac_batches ADD COLUMN platform TEXT DEFAULT "pdd"')
except Exception:
pass # 列已存在
c.execute('''CREATE TABLE IF NOT EXISTS defects(
id INTEGER PRIMARY KEY AUTOINCREMENT,
mac TEXT,
batch TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS shipments(
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT,
qty INTEGER,
receiver TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS devices(
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
status TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS environment(
id INTEGER PRIMARY KEY AUTOINCREMENT,
temp TEXT,
hum TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS personnel(
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
role TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS qa(
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
date TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS production(
id INTEGER PRIMARY KEY AUTOINCREMENT,
batch TEXT,
duration TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS repairs(
id INTEGER PRIMARY KEY AUTOINCREMENT,
qty INTEGER,
note TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS sop_files(
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT NOT NULL,
original_name TEXT NOT NULL,
description TEXT,
uploader TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS work_orders(
id INTEGER PRIMARY KEY AUTOINCREMENT,
factory TEXT NOT NULL,
order_no TEXT NOT NULL,
product_model TEXT,
order_qty INTEGER NOT NULL,
production_start_time TEXT,
production_end_time TEXT,
status TEXT DEFAULT 'issued',
status_text TEXT DEFAULT '已下发',
remark TEXT,
created_by TEXT,
created_at TEXT,
updated_at TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS material_purchase(
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
list_no TEXT NOT NULL,
plan_no TEXT NOT NULL,
bom_result TEXT,
status TEXT NOT NULL,
demand_status TEXT NOT NULL,
complete_rate REAL,
material_code TEXT NOT NULL,
material_name TEXT NOT NULL,
batch_no TEXT,
level INTEGER,
required_qty INTEGER NOT NULL,
stock_qty INTEGER,
shortage INTEGER,
acquire_method TEXT,
realtime_stock INTEGER,
pending_qty INTEGER,
dispatched_qty INTEGER,
received_qty INTEGER,
submitter TEXT,
submit_time TEXT,
update_time TEXT,
deleted INTEGER DEFAULT 0,
deleted_at TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS customer_orders(
id INTEGER PRIMARY KEY AUTOINCREMENT,
order_date TEXT NOT NULL,
order_no TEXT NOT NULL,
customer_name TEXT NOT NULL,
material TEXT NOT NULL,
quantity INTEGER NOT NULL,
unit_price REAL NOT NULL,
created_by TEXT,
created_at TEXT,
updated_at TEXT
)''')
# 为已存在的表添加列(如果不存在)
try:
c.execute('ALTER TABLE customer_orders ADD COLUMN customer_name TEXT')
except Exception:
pass # 列已存在
# 为已存在的表添加列(如果不存在)
try:
c.execute('ALTER TABLE work_orders ADD COLUMN product_model TEXT')
except Exception:
pass # 列已存在
conn.commit()
# create default admin
c.execute('SELECT id FROM users WHERE username=?', ('admin',))
if not c.fetchone():
pwd = os.environ.get('ADMIN_PASSWORD', 'admin123')
c.execute('INSERT INTO users(username, password_hash, role) VALUES(?,?,?)', (
'admin', generate_password_hash(pwd), 'admin'
))
conn.commit()
# create superadmin from env
su_user = os.environ.get('SUPERADMIN_USERNAME')
su_pass = os.environ.get('SUPERADMIN_PASSWORD')
if su_user and su_pass:
c.execute('SELECT id FROM users WHERE username=?', (su_user,))
if not c.fetchone():
c.execute('INSERT INTO users(username, password_hash, role) VALUES(?,?,?)', (
su_user, generate_password_hash(su_pass), 'superadmin'
))
conn.commit()
conn.close()
def log(action, detail=''):
try:
conn = get_db()
c = conn.cursor()
c.execute('INSERT INTO operations_log(user_id, action, detail, ts) VALUES(?,?,?,?)', (
session.get('user_id'), action, detail, get_beijing_time()
))
conn.commit()
conn.close()
except Exception:
pass
def notify_superadmin(action, detail=''):
"""为超级管理员创建通知"""
try:
user_id = session.get('user_id')
if not user_id:
return
conn = get_db()
c = conn.cursor()
# 获取当前用户信息
c.execute('SELECT username, role FROM users WHERE id=?', (user_id,))
user = c.fetchone()
if not user:
conn.close()
return
# 如果是超级管理员自己的操作,不创建通知
if user['role'] == 'superadmin':
conn.close()
return
# 为所有超级管理员创建通知
c.execute('SELECT id FROM users WHERE role=?', ('superadmin',))
superadmins = c.fetchall()
# 使用北京时间UTC+8
from datetime import timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz).isoformat()
for admin in superadmins:
c.execute('INSERT INTO notifications(user_id, username, action, detail, ts, read) VALUES(?,?,?,?,?,?)', (
admin['id'], user['username'], action, detail, now, 0
))
conn.commit()
conn.close()
except Exception:
pass
def notify_admins(action, detail=''):
"""为管理员创建通知(超级管理员操作时使用)"""
try:
user_id = session.get('user_id')
if not user_id:
return
conn = get_db()
c = conn.cursor()
# 获取当前用户信息
c.execute('SELECT username, role FROM users WHERE id=?', (user_id,))
user = c.fetchone()
if not user:
conn.close()
return
# 只有超级管理员的操作才通知管理员
if user['role'] != 'superadmin':
conn.close()
return
# 为所有管理员创建通知
c.execute('SELECT id FROM users WHERE role=?', ('admin',))
admins = c.fetchall()
# 使用北京时间UTC+8
from datetime import timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz).isoformat()
for admin in admins:
c.execute('INSERT INTO notifications(user_id, username, action, detail, ts, read) VALUES(?,?,?,?,?,?)', (
admin['id'], user['username'], action, detail, now, 0
))
conn.commit()
conn.close()
except Exception:
pass
def notify_admins_by_factory(action, detail='', factory=None):
"""为指定工厂的管理员创建通知(超级管理员操作时使用)"""
try:
user_id = session.get('user_id')
if not user_id:
return
conn = get_db()
c = conn.cursor()
# 获取当前用户信息
c.execute('SELECT username, role FROM users WHERE id=?', (user_id,))
user = c.fetchone()
if not user:
conn.close()
return
# 只有超级管理员的操作才通知管理员
if user['role'] != 'superadmin':
conn.close()
return
# 如果指定了工厂,只通知该工厂的管理员;否则通知所有管理员
if factory:
c.execute('SELECT id FROM users WHERE role=? AND factory=?', ('admin', factory))
else:
c.execute('SELECT id FROM users WHERE role=?', ('admin',))
admins = c.fetchall()
# 使用北京时间UTC+8
from datetime import timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz).isoformat()
for admin in admins:
c.execute('INSERT INTO notifications(user_id, username, action, detail, ts, read) VALUES(?,?,?,?,?,?)', (
admin['id'], user['username'], action, detail, now, 0
))
conn.commit()
conn.close()
except Exception:
pass
def get_redis():
global _redis_client
if not redis:
raise RuntimeError('redis missing')
if _redis_client is not None:
return _redis_client
host = os.environ.get('REDIS_HOST', '180.163.74.83')
port = int(os.environ.get('REDIS_PORT', '6379'))
password = os.environ.get('REDIS_PASSWORD') or os.environ.get('SUPERADMIN_PASSWORD')
db = int(os.environ.get('REDIS_DB', '0'))
_redis_client = redis.Redis(host=host, port=port, password=password, db=db, decode_responses=True, socket_timeout=0.5, socket_connect_timeout=0.5)
return _redis_client
def get_beijing_time():
"""获取北京时间UTC+8的ISO格式字符串"""
from datetime import timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
return datetime.now(beijing_tz).isoformat()
def parse_audit_line(s):
if not s:
return {'ts_cn': None, 'batch': None, 'mac': None, 'note': None}
def normalize_ts(ts):
try:
from datetime import datetime, timezone, timedelta
# Attempt ISO parsing
# Support "Z" UTC suffix and offsets like +08:00
if ts.endswith('Z'):
dt = datetime.fromisoformat(ts.replace('Z', '+00:00'))
else:
dt = datetime.fromisoformat(ts)
# Convert to Beijing time (UTC+8)
bj = dt.astimezone(timezone(timedelta(hours=8)))
return bj.strftime('%Y-%m-%d %H:%M:%S')
except Exception:
return ts
def has_time(v):
return isinstance(v, str) and (('T' in v) or (':' in v))
def choose_ts(d):
candidates = [d.get('ts_cn'), d.get('ts_local'), d.get('ts'), d.get('ts_utc'), d.get('timestamp'), d.get('time')]
for v in candidates:
if has_time(v):
return v
for v in candidates:
if v:
return v
return None
try:
obj = json.loads(s)
ts = choose_ts(obj)
if not ts:
ts = s if isinstance(s, str) else None
return {
'ts_cn': ts if ts else None,
'batch': obj.get('batch') or obj.get('batch_no') or obj.get('lot'),
'mac': obj.get('mac') or obj.get('mac_addr') or obj.get('mac_address'),
'note': obj.get('note') or obj.get('msg') or obj.get('message')
}
except Exception:
pass
d = {}
parts = []
for sep in [' ', ',', ';', '|']:
if sep in s:
parts = s.split(sep)
break
if not parts:
parts = [s]
i = 0
while i < len(parts):
part = parts[i]
if '=' in part:
k, v = part.split('=', 1)
kk = k.strip()
vv = v.strip()
try:
import re
if kk in ('ts_cn', 'ts_local', 'ts', 'timestamp', 'time'):
if re.match(r'^\d{4}-\d{2}-\d{2}$', vv) and i + 1 < len(parts) and re.match(r'^\d{2}:\d{2}:\d{2}', parts[i+1]):
vv = vv + ' ' + parts[i+1]
i += 1
except Exception:
pass
d[kk] = vv
i += 1
ts = choose_ts(d)
if not ts:
ts = s if isinstance(s, str) else None
return {
'ts_cn': ts if ts else None,
'batch': d.get('batch') or d.get('batch_no') or d.get('lot'),
'mac': d.get('mac') or d.get('mac_addr') or d.get('mac_address'),
'note': d.get('note') or d.get('msg') or d.get('message') or s
}
def require_login(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
if not session.get('user_id'):
return jsonify({'error': 'unauthorized'}), 401
return fn(*args, **kwargs)
return wrapper
def require_role(role):
def deco(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
if session.get('role') != role:
return jsonify({'error': 'forbidden'}), 403
return fn(*args, **kwargs)
return wrapper
return deco
def require_any_role(*roles):
def deco(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
if session.get('role') not in roles:
return jsonify({'error': 'forbidden'}), 403
return fn(*args, **kwargs)
return wrapper
return deco
@app.route('/')
def index():
return send_from_directory(FRONTEND_DIR, 'index.html')
@app.route('/index.html')
def index_html():
return send_from_directory(FRONTEND_DIR, 'index.html')
# auth
@app.get('/api/auth/captcha')
def captcha():
"""生成验证码图片"""
try:
from PIL import Image, ImageDraw, ImageFont
import random
import io
import base64
# 生成4位随机数字
code = ''.join([str(random.randint(0, 9)) for _ in range(4)])
# 将验证码存储到session
session['captcha'] = code
# 创建图片
width, height = 120, 40
image = Image.new('RGB', (width, height), color='#f0f4f8')
draw = ImageDraw.Draw(image)
# 尝试使用系统字体,如果失败则使用默认字体
try:
font = ImageFont.truetype('/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf', 28)
except:
try:
font = ImageFont.truetype('/System/Library/Fonts/Helvetica.ttc', 28)
except:
font = ImageFont.load_default()
# 绘制干扰线
for _ in range(3):
x1 = random.randint(0, width)
y1 = random.randint(0, height)
x2 = random.randint(0, width)
y2 = random.randint(0, height)
draw.line([(x1, y1), (x2, y2)], fill='#cbd5e1', width=1)
# 绘制验证码文字
colors = ['#3b82f6', '#2563eb', '#1e40af', '#1e3a8a']
for i, char in enumerate(code):
x = 20 + i * 25 + random.randint(-3, 3)
y = 5 + random.randint(-3, 3)
color = random.choice(colors)
draw.text((x, y), char, font=font, fill=color)
# 绘制干扰点
for _ in range(50):
x = random.randint(0, width)
y = random.randint(0, height)
draw.point((x, y), fill='#94a3b8')
# 转换为base64
buffer = io.BytesIO()
image.save(buffer, format='PNG')
buffer.seek(0)
img_base64 = base64.b64encode(buffer.getvalue()).decode()
return jsonify({'image': f'data:image/png;base64,{img_base64}'})
except Exception as e:
log('captcha_error', str(e))
# 如果生成失败,返回简单的验证码
code = ''.join([str(random.randint(0, 9)) for _ in range(4)])
session['captcha'] = code
return jsonify({'image': '', 'code': code})
@app.post('/api/auth/login')
def login():
data = request.get_json() or {}
username = data.get('username')
password = data.get('password')
captcha = data.get('captcha')
# 验证验证码
session_captcha = session.get('captcha', '').lower()
if not captcha or captcha.lower() != session_captcha:
# 清除验证码,防止重复使用
session.pop('captcha', None)
return jsonify({'error': '验证码错误'}), 400
# 清除验证码,防止重复使用
session.pop('captcha', None)
conn = get_db()
c = conn.cursor()
c.execute('SELECT id, password_hash, role FROM users WHERE username=?', (username,))
row = c.fetchone()
conn.close()
if not row or not check_password_hash(row['password_hash'], password or ''):
return jsonify({'error': '用户名或密码错误'}), 400
session['user_id'] = row['id']
session['role'] = row['role']
session['username'] = username
session.permanent = True
log('login', username)
return jsonify({'ok': True})
@app.get('/api/auth/me')
def me():
uid = session.get('user_id')
if not uid:
return jsonify({'username': None, 'role': None, 'avatar': None, 'factory': None})
conn = get_db()
c = conn.cursor()
c.execute('SELECT username, role, avatar, factory FROM users WHERE id=?', (uid,))
row = c.fetchone()
conn.close()
return jsonify({
'username': row['username'],
'role': row['role'],
'avatar': row['avatar'] if row['avatar'] else None,
'factory': row['factory'] if row['factory'] else None
})
@app.post('/api/auth/logout')
def logout():
log('logout')
session.clear()
return jsonify({'ok': True})
@app.post('/api/user/upload-avatar')
@require_login
def upload_avatar():
uid = session.get('user_id')
if not uid:
return jsonify({'error': '未登录'}), 401
if 'avatar' not in request.files:
return jsonify({'error': '未选择文件'}), 400
file = request.files['avatar']
if file.filename == '':
return jsonify({'error': '未选择文件'}), 400
# 验证文件类型
allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
# 先从原始文件名获取扩展名
original_filename = file.filename
if '.' not in original_filename:
return jsonify({'error': '无效的文件格式'}), 400
ext = original_filename.rsplit('.', 1)[1].lower()
if ext not in allowed_extensions:
return jsonify({'error': '不支持的文件格式,请上传 PNG、JPG、GIF 或 WEBP 格式'}), 400
# 创建avatars目录
avatars_dir = os.path.join(FRONTEND_DIR, 'assets', 'avatars')
os.makedirs(avatars_dir, exist_ok=True)
# 生成唯一文件名
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
new_filename = f'avatar_{uid}_{timestamp}.{ext}'
filepath = os.path.join(avatars_dir, new_filename)
# 保存文件
file.save(filepath)
# 更新数据库
avatar_url = f'./assets/avatars/{new_filename}'
conn = get_db()
c = conn.cursor()
# 删除旧头像文件(如果存在)
c.execute('SELECT avatar FROM users WHERE id=?', (uid,))
row = c.fetchone()
if row and row['avatar'] and row['avatar'].startswith('./assets/avatars/'):
old_file = os.path.join(FRONTEND_DIR, row['avatar'].replace('./', ''))
if os.path.exists(old_file):
try:
os.remove(old_file)
except Exception:
pass
c.execute('UPDATE users SET avatar=? WHERE id=?', (avatar_url, uid))
conn.commit()
conn.close()
log('upload_avatar', f'上传头像: {new_filename}')
return jsonify({'ok': True, 'avatar_url': avatar_url})
@app.post('/api/user/reset-avatar')
@require_login
def reset_avatar():
uid = session.get('user_id')
if not uid:
return jsonify({'error': '未登录'}), 401
conn = get_db()
c = conn.cursor()
# 删除旧头像文件(如果存在)
c.execute('SELECT avatar FROM users WHERE id=?', (uid,))
row = c.fetchone()
if row and row['avatar'] and row['avatar'].startswith('./assets/avatars/'):
old_file = os.path.join(FRONTEND_DIR, row['avatar'].replace('./', ''))
if os.path.exists(old_file):
try:
os.remove(old_file)
except Exception:
pass
c.execute('UPDATE users SET avatar=NULL WHERE id=?', (uid,))
conn.commit()
conn.close()
log('reset_avatar', '恢复默认头像')
return jsonify({'ok': True})
# dashboard
@app.get('/api/dashboard')
@require_login
def dashboard():
conn = get_db()
c = conn.cursor()
c.execute('SELECT COALESCE(SUM(good),0) AS good_total, COALESCE(SUM(bad),0) AS bad_total, COALESCE(SUM(fpy_good),0) AS fpy_good_total FROM stats')
s = c.fetchone()
c.execute('SELECT COUNT(1) AS total FROM defects')
defects = c.fetchone()
conn.close()
good = s['good_total'] if s else 0
bad = s['bad_total'] if s else 0
fpy_good = s['fpy_good_total'] if s else 0
# 计算总良品率
rate = "{}%".format(round((good/(good+bad)) * 100, 2)) if (good+bad) > 0 else u''
# 计算直通良品率FPY = First Pass Yield
total_produced = good + bad
fpy_rate = "{}%".format(round((fpy_good/total_produced) * 100, 2)) if total_produced > 0 else u''
# 从 Redis 获取发货数量SN 记录数)
shipments_count = 0
try:
r = get_redis()
redis_key = 'shipment_sn_mapping'
shipments_count = r.hlen(redis_key)
except Exception as e:
log('dashboard_redis_error', str(e))
# Redis 失败时回退到 SQLite
conn = get_db()
c = conn.cursor()
c.execute('SELECT SUM(qty) AS total FROM shipments')
ship = c.fetchone()
conn.close()
shipments_count = (ship['total'] or 0) if ship else 0
return jsonify({
'fpyRate': fpy_rate,
'goodRate': rate,
'shipments': shipments_count,
'defects': (defects['total'] or 0) if defects else 0,
'badCount': bad
})
@app.get('/api/audit/pdd')
@require_login
def audit_pdd():
start = datetime.utcnow()
try:
q_start = request.args.get('start')
q_end = request.args.get('end')
q_limit = request.args.get('limit')
q_order = request.args.get('order', 'desc')
has_filter = bool(q_start or q_end or q_limit or q_order)
# 缓存优化3秒内不重复查询
if (not has_filter) and ((datetime.utcnow().timestamp() - _audit_cache['pdd']['ts']) < 3):
return jsonify({'list': _audit_cache['pdd']['list']})
r = get_redis()
# 设置Redis超时为5秒
r.connection_pool.connection_kwargs['socket_timeout'] = 5
items = []
# 限制最大返回数量,避免数据过大
max_items = 500 if has_filter else 200
for key in ['mac_batch_audit_pdd', 'audit:pdd', 'pdd:audit']:
try:
if r.exists(key):
t = r.type(key)
if t == 'list':
# 限制最大查询数量
total = r.llen(key)
if has_filter:
items = r.lrange(key, max(0, total - max_items), -1)
else:
items = r.lrange(key, -200, -1)
elif t == 'zset':
items = r.zrevrange(key, 0, max_items - 1)
elif t == 'stream':
entries = r.xrevrange(key, max='+', min='-', count=max_items)
items = [json.dumps(v) for _id, v in entries]
else:
v = r.get(key)
items = [v] if v else []
break
except Exception as e:
log('audit_pdd_error', f'Redis query error: {str(e)}')
continue
try:
host = os.environ.get('REDIS_HOST')
db = os.environ.get('REDIS_DB')
tp = r.type('mac_batch_audit_pdd')
ln = 0
try:
ln = r.llen('mac_batch_audit_pdd')
except Exception:
pass
log('audit_pdd_probe', json.dumps({'host': host, 'db': db, 'type': tp, 'len': ln}))
except Exception:
pass
if not items and r.exists('batch_sn_mapping_pdd') and r.type('batch_sn_mapping_pdd') == 'hash':
try:
pairs = []
cursor = 0
while True:
cursor, res = r.hscan('batch_sn_mapping_pdd', cursor=cursor, count=200)
for k, v in (res or {}).items():
pairs.append({'mac': k, 'batch': v})
if len(pairs) >= 100:
break
if cursor == 0 or len(pairs) >= 100:
break
items = [json.dumps({'mac': p['mac'], 'batch': p['batch'], 'note': 'mapping'}) for p in pairs]
except Exception:
pass
res = [parse_audit_line(x) for x in items]
if q_start or q_end:
def to_epoch(s):
try:
if not s:
return None
if 'T' in s or 'Z' in s or '+' in s:
return datetime.fromisoformat(s.replace('Z','+00:00')).timestamp()
if ' ' in s and ':' in s:
return datetime.strptime(s, '%Y-%m-%d %H:%M:%S').timestamp()
return datetime.strptime(s, '%Y-%m-%d').timestamp()
except Exception:
return None
s_epoch = to_epoch(q_start) if q_start else None
e_epoch = to_epoch(q_end) if q_end else None
tmp = []
for r0 in res:
ts = to_epoch(r0.get('ts_cn'))
if ts is None:
continue
if s_epoch is not None and ts < s_epoch:
continue
if e_epoch is not None and ts > e_epoch:
continue
tmp.append(r0)
res = tmp
try:
def to_key(r):
s = r.get('ts_cn') or ''
try:
if 'T' in s or 'Z' in s or '+' in s:
return datetime.fromisoformat(s.replace('Z','+00:00')).timestamp()
if ' ' in s and ':' in s:
return datetime.strptime(s, '%Y-%m-%d %H:%M:%S').timestamp()
return datetime.strptime(s, '%Y-%m-%d').timestamp()
except Exception:
return 0
res.sort(key=to_key, reverse=(q_order != 'asc'))
except Exception:
res.reverse()
if q_limit:
try:
lim = int(q_limit)
if lim > 0:
res = res[:lim]
except Exception:
pass
if not has_filter:
_audit_cache['pdd'] = {'ts': datetime.utcnow().timestamp(), 'list': res}
dur = (datetime.utcnow() - start).total_seconds()
log('audit_pdd_cost', f"{dur}s len={len(res)}")
return jsonify({'list': res})
except Exception as e:
log('audit_pdd_error', str(e))
return jsonify({'list': []})
@app.get('/api/audit/yt')
@require_login
def audit_yt():
start = datetime.utcnow()
try:
q_start = request.args.get('start')
q_end = request.args.get('end')
q_limit = request.args.get('limit')
q_order = request.args.get('order', 'desc')
has_filter = bool(q_start or q_end or q_limit or q_order)
# 缓存优化3秒内不重复查询
if (not has_filter) and ((datetime.utcnow().timestamp() - _audit_cache['yt']['ts']) < 3):
return jsonify({'list': _audit_cache['yt']['list']})
r = get_redis()
# 设置Redis超时为5秒
r.connection_pool.connection_kwargs['socket_timeout'] = 5
items = []
# 限制最大返回数量,避免数据过大
max_items = 500 if has_filter else 200
for key in ['mac_batch_audit_yt', 'audit:yt', 'yt:audit']:
try:
if r.exists(key):
t = r.type(key)
if t == 'list':
# 限制最大查询数量
total = r.llen(key)
if has_filter:
items = r.lrange(key, max(0, total - max_items), -1)
else:
items = r.lrange(key, -200, -1)
elif t == 'zset':
items = r.zrevrange(key, 0, max_items - 1)
elif t == 'stream':
entries = r.xrevrange(key, max='+', min='-', count=max_items)
items = [json.dumps(v) for _id, v in entries]
else:
v = r.get(key)
items = [v] if v else []
break
except Exception as e:
log('audit_yt_error', f'Redis query error: {str(e)}')
continue
try:
host = os.environ.get('REDIS_HOST')
db = os.environ.get('REDIS_DB')
tp = r.type('mac_batch_audit_yt')
ln = 0
try:
ln = r.llen('mac_batch_audit_yt')
except Exception:
pass
log('audit_yt_probe', json.dumps({'host': host, 'db': db, 'type': tp, 'len': ln}))
except Exception:
pass
if not items and r.exists('batch_sn_mapping_yt') and r.type('batch_sn_mapping_yt') == 'hash':
try:
pairs = []
cursor = 0
while True:
cursor, res = r.hscan('batch_sn_mapping_yt', cursor=cursor, count=200)
for k, v in (res or {}).items():
pairs.append({'mac': k, 'batch': v})
if len(pairs) >= 100:
break
if cursor == 0 or len(pairs) >= 100:
break
items = [json.dumps({'mac': p['mac'], 'batch': p['batch'], 'note': 'mapping'}) for p in pairs]
except Exception:
pass
res = [parse_audit_line(x) for x in items]
if q_start or q_end:
def to_epoch(s):
try:
if not s:
return None
if 'T' in s or 'Z' in s or '+' in s:
return datetime.fromisoformat(s.replace('Z','+00:00')).timestamp()
if ' ' in s and ':' in s:
return datetime.strptime(s, '%Y-%m-%d %H:%M:%S').timestamp()
return datetime.strptime(s, '%Y-%m-%d').timestamp()
except Exception:
return None
s_epoch = to_epoch(q_start) if q_start else None
e_epoch = to_epoch(q_end) if q_end else None
tmp = []
for r0 in res:
ts = to_epoch(r0.get('ts_cn'))
if ts is None:
continue
if s_epoch is not None and ts < s_epoch:
continue
if e_epoch is not None and ts > e_epoch:
continue
tmp.append(r0)
res = tmp
try:
def to_key(r):
s = r.get('ts_cn') or ''
try:
if 'T' in s or 'Z' in s or '+' in s:
return datetime.fromisoformat(s.replace('Z','+00:00')).timestamp()
if ' ' in s and ':' in s:
return datetime.strptime(s, '%Y-%m-%d %H:%M:%S').timestamp()
return datetime.strptime(s, '%Y-%m-%d').timestamp()
except Exception:
return 0
res.sort(key=to_key, reverse=(q_order != 'asc'))
except Exception:
res.reverse()
if q_limit:
try:
lim = int(q_limit)
if lim > 0:
res = res[:lim]
except Exception:
pass
if not has_filter:
_audit_cache['yt'] = {'ts': datetime.utcnow().timestamp(), 'list': res}
dur = (datetime.utcnow() - start).total_seconds()
log('audit_yt_cost', f"{dur}s len={len(res)}")
return jsonify({'list': res})
except Exception as e:
log('audit_yt_error', str(e))
return jsonify({'list': []})
@app.get('/api/audit/diagnose')
@require_login
def audit_diagnose():
try:
r = get_redis()
result = {}
for key in ['mac_batch_audit_pdd', 'mac_batch_audit_yt', 'batch_sn_mapping_pdd', 'batch_sn_mapping_yt']:
try:
t = r.type(key)
if t == 'list':
result[key] = {'type': t, 'len': r.llen(key)}
elif t == 'zset':
result[key] = {'type': t, 'len': r.zcard(key)}
elif t == 'stream':
info = r.xinfo_stream(key)
result[key] = {'type': t, 'len': info.get('length')}
elif t == 'hash':
result[key] = {'type': t, 'len': r.hlen(key)}
elif t == 'none':
result[key] = {'type': t, 'len': 0}
else:
v = r.get(key)
result[key] = {'type': t, 'len': 1 if v else 0}
except Exception as e:
result[key] = {'error': str(e)}
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.get('/api/overview')
@require_login
def overview():
conn = get_db()
c = conn.cursor()
c.execute('SELECT COUNT(1) AS cnt, COALESCE(SUM(good),0) AS good_total, COALESCE(SUM(bad),0) AS bad_total, COALESCE(SUM(fpy_good),0) AS fpy_good_total FROM stats')
stats_row = c.fetchone()
c.execute('SELECT COUNT(1) AS cnt FROM defects')
defects_row = c.fetchone()
c.execute('SELECT COUNT(1) AS cnt FROM mac_batches')
mac_row = c.fetchone()
c.execute('SELECT COUNT(1) AS cnt, COALESCE(SUM(qty),0) AS qty_total FROM shipments')
ship_row = c.fetchone()
c.execute('SELECT COUNT(1) AS cnt FROM devices')
devices_row = c.fetchone()
c.execute('SELECT COUNT(1) AS cnt FROM personnel')
personnel_row = c.fetchone()
c.execute('SELECT COUNT(1) AS cnt FROM qa')
qa_row = c.fetchone()
c.execute('SELECT COUNT(1) AS cnt FROM production')
production_row = c.fetchone()
conn.close()
return jsonify({
'stats': {
'records': (stats_row['cnt'] or 0) if stats_row else 0,
'goodTotal': (stats_row['good_total'] or 0) if stats_row else 0,
'badTotal': (stats_row['bad_total'] or 0) if stats_row else 0,
'fpyGoodTotal': (stats_row['fpy_good_total'] or 0) if stats_row else 0
},
'defects': (defects_row['cnt'] or 0) if defects_row else 0,
'mac': (mac_row['cnt'] or 0) if mac_row else 0,
'shipments': {
'records': (ship_row['cnt'] or 0) if ship_row else 0,
'qtyTotal': (ship_row['qty_total'] or 0) if ship_row else 0
},
'devices': (devices_row['cnt'] or 0) if devices_row else 0,
'personnel': (personnel_row['cnt'] or 0) if personnel_row else 0,
'qa': (qa_row['cnt'] or 0) if qa_row else 0,
'production': (production_row['cnt'] or 0) if production_row else 0
})
# uploads
@app.post('/api/upload/mac')
@require_login
@require_any_role('admin','superadmin')
def upload_mac():
data = request.get_json() or {}
rows = data.get('rows') or []
if not isinstance(rows, list):
return jsonify({'error': 'invalid rows'}), 400
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
for r in rows:
mac = (r or {}).get('mac')
batch = (r or {}).get('batch')
if not mac or not batch:
continue
c.execute('INSERT INTO mac_batches(mac, batch, ts) VALUES(?,?,?)', (mac, batch, now))
conn.commit()
conn.close()
log('upload_mac', f"count={len(rows)}")
notify_superadmin('上传MAC与批次', f"上传了 {len(rows)} 条记录")
return jsonify({'ok': True})
@app.post('/api/upload/stats')
@require_login
@require_any_role('admin','superadmin')
def upload_stats():
data = request.get_json() or {}
good = int(data.get('good') or 0)
bad = int(data.get('bad') or 0)
fpy_good = int(data.get('fpy_good') or 0) # 直通良品数
platform = data.get('platform') or 'pdd' # 平台pdd/yt/tx
details = data.get('details') or []
if good < 0 or bad < 0 or fpy_good < 0:
return jsonify({'error': 'invalid count'}), 400
if platform not in ['pdd', 'yt', 'tx']:
platform = 'pdd'
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
# 保存统计数据
c.execute('INSERT INTO stats(good,bad,fpy_good,platform,ts) VALUES(?,?,?,?,?)', (good, bad, fpy_good, platform, now))
# 如果有不良明细保存到defects表
if details and isinstance(details, list):
for item in details:
mac = (item or {}).get('mac')
batch = (item or {}).get('batch')
if mac and batch:
c.execute('INSERT INTO defects(mac, batch, ts) VALUES(?,?,?)', (mac, batch, now))
conn.commit()
conn.close()
platform_name = {'pdd': '拼多多', 'yt': '圆通', 'tx': '兔喜'}.get(platform, platform)
log('upload_stats', json.dumps({'good': good, 'bad': bad, 'fpy_good': fpy_good, 'platform': platform, 'details_count': len(details)}))
notify_superadmin('上传良/不良统计', f"平台: {platform_name}, 良品: {good}, 不良品: {bad}, 直通良品: {fpy_good}")
return jsonify({'ok': True})
@app.post('/api/upload/repairs')
@require_login
@require_any_role('admin','superadmin')
def upload_repairs():
data = request.get_json() or {}
qty = int(data.get('qty') or 0)
note = data.get('note') or ''
if qty < 0:
return jsonify({'error': 'invalid quantity'}), 400
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
c.execute('INSERT INTO repairs(qty, note, ts) VALUES(?,?,?)', (qty, note, now))
conn.commit()
conn.close()
log('upload_repairs', json.dumps({'qty': qty, 'note': note}))
notify_superadmin('上传返修记录', f"数量: {qty}")
return jsonify({'ok': True})
@app.post('/api/upload/defects')
@require_login
@require_any_role('admin','superadmin')
def upload_defects():
data = request.get_json() or {}
rows = data.get('rows') or []
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
for r in rows:
mac = (r or {}).get('mac')
batch = (r or {}).get('batch')
if not mac or not batch:
continue
c.execute('INSERT INTO defects(mac, batch, ts) VALUES(?,?,?)', (mac, batch, now))
conn.commit()
conn.close()
log('upload_defects', f"count={len(rows)}")
notify_superadmin('上传不良明细', f"上传了 {len(rows)} 条记录")
return jsonify({'ok': True})
@app.post('/api/upload/shipments')
@require_login
@require_any_role('admin','superadmin')
def upload_shipments():
data = request.get_json() or {}
date = data.get('date')
qty = int(data.get('qty') or 0)
to = data.get('to')
platform = data.get('platform', '')
box_no = data.get('box_no', '')
if not date or qty <= 0 or not to or not platform:
return jsonify({'error': 'invalid payload'}), 400
conn = get_db()
c = conn.cursor()
# 检查shipments表是否有platform和box_no列如果没有则添加
c.execute("PRAGMA table_info(shipments)")
columns = [col[1] for col in c.fetchall()]
if 'platform' not in columns:
c.execute('ALTER TABLE shipments ADD COLUMN platform TEXT')
if 'box_no' not in columns:
c.execute('ALTER TABLE shipments ADD COLUMN box_no TEXT')
c.execute(
'INSERT INTO shipments(date, qty, receiver, platform, box_no, ts) VALUES(?,?,?,?,?,?)',
(date, qty, to, platform, box_no, get_beijing_time())
)
conn.commit()
conn.close()
platform_name = {'pdd': '拼多多', 'yt': '圆通', 'tx': '兔喜'}.get(platform, platform)
log_data = {'date': date, 'qty': qty, 'to': to, 'platform': platform}
if box_no:
log_data['box_no'] = box_no
log('upload_shipments', json.dumps(log_data))
notify_superadmin('上传发货记录', f"机种: {platform_name}, 日期: {date}, 数量: {qty}, 接收方: {to}")
return jsonify({'ok': True})
# collect
@app.get('/api/collect/devices')
@require_login
def devices():
conn = get_db()
c = conn.cursor()
c.execute('SELECT name, status FROM devices ORDER BY id DESC LIMIT 50')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.get('/api/collect/environment')
@require_login
def environment():
conn = get_db()
c = conn.cursor()
c.execute('SELECT temp, hum FROM environment ORDER BY id DESC LIMIT 1')
r = c.fetchone()
conn.close()
return jsonify({'temp': (r['temp'] if r else ''), 'hum': (r['hum'] if r else '')})
@app.get('/api/collect/personnel')
@require_login
def personnel():
conn = get_db()
c = conn.cursor()
c.execute('SELECT name, role FROM personnel ORDER BY id DESC LIMIT 100')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.post('/api/collect/personnel')
@require_login
@require_any_role('admin','superadmin')
def add_personnel():
data = request.get_json() or {}
name = (data.get('name') or '').strip()
role = (data.get('role') or '').strip()
if not name:
return jsonify({'error': 'invalid payload'}), 400
conn = get_db()
c = conn.cursor()
c.execute('INSERT INTO personnel(name, role, ts) VALUES(?,?,?)', (name, role, get_beijing_time()))
conn.commit()
conn.close()
log('add_personnel', name)
notify_superadmin('添加人员信息', f"姓名: {name}, 角色: {role}")
return jsonify({'ok': True})
@app.get('/api/collect/qa')
@require_login
def qa():
conn = get_db()
c = conn.cursor()
c.execute('SELECT title, date FROM qa ORDER BY id DESC LIMIT 100')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.get('/api/collect/production')
@require_login
def production():
conn = get_db()
c = conn.cursor()
c.execute('SELECT batch, duration FROM production ORDER BY id DESC LIMIT 100')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
# export
@app.post('/api/export/excel')
@require_login
def export_excel():
try:
import openpyxl
from openpyxl.styles import Font, Alignment, PatternFill
from io import BytesIO
data = request.get_json() or {}
data_type = data.get('type', 'stats')
# 创建工作簿
wb = openpyxl.Workbook()
ws = wb.active
# 设置标题样式
header_fill = PatternFill(start_color='4F8CFF', end_color='4F8CFF', fill_type='solid')
header_font = Font(bold=True, color='FFFFFF')
header_alignment = Alignment(horizontal='center', vertical='center')
conn = get_db()
c = conn.cursor()
# 根据类型导出不同的数据
if data_type == 'stats':
ws.title = '良不良统计'
ws.append(['直通良品数', '良品数', '不良品数', '时间'])
c.execute('SELECT fpy_good, good, bad, ts FROM stats ORDER BY id DESC')
elif data_type == 'mac':
ws.title = 'MAC与批次'
ws.append(['MAC地址', '批次号', '时间'])
c.execute('SELECT mac, batch, ts FROM mac_batches ORDER BY id DESC')
elif data_type == 'repairs':
ws.title = '返修记录'
ws.append(['返修数量', '备注', '时间'])
c.execute('SELECT qty, note, ts FROM repairs ORDER BY id DESC')
elif data_type == 'defects':
ws.title = '不良明细'
ws.append(['MAC地址', '批次号', '时间'])
c.execute('SELECT mac, batch, ts FROM defects ORDER BY id DESC')
elif data_type == 'shipments':
ws.title = '发货记录'
ws.append(['日期', '数量', '收货方', '时间'])
c.execute('SELECT date, qty, receiver, ts FROM shipments ORDER BY id DESC')
elif data_type == 'devices':
ws.title = '设备状态'
ws.append(['设备名称', '状态'])
c.execute('SELECT name, status FROM devices ORDER BY id DESC')
elif data_type == 'personnel':
ws.title = '人员信息'
ws.append(['姓名', '角色'])
c.execute('SELECT name, role FROM personnel ORDER BY id DESC')
elif data_type == 'qa':
ws.title = '质检报告'
ws.append(['标题', '日期'])
c.execute('SELECT title, date FROM qa ORDER BY id DESC')
elif data_type == 'production':
ws.title = '时间记录'
ws.append(['批次', '时长'])
c.execute('SELECT batch, duration FROM production ORDER BY id DESC')
else:
conn.close()
return jsonify({'error': 'invalid type'}), 400
# 应用标题样式
for cell in ws[1]:
cell.fill = header_fill
cell.font = header_font
cell.alignment = header_alignment
# 写入数据
rows = c.fetchall()
for row in rows:
ws.append(list(row))
conn.close()
# 自动调整列宽
for column in ws.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
ws.column_dimensions[column_letter].width = adjusted_width
# 保存到内存
output = BytesIO()
wb.save(output)
output.seek(0)
log('export_excel', data_type)
# 返回文件
from flask import send_file
filename = f'{ws.title}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.xlsx'
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=filename
)
except Exception as e:
log('export_excel_error', str(e))
return jsonify({'error': str(e)}), 500
@app.post('/api/export/pdf')
@require_login
def export_pdf():
try:
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4, landscape
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import cm
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.lib.enums import TA_CENTER
from io import BytesIO
data = request.get_json() or {}
data_type = data.get('type', 'stats')
# 注册中文字体
font_name = 'Helvetica'
try:
# 尝试常见的中文字体路径
font_paths = [
'/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc',
'/usr/share/fonts/truetype/arphic/uming.ttc',
'/System/Library/Fonts/PingFang.ttc',
'C:\\Windows\\Fonts\\simhei.ttf'
]
for font_path in font_paths:
if os.path.exists(font_path):
pdfmetrics.registerFont(TTFont('ChineseFont', font_path))
font_name = 'ChineseFont'
break
except Exception as e:
log('pdf_font_warning', f'无法加载中文字体: {str(e)}')
# 创建PDF
buffer = BytesIO()
doc = SimpleDocTemplate(
buffer,
pagesize=landscape(A4),
topMargin=1.5*cm,
bottomMargin=1.5*cm,
leftMargin=1.5*cm,
rightMargin=1.5*cm
)
elements = []
# 样式
styles = getSampleStyleSheet()
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontName=font_name,
fontSize=18,
textColor=colors.HexColor('#4F8CFF'),
spaceAfter=20,
alignment=TA_CENTER,
leading=24
)
conn = get_db()
c = conn.cursor()
# 根据类型导出不同的数据
if data_type == 'stats':
title = '良/不良统计报表'
headers = ['直通良品数', '良品数', '不良品数', '时间']
c.execute('SELECT fpy_good, good, bad, ts FROM stats ORDER BY id DESC LIMIT 200')
elif data_type == 'mac':
title = 'MAC与批次报表'
headers = ['MAC地址', '批次号', '时间']
c.execute('SELECT mac, batch, ts FROM mac_batches ORDER BY id DESC LIMIT 200')
elif data_type == 'repairs':
title = '返修记录报表'
headers = ['返修数量', '备注', '时间']
c.execute('SELECT qty, note, ts FROM repairs ORDER BY id DESC LIMIT 200')
elif data_type == 'defects':
title = '不良明细报表'
headers = ['MAC地址', '批次号', '时间']
c.execute('SELECT mac, batch, ts FROM defects ORDER BY id DESC LIMIT 200')
elif data_type == 'shipments':
title = '发货记录报表'
headers = ['日期', '数量', '收货方', '时间']
c.execute('SELECT date, qty, receiver, ts FROM shipments ORDER BY id DESC LIMIT 200')
elif data_type == 'devices':
title = '设备状态报表'
headers = ['设备名称', '状态']
c.execute('SELECT name, status FROM devices ORDER BY id DESC LIMIT 200')
elif data_type == 'personnel':
title = '人员信息报表'
headers = ['姓名', '角色']
c.execute('SELECT name, role FROM personnel ORDER BY id DESC LIMIT 200')
elif data_type == 'qa':
title = '质检报告'
headers = ['标题', '日期']
c.execute('SELECT title, date FROM qa ORDER BY id DESC LIMIT 200')
elif data_type == 'production':
title = '生产时间记录'
headers = ['批次', '时长']
c.execute('SELECT batch, duration FROM production ORDER BY id DESC LIMIT 200')
else:
conn.close()
return jsonify({'error': 'invalid type'}), 400
# 添加标题
elements.append(Paragraph(title, title_style))
elements.append(Spacer(1, 0.5*cm))
# 获取数据
rows = c.fetchall()
conn.close()
if len(rows) == 0:
# 没有数据时的提示
no_data_style = ParagraphStyle(
'NoData',
parent=styles['Normal'],
fontName=font_name,
fontSize=12,
textColor=colors.grey,
alignment=TA_CENTER
)
elements.append(Paragraph('暂无数据', no_data_style))
else:
# 构建表格数据
table_data = [headers]
for row in rows:
table_data.append([str(val) if val is not None else '' for val in row])
# 创建表格
table = Table(table_data, repeatRows=1)
table.setStyle(TableStyle([
# 表头样式
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#4F8CFF')),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('FONTNAME', (0, 0), (-1, 0), font_name),
('FONTSIZE', (0, 0), (-1, 0), 10),
('BOTTOMPADDING', (0, 0), (-1, 0), 8),
('TOPPADDING', (0, 0), (-1, 0), 8),
# 数据行样式
('BACKGROUND', (0, 1), (-1, -1), colors.white),
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
('FONTNAME', (0, 1), (-1, -1), font_name),
('FONTSIZE', (0, 1), (-1, -1), 8),
('ROWBACKGROUNDS', (0, 1), (-1, -1), [colors.white, colors.HexColor('#F5F7FA')]),
('TOPPADDING', (0, 1), (-1, -1), 5),
('BOTTOMPADDING', (0, 1), (-1, -1), 5),
]))
elements.append(table)
# 添加页脚信息
elements.append(Spacer(1, 0.5*cm))
footer_style = ParagraphStyle(
'Footer',
parent=styles['Normal'],
fontName=font_name,
fontSize=8,
textColor=colors.grey,
alignment=TA_CENTER
)
footer_text = f'导出时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | 共 {len(rows)} 条记录'
elements.append(Paragraph(footer_text, footer_style))
# 生成PDF
doc.build(elements)
buffer.seek(0)
log('export_pdf', data_type)
# 返回文件
from flask import send_file
filename = f'{title}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.pdf'
return send_file(
buffer,
mimetype='application/pdf',
as_attachment=True,
download_name=filename
)
except Exception as e:
log('export_pdf_error', str(e))
return jsonify({'error': f'PDF导出失败: {str(e)}'}), 500
# lists
@app.get('/api/list/mac')
@require_login
def list_mac():
conn = get_db()
c = conn.cursor()
c.execute('SELECT mac, batch, platform, ts FROM mac_batches ORDER BY id DESC LIMIT 200')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.get('/api/list/stats')
@require_login
def list_stats():
conn = get_db()
c = conn.cursor()
c.execute('SELECT good, bad, fpy_good, platform, ts FROM stats ORDER BY id DESC LIMIT 200')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.get('/api/list/repairs')
@require_login
def list_repairs():
conn = get_db()
c = conn.cursor()
c.execute('SELECT qty, note, ts FROM repairs ORDER BY id DESC LIMIT 200')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.get('/api/list/defects')
@require_login
def list_defects():
conn = get_db()
c = conn.cursor()
c.execute('SELECT mac, batch, ts FROM defects ORDER BY id DESC LIMIT 200')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.get('/api/list/shipments')
@require_login
def list_shipments():
conn = get_db()
c = conn.cursor()
c.execute('SELECT date, qty, receiver, ts FROM shipments ORDER BY id DESC LIMIT 200')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
# admin management
@app.get('/api/admin/users')
@require_login
@require_any_role('superadmin')
def list_users():
conn = get_db()
c = conn.cursor()
c.execute('SELECT username, role, factory FROM users ORDER BY id ASC')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.post('/api/admin/reset-password')
@require_login
@require_any_role('superadmin')
def reset_password():
data = request.get_json() or {}
username = data.get('username')
new_password = data.get('new_password')
if not username or not new_password:
return jsonify({'error': 'invalid payload'}), 400
conn = get_db()
c = conn.cursor()
c.execute('SELECT id FROM users WHERE username=?', (username,))
row = c.fetchone()
if not row:
conn.close()
return jsonify({'error': 'user not found'}), 404
c.execute('UPDATE users SET password_hash=? WHERE id=?', (generate_password_hash(new_password), row['id']))
conn.commit()
conn.close()
log('reset_password', username)
return jsonify({'ok': True})
@app.post('/api/admin/change-password')
@require_login
@require_any_role('superadmin')
def change_password():
data = request.get_json() or {}
username = data.get('username')
new_password = data.get('new_password')
if not username or not new_password:
return jsonify({'error': 'invalid payload'}), 400
conn = get_db()
c = conn.cursor()
c.execute('SELECT id FROM users WHERE username=?', (username,))
row = c.fetchone()
if not row:
conn.close()
return jsonify({'error': 'user not found'}), 404
c.execute('UPDATE users SET password_hash=? WHERE id=?', (generate_password_hash(new_password), row['id']))
conn.commit()
conn.close()
log('change_password', username)
return jsonify({'ok': True})
@app.post('/api/admin/update-user-factory')
@require_login
@require_any_role('superadmin')
def update_user_factory():
"""更新用户所属工厂"""
data = request.get_json() or {}
username = data.get('username')
factory = (data.get('factory') or '').strip()
if not username:
return jsonify({'error': '用户名不能为空'}), 400
if not factory:
return jsonify({'error': '工厂名称不能为空'}), 400
conn = get_db()
c = conn.cursor()
c.execute('SELECT id FROM users WHERE username=?', (username,))
row = c.fetchone()
if not row:
conn.close()
return jsonify({'error': '用户不存在'}), 404
c.execute('UPDATE users SET factory=? WHERE id=?', (factory, row['id']))
conn.commit()
conn.close()
log('update_user_factory', f'username={username}, factory={factory}')
return jsonify({'ok': True, 'message': f'已更新用户 {username} 的所属工厂为 {factory}'})
@app.post('/api/admin/add-user')
@require_login
@require_any_role('superadmin')
def add_user():
"""添加新用户"""
data = request.get_json() or {}
username = (data.get('username') or '').strip()
password = data.get('password')
role = (data.get('role') or 'admin').strip()
factory = (data.get('factory') or '').strip()
if not username or not password:
return jsonify({'error': '用户名和密码不能为空'}), 400
if not factory:
return jsonify({'error': '所属工厂不能为空'}), 400
if role not in ['admin', 'superadmin']:
return jsonify({'error': '角色必须是 admin 或 superadmin'}), 400
conn = get_db()
c = conn.cursor()
# 检查用户名是否已存在
c.execute('SELECT id FROM users WHERE username=?', (username,))
if c.fetchone():
conn.close()
return jsonify({'error': '用户名已存在'}), 400
# 创建新用户
try:
c.execute('INSERT INTO users(username, password_hash, role, factory) VALUES(?,?,?,?)',
(username, generate_password_hash(password), role, factory))
conn.commit()
conn.close()
log('add_user', f'username={username}, role={role}, factory={factory}')
return jsonify({'ok': True, 'message': f'用户 {username} 创建成功'})
except Exception as e:
conn.close()
return jsonify({'error': f'创建用户失败:{str(e)}'}), 500
@app.post('/api/admin/delete-user')
@require_login
@require_any_role('superadmin')
def delete_user():
"""删除用户"""
data = request.get_json() or {}
username = (data.get('username') or '').strip()
if not username:
return jsonify({'error': '用户名不能为空'}), 400
# 获取当前登录用户
current_user = session.get('user')
if current_user and current_user.get('username') == username:
return jsonify({'error': '不能删除当前登录的用户'}), 400
conn = get_db()
c = conn.cursor()
# 检查用户是否存在
c.execute('SELECT id, role FROM users WHERE username=?', (username,))
user = c.fetchone()
if not user:
conn.close()
return jsonify({'error': '用户不存在'}), 404
# 检查是否是最后一个超级管理员
if user['role'] == 'superadmin':
c.execute('SELECT COUNT(*) as cnt FROM users WHERE role=?', ('superadmin',))
count = c.fetchone()['cnt']
if count <= 1:
conn.close()
return jsonify({'error': '不能删除最后一个超级管理员'}), 400
# 删除用户
try:
c.execute('DELETE FROM users WHERE username=?', (username,))
conn.commit()
conn.close()
log('delete_user', f'username={username}')
return jsonify({'ok': True, 'message': f'用户 {username} 已删除'})
except Exception as e:
conn.close()
return jsonify({'error': f'删除用户失败:{str(e)}'}), 500
@app.post('/api/admin/clear')
@require_login
@require_any_role('superadmin')
def clear_module():
data = request.get_json() or {}
module = data.get('module')
tables = {
'mac': 'mac_batches',
'stats': 'stats',
'defects': 'defects',
'shipments': 'shipments',
'devices': 'devices',
'environment': 'environment',
'personnel': 'personnel',
'qa': 'qa',
'production': 'production'
}
table = tables.get(module)
if not table:
return jsonify({'error': 'invalid module'}), 400
# 清空 SQLite 表
conn = get_db()
c = conn.cursor()
c.execute(f'DELETE FROM {table}')
conn.commit()
conn.close()
# 如果是清空发货记录,同时清空 Redis
if module == 'shipments':
try:
r = get_redis()
redis_key = 'shipment_sn_mapping'
redis_count = r.hlen(redis_key)
r.delete(redis_key)
log('clear_module_redis', f'shipments: cleared {redis_count} records from redis')
except Exception as e:
log('clear_module_redis_error', str(e))
log('clear_module', module)
return jsonify({'ok': True})
# notifications
@app.get('/api/notifications')
@require_login
@require_any_role('superadmin', 'admin')
def get_notifications():
"""获取当前用户的通知列表"""
user_id = session.get('user_id')
conn = get_db()
c = conn.cursor()
c.execute('SELECT id, username, action, detail, ts, read FROM notifications WHERE user_id=? ORDER BY id DESC LIMIT 100', (user_id,))
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.get('/api/notifications/unread-count')
@require_login
@require_any_role('superadmin', 'admin')
def get_unread_count():
"""获取未读通知数量"""
user_id = session.get('user_id')
conn = get_db()
c = conn.cursor()
c.execute('SELECT COUNT(*) as count FROM notifications WHERE user_id=? AND read=0', (user_id,))
row = c.fetchone()
conn.close()
return jsonify({'count': row['count'] if row else 0})
@app.post('/api/notifications/mark-read')
@require_login
@require_any_role('superadmin', 'admin')
def mark_notification_read():
"""标记通知为已读"""
data = request.get_json() or {}
notification_id = data.get('id')
if not notification_id:
return jsonify({'error': 'invalid id'}), 400
user_id = session.get('user_id')
conn = get_db()
c = conn.cursor()
c.execute('UPDATE notifications SET read=1 WHERE id=? AND user_id=?', (notification_id, user_id))
conn.commit()
conn.close()
return jsonify({'ok': True})
@app.post('/api/notifications/mark-all-read')
@require_login
@require_any_role('superadmin', 'admin')
def mark_all_notifications_read():
"""标记所有通知为已读"""
user_id = session.get('user_id')
conn = get_db()
c = conn.cursor()
c.execute('UPDATE notifications SET read=1 WHERE user_id=?', (user_id,))
conn.commit()
conn.close()
return jsonify({'ok': True})
@app.post('/api/notifications/delete-read')
@require_login
@require_any_role('superadmin', 'admin')
def delete_read_notifications():
"""删除所有已读通知"""
user_id = session.get('user_id')
conn = get_db()
c = conn.cursor()
c.execute('DELETE FROM notifications WHERE user_id=? AND read=1', (user_id,))
deleted_count = c.rowcount
conn.commit()
conn.close()
return jsonify({'ok': True, 'count': deleted_count})
@app.errorhandler(404)
def not_found(_):
return jsonify({'error': 'not found'}), 404
@app.route('/api/validate/mac-file', methods=['POST'])
@require_login
@require_any_role('admin','superadmin')
def validate_mac_file():
"""验证Excel文件格式是否符合要求"""
f = request.files.get('file')
if not f:
return jsonify({'error': 'no file'}), 400
name = secure_filename(f.filename or '')
ext = (name.split('.')[-1] or '').lower()
if ext not in ['csv', 'xlsx', 'xls']:
return jsonify({'valid': False, 'message': '文件格式不支持请上传CSV或Excel文件'}), 200
try:
if ext == 'csv':
text = f.stream.read().decode('utf-8', errors='ignore')
lines = [l.strip() for l in text.splitlines() if l.strip()]
if not lines:
return jsonify({'valid': False, 'message': '文件为空,没有数据'}), 200
# 检查第一行(表头)
header = [h.strip() for h in lines[0].split(',')]
if len(header) != 2:
return jsonify({'valid': False, 'message': f'文件应该只包含2列数据当前有{len(header)}'}), 200
# 记录表头用于调试
log('validate_mac_file_csv', f'headers: {header}')
# 更灵活的表头检查(不区分大小写)
header_lower = [h.lower() for h in header]
has_mac = any('mac' in h and 'sn' not in h for h in header_lower)
has_sn_mac = any('sn_mac' in h or 'sn-mac' in h for h in header_lower)
has_batch = any('批次' in h or 'batch' in h for h in header_lower)
if not (has_mac or has_sn_mac):
return jsonify({'valid': False, 'message': f'缺少必需的列MAC 或 SN_MAC当前列{", ".join(header)}'}), 200
if not has_batch:
return jsonify({'valid': False, 'message': f'缺少必需的列:批次号(当前列:{", ".join(header)}'}), 200
data_rows = len(lines) - 1
mac_col = 'MAC' if has_mac else 'SN_MAC'
return jsonify({'valid': True, 'message': f'文件格式正确,包含列:{mac_col} 和 批次号,共{data_rows}行数据'}), 200
else:
import openpyxl
wb = openpyxl.load_workbook(f)
ws = wb.active
if ws.max_row < 2:
wb.close()
return jsonify({'valid': False, 'message': '文件为空,没有数据'}), 200
if ws.max_column != 2:
wb.close()
return jsonify({'valid': False, 'message': f'文件应该只包含2列数据当前有{ws.max_column}'}), 200
# 检查表头
header_row = list(ws.iter_rows(min_row=1, max_row=1, values_only=True))[0]
header = [str(h).strip() if h else '' for h in header_row]
# 记录表头用于调试
log('validate_mac_file', f'headers: {header}')
# 更灵活的表头检查(不区分大小写)
header_lower = [h.lower() for h in header]
has_mac = any('mac' in h and 'sn' not in h for h in header_lower)
has_sn_mac = any('sn_mac' in h or 'sn-mac' in h for h in header_lower)
has_batch = any('批次' in h or 'batch' in h for h in header_lower)
if not (has_mac or has_sn_mac):
wb.close()
return jsonify({'valid': False, 'message': f'缺少必需的列MAC 或 SN_MAC当前列{", ".join(header)}'}), 200
if not has_batch:
wb.close()
return jsonify({'valid': False, 'message': f'缺少必需的列:批次号(当前列:{", ".join(header)}'}), 200
data_rows = ws.max_row - 1
mac_col = 'MAC' if has_mac else 'SN_MAC'
wb.close()
return jsonify({'valid': True, 'message': f'文件格式正确,包含列:{mac_col} 和 批次号,共{data_rows}行数据'}), 200
except Exception as e:
return jsonify({'valid': False, 'message': f'读取文件失败:{str(e)}'}), 200
@app.post('/api/upload/mac-file')
@require_login
@require_any_role('admin','superadmin')
def upload_mac_file():
import subprocess
import tempfile
f = request.files.get('file')
upload_type = request.form.get('type', 'pdd') # pdd, yt, or tx
if not f:
return jsonify({'error': 'no file'}), 400
if upload_type not in ['pdd', 'yt', 'tx']:
return jsonify({'error': 'invalid type'}), 400
# 保存上传的文件到临时位置
name = secure_filename(f.filename or 'upload.xlsx')
temp_dir = '/home/hyx/work/batch_import_xlsx'
os.makedirs(temp_dir, exist_ok=True)
# 根据类型确定文件名
if upload_type == 'yt':
temp_path = os.path.join(temp_dir, 'sn_test_yt.xlsx')
elif upload_type == 'pdd':
temp_path = os.path.join(temp_dir, 'sn_test_pdd.xlsx')
else:
temp_path = os.path.join(temp_dir, 'sn_test_tx.xlsx')
f.save(temp_path)
# 调用batch_import.py脚本
script_path = '/home/hyx/work/生产管理系统/batch_import.py'
python_path = '/home/hyx/work/.venv/bin/python'
try:
result = subprocess.run(
[python_path, script_path, upload_type],
capture_output=True,
text=True,
timeout=300 # 5分钟超时
)
output = result.stdout + result.stderr
success = result.returncode == 0
# 解析输出中的成功导入数据同时保存到SQLite数据库
if success:
import re
json_match = re.search(r'=== 成功导入的数据 ===\n([\s\S]*?)\n=== 数据输出结束 ===', output)
if json_match:
try:
import json as json_lib
records = json_lib.loads(json_match.group(1).strip())
if records and isinstance(records, list):
# 保存到SQLite数据库使用北京时间UTC+8
conn = get_db()
c = conn.cursor()
from datetime import timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz).isoformat()
for record in records:
mac = record.get('mac')
batch = record.get('batch')
if mac and batch:
c.execute('INSERT INTO mac_batches(mac, batch, platform, ts) VALUES(?,?,?,?)', (mac, batch, upload_type, now))
conn.commit()
conn.close()
log('upload_mac_file_db', f"saved {len(records)} records to database")
except Exception as e:
log('upload_mac_file_db_error', str(e))
log('upload_mac_file', f"type={upload_type}, success={success}")
if success:
notify_superadmin('批量上传MAC文件', f"类型: {upload_type}")
return jsonify({
'ok': success,
'output': output,
'returncode': result.returncode
})
except subprocess.TimeoutExpired:
return jsonify({'error': '上传超时', 'output': '处理时间超过5分钟'}), 500
except Exception as e:
return jsonify({'error': str(e), 'output': ''}), 500
@app.post('/api/upload/defects-file')
@require_login
@require_any_role('admin','superadmin')
def upload_defects_file():
f = request.files.get('file')
if not f:
return jsonify({'error': 'no file'}), 400
name = secure_filename(f.filename or '')
ext = (name.split('.')[-1] or '').lower()
rows = []
if ext == 'csv':
text = f.stream.read().decode('utf-8', errors='ignore')
for l in text.splitlines():
parts = [p.strip() for p in l.split(',')]
if len(parts) >= 2:
rows.append({'mac': parts[0], 'batch': parts[1]})
else:
try:
import openpyxl
wb = openpyxl.load_workbook(f)
ws = wb.active
for r in ws.iter_rows(values_only=True):
mac = str(r[0]).strip() if r and r[0] else None
batch = str(r[1]).strip() if r and len(r) > 1 and r[1] else None
if mac and batch:
rows.append({'mac': mac, 'batch': batch})
except Exception:
return jsonify({'error': 'parse error'}), 400
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
for r in rows:
c.execute('INSERT INTO defects(mac, batch, ts) VALUES(?,?,?)', (r['mac'], r['batch'], now))
conn.commit()
conn.close()
log('upload_defects_file', f"count={len(rows)}")
notify_superadmin('批量上传不良明细文件', f"上传了 {len(rows)} 条记录")
return jsonify({'ok': True, 'count': len(rows)})
@app.route('/api/validate/shipments-file', methods=['POST'])
@require_login
@require_any_role('admin','superadmin')
def validate_shipments_file():
"""验证发货记录Excel文件格式"""
f = request.files.get('file')
if not f:
return jsonify({'error': 'no file'}), 400
name = secure_filename(f.filename or '')
ext = (name.split('.')[-1] or '').lower()
if ext not in ['csv', 'xlsx', 'xls']:
return jsonify({'valid': False, 'message': '文件格式不支持请上传CSV或Excel文件'}), 200
try:
if ext == 'csv':
text = f.stream.read().decode('utf-8', errors='ignore')
lines = [l.strip() for l in text.splitlines() if l.strip()]
if not lines:
return jsonify({'valid': False, 'message': '文件为空,没有数据'}), 200
header = [h.strip() for h in lines[0].split(',')]
header_lower = [h.lower() for h in header]
# 检查必需的列
has_date = any('出货日期' in h or '发货日期' in h or 'date' in hl for h, hl in zip(header, header_lower))
has_box = any('箱号' in h or 'box' in hl for h, hl in zip(header, header_lower))
if not has_date:
return jsonify({'valid': False, 'message': '缺少必需的列:出货日期'}), 200
if not has_box:
return jsonify({'valid': False, 'message': '缺少必需的列:箱号'}), 200
# 检查SN列SN1-SN20
sn_cols = [h for h in header if h.startswith('SN') and h[2:].isdigit()]
if not sn_cols:
return jsonify({'valid': False, 'message': '缺少SN列SN1, SN2, ... SN20'}), 200
data_rows = len(lines) - 1
return jsonify({'valid': True, 'message': f'文件格式正确,包含{len(sn_cols)}个SN列{data_rows}行数据'}), 200
else:
import openpyxl
wb = openpyxl.load_workbook(f)
ws = wb.active
if ws.max_row < 2:
wb.close()
return jsonify({'valid': False, 'message': '文件为空,没有数据'}), 200
# 检查表头
header_row = list(ws.iter_rows(min_row=1, max_row=1, values_only=True))[0]
header = [str(h).strip() if h else '' for h in header_row]
header_lower = [h.lower() for h in header]
# 检查必需的列
has_date = any('出货日期' in h or '发货日期' in h or 'date' in hl for h, hl in zip(header, header_lower))
has_box = any('箱号' in h or 'box' in hl for h, hl in zip(header, header_lower))
if not has_date:
wb.close()
return jsonify({'valid': False, 'message': '缺少必需的列:出货日期'}), 200
if not has_box:
wb.close()
return jsonify({'valid': False, 'message': '缺少必需的列:箱号'}), 200
# 检查SN列
sn_cols = [h for h in header if h.startswith('SN') and h[2:].isdigit()]
if not sn_cols:
wb.close()
return jsonify({'valid': False, 'message': '缺少SN列SN1, SN2, ... SN20'}), 200
data_rows = ws.max_row - 1
wb.close()
return jsonify({'valid': True, 'message': f'文件格式正确,包含{len(sn_cols)}个SN列{data_rows}行数据'}), 200
except Exception as e:
return jsonify({'valid': False, 'message': f'读取文件失败:{str(e)}'}), 200
@app.get('/api/shipments/query-by-sn')
@require_login
def query_shipment_by_sn():
"""通过 SN/MAC 号查询出货信息"""
sn = request.args.get('sn', '').strip()
if not sn:
return jsonify({'error': '请提供 SN/MAC 号'}), 400
try:
r = get_redis()
redis_key = 'shipment_sn_mapping'
# 从 Redis Hash 中查询
result = r.hget(redis_key, sn)
if result:
# 解析 JSON 数据
shipment_info = json.loads(result)
platform = shipment_info.get('platform', 'pdd') # 默认拼多多
platform_name = {'pdd': '拼多多', 'yt': '圆通', 'tx': '兔喜'}.get(platform, platform)
return jsonify({
'found': True,
'sn': sn,
'date': shipment_info.get('date'),
'box': shipment_info.get('box'),
'platform': platform,
'platform_name': platform_name,
'ts': shipment_info.get('ts')
})
else:
return jsonify({
'found': False,
'sn': sn,
'message': '未找到该 SN 的出货记录'
})
except Exception as e:
log('query_shipment_error', str(e))
return jsonify({'error': f'查询失败:{str(e)}'}), 500
@app.get('/api/shipments/query-by-box')
@require_login
def query_shipment_by_box():
"""通过箱号查询出货信息"""
box_no = request.args.get('box', '').strip()
if not box_no:
return jsonify({'error': '请提供箱号'}), 400
try:
r = get_redis()
redis_key = 'shipment_sn_mapping'
# 获取所有记录
all_records = r.hgetall(redis_key)
# 筛选出匹配箱号的记录
matched_records = []
for sn, data in all_records.items():
try:
shipment_info = json.loads(data)
if shipment_info.get('box') == box_no:
platform = shipment_info.get('platform', 'pdd')
platform_name = {'pdd': '拼多多', 'yt': '圆通', 'tx': '兔喜'}.get(platform, platform)
matched_records.append({
'sn': sn.decode('utf-8') if isinstance(sn, bytes) else sn,
'date': shipment_info.get('date'),
'box': shipment_info.get('box'),
'platform': platform,
'platform_name': platform_name,
'ts': shipment_info.get('ts')
})
except:
continue
if matched_records:
return jsonify({
'found': True,
'box': box_no,
'count': len(matched_records),
'records': matched_records
})
else:
return jsonify({
'found': False,
'box': box_no,
'message': '未找到该箱号的出货记录'
})
except Exception as e:
log('query_shipment_by_box_error', str(e))
return jsonify({'error': f'查询失败:{str(e)}'}), 500
@app.post('/api/shipments/update-platform')
@require_login
@require_any_role('superadmin')
def update_shipments_platform():
"""批量更新Redis中发货记录的机种字段"""
try:
r = get_redis()
redis_key = 'shipment_sn_mapping'
# 获取所有记录
all_data = r.hgetall(redis_key)
updated_count = 0
pipe = r.pipeline()
for sn, value in all_data.items():
try:
info = json.loads(value)
# 如果没有platform字段添加为pdd
if 'platform' not in info:
info['platform'] = 'pdd'
pipe.hset(redis_key, sn, json.dumps(info, ensure_ascii=False))
updated_count += 1
except Exception:
continue
pipe.execute()
log('update_shipments_platform', f'updated {updated_count} records')
return jsonify({
'ok': True,
'message': f'已更新 {updated_count} 条记录为拼多多',
'updated': updated_count
})
except Exception as e:
log('update_shipments_platform_error', str(e))
return jsonify({'error': f'更新失败: {str(e)}'}), 500
@app.get('/api/shipments/redis-stats')
@require_login
def shipments_redis_stats():
"""获取 Redis 中发货记录的统计信息"""
try:
r = get_redis()
redis_key = 'shipment_sn_mapping'
count = r.hlen(redis_key)
return jsonify({
'key': redis_key,
'count': count,
'exists': r.exists(redis_key) > 0
})
except Exception as e:
log('shipments_redis_stats_error', str(e))
return jsonify({'error': f'获取统计失败:{str(e)}'}), 500
@app.get('/api/shipments/summary')
@require_login
def shipments_summary():
"""查询发货记录汇总信息(按日期范围)"""
try:
start_date = request.args.get('start')
end_date = request.args.get('end')
if not start_date or not end_date:
return jsonify({'error': '请提供开始和结束日期'}), 400
conn = get_db()
c = conn.cursor()
# 查询指定日期范围内的发货记录
c.execute('''
SELECT date, qty, receiver, ts
FROM shipments
WHERE date >= ? AND date <= ?
ORDER BY date DESC
''', (start_date, end_date))
rows = [dict(r) for r in c.fetchall()]
conn.close()
log('shipments_summary', f'start={start_date}, end={end_date}, count={len(rows)}')
return jsonify({
'ok': True,
'records': rows,
'count': len(rows)
})
except Exception as e:
log('shipments_summary_error', str(e))
return jsonify({'error': f'查询失败:{str(e)}'}), 500
@app.post('/api/shipments/clear-redis')
@require_login
@require_any_role('admin','superadmin')
def clear_shipments_redis():
"""清空 Redis 和 SQLite 中的发货记录数据"""
try:
# 清空 Redis
r = get_redis()
redis_key = 'shipment_sn_mapping'
redis_count = r.hlen(redis_key)
r.delete(redis_key)
# 同时清空 SQLite 中的 shipments 表
conn = get_db()
c = conn.cursor()
c.execute('SELECT COUNT(*) as cnt FROM shipments')
sqlite_count = c.fetchone()['cnt']
c.execute('DELETE FROM shipments')
conn.commit()
conn.close()
log('clear_shipments_all', f'cleared redis={redis_count}, sqlite={sqlite_count}')
return jsonify({
'ok': True,
'message': f'已清空 Redis {redis_count} 条记录和 SQLite {sqlite_count} 条记录',
'redis_count': redis_count,
'sqlite_count': sqlite_count
})
except Exception as e:
log('clear_shipments_redis_error', str(e))
return jsonify({'error': f'清空失败:{str(e)}'}), 500
@app.route('/api/upload/shipments-file', methods=['POST'])
@require_login
@require_any_role('admin','superadmin')
def upload_shipments_file():
"""上传发货记录Excel文件"""
f = request.files.get('file')
platform = request.form.get('platform') # 获取机种参数
if not f:
return jsonify({'error': '请选择文件'}), 400
if not platform or platform not in ['pdd', 'yt', 'tx']:
return jsonify({'error': '请选择机种(拼多多/圆通/兔喜)'}), 400
name = secure_filename(f.filename or '')
ext = (name.split('.')[-1] or '').lower()
if ext not in ['csv', 'xlsx', 'xls']:
return jsonify({'error': '文件格式不支持'}), 400
try:
rows = []
if ext == 'csv':
text = f.stream.read().decode('utf-8', errors='ignore')
lines = [l.strip() for l in text.splitlines() if l.strip()]
if len(lines) < 2:
return jsonify({'error': '文件为空'}), 400
header = [h.strip() for h in lines[0].split(',')]
header_lower = [h.lower() for h in header]
# 找到列索引
date_idx = next((i for i, h in enumerate(header) if '出货日期' in h or '发货日期' in h or 'date' in header_lower[i]), None)
box_idx = next((i for i, h in enumerate(header) if '箱号' in h or 'box' in header_lower[i]), None)
if date_idx is None or box_idx is None:
return jsonify({'error': '缺少必需的列'}), 400
# 找到所有SN列按数字排序
sn_cols = [(i, h) for i, h in enumerate(header) if h.startswith('SN') and h[2:].isdigit()]
sn_indices = sorted(sn_cols, key=lambda x: int(x[1][2:])) # 按 SN 后面的数字排序
# 记录上一个有效的日期(用于处理合并单元格)
last_valid_date = None
for line in lines[1:]:
parts = [p.strip() for p in line.split(',')]
if len(parts) <= max(date_idx, box_idx):
continue
# 处理日期(合并单元格时可能为空)
current_date = parts[date_idx] if date_idx < len(parts) and parts[date_idx] else ''
# 如果当前行日期为空,使用上一个有效日期
if current_date:
last_valid_date = current_date
date = current_date
else:
date = last_valid_date
# 处理箱号
box = parts[box_idx] if box_idx < len(parts) and parts[box_idx] else ''
# 如果没有日期或箱号,跳过这行
if not date or not box:
continue
# 收集所有SN横向 20 个)
sns = []
for idx, _ in sn_indices:
if idx < len(parts) and parts[idx]:
sns.append(parts[idx])
# 只有当有 SN 数据时才添加记录
if sns:
rows.append({
'date': date,
'box': box,
'sns': sns,
'qty': len(sns)
})
else:
import openpyxl
wb = openpyxl.load_workbook(f)
ws = wb.active
if ws.max_row < 2:
wb.close()
return jsonify({'error': '文件为空'}), 400
# 读取表头
header_row = list(ws.iter_rows(min_row=1, max_row=1, values_only=True))[0]
header = [str(h).strip() if h else '' for h in header_row]
header_lower = [h.lower() for h in header]
# 找到列索引
date_idx = next((i for i, h in enumerate(header) if '出货日期' in h or '发货日期' in h or 'date' in header_lower[i]), None)
box_idx = next((i for i, h in enumerate(header) if '箱号' in h or 'box' in header_lower[i]), None)
if date_idx is None or box_idx is None:
wb.close()
return jsonify({'error': '缺少必需的列'}), 400
# 找到所有SN列按数字排序
sn_cols = [(i, h) for i, h in enumerate(header) if h.startswith('SN') and h[2:].isdigit()]
sn_indices = sorted(sn_cols, key=lambda x: int(x[1][2:])) # 按 SN 后面的数字排序
# 记录上一个有效的日期(用于处理合并单元格)
last_valid_date = None
# 读取数据行
for row in ws.iter_rows(min_row=2, values_only=True):
# 处理日期(合并单元格时可能为 None
date_value = row[date_idx] if date_idx < len(row) else None
current_date = None
if date_value:
# 如果是 datetime.datetime 或 datetime.date 对象
if hasattr(date_value, 'strftime'):
current_date = date_value.strftime('%Y-%m-%d')
# 如果是数字Excel 日期序列号)
elif isinstance(date_value, (int, float)):
try:
# Excel 日期从 1900-01-01 开始计数
from datetime import datetime as dt, timedelta
# Excel 的 bug1900 不是闰年但 Excel 认为是所以需要减1
excel_epoch = dt(1899, 12, 30)
date_obj = excel_epoch + timedelta(days=float(date_value))
current_date = date_obj.strftime('%Y-%m-%d')
except Exception:
current_date = str(date_value).strip()
else:
current_date = str(date_value).strip()
if current_date == 'None':
current_date = None
# 如果当前行日期为空,使用上一个有效日期
if current_date:
last_valid_date = current_date
date = current_date
else:
date = last_valid_date
# 处理箱号
box = str(row[box_idx]).strip() if box_idx < len(row) and row[box_idx] else ''
# 如果没有日期或箱号,跳过这行
if not date or not box or box == 'None':
continue
# 收集所有SN横向 20 个)
sns = []
for idx, _ in sn_indices:
if idx < len(row) and row[idx]:
sn_value = str(row[idx]).strip()
if sn_value and sn_value != 'None':
sns.append(sn_value)
# 只有当有 SN 数据时才添加记录
if sns:
rows.append({
'date': date,
'box': box,
'sns': sns,
'qty': len(sns)
})
wb.close()
# 保存到 SQLite 数据库
conn = get_db()
c = conn.cursor()
# 使用北京时间UTC+8
from datetime import timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz).strftime('%Y-%m-%d %H:%M:%S')
total_qty = 0
for r in rows:
receiver_info = f"箱号:{r['box']}"
c.execute('INSERT INTO shipments(date, qty, receiver, ts) VALUES(?,?,?,?)',
(r['date'], r['qty'], receiver_info, now))
total_qty += r['qty']
conn.commit()
conn.close()
# 保存到 Redis - 使用 Hash 结构存储 MAC -> 出货信息的映射
try:
r = get_redis()
redis_key = 'shipment_sn_mapping' # Redis Hash key
# 使用 pipeline 批量写入,提高性能
pipe = r.pipeline()
redis_count = 0
for row_data in rows:
date = row_data['date']
box = row_data['box']
sns = row_data['sns']
# 为每个 SN/MAC 创建映射记录
for sn in sns:
if sn: # 确保 SN 不为空
# 存储格式: MAC -> JSON(date, box, platform, timestamp)
shipment_info = json.dumps({
'date': date,
'box': box,
'platform': platform,
'ts': now
}, ensure_ascii=False)
pipe.hset(redis_key, sn, shipment_info)
redis_count += 1
# 执行批量写入
pipe.execute()
log('upload_shipments_redis', f"redis_key={redis_key}, sn_count={redis_count}")
except Exception as redis_error:
# Redis 写入失败不影响主流程,只记录日志
log('upload_shipments_redis_error', str(redis_error))
log('upload_shipments_file', f"boxes={len(rows)}, total_qty={total_qty}")
notify_superadmin('批量上传发货记录文件', f"箱数: {len(rows)}, 总数量: {total_qty}")
return jsonify({'ok': True, 'count': len(rows), 'total_qty': total_qty})
except Exception as e:
log('upload_shipments_file_error', str(e))
return jsonify({'error': f'处理文件失败:{str(e)}'}), 500
# SOP 文件管理
SOP_DIR = os.path.join(FRONTEND_DIR, 'sop_files')
os.makedirs(SOP_DIR, exist_ok=True)
@app.get('/api/sop/list')
@require_login
def list_sop_files():
"""获取所有 SOP 文件列表"""
conn = get_db()
c = conn.cursor()
c.execute('SELECT id, filename, original_name, description, uploader, ts FROM sop_files ORDER BY id DESC')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.post('/api/sop/upload')
@require_login
@require_any_role('admin','superadmin')
def upload_sop_file():
"""上传 SOP 文件"""
f = request.files.get('file')
description = request.form.get('description', '').strip()
if not f:
return jsonify({'error': '请选择文件'}), 400
# 保留原始文件名(包含中文)
original_name = f.filename or 'sop.xlsx'
# 获取扩展名
if '.' in original_name:
ext = original_name.rsplit('.', 1)[1].lower()
else:
ext = ''
# 验证文件类型Excel 和 Word 文件)
if ext not in ['xlsx', 'xls', 'csv', 'doc', 'docx']:
return jsonify({'error': '只支持 Excel 和 Word 文件格式(.xlsx, .xls, .csv, .doc, .docx'}), 400
# 生成唯一文件名(用于存储)
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
# 存储文件名使用安全的名称
safe_name = secure_filename(original_name)
if not safe_name or safe_name == ext:
# 如果 secure_filename 返回空或只有扩展名,使用时间戳
safe_name = f'file.{ext}'
filename = f'sop_{timestamp}_{safe_name}'
filepath = os.path.join(SOP_DIR, filename)
# 保存文件
f.save(filepath)
# 保存到数据库
conn = get_db()
c = conn.cursor()
uploader = session.get('user_id')
# 获取上传者用户名
c.execute('SELECT username FROM users WHERE id=?', (uploader,))
user = c.fetchone()
uploader_name = user['username'] if user else '未知'
c.execute('INSERT INTO sop_files(filename, original_name, description, uploader, ts) VALUES(?,?,?,?,?)',
(filename, original_name, description, uploader_name, get_beijing_time()))
conn.commit()
conn.close()
log('upload_sop', f'filename={original_name}, description={description}')
notify_superadmin('上传 SOP 文件', f'文件名: {original_name}')
return jsonify({'ok': True, 'message': 'SOP 文件上传成功'})
@app.get('/api/sop/download/<int:file_id>')
@require_login
def download_sop_file(file_id):
"""下载 SOP 文件"""
conn = get_db()
c = conn.cursor()
c.execute('SELECT filename, original_name FROM sop_files WHERE id=?', (file_id,))
row = c.fetchone()
conn.close()
if not row:
return jsonify({'error': '文件不存在'}), 404
filepath = os.path.join(SOP_DIR, row['filename'])
if not os.path.exists(filepath):
return jsonify({'error': '文件已被删除'}), 404
log('download_sop', f'file_id={file_id}, filename={row["original_name"]}')
from flask import send_file
return send_file(
filepath,
as_attachment=True,
download_name=row['original_name']
)
@app.post('/api/sop/delete/<int:file_id>')
@require_login
@require_any_role('admin','superadmin')
def delete_sop_file(file_id):
"""删除 SOP 文件"""
conn = get_db()
c = conn.cursor()
c.execute('SELECT filename, original_name FROM sop_files WHERE id=?', (file_id,))
row = c.fetchone()
if not row:
conn.close()
return jsonify({'error': '文件不存在'}), 404
# 删除物理文件
filepath = os.path.join(SOP_DIR, row['filename'])
if os.path.exists(filepath):
try:
os.remove(filepath)
except Exception as e:
log('delete_sop_file_error', str(e))
# 删除数据库记录
c.execute('DELETE FROM sop_files WHERE id=?', (file_id,))
conn.commit()
conn.close()
log('delete_sop', f'file_id={file_id}, filename={row["original_name"]}')
notify_superadmin('删除 SOP 文件', f'文件名: {row["original_name"]}')
return jsonify({'ok': True, 'message': 'SOP 文件已删除'})
# ==================== 工单管理 API ====================
@app.get('/api/work-orders')
@require_login
def get_work_orders():
"""获取工单列表"""
factory = request.args.get('factory', '')
order_no = request.args.get('order', '')
date = request.args.get('date', '')
# 获取当前用户信息
user_id = session.get('user_id')
user_role = session.get('role')
conn = get_db()
c = conn.cursor()
# 获取用户所属工厂
c.execute('SELECT factory FROM users WHERE id=?', (user_id,))
user_row = c.fetchone()
user_factory = user_row['factory'] if user_row and user_row['factory'] else None
query = 'SELECT * FROM work_orders WHERE 1=1'
params = []
# 如果是管理员(非超级管理员),只能看到自己工厂的订单
if user_role == 'admin' and user_factory:
query += ' AND factory = ?'
params.append(user_factory)
if factory:
query += ' AND factory LIKE ?'
params.append(f'%{factory}%')
if order_no:
query += ' AND order_no LIKE ?'
params.append(f'%{order_no}%')
if date:
query += ' AND (production_start_time LIKE ? OR production_end_time LIKE ?)'
params.append(f'{date}%')
params.append(f'{date}%')
query += ' ORDER BY created_at DESC'
c.execute(query, params)
rows = c.fetchall()
conn.close()
orders = []
for row in rows:
orders.append({
'id': str(row['id']),
'factory': row['factory'],
'orderNo': row['order_no'],
'productModel': row['product_model'] if 'product_model' in row.keys() else '',
'orderQty': row['order_qty'],
'productionStartTime': row['production_start_time'],
'productionEndTime': row['production_end_time'],
'status': row['status'],
'statusText': row['status_text'],
'remark': row['remark'],
'createdBy': row['created_by'],
'createdAt': row['created_at']
})
return jsonify({'ok': True, 'data': orders})
@app.post('/api/work-orders')
@require_login
@require_any_role('admin','superadmin')
def create_work_order():
"""创建工单"""
data = request.get_json()
factory = data.get('factory', '').strip()
order_no = data.get('orderNo', '').strip()
product_model = data.get('productModel', '').strip()
order_qty = data.get('orderQty', 0)
production_start_time = data.get('productionStartTime', '')
production_end_time = data.get('productionEndTime', '')
remark = data.get('remark', '').strip()
# 获取当前用户信息
user_id = session.get('user_id')
user_role = session.get('role')
username = session.get('username', '')
conn = get_db()
c = conn.cursor()
# 如果是管理员(非超级管理员),强制使用用户自己的工厂
if user_role == 'admin':
c.execute('SELECT factory FROM users WHERE id=?', (user_id,))
user_row = c.fetchone()
if user_row and user_row['factory']:
factory = user_row['factory']
else:
conn.close()
return jsonify({'error': '您的账户未设置所属工厂,请联系超级管理员'}), 400
if not factory or not order_no or not order_qty:
conn.close()
return jsonify({'error': '请填写所有必填项'}), 400
now = get_beijing_time()
c.execute('''INSERT INTO work_orders(
factory, order_no, product_model, order_qty, production_start_time, production_end_time,
status, status_text, remark, created_by, created_at, updated_at
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?)''', (
factory, order_no, product_model, order_qty, production_start_time, production_end_time,
'issued', '已下发', remark, username, now, now
))
order_id = c.lastrowid
conn.commit()
conn.close()
log('create_work_order', f'工单号: {order_no}, 工厂: {factory}, 型号: {product_model}')
# 如果是超级管理员添加工单,通知该工厂的管理员
notify_admins_by_factory('添加工单', f'工单号: {order_no}, 工厂: {factory}, 数量: {order_qty}', factory=factory)
return jsonify({'ok': True, 'id': order_id, 'message': '工单创建成功'})
@app.put('/api/work-orders/<int:order_id>')
@require_login
@require_any_role('superadmin')
def update_work_order(order_id):
"""更新工单(仅超级管理员)"""
data = request.get_json()
factory = data.get('factory', '').strip()
order_no = data.get('orderNo', '').strip()
product_model = data.get('productModel', '').strip()
order_qty = data.get('orderQty', 0)
production_start_time = data.get('productionStartTime', '')
production_end_time = data.get('productionEndTime', '')
remark = data.get('remark', '').strip()
if not factory or not order_no or not order_qty:
return jsonify({'error': '请填写所有必填项'}), 400
conn = get_db()
c = conn.cursor()
# 检查工单是否存在
c.execute('SELECT id FROM work_orders WHERE id=?', (order_id,))
if not c.fetchone():
conn.close()
return jsonify({'error': '工单不存在'}), 404
now = get_beijing_time()
c.execute('''UPDATE work_orders SET
factory=?, order_no=?, product_model=?, order_qty=?, production_start_time=?, production_end_time=?,
remark=?, updated_at=?
WHERE id=?''', (
factory, order_no, product_model, order_qty, production_start_time, production_end_time,
remark, now, order_id
))
conn.commit()
conn.close()
log('update_work_order', f'工单ID: {order_id}, 工单号: {order_no}, 型号: {product_model}')
notify_admins_by_factory('更新工单', f'工单号: {order_no}, 工厂: {factory}, 型号: {product_model}', factory=factory)
return jsonify({'ok': True, 'message': '工单更新成功'})
@app.delete('/api/work-orders/<int:order_id>')
@require_login
@require_any_role('superadmin')
def delete_work_order(order_id):
"""删除工单(仅超级管理员)"""
conn = get_db()
c = conn.cursor()
# 获取工单信息用于日志
c.execute('SELECT order_no, factory FROM work_orders WHERE id=?', (order_id,))
row = c.fetchone()
if not row:
conn.close()
return jsonify({'error': '工单不存在'}), 404
order_no = row['order_no']
factory = row['factory']
c.execute('DELETE FROM work_orders WHERE id=?', (order_id,))
conn.commit()
conn.close()
log('delete_work_order', f'工单ID: {order_id}, 工单号: {order_no}')
notify_admins_by_factory('删除工单', f'工单号: {order_no}, 工厂: {factory}', factory=factory)
return jsonify({'ok': True, 'message': '工单删除成功'})
@app.post('/api/work-orders/<int:order_id>/confirm')
@require_login
@require_any_role('admin','superadmin')
def confirm_work_order(order_id):
"""确认工单"""
# 获取当前用户信息
user_id = session.get('user_id')
user_role = session.get('role')
conn = get_db()
c = conn.cursor()
# 检查工单是否存在
c.execute('SELECT order_no, factory, status FROM work_orders WHERE id=?', (order_id,))
row = c.fetchone()
if not row:
conn.close()
return jsonify({'error': '工单不存在'}), 404
order_no = row['order_no']
factory = row['factory']
current_status = row['status']
# 如果是管理员(非超级管理员),检查工单是否属于自己的工厂
if user_role == 'admin':
c.execute('SELECT factory FROM users WHERE id=?', (user_id,))
user_row = c.fetchone()
user_factory = user_row['factory'] if user_row and user_row['factory'] else None
if not user_factory or factory != user_factory:
conn.close()
return jsonify({'error': '您只能确认自己工厂的工单'}), 403
# 如果已经确认,返回提示
if current_status == 'confirmed':
conn.close()
return jsonify({'ok': True, 'message': '工单已确认'})
# 更新状态为已确认
now = get_beijing_time()
c.execute('''UPDATE work_orders SET
status=?, status_text=?, updated_at=?
WHERE id=?''', (
'confirmed', '已确认', now, order_id
))
conn.commit()
conn.close()
log('confirm_work_order', f'工单ID: {order_id}, 工单号: {order_no}')
notify_superadmin('确认工单', f'工单号: {order_no}, 工厂: {factory}')
return jsonify({'ok': True, 'message': '工单确认成功'})
# ==================== 物料清单-采购 API ====================
def convert_material_purchase_to_camel(row):
"""将数据库行转换为驼峰命名的字典"""
return {
'id': row['id'],
'title': row['title'],
'listNo': row['list_no'],
'planNo': row['plan_no'],
'bomResult': row['bom_result'],
'status': row['status'],
'demandStatus': row['demand_status'],
'completeRate': row['complete_rate'],
'materialCode': row['material_code'],
'materialName': row['material_name'],
'batchNo': row['batch_no'],
'level': row['level'],
'requiredQty': row['required_qty'],
'stockQty': row['stock_qty'],
'shortage': row['shortage'],
'acquireMethod': row['acquire_method'],
'realtimeStock': row['realtime_stock'],
'pendingQty': row['pending_qty'],
'dispatchedQty': row['dispatched_qty'],
'receivedQty': row['received_qty'],
'submitter': row['submitter'],
'submitTime': row['submit_time'],
'updateTime': row['update_time'],
'deleted': row['deleted'],
'deletedAt': row['deleted_at']
}
@app.get('/api/material-purchase/list')
@require_login
@require_any_role('superadmin')
def list_material_purchase():
"""获取物料清单列表(不包括已删除的)"""
conn = get_db()
c = conn.cursor()
c.execute('''SELECT * FROM material_purchase
WHERE deleted=0
ORDER BY submit_time DESC''')
rows = c.fetchall()
conn.close()
return jsonify({'list': [convert_material_purchase_to_camel(r) for r in rows]})
@app.get('/api/material-purchase/recycle-bin')
@require_login
@require_any_role('superadmin')
def list_material_purchase_recycle_bin():
"""获取回收站列表"""
conn = get_db()
c = conn.cursor()
c.execute('''SELECT * FROM material_purchase
WHERE deleted=1
ORDER BY deleted_at DESC''')
rows = c.fetchall()
conn.close()
return jsonify({'list': [convert_material_purchase_to_camel(r) for r in rows]})
@app.post('/api/material-purchase/add')
@require_login
@require_any_role('superadmin')
def add_material_purchase():
"""新增物料需求"""
data = request.get_json() or {}
required_fields = ['title', 'list_no', 'plan_no', 'status', 'demand_status',
'material_code', 'material_name', 'required_qty']
for field in required_fields:
if not data.get(field):
return jsonify({'error': f'缺少必填字段: {field}'}), 400
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
username = session.get('username', '')
c.execute('''INSERT INTO material_purchase(
title, list_no, plan_no, bom_result, status, demand_status,
complete_rate, material_code, material_name, batch_no, level,
required_qty, stock_qty, shortage, acquire_method, realtime_stock,
pending_qty, dispatched_qty, received_qty, submitter, submit_time, update_time
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)''', (
data.get('title'), data.get('list_no'), data.get('plan_no'),
data.get('bom_result'), data.get('status'), data.get('demand_status'),
data.get('complete_rate'), data.get('material_code'), data.get('material_name'),
data.get('batch_no'), data.get('level'), data.get('required_qty'),
data.get('stock_qty'), data.get('shortage'), data.get('acquire_method'),
data.get('realtime_stock'), data.get('pending_qty'), data.get('dispatched_qty'),
data.get('received_qty'), username, now, now
))
conn.commit()
item_id = c.lastrowid
conn.close()
log('add_material_purchase', f'物料编码: {data.get("material_code")}')
return jsonify({'ok': True, 'id': item_id})
@app.post('/api/material-purchase/update')
@require_login
@require_any_role('superadmin')
def update_material_purchase():
"""更新物料需求"""
data = request.get_json() or {}
item_id = data.get('id')
if not item_id:
return jsonify({'error': '缺少ID'}), 400
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
c.execute('''UPDATE material_purchase SET
title=?, list_no=?, plan_no=?, bom_result=?, status=?, demand_status=?,
complete_rate=?, material_code=?, material_name=?, batch_no=?, level=?,
required_qty=?, stock_qty=?, shortage=?, acquire_method=?, realtime_stock=?,
pending_qty=?, dispatched_qty=?, received_qty=?, update_time=?
WHERE id=?''', (
data.get('title'), data.get('list_no'), data.get('plan_no'),
data.get('bom_result'), data.get('status'), data.get('demand_status'),
data.get('complete_rate'), data.get('material_code'), data.get('material_name'),
data.get('batch_no'), data.get('level'), data.get('required_qty'),
data.get('stock_qty'), data.get('shortage'), data.get('acquire_method'),
data.get('realtime_stock'), data.get('pending_qty'), data.get('dispatched_qty'),
data.get('received_qty'), now, item_id
))
conn.commit()
conn.close()
log('update_material_purchase', f'ID: {item_id}')
return jsonify({'ok': True})
@app.post('/api/material-purchase/delete')
@require_login
@require_any_role('superadmin')
def delete_material_purchase():
"""删除物料需求(移到回收站)"""
data = request.get_json() or {}
item_ids = data.get('ids', [])
if not item_ids:
return jsonify({'error': '缺少ID'}), 400
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
placeholders = ','.join('?' * len(item_ids))
c.execute(f'''UPDATE material_purchase SET
deleted=1, deleted_at=?
WHERE id IN ({placeholders})''', [now] + item_ids)
conn.commit()
conn.close()
log('delete_material_purchase', f'删除数量: {len(item_ids)}')
return jsonify({'ok': True, 'count': len(item_ids)})
@app.post('/api/material-purchase/restore')
@require_login
@require_any_role('superadmin')
def restore_material_purchase():
"""从回收站恢复"""
data = request.get_json() or {}
item_id = data.get('id')
if not item_id:
return jsonify({'error': '缺少ID'}), 400
conn = get_db()
c = conn.cursor()
c.execute('''UPDATE material_purchase SET
deleted=0, deleted_at=NULL
WHERE id=?''', (item_id,))
conn.commit()
conn.close()
log('restore_material_purchase', f'ID: {item_id}')
return jsonify({'ok': True})
@app.post('/api/material-purchase/permanent-delete')
@require_login
@require_any_role('superadmin')
def permanent_delete_material_purchase():
"""永久删除"""
data = request.get_json() or {}
item_id = data.get('id')
if not item_id:
return jsonify({'error': '缺少ID'}), 400
conn = get_db()
c = conn.cursor()
c.execute('DELETE FROM material_purchase WHERE id=?', (item_id,))
conn.commit()
conn.close()
log('permanent_delete_material_purchase', f'ID: {item_id}')
return jsonify({'ok': True})
@app.post('/api/material-purchase/empty-recycle-bin')
@require_login
@require_any_role('superadmin')
def empty_recycle_bin():
"""清空回收站"""
conn = get_db()
c = conn.cursor()
c.execute('DELETE FROM material_purchase WHERE deleted=1')
count = c.rowcount
conn.commit()
conn.close()
log('empty_recycle_bin', f'清空数量: {count}')
return jsonify({'ok': True, 'count': count})
@app.route('/api/validate/material-purchase-file', methods=['POST'])
@require_login
@require_any_role('superadmin')
def validate_material_purchase_file():
"""验证物料清单Excel文件格式"""
f = request.files.get('file')
if not f:
return jsonify({'error': 'no file'}), 400
name = secure_filename(f.filename or '')
ext = (name.split('.')[-1] or '').lower()
if ext not in ['xlsx', 'xls']:
return jsonify({'valid': False, 'message': '文件格式不支持请上传Excel文件.xlsx或.xls'}), 200
try:
import openpyxl
wb = openpyxl.load_workbook(f)
ws = wb.active
if ws.max_row < 2:
wb.close()
return jsonify({'valid': False, 'message': '文件为空,没有数据'}), 200
# 检查表头
header_row = list(ws.iter_rows(min_row=1, max_row=1, values_only=True))[0]
header = [str(h).strip() if h else '' for h in header_row]
# 必需的列
required_columns = ['标题', '生产计划明细物料需求清单编号', '生产计划编号', '状态',
'需求状态', '物料编码', '物料名称', '所需物料数']
missing_columns = []
for col in required_columns:
if col not in header:
missing_columns.append(col)
if missing_columns:
wb.close()
return jsonify({
'valid': False,
'message': f'缺少必需的列:{", ".join(missing_columns)}'
}), 200
data_rows = ws.max_row - 1
wb.close()
return jsonify({
'valid': True,
'message': f'文件格式正确,共{data_rows}行数据'
}), 200
except Exception as e:
return jsonify({'valid': False, 'message': f'读取文件失败:{str(e)}'}), 200
@app.post('/api/upload/material-purchase-file')
@require_login
@require_any_role('superadmin')
def upload_material_purchase_file():
"""上传物料清单Excel文件"""
f = request.files.get('file')
if not f:
return jsonify({'error': 'no file'}), 400
try:
import openpyxl
wb = openpyxl.load_workbook(f)
ws = wb.active
# 获取表头
header_row = list(ws.iter_rows(min_row=1, max_row=1, values_only=True))[0]
header = [str(h).strip() if h else '' for h in header_row]
# 创建列索引映射
col_map = {h: i for i, h in enumerate(header)}
# 辅助函数:解析百分比字符串
def parse_percentage(value):
if value is None:
return 0
if isinstance(value, (int, float)):
return float(value)
# 如果是字符串,去除%符号
value_str = str(value).strip()
if value_str.endswith('%'):
value_str = value_str[:-1]
try:
return float(value_str)
except:
return 0
# 辅助函数:安全转换为整数
def safe_int(value, default=0):
if value is None:
return default
try:
return int(float(value))
except:
return default
# 读取数据
rows = []
for row in ws.iter_rows(min_row=2, values_only=True):
if not any(row): # 跳过空行
continue
# 辅助函数:安全获取单元格值
def get_cell_value(col_name, default=''):
if col_name not in col_map:
return default
idx = col_map[col_name]
if idx >= len(row):
return default
value = row[idx]
return value if value is not None else default
# 提取数据
item = {
'title': str(get_cell_value('标题') or ''),
'list_no': str(get_cell_value('生产计划明细物料需求清单编号') or ''),
'plan_no': str(get_cell_value('生产计划编号') or ''),
'bom_result': str(get_cell_value('产品BOM分析结果') or ''),
'status': str(get_cell_value('状态') or 'pending'),
'demand_status': str(get_cell_value('需求状态') or 'normal'),
'complete_rate': parse_percentage(get_cell_value('物料齐套率', 0)),
'material_code': str(get_cell_value('物料编码') or ''),
'material_name': str(get_cell_value('物料名称') or ''),
'batch_no': str(get_cell_value('物料批次号') or ''),
'level': safe_int(get_cell_value('物料层级', 1), 1),
'required_qty': safe_int(get_cell_value('所需物料数', 0), 0),
'stock_qty': safe_int(get_cell_value('库存现有物料数', 0), 0),
'shortage': safe_int(get_cell_value('欠缺值', 0), 0),
'acquire_method': str(get_cell_value('物料获取方式') or ''),
'realtime_stock': safe_int(get_cell_value('实时库存值', 0), 0),
'pending_qty': safe_int(get_cell_value('待入库数量', 0), 0),
'dispatched_qty': safe_int(get_cell_value('派发数量', 0), 0),
'received_qty': safe_int(get_cell_value('入库数量', 0), 0),
}
# 验证必填字段
if not item['title'] or not item['list_no'] or not item['material_code']:
continue
rows.append(item)
wb.close()
if not rows:
return jsonify({'error': '文件中没有有效数据'}), 400
# 批量插入数据库
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
username = session.get('username', '')
for item in rows:
c.execute('''INSERT INTO material_purchase(
title, list_no, plan_no, bom_result, status, demand_status,
complete_rate, material_code, material_name, batch_no, level,
required_qty, stock_qty, shortage, acquire_method, realtime_stock,
pending_qty, dispatched_qty, received_qty, submitter, submit_time, update_time
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)''', (
item['title'], item['list_no'], item['plan_no'], item['bom_result'],
item['status'], item['demand_status'], item['complete_rate'],
item['material_code'], item['material_name'], item['batch_no'],
item['level'], item['required_qty'], item['stock_qty'], item['shortage'],
item['acquire_method'], item['realtime_stock'], item['pending_qty'],
item['dispatched_qty'], item['received_qty'], username, now, now
))
conn.commit()
conn.close()
log('upload_material_purchase_file', f'导入数量: {len(rows)}')
return jsonify({'ok': True, 'count': len(rows), 'message': f'成功导入{len(rows)}条数据'})
except Exception as e:
return jsonify({'error': f'导入失败:{str(e)}'}), 500
# ==================== 客户订单 API ====================
@app.get('/api/customer-orders')
@require_login
@require_any_role('superadmin')
def get_customer_orders():
"""获取客户订单列表"""
conn = get_db()
c = conn.cursor()
c.execute('''SELECT id, order_date, order_no, customer_name, material, quantity, unit_price,
created_by, created_at, updated_at
FROM customer_orders
ORDER BY order_date DESC, id DESC''')
rows = c.fetchall()
conn.close()
orders = []
for row in rows:
orders.append({
'id': row['id'],
'order_date': row['order_date'],
'order_no': row['order_no'],
'customer_name': row['customer_name'] if 'customer_name' in row.keys() else '',
'material': row['material'],
'quantity': row['quantity'],
'unit_price': row['unit_price'],
'created_by': row['created_by'],
'created_at': row['created_at'],
'updated_at': row['updated_at']
})
return jsonify({'list': orders})
@app.post('/api/customer-orders')
@require_login
@require_any_role('superadmin')
def create_customer_order():
"""创建客户订单"""
data = request.get_json() or {}
order_date = data.get('order_date', '').strip()
order_no = data.get('order_no', '').strip()
customer_name = data.get('customer_name', '').strip()
material = data.get('material', '').strip()
quantity = data.get('quantity', 0)
unit_price = data.get('unit_price', 0)
if not order_date or not order_no or not customer_name or not material:
return jsonify({'error': '请填写所有必填项'}), 400
if quantity <= 0:
return jsonify({'error': '订单数量必须大于0'}), 400
if unit_price < 0:
return jsonify({'error': '单价不能为负数'}), 400
username = session.get('username', '')
now = get_beijing_time()
conn = get_db()
c = conn.cursor()
c.execute('''INSERT INTO customer_orders(
order_date, order_no, customer_name, material, quantity, unit_price,
created_by, created_at, updated_at
) VALUES(?,?,?,?,?,?,?,?,?)''', (
order_date, order_no, customer_name, material, quantity, unit_price,
username, now, now
))
order_id = c.lastrowid
conn.commit()
conn.close()
log('create_customer_order', f'客户: {customer_name}, 订单号: {order_no}, 物料: {material}, 数量: {quantity}')
return jsonify({'ok': True, 'id': order_id, 'message': '订单创建成功'})
@app.delete('/api/customer-orders/<int:order_id>')
@require_login
@require_any_role('superadmin')
def delete_customer_order(order_id):
"""删除客户订单"""
conn = get_db()
c = conn.cursor()
# 获取订单信息用于日志
c.execute('SELECT order_no, customer_name, material FROM customer_orders WHERE id=?', (order_id,))
row = c.fetchone()
if not row:
conn.close()
return jsonify({'error': '订单不存在'}), 404
order_no = row['order_no']
customer_name = row['customer_name'] if 'customer_name' in row.keys() else ''
material = row['material']
c.execute('DELETE FROM customer_orders WHERE id=?', (order_id,))
conn.commit()
conn.close()
log('delete_customer_order', f'订单ID: {order_id}, 客户: {customer_name}, 订单号: {order_no}')
return jsonify({'ok': True, 'message': '订单删除成功'})
@app.errorhandler(404)
def not_found(e):
# 如果请求的是 HTML 页面(通过 Accept header 判断),返回 index.html
# 这样可以支持前端路由的直接访问
if request.path and not request.path.startswith('/api/'):
accept = request.headers.get('Accept', '')
if 'text/html' in accept:
return send_from_directory(FRONTEND_DIR, 'index.html')
return jsonify({'error': 'not found'}), 404
init_db()
if __name__ == '__main__':
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', '5000')))