ERP/server/app.py

2266 lines
80 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import os
import json
import sqlite3
from datetime import datetime
from functools import wraps
from flask import Flask, request, jsonify, session, send_from_directory
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.utils import secure_filename
try:
import redis
except Exception:
redis = None
_redis_client = None
_audit_cache = {'pdd': {'ts': 0, 'list': []}, 'yt': {'ts': 0, 'list': []}}
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DB_PATH = os.path.join(BASE_DIR, 'data.db')
FRONTEND_DIR = os.path.join(os.path.dirname(BASE_DIR), 'frontend')
app = Flask(__name__, static_folder=FRONTEND_DIR, static_url_path='')
app.config['SECRET_KEY'] = os.environ.get('APP_SECRET', 'change-me')
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
conn = get_db()
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS users(
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
role TEXT NOT NULL
)''')
c.execute('''CREATE TABLE IF NOT EXISTS operations_log(
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
action TEXT,
detail TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS notifications(
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
username TEXT,
action TEXT,
detail TEXT,
ts TEXT,
read INTEGER DEFAULT 0
)''')
c.execute('''CREATE TABLE IF NOT EXISTS mac_batches(
id INTEGER PRIMARY KEY AUTOINCREMENT,
mac TEXT,
batch TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS stats(
id INTEGER PRIMARY KEY AUTOINCREMENT,
good INTEGER,
bad INTEGER,
fpy_good INTEGER DEFAULT 0,
platform TEXT DEFAULT 'pdd',
ts TEXT
)''')
# 为已存在的表添加列(如果不存在)
try:
c.execute('ALTER TABLE stats ADD COLUMN fpy_good INTEGER DEFAULT 0')
except Exception:
pass # 列已存在
try:
c.execute('ALTER TABLE stats ADD COLUMN platform TEXT DEFAULT "pdd"')
except Exception:
pass # 列已存在
try:
c.execute('ALTER TABLE users ADD COLUMN avatar TEXT')
except Exception:
pass # 列已存在
c.execute('''CREATE TABLE IF NOT EXISTS defects(
id INTEGER PRIMARY KEY AUTOINCREMENT,
mac TEXT,
batch TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS shipments(
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT,
qty INTEGER,
receiver TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS devices(
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
status TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS environment(
id INTEGER PRIMARY KEY AUTOINCREMENT,
temp TEXT,
hum TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS personnel(
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
role TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS qa(
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
date TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS production(
id INTEGER PRIMARY KEY AUTOINCREMENT,
batch TEXT,
duration TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS repairs(
id INTEGER PRIMARY KEY AUTOINCREMENT,
qty INTEGER,
note TEXT,
ts TEXT
)''')
conn.commit()
# create default admin
c.execute('SELECT id FROM users WHERE username=?', ('admin',))
if not c.fetchone():
pwd = os.environ.get('ADMIN_PASSWORD', 'admin123')
c.execute('INSERT INTO users(username, password_hash, role) VALUES(?,?,?)', (
'admin', generate_password_hash(pwd), 'admin'
))
conn.commit()
# create superadmin from env
su_user = os.environ.get('SUPERADMIN_USERNAME')
su_pass = os.environ.get('SUPERADMIN_PASSWORD')
if su_user and su_pass:
c.execute('SELECT id FROM users WHERE username=?', (su_user,))
if not c.fetchone():
c.execute('INSERT INTO users(username, password_hash, role) VALUES(?,?,?)', (
su_user, generate_password_hash(su_pass), 'superadmin'
))
conn.commit()
conn.close()
def log(action, detail=''):
try:
conn = get_db()
c = conn.cursor()
c.execute('INSERT INTO operations_log(user_id, action, detail, ts) VALUES(?,?,?,?)', (
session.get('user_id'), action, detail, datetime.utcnow().isoformat()
))
conn.commit()
conn.close()
except Exception:
pass
def notify_superadmin(action, detail=''):
"""为超级管理员创建通知"""
try:
user_id = session.get('user_id')
if not user_id:
return
conn = get_db()
c = conn.cursor()
# 获取当前用户信息
c.execute('SELECT username, role FROM users WHERE id=?', (user_id,))
user = c.fetchone()
if not user:
conn.close()
return
# 如果是超级管理员自己的操作,不创建通知
if user['role'] == 'superadmin':
conn.close()
return
# 为所有超级管理员创建通知
c.execute('SELECT id FROM users WHERE role=?', ('superadmin',))
superadmins = c.fetchall()
# 使用北京时间UTC+8
from datetime import timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz).isoformat()
for admin in superadmins:
c.execute('INSERT INTO notifications(user_id, username, action, detail, ts, read) VALUES(?,?,?,?,?,?)', (
admin['id'], user['username'], action, detail, now, 0
))
conn.commit()
conn.close()
except Exception:
pass
def get_redis():
global _redis_client
if not redis:
raise RuntimeError('redis missing')
if _redis_client is not None:
return _redis_client
host = os.environ.get('REDIS_HOST', '180.163.74.83')
port = int(os.environ.get('REDIS_PORT', '6379'))
password = os.environ.get('REDIS_PASSWORD') or os.environ.get('SUPERADMIN_PASSWORD')
db = int(os.environ.get('REDIS_DB', '0'))
_redis_client = redis.Redis(host=host, port=port, password=password, db=db, decode_responses=True, socket_timeout=0.5, socket_connect_timeout=0.5)
return _redis_client
def parse_audit_line(s):
if not s:
return {'ts_cn': None, 'batch': None, 'mac': None, 'note': None}
def normalize_ts(ts):
try:
from datetime import datetime, timezone, timedelta
# Attempt ISO parsing
# Support "Z" UTC suffix and offsets like +08:00
if ts.endswith('Z'):
dt = datetime.fromisoformat(ts.replace('Z', '+00:00'))
else:
dt = datetime.fromisoformat(ts)
# Convert to Beijing time (UTC+8)
bj = dt.astimezone(timezone(timedelta(hours=8)))
return bj.strftime('%Y-%m-%d %H:%M:%S')
except Exception:
return ts
def has_time(v):
return isinstance(v, str) and (('T' in v) or (':' in v))
def choose_ts(d):
candidates = [d.get('ts_cn'), d.get('ts_local'), d.get('ts'), d.get('ts_utc'), d.get('timestamp'), d.get('time')]
for v in candidates:
if has_time(v):
return v
for v in candidates:
if v:
return v
return None
try:
obj = json.loads(s)
ts = choose_ts(obj)
if not ts:
ts = s if isinstance(s, str) else None
return {
'ts_cn': ts if ts else None,
'batch': obj.get('batch') or obj.get('batch_no') or obj.get('lot'),
'mac': obj.get('mac') or obj.get('mac_addr') or obj.get('mac_address'),
'note': obj.get('note') or obj.get('msg') or obj.get('message')
}
except Exception:
pass
d = {}
parts = []
for sep in [' ', ',', ';', '|']:
if sep in s:
parts = s.split(sep)
break
if not parts:
parts = [s]
i = 0
while i < len(parts):
part = parts[i]
if '=' in part:
k, v = part.split('=', 1)
kk = k.strip()
vv = v.strip()
try:
import re
if kk in ('ts_cn', 'ts_local', 'ts', 'timestamp', 'time'):
if re.match(r'^\d{4}-\d{2}-\d{2}$', vv) and i + 1 < len(parts) and re.match(r'^\d{2}:\d{2}:\d{2}', parts[i+1]):
vv = vv + ' ' + parts[i+1]
i += 1
except Exception:
pass
d[kk] = vv
i += 1
ts = choose_ts(d)
if not ts:
ts = s if isinstance(s, str) else None
return {
'ts_cn': ts if ts else None,
'batch': d.get('batch') or d.get('batch_no') or d.get('lot'),
'mac': d.get('mac') or d.get('mac_addr') or d.get('mac_address'),
'note': d.get('note') or d.get('msg') or d.get('message') or s
}
def require_login(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
if not session.get('user_id'):
return jsonify({'error': 'unauthorized'}), 401
return fn(*args, **kwargs)
return wrapper
def require_role(role):
def deco(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
if session.get('role') != role:
return jsonify({'error': 'forbidden'}), 403
return fn(*args, **kwargs)
return wrapper
return deco
def require_any_role(*roles):
def deco(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
if session.get('role') not in roles:
return jsonify({'error': 'forbidden'}), 403
return fn(*args, **kwargs)
return wrapper
return deco
@app.route('/')
def index():
return send_from_directory(FRONTEND_DIR, 'index.html')
# auth
@app.post('/api/auth/login')
def login():
data = request.get_json() or {}
username = data.get('username')
password = data.get('password')
conn = get_db()
c = conn.cursor()
c.execute('SELECT id, password_hash, role FROM users WHERE username=?', (username,))
row = c.fetchone()
conn.close()
if not row or not check_password_hash(row['password_hash'], password or ''):
return jsonify({'error': 'invalid credentials'}), 400
session['user_id'] = row['id']
session['role'] = row['role']
session.permanent = True
log('login', username)
return jsonify({'ok': True})
@app.get('/api/auth/me')
def me():
uid = session.get('user_id')
if not uid:
return jsonify({'username': None, 'role': None, 'avatar': None})
conn = get_db()
c = conn.cursor()
c.execute('SELECT username, role, avatar FROM users WHERE id=?', (uid,))
row = c.fetchone()
conn.close()
return jsonify({'username': row['username'], 'role': row['role'], 'avatar': row['avatar'] if row['avatar'] else None})
@app.post('/api/auth/logout')
def logout():
log('logout')
session.clear()
return jsonify({'ok': True})
@app.post('/api/user/upload-avatar')
@require_login
def upload_avatar():
uid = session.get('user_id')
if not uid:
return jsonify({'error': '未登录'}), 401
if 'avatar' not in request.files:
return jsonify({'error': '未选择文件'}), 400
file = request.files['avatar']
if file.filename == '':
return jsonify({'error': '未选择文件'}), 400
# 验证文件类型
allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
# 先从原始文件名获取扩展名
original_filename = file.filename
if '.' not in original_filename:
return jsonify({'error': '无效的文件格式'}), 400
ext = original_filename.rsplit('.', 1)[1].lower()
if ext not in allowed_extensions:
return jsonify({'error': '不支持的文件格式,请上传 PNG、JPG、GIF 或 WEBP 格式'}), 400
# 创建avatars目录
avatars_dir = os.path.join(FRONTEND_DIR, 'assets', 'avatars')
os.makedirs(avatars_dir, exist_ok=True)
# 生成唯一文件名
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
new_filename = f'avatar_{uid}_{timestamp}.{ext}'
filepath = os.path.join(avatars_dir, new_filename)
# 保存文件
file.save(filepath)
# 更新数据库
avatar_url = f'./assets/avatars/{new_filename}'
conn = get_db()
c = conn.cursor()
# 删除旧头像文件(如果存在)
c.execute('SELECT avatar FROM users WHERE id=?', (uid,))
row = c.fetchone()
if row and row['avatar'] and row['avatar'].startswith('./assets/avatars/'):
old_file = os.path.join(FRONTEND_DIR, row['avatar'].replace('./', ''))
if os.path.exists(old_file):
try:
os.remove(old_file)
except Exception:
pass
c.execute('UPDATE users SET avatar=? WHERE id=?', (avatar_url, uid))
conn.commit()
conn.close()
log('upload_avatar', f'上传头像: {new_filename}')
return jsonify({'ok': True, 'avatar_url': avatar_url})
@app.post('/api/user/reset-avatar')
@require_login
def reset_avatar():
uid = session.get('user_id')
if not uid:
return jsonify({'error': '未登录'}), 401
conn = get_db()
c = conn.cursor()
# 删除旧头像文件(如果存在)
c.execute('SELECT avatar FROM users WHERE id=?', (uid,))
row = c.fetchone()
if row and row['avatar'] and row['avatar'].startswith('./assets/avatars/'):
old_file = os.path.join(FRONTEND_DIR, row['avatar'].replace('./', ''))
if os.path.exists(old_file):
try:
os.remove(old_file)
except Exception:
pass
c.execute('UPDATE users SET avatar=NULL WHERE id=?', (uid,))
conn.commit()
conn.close()
log('reset_avatar', '恢复默认头像')
return jsonify({'ok': True})
# dashboard
@app.get('/api/dashboard')
@require_login
def dashboard():
conn = get_db()
c = conn.cursor()
c.execute('SELECT COALESCE(SUM(good),0) AS good_total, COALESCE(SUM(bad),0) AS bad_total, COALESCE(SUM(fpy_good),0) AS fpy_good_total FROM stats')
s = c.fetchone()
c.execute('SELECT COUNT(1) AS total FROM defects')
defects = c.fetchone()
conn.close()
good = s['good_total'] if s else 0
bad = s['bad_total'] if s else 0
fpy_good = s['fpy_good_total'] if s else 0
# 计算总良品率
rate = "{}%".format(round((good/(good+bad)) * 100, 2)) if (good+bad) > 0 else u''
# 计算直通良品率FPY = First Pass Yield
total_produced = good + bad
fpy_rate = "{}%".format(round((fpy_good/total_produced) * 100, 2)) if total_produced > 0 else u''
# 从 Redis 获取发货数量SN 记录数)
shipments_count = 0
try:
r = get_redis()
redis_key = 'shipment_sn_mapping'
shipments_count = r.hlen(redis_key)
except Exception as e:
log('dashboard_redis_error', str(e))
# Redis 失败时回退到 SQLite
conn = get_db()
c = conn.cursor()
c.execute('SELECT SUM(qty) AS total FROM shipments')
ship = c.fetchone()
conn.close()
shipments_count = (ship['total'] or 0) if ship else 0
return jsonify({
'fpyRate': fpy_rate,
'goodRate': rate,
'shipments': shipments_count,
'defects': (defects['total'] or 0) if defects else 0,
'badCount': bad
})
@app.get('/api/audit/pdd')
@require_login
def audit_pdd():
start = datetime.utcnow()
try:
q_start = request.args.get('start')
q_end = request.args.get('end')
q_limit = request.args.get('limit')
q_order = request.args.get('order', 'desc')
has_filter = bool(q_start or q_end or q_limit or q_order)
# 缓存优化3秒内不重复查询
if (not has_filter) and ((datetime.utcnow().timestamp() - _audit_cache['pdd']['ts']) < 3):
return jsonify({'list': _audit_cache['pdd']['list']})
r = get_redis()
# 设置Redis超时为5秒
r.connection_pool.connection_kwargs['socket_timeout'] = 5
items = []
# 限制最大返回数量,避免数据过大
max_items = 500 if has_filter else 200
for key in ['mac_batch_audit_pdd', 'audit:pdd', 'pdd:audit']:
try:
if r.exists(key):
t = r.type(key)
if t == 'list':
# 限制最大查询数量
total = r.llen(key)
if has_filter:
items = r.lrange(key, max(0, total - max_items), -1)
else:
items = r.lrange(key, -200, -1)
elif t == 'zset':
items = r.zrevrange(key, 0, max_items - 1)
elif t == 'stream':
entries = r.xrevrange(key, max='+', min='-', count=max_items)
items = [json.dumps(v) for _id, v in entries]
else:
v = r.get(key)
items = [v] if v else []
break
except Exception as e:
log('audit_pdd_error', f'Redis query error: {str(e)}')
continue
try:
host = os.environ.get('REDIS_HOST')
db = os.environ.get('REDIS_DB')
tp = r.type('mac_batch_audit_pdd')
ln = 0
try:
ln = r.llen('mac_batch_audit_pdd')
except Exception:
pass
log('audit_pdd_probe', json.dumps({'host': host, 'db': db, 'type': tp, 'len': ln}))
except Exception:
pass
if not items and r.exists('batch_sn_mapping_pdd') and r.type('batch_sn_mapping_pdd') == 'hash':
try:
pairs = []
cursor = 0
while True:
cursor, res = r.hscan('batch_sn_mapping_pdd', cursor=cursor, count=200)
for k, v in (res or {}).items():
pairs.append({'mac': k, 'batch': v})
if len(pairs) >= 100:
break
if cursor == 0 or len(pairs) >= 100:
break
items = [json.dumps({'mac': p['mac'], 'batch': p['batch'], 'note': 'mapping'}) for p in pairs]
except Exception:
pass
res = [parse_audit_line(x) for x in items]
if q_start or q_end:
def to_epoch(s):
try:
if not s:
return None
if 'T' in s or 'Z' in s or '+' in s:
return datetime.fromisoformat(s.replace('Z','+00:00')).timestamp()
if ' ' in s and ':' in s:
return datetime.strptime(s, '%Y-%m-%d %H:%M:%S').timestamp()
return datetime.strptime(s, '%Y-%m-%d').timestamp()
except Exception:
return None
s_epoch = to_epoch(q_start) if q_start else None
e_epoch = to_epoch(q_end) if q_end else None
tmp = []
for r0 in res:
ts = to_epoch(r0.get('ts_cn'))
if ts is None:
continue
if s_epoch is not None and ts < s_epoch:
continue
if e_epoch is not None and ts > e_epoch:
continue
tmp.append(r0)
res = tmp
try:
def to_key(r):
s = r.get('ts_cn') or ''
try:
if 'T' in s or 'Z' in s or '+' in s:
return datetime.fromisoformat(s.replace('Z','+00:00')).timestamp()
if ' ' in s and ':' in s:
return datetime.strptime(s, '%Y-%m-%d %H:%M:%S').timestamp()
return datetime.strptime(s, '%Y-%m-%d').timestamp()
except Exception:
return 0
res.sort(key=to_key, reverse=(q_order != 'asc'))
except Exception:
res.reverse()
if q_limit:
try:
lim = int(q_limit)
if lim > 0:
res = res[:lim]
except Exception:
pass
if not has_filter:
_audit_cache['pdd'] = {'ts': datetime.utcnow().timestamp(), 'list': res}
dur = (datetime.utcnow() - start).total_seconds()
log('audit_pdd_cost', f"{dur}s len={len(res)}")
return jsonify({'list': res})
except Exception as e:
log('audit_pdd_error', str(e))
return jsonify({'list': []})
@app.get('/api/audit/yt')
@require_login
def audit_yt():
start = datetime.utcnow()
try:
q_start = request.args.get('start')
q_end = request.args.get('end')
q_limit = request.args.get('limit')
q_order = request.args.get('order', 'desc')
has_filter = bool(q_start or q_end or q_limit or q_order)
# 缓存优化3秒内不重复查询
if (not has_filter) and ((datetime.utcnow().timestamp() - _audit_cache['yt']['ts']) < 3):
return jsonify({'list': _audit_cache['yt']['list']})
r = get_redis()
# 设置Redis超时为5秒
r.connection_pool.connection_kwargs['socket_timeout'] = 5
items = []
# 限制最大返回数量,避免数据过大
max_items = 500 if has_filter else 200
for key in ['mac_batch_audit_yt', 'audit:yt', 'yt:audit']:
try:
if r.exists(key):
t = r.type(key)
if t == 'list':
# 限制最大查询数量
total = r.llen(key)
if has_filter:
items = r.lrange(key, max(0, total - max_items), -1)
else:
items = r.lrange(key, -200, -1)
elif t == 'zset':
items = r.zrevrange(key, 0, max_items - 1)
elif t == 'stream':
entries = r.xrevrange(key, max='+', min='-', count=max_items)
items = [json.dumps(v) for _id, v in entries]
else:
v = r.get(key)
items = [v] if v else []
break
except Exception as e:
log('audit_yt_error', f'Redis query error: {str(e)}')
continue
try:
host = os.environ.get('REDIS_HOST')
db = os.environ.get('REDIS_DB')
tp = r.type('mac_batch_audit_yt')
ln = 0
try:
ln = r.llen('mac_batch_audit_yt')
except Exception:
pass
log('audit_yt_probe', json.dumps({'host': host, 'db': db, 'type': tp, 'len': ln}))
except Exception:
pass
if not items and r.exists('batch_sn_mapping_yt') and r.type('batch_sn_mapping_yt') == 'hash':
try:
pairs = []
cursor = 0
while True:
cursor, res = r.hscan('batch_sn_mapping_yt', cursor=cursor, count=200)
for k, v in (res or {}).items():
pairs.append({'mac': k, 'batch': v})
if len(pairs) >= 100:
break
if cursor == 0 or len(pairs) >= 100:
break
items = [json.dumps({'mac': p['mac'], 'batch': p['batch'], 'note': 'mapping'}) for p in pairs]
except Exception:
pass
res = [parse_audit_line(x) for x in items]
if q_start or q_end:
def to_epoch(s):
try:
if not s:
return None
if 'T' in s or 'Z' in s or '+' in s:
return datetime.fromisoformat(s.replace('Z','+00:00')).timestamp()
if ' ' in s and ':' in s:
return datetime.strptime(s, '%Y-%m-%d %H:%M:%S').timestamp()
return datetime.strptime(s, '%Y-%m-%d').timestamp()
except Exception:
return None
s_epoch = to_epoch(q_start) if q_start else None
e_epoch = to_epoch(q_end) if q_end else None
tmp = []
for r0 in res:
ts = to_epoch(r0.get('ts_cn'))
if ts is None:
continue
if s_epoch is not None and ts < s_epoch:
continue
if e_epoch is not None and ts > e_epoch:
continue
tmp.append(r0)
res = tmp
try:
def to_key(r):
s = r.get('ts_cn') or ''
try:
if 'T' in s or 'Z' in s or '+' in s:
return datetime.fromisoformat(s.replace('Z','+00:00')).timestamp()
if ' ' in s and ':' in s:
return datetime.strptime(s, '%Y-%m-%d %H:%M:%S').timestamp()
return datetime.strptime(s, '%Y-%m-%d').timestamp()
except Exception:
return 0
res.sort(key=to_key, reverse=(q_order != 'asc'))
except Exception:
res.reverse()
if q_limit:
try:
lim = int(q_limit)
if lim > 0:
res = res[:lim]
except Exception:
pass
if not has_filter:
_audit_cache['yt'] = {'ts': datetime.utcnow().timestamp(), 'list': res}
dur = (datetime.utcnow() - start).total_seconds()
log('audit_yt_cost', f"{dur}s len={len(res)}")
return jsonify({'list': res})
except Exception as e:
log('audit_yt_error', str(e))
return jsonify({'list': []})
@app.get('/api/audit/diagnose')
@require_login
def audit_diagnose():
try:
r = get_redis()
result = {}
for key in ['mac_batch_audit_pdd', 'mac_batch_audit_yt', 'batch_sn_mapping_pdd', 'batch_sn_mapping_yt']:
try:
t = r.type(key)
if t == 'list':
result[key] = {'type': t, 'len': r.llen(key)}
elif t == 'zset':
result[key] = {'type': t, 'len': r.zcard(key)}
elif t == 'stream':
info = r.xinfo_stream(key)
result[key] = {'type': t, 'len': info.get('length')}
elif t == 'hash':
result[key] = {'type': t, 'len': r.hlen(key)}
elif t == 'none':
result[key] = {'type': t, 'len': 0}
else:
v = r.get(key)
result[key] = {'type': t, 'len': 1 if v else 0}
except Exception as e:
result[key] = {'error': str(e)}
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.get('/api/overview')
@require_login
def overview():
conn = get_db()
c = conn.cursor()
c.execute('SELECT COUNT(1) AS cnt, COALESCE(SUM(good),0) AS good_total, COALESCE(SUM(bad),0) AS bad_total, COALESCE(SUM(fpy_good),0) AS fpy_good_total FROM stats')
stats_row = c.fetchone()
c.execute('SELECT COUNT(1) AS cnt FROM defects')
defects_row = c.fetchone()
c.execute('SELECT COUNT(1) AS cnt FROM mac_batches')
mac_row = c.fetchone()
c.execute('SELECT COUNT(1) AS cnt, COALESCE(SUM(qty),0) AS qty_total FROM shipments')
ship_row = c.fetchone()
c.execute('SELECT COUNT(1) AS cnt FROM devices')
devices_row = c.fetchone()
c.execute('SELECT COUNT(1) AS cnt FROM personnel')
personnel_row = c.fetchone()
c.execute('SELECT COUNT(1) AS cnt FROM qa')
qa_row = c.fetchone()
c.execute('SELECT COUNT(1) AS cnt FROM production')
production_row = c.fetchone()
conn.close()
return jsonify({
'stats': {
'records': (stats_row['cnt'] or 0) if stats_row else 0,
'goodTotal': (stats_row['good_total'] or 0) if stats_row else 0,
'badTotal': (stats_row['bad_total'] or 0) if stats_row else 0,
'fpyGoodTotal': (stats_row['fpy_good_total'] or 0) if stats_row else 0
},
'defects': (defects_row['cnt'] or 0) if defects_row else 0,
'mac': (mac_row['cnt'] or 0) if mac_row else 0,
'shipments': {
'records': (ship_row['cnt'] or 0) if ship_row else 0,
'qtyTotal': (ship_row['qty_total'] or 0) if ship_row else 0
},
'devices': (devices_row['cnt'] or 0) if devices_row else 0,
'personnel': (personnel_row['cnt'] or 0) if personnel_row else 0,
'qa': (qa_row['cnt'] or 0) if qa_row else 0,
'production': (production_row['cnt'] or 0) if production_row else 0
})
# uploads
@app.post('/api/upload/mac')
@require_login
@require_any_role('admin','superadmin')
def upload_mac():
data = request.get_json() or {}
rows = data.get('rows') or []
if not isinstance(rows, list):
return jsonify({'error': 'invalid rows'}), 400
conn = get_db()
c = conn.cursor()
now = datetime.utcnow().isoformat()
for r in rows:
mac = (r or {}).get('mac')
batch = (r or {}).get('batch')
if not mac or not batch:
continue
c.execute('INSERT INTO mac_batches(mac, batch, ts) VALUES(?,?,?)', (mac, batch, now))
conn.commit()
conn.close()
log('upload_mac', f"count={len(rows)}")
notify_superadmin('上传MAC与批次', f"上传了 {len(rows)} 条记录")
return jsonify({'ok': True})
@app.post('/api/upload/stats')
@require_login
@require_any_role('admin','superadmin')
def upload_stats():
data = request.get_json() or {}
good = int(data.get('good') or 0)
bad = int(data.get('bad') or 0)
fpy_good = int(data.get('fpy_good') or 0) # 直通良品数
platform = data.get('platform') or 'pdd' # 平台pdd/yt/tx
details = data.get('details') or []
if good < 0 or bad < 0 or fpy_good < 0:
return jsonify({'error': 'invalid count'}), 400
if platform not in ['pdd', 'yt', 'tx']:
platform = 'pdd'
conn = get_db()
c = conn.cursor()
now = datetime.utcnow().isoformat()
# 保存统计数据
c.execute('INSERT INTO stats(good,bad,fpy_good,platform,ts) VALUES(?,?,?,?,?)', (good, bad, fpy_good, platform, now))
# 如果有不良明细保存到defects表
if details and isinstance(details, list):
for item in details:
mac = (item or {}).get('mac')
batch = (item or {}).get('batch')
if mac and batch:
c.execute('INSERT INTO defects(mac, batch, ts) VALUES(?,?,?)', (mac, batch, now))
conn.commit()
conn.close()
platform_name = {'pdd': '拼多多', 'yt': '圆通', 'tx': '兔喜'}.get(platform, platform)
log('upload_stats', json.dumps({'good': good, 'bad': bad, 'fpy_good': fpy_good, 'platform': platform, 'details_count': len(details)}))
notify_superadmin('上传良/不良统计', f"平台: {platform_name}, 良品: {good}, 不良品: {bad}, 直通良品: {fpy_good}")
return jsonify({'ok': True})
@app.post('/api/upload/repairs')
@require_login
@require_any_role('admin','superadmin')
def upload_repairs():
data = request.get_json() or {}
qty = int(data.get('qty') or 0)
note = data.get('note') or ''
if qty < 0:
return jsonify({'error': 'invalid quantity'}), 400
conn = get_db()
c = conn.cursor()
now = datetime.utcnow().isoformat()
c.execute('INSERT INTO repairs(qty, note, ts) VALUES(?,?,?)', (qty, note, now))
conn.commit()
conn.close()
log('upload_repairs', json.dumps({'qty': qty, 'note': note}))
notify_superadmin('上传返修记录', f"数量: {qty}")
return jsonify({'ok': True})
@app.post('/api/upload/defects')
@require_login
@require_any_role('admin','superadmin')
def upload_defects():
data = request.get_json() or {}
rows = data.get('rows') or []
conn = get_db()
c = conn.cursor()
now = datetime.utcnow().isoformat()
for r in rows:
mac = (r or {}).get('mac')
batch = (r or {}).get('batch')
if not mac or not batch:
continue
c.execute('INSERT INTO defects(mac, batch, ts) VALUES(?,?,?)', (mac, batch, now))
conn.commit()
conn.close()
log('upload_defects', f"count={len(rows)}")
notify_superadmin('上传不良明细', f"上传了 {len(rows)} 条记录")
return jsonify({'ok': True})
@app.post('/api/upload/shipments')
@require_login
@require_any_role('admin','superadmin')
def upload_shipments():
data = request.get_json() or {}
date = data.get('date')
qty = int(data.get('qty') or 0)
to = data.get('to')
platform = data.get('platform', '')
box_no = data.get('box_no', '')
if not date or qty <= 0 or not to or not platform:
return jsonify({'error': 'invalid payload'}), 400
conn = get_db()
c = conn.cursor()
# 检查shipments表是否有platform和box_no列如果没有则添加
c.execute("PRAGMA table_info(shipments)")
columns = [col[1] for col in c.fetchall()]
if 'platform' not in columns:
c.execute('ALTER TABLE shipments ADD COLUMN platform TEXT')
if 'box_no' not in columns:
c.execute('ALTER TABLE shipments ADD COLUMN box_no TEXT')
c.execute(
'INSERT INTO shipments(date, qty, receiver, platform, box_no, ts) VALUES(?,?,?,?,?,?)',
(date, qty, to, platform, box_no, datetime.utcnow().isoformat())
)
conn.commit()
conn.close()
platform_name = {'pdd': '拼多多', 'yt': '圆通', 'tx': '兔喜'}.get(platform, platform)
log_data = {'date': date, 'qty': qty, 'to': to, 'platform': platform}
if box_no:
log_data['box_no'] = box_no
log('upload_shipments', json.dumps(log_data))
notify_superadmin('上传发货记录', f"机种: {platform_name}, 日期: {date}, 数量: {qty}, 接收方: {to}")
return jsonify({'ok': True})
# collect
@app.get('/api/collect/devices')
@require_login
def devices():
conn = get_db()
c = conn.cursor()
c.execute('SELECT name, status FROM devices ORDER BY id DESC LIMIT 50')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.get('/api/collect/environment')
@require_login
def environment():
conn = get_db()
c = conn.cursor()
c.execute('SELECT temp, hum FROM environment ORDER BY id DESC LIMIT 1')
r = c.fetchone()
conn.close()
return jsonify({'temp': (r['temp'] if r else ''), 'hum': (r['hum'] if r else '')})
@app.get('/api/collect/personnel')
@require_login
def personnel():
conn = get_db()
c = conn.cursor()
c.execute('SELECT name, role FROM personnel ORDER BY id DESC LIMIT 100')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.post('/api/collect/personnel')
@require_login
@require_any_role('admin','superadmin')
def add_personnel():
data = request.get_json() or {}
name = (data.get('name') or '').strip()
role = (data.get('role') or '').strip()
if not name:
return jsonify({'error': 'invalid payload'}), 400
conn = get_db()
c = conn.cursor()
c.execute('INSERT INTO personnel(name, role, ts) VALUES(?,?,?)', (name, role, datetime.utcnow().isoformat()))
conn.commit()
conn.close()
log('add_personnel', name)
notify_superadmin('添加人员信息', f"姓名: {name}, 角色: {role}")
return jsonify({'ok': True})
@app.get('/api/collect/qa')
@require_login
def qa():
conn = get_db()
c = conn.cursor()
c.execute('SELECT title, date FROM qa ORDER BY id DESC LIMIT 100')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.get('/api/collect/production')
@require_login
def production():
conn = get_db()
c = conn.cursor()
c.execute('SELECT batch, duration FROM production ORDER BY id DESC LIMIT 100')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
# export
@app.post('/api/export/excel')
@require_login
def export_excel():
try:
import openpyxl
from openpyxl.styles import Font, Alignment, PatternFill
from io import BytesIO
data = request.get_json() or {}
data_type = data.get('type', 'stats')
# 创建工作簿
wb = openpyxl.Workbook()
ws = wb.active
# 设置标题样式
header_fill = PatternFill(start_color='4F8CFF', end_color='4F8CFF', fill_type='solid')
header_font = Font(bold=True, color='FFFFFF')
header_alignment = Alignment(horizontal='center', vertical='center')
conn = get_db()
c = conn.cursor()
# 根据类型导出不同的数据
if data_type == 'stats':
ws.title = '良不良统计'
ws.append(['直通良品数', '良品数', '不良品数', '时间'])
c.execute('SELECT fpy_good, good, bad, ts FROM stats ORDER BY id DESC')
elif data_type == 'mac':
ws.title = 'MAC与批次'
ws.append(['MAC地址', '批次号', '时间'])
c.execute('SELECT mac, batch, ts FROM mac_batches ORDER BY id DESC')
elif data_type == 'repairs':
ws.title = '返修记录'
ws.append(['返修数量', '备注', '时间'])
c.execute('SELECT qty, note, ts FROM repairs ORDER BY id DESC')
elif data_type == 'defects':
ws.title = '不良明细'
ws.append(['MAC地址', '批次号', '时间'])
c.execute('SELECT mac, batch, ts FROM defects ORDER BY id DESC')
elif data_type == 'shipments':
ws.title = '发货记录'
ws.append(['日期', '数量', '收货方', '时间'])
c.execute('SELECT date, qty, receiver, ts FROM shipments ORDER BY id DESC')
elif data_type == 'devices':
ws.title = '设备状态'
ws.append(['设备名称', '状态'])
c.execute('SELECT name, status FROM devices ORDER BY id DESC')
elif data_type == 'personnel':
ws.title = '人员信息'
ws.append(['姓名', '角色'])
c.execute('SELECT name, role FROM personnel ORDER BY id DESC')
elif data_type == 'qa':
ws.title = '质检报告'
ws.append(['标题', '日期'])
c.execute('SELECT title, date FROM qa ORDER BY id DESC')
elif data_type == 'production':
ws.title = '时间记录'
ws.append(['批次', '时长'])
c.execute('SELECT batch, duration FROM production ORDER BY id DESC')
else:
conn.close()
return jsonify({'error': 'invalid type'}), 400
# 应用标题样式
for cell in ws[1]:
cell.fill = header_fill
cell.font = header_font
cell.alignment = header_alignment
# 写入数据
rows = c.fetchall()
for row in rows:
ws.append(list(row))
conn.close()
# 自动调整列宽
for column in ws.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
ws.column_dimensions[column_letter].width = adjusted_width
# 保存到内存
output = BytesIO()
wb.save(output)
output.seek(0)
log('export_excel', data_type)
# 返回文件
from flask import send_file
filename = f'{ws.title}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.xlsx'
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=filename
)
except Exception as e:
log('export_excel_error', str(e))
return jsonify({'error': str(e)}), 500
@app.post('/api/export/pdf')
@require_login
def export_pdf():
try:
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4, landscape
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import cm
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.lib.enums import TA_CENTER
from io import BytesIO
data = request.get_json() or {}
data_type = data.get('type', 'stats')
# 注册中文字体
font_name = 'Helvetica'
try:
# 尝试常见的中文字体路径
font_paths = [
'/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc',
'/usr/share/fonts/truetype/arphic/uming.ttc',
'/System/Library/Fonts/PingFang.ttc',
'C:\\Windows\\Fonts\\simhei.ttf'
]
for font_path in font_paths:
if os.path.exists(font_path):
pdfmetrics.registerFont(TTFont('ChineseFont', font_path))
font_name = 'ChineseFont'
break
except Exception as e:
log('pdf_font_warning', f'无法加载中文字体: {str(e)}')
# 创建PDF
buffer = BytesIO()
doc = SimpleDocTemplate(
buffer,
pagesize=landscape(A4),
topMargin=1.5*cm,
bottomMargin=1.5*cm,
leftMargin=1.5*cm,
rightMargin=1.5*cm
)
elements = []
# 样式
styles = getSampleStyleSheet()
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontName=font_name,
fontSize=18,
textColor=colors.HexColor('#4F8CFF'),
spaceAfter=20,
alignment=TA_CENTER,
leading=24
)
conn = get_db()
c = conn.cursor()
# 根据类型导出不同的数据
if data_type == 'stats':
title = '良/不良统计报表'
headers = ['直通良品数', '良品数', '不良品数', '时间']
c.execute('SELECT fpy_good, good, bad, ts FROM stats ORDER BY id DESC LIMIT 200')
elif data_type == 'mac':
title = 'MAC与批次报表'
headers = ['MAC地址', '批次号', '时间']
c.execute('SELECT mac, batch, ts FROM mac_batches ORDER BY id DESC LIMIT 200')
elif data_type == 'repairs':
title = '返修记录报表'
headers = ['返修数量', '备注', '时间']
c.execute('SELECT qty, note, ts FROM repairs ORDER BY id DESC LIMIT 200')
elif data_type == 'defects':
title = '不良明细报表'
headers = ['MAC地址', '批次号', '时间']
c.execute('SELECT mac, batch, ts FROM defects ORDER BY id DESC LIMIT 200')
elif data_type == 'shipments':
title = '发货记录报表'
headers = ['日期', '数量', '收货方', '时间']
c.execute('SELECT date, qty, receiver, ts FROM shipments ORDER BY id DESC LIMIT 200')
elif data_type == 'devices':
title = '设备状态报表'
headers = ['设备名称', '状态']
c.execute('SELECT name, status FROM devices ORDER BY id DESC LIMIT 200')
elif data_type == 'personnel':
title = '人员信息报表'
headers = ['姓名', '角色']
c.execute('SELECT name, role FROM personnel ORDER BY id DESC LIMIT 200')
elif data_type == 'qa':
title = '质检报告'
headers = ['标题', '日期']
c.execute('SELECT title, date FROM qa ORDER BY id DESC LIMIT 200')
elif data_type == 'production':
title = '生产时间记录'
headers = ['批次', '时长']
c.execute('SELECT batch, duration FROM production ORDER BY id DESC LIMIT 200')
else:
conn.close()
return jsonify({'error': 'invalid type'}), 400
# 添加标题
elements.append(Paragraph(title, title_style))
elements.append(Spacer(1, 0.5*cm))
# 获取数据
rows = c.fetchall()
conn.close()
if len(rows) == 0:
# 没有数据时的提示
no_data_style = ParagraphStyle(
'NoData',
parent=styles['Normal'],
fontName=font_name,
fontSize=12,
textColor=colors.grey,
alignment=TA_CENTER
)
elements.append(Paragraph('暂无数据', no_data_style))
else:
# 构建表格数据
table_data = [headers]
for row in rows:
table_data.append([str(val) if val is not None else '' for val in row])
# 创建表格
table = Table(table_data, repeatRows=1)
table.setStyle(TableStyle([
# 表头样式
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#4F8CFF')),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('FONTNAME', (0, 0), (-1, 0), font_name),
('FONTSIZE', (0, 0), (-1, 0), 10),
('BOTTOMPADDING', (0, 0), (-1, 0), 8),
('TOPPADDING', (0, 0), (-1, 0), 8),
# 数据行样式
('BACKGROUND', (0, 1), (-1, -1), colors.white),
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
('FONTNAME', (0, 1), (-1, -1), font_name),
('FONTSIZE', (0, 1), (-1, -1), 8),
('ROWBACKGROUNDS', (0, 1), (-1, -1), [colors.white, colors.HexColor('#F5F7FA')]),
('TOPPADDING', (0, 1), (-1, -1), 5),
('BOTTOMPADDING', (0, 1), (-1, -1), 5),
]))
elements.append(table)
# 添加页脚信息
elements.append(Spacer(1, 0.5*cm))
footer_style = ParagraphStyle(
'Footer',
parent=styles['Normal'],
fontName=font_name,
fontSize=8,
textColor=colors.grey,
alignment=TA_CENTER
)
footer_text = f'导出时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | 共 {len(rows)} 条记录'
elements.append(Paragraph(footer_text, footer_style))
# 生成PDF
doc.build(elements)
buffer.seek(0)
log('export_pdf', data_type)
# 返回文件
from flask import send_file
filename = f'{title}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.pdf'
return send_file(
buffer,
mimetype='application/pdf',
as_attachment=True,
download_name=filename
)
except Exception as e:
log('export_pdf_error', str(e))
return jsonify({'error': f'PDF导出失败: {str(e)}'}), 500
# lists
@app.get('/api/list/mac')
@require_login
def list_mac():
conn = get_db()
c = conn.cursor()
c.execute('SELECT mac, batch, ts FROM mac_batches ORDER BY id DESC LIMIT 200')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.get('/api/list/stats')
@require_login
def list_stats():
conn = get_db()
c = conn.cursor()
c.execute('SELECT good, bad, fpy_good, platform, ts FROM stats ORDER BY id DESC LIMIT 200')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.get('/api/list/repairs')
@require_login
def list_repairs():
conn = get_db()
c = conn.cursor()
c.execute('SELECT qty, note, ts FROM repairs ORDER BY id DESC LIMIT 200')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.get('/api/list/defects')
@require_login
def list_defects():
conn = get_db()
c = conn.cursor()
c.execute('SELECT mac, batch, ts FROM defects ORDER BY id DESC LIMIT 200')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.get('/api/list/shipments')
@require_login
def list_shipments():
conn = get_db()
c = conn.cursor()
c.execute('SELECT date, qty, receiver, ts FROM shipments ORDER BY id DESC LIMIT 200')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
# admin management
@app.get('/api/admin/users')
@require_login
@require_any_role('superadmin')
def list_users():
conn = get_db()
c = conn.cursor()
c.execute('SELECT username, role FROM users ORDER BY id ASC')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.post('/api/admin/reset-password')
@require_login
@require_any_role('superadmin')
def reset_password():
data = request.get_json() or {}
username = data.get('username')
new_password = data.get('new_password')
if not username or not new_password:
return jsonify({'error': 'invalid payload'}), 400
conn = get_db()
c = conn.cursor()
c.execute('SELECT id FROM users WHERE username=?', (username,))
row = c.fetchone()
if not row:
conn.close()
return jsonify({'error': 'user not found'}), 404
c.execute('UPDATE users SET password_hash=? WHERE id=?', (generate_password_hash(new_password), row['id']))
conn.commit()
conn.close()
log('reset_password', username)
return jsonify({'ok': True})
@app.post('/api/admin/change-password')
@require_login
@require_any_role('superadmin')
def change_password():
data = request.get_json() or {}
username = data.get('username')
new_password = data.get('new_password')
if not username or not new_password:
return jsonify({'error': 'invalid payload'}), 400
conn = get_db()
c = conn.cursor()
c.execute('SELECT id FROM users WHERE username=?', (username,))
row = c.fetchone()
if not row:
conn.close()
return jsonify({'error': 'user not found'}), 404
c.execute('UPDATE users SET password_hash=? WHERE id=?', (generate_password_hash(new_password), row['id']))
conn.commit()
conn.close()
log('change_password', username)
return jsonify({'ok': True})
@app.post('/api/admin/add-user')
@require_login
@require_any_role('superadmin')
def add_user():
"""添加新用户"""
data = request.get_json() or {}
username = (data.get('username') or '').strip()
password = data.get('password')
role = (data.get('role') or 'admin').strip()
if not username or not password:
return jsonify({'error': '用户名和密码不能为空'}), 400
if role not in ['admin', 'superadmin']:
return jsonify({'error': '角色必须是 admin 或 superadmin'}), 400
conn = get_db()
c = conn.cursor()
# 检查用户名是否已存在
c.execute('SELECT id FROM users WHERE username=?', (username,))
if c.fetchone():
conn.close()
return jsonify({'error': '用户名已存在'}), 400
# 创建新用户
try:
c.execute('INSERT INTO users(username, password_hash, role) VALUES(?,?,?)',
(username, generate_password_hash(password), role))
conn.commit()
conn.close()
log('add_user', f'username={username}, role={role}')
return jsonify({'ok': True, 'message': f'用户 {username} 创建成功'})
except Exception as e:
conn.close()
return jsonify({'error': f'创建用户失败:{str(e)}'}), 500
@app.post('/api/admin/clear')
@require_login
@require_any_role('superadmin')
def clear_module():
data = request.get_json() or {}
module = data.get('module')
tables = {
'mac': 'mac_batches',
'stats': 'stats',
'defects': 'defects',
'shipments': 'shipments',
'devices': 'devices',
'environment': 'environment',
'personnel': 'personnel',
'qa': 'qa',
'production': 'production'
}
table = tables.get(module)
if not table:
return jsonify({'error': 'invalid module'}), 400
conn = get_db()
c = conn.cursor()
c.execute(f'DELETE FROM {table}')
conn.commit()
conn.close()
log('clear_module', module)
return jsonify({'ok': True})
# notifications
@app.get('/api/notifications')
@require_login
@require_any_role('superadmin')
def get_notifications():
"""获取当前用户的通知列表"""
user_id = session.get('user_id')
conn = get_db()
c = conn.cursor()
c.execute('SELECT id, username, action, detail, ts, read FROM notifications WHERE user_id=? ORDER BY id DESC LIMIT 100', (user_id,))
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.get('/api/notifications/unread-count')
@require_login
@require_any_role('superadmin')
def get_unread_count():
"""获取未读通知数量"""
user_id = session.get('user_id')
conn = get_db()
c = conn.cursor()
c.execute('SELECT COUNT(*) as count FROM notifications WHERE user_id=? AND read=0', (user_id,))
row = c.fetchone()
conn.close()
return jsonify({'count': row['count'] if row else 0})
@app.post('/api/notifications/mark-read')
@require_login
@require_any_role('superadmin')
def mark_notification_read():
"""标记通知为已读"""
data = request.get_json() or {}
notification_id = data.get('id')
if not notification_id:
return jsonify({'error': 'invalid id'}), 400
user_id = session.get('user_id')
conn = get_db()
c = conn.cursor()
c.execute('UPDATE notifications SET read=1 WHERE id=? AND user_id=?', (notification_id, user_id))
conn.commit()
conn.close()
return jsonify({'ok': True})
@app.post('/api/notifications/mark-all-read')
@require_login
@require_any_role('superadmin')
def mark_all_notifications_read():
"""标记所有通知为已读"""
user_id = session.get('user_id')
conn = get_db()
c = conn.cursor()
c.execute('UPDATE notifications SET read=1 WHERE user_id=?', (user_id,))
conn.commit()
conn.close()
return jsonify({'ok': True})
@app.post('/api/notifications/delete-read')
@require_login
@require_any_role('superadmin')
def delete_read_notifications():
"""删除所有已读通知"""
user_id = session.get('user_id')
conn = get_db()
c = conn.cursor()
c.execute('DELETE FROM notifications WHERE user_id=? AND read=1', (user_id,))
deleted_count = c.rowcount
conn.commit()
conn.close()
return jsonify({'ok': True, 'count': deleted_count})
@app.errorhandler(404)
def not_found(_):
return jsonify({'error': 'not found'}), 404
@app.route('/api/validate/mac-file', methods=['POST'])
@require_login
@require_any_role('admin','superadmin')
def validate_mac_file():
"""验证Excel文件格式是否符合要求"""
f = request.files.get('file')
if not f:
return jsonify({'error': 'no file'}), 400
name = secure_filename(f.filename or '')
ext = (name.split('.')[-1] or '').lower()
if ext not in ['csv', 'xlsx', 'xls']:
return jsonify({'valid': False, 'message': '文件格式不支持请上传CSV或Excel文件'}), 200
try:
if ext == 'csv':
text = f.stream.read().decode('utf-8', errors='ignore')
lines = [l.strip() for l in text.splitlines() if l.strip()]
if not lines:
return jsonify({'valid': False, 'message': '文件为空,没有数据'}), 200
# 检查第一行(表头)
header = [h.strip() for h in lines[0].split(',')]
if len(header) != 2:
return jsonify({'valid': False, 'message': f'文件应该只包含2列数据当前有{len(header)}'}), 200
# 记录表头用于调试
log('validate_mac_file_csv', f'headers: {header}')
# 更灵活的表头检查(不区分大小写)
header_lower = [h.lower() for h in header]
has_mac = any('mac' in h and 'sn' not in h for h in header_lower)
has_sn_mac = any('sn_mac' in h or 'sn-mac' in h for h in header_lower)
has_batch = any('批次' in h or 'batch' in h for h in header_lower)
if not (has_mac or has_sn_mac):
return jsonify({'valid': False, 'message': f'缺少必需的列MAC 或 SN_MAC当前列{", ".join(header)}'}), 200
if not has_batch:
return jsonify({'valid': False, 'message': f'缺少必需的列:批次号(当前列:{", ".join(header)}'}), 200
data_rows = len(lines) - 1
mac_col = 'MAC' if has_mac else 'SN_MAC'
return jsonify({'valid': True, 'message': f'文件格式正确,包含列:{mac_col} 和 批次号,共{data_rows}行数据'}), 200
else:
import openpyxl
wb = openpyxl.load_workbook(f)
ws = wb.active
if ws.max_row < 2:
wb.close()
return jsonify({'valid': False, 'message': '文件为空,没有数据'}), 200
if ws.max_column != 2:
wb.close()
return jsonify({'valid': False, 'message': f'文件应该只包含2列数据当前有{ws.max_column}'}), 200
# 检查表头
header_row = list(ws.iter_rows(min_row=1, max_row=1, values_only=True))[0]
header = [str(h).strip() if h else '' for h in header_row]
# 记录表头用于调试
log('validate_mac_file', f'headers: {header}')
# 更灵活的表头检查(不区分大小写)
header_lower = [h.lower() for h in header]
has_mac = any('mac' in h and 'sn' not in h for h in header_lower)
has_sn_mac = any('sn_mac' in h or 'sn-mac' in h for h in header_lower)
has_batch = any('批次' in h or 'batch' in h for h in header_lower)
if not (has_mac or has_sn_mac):
wb.close()
return jsonify({'valid': False, 'message': f'缺少必需的列MAC 或 SN_MAC当前列{", ".join(header)}'}), 200
if not has_batch:
wb.close()
return jsonify({'valid': False, 'message': f'缺少必需的列:批次号(当前列:{", ".join(header)}'}), 200
data_rows = ws.max_row - 1
mac_col = 'MAC' if has_mac else 'SN_MAC'
wb.close()
return jsonify({'valid': True, 'message': f'文件格式正确,包含列:{mac_col} 和 批次号,共{data_rows}行数据'}), 200
except Exception as e:
return jsonify({'valid': False, 'message': f'读取文件失败:{str(e)}'}), 200
@app.post('/api/upload/mac-file')
@require_login
@require_any_role('admin','superadmin')
def upload_mac_file():
import subprocess
import tempfile
f = request.files.get('file')
upload_type = request.form.get('type', 'pdd') # pdd, yt, or tx
if not f:
return jsonify({'error': 'no file'}), 400
if upload_type not in ['pdd', 'yt', 'tx']:
return jsonify({'error': 'invalid type'}), 400
# 保存上传的文件到临时位置
name = secure_filename(f.filename or 'upload.xlsx')
temp_dir = '/home/hyx/work/batch_import_xlsx'
os.makedirs(temp_dir, exist_ok=True)
# 根据类型确定文件名
if upload_type == 'yt':
temp_path = os.path.join(temp_dir, 'sn_test_yt.xlsx')
elif upload_type == 'pdd':
temp_path = os.path.join(temp_dir, 'sn_test_pdd.xlsx')
else:
temp_path = os.path.join(temp_dir, 'sn_test_tx.xlsx')
f.save(temp_path)
# 调用batch_import.py脚本
script_path = '/home/hyx/work/生产管理系统/batch_import.py'
python_path = '/home/hyx/work/.venv/bin/python'
try:
result = subprocess.run(
[python_path, script_path, upload_type],
capture_output=True,
text=True,
timeout=300 # 5分钟超时
)
output = result.stdout + result.stderr
success = result.returncode == 0
log('upload_mac_file', f"type={upload_type}, success={success}")
if success:
notify_superadmin('批量上传MAC文件', f"类型: {upload_type}")
return jsonify({
'ok': success,
'output': output,
'returncode': result.returncode
})
except subprocess.TimeoutExpired:
return jsonify({'error': '上传超时', 'output': '处理时间超过5分钟'}), 500
except Exception as e:
return jsonify({'error': str(e), 'output': ''}), 500
@app.post('/api/upload/defects-file')
@require_login
@require_any_role('admin','superadmin')
def upload_defects_file():
f = request.files.get('file')
if not f:
return jsonify({'error': 'no file'}), 400
name = secure_filename(f.filename or '')
ext = (name.split('.')[-1] or '').lower()
rows = []
if ext == 'csv':
text = f.stream.read().decode('utf-8', errors='ignore')
for l in text.splitlines():
parts = [p.strip() for p in l.split(',')]
if len(parts) >= 2:
rows.append({'mac': parts[0], 'batch': parts[1]})
else:
try:
import openpyxl
wb = openpyxl.load_workbook(f)
ws = wb.active
for r in ws.iter_rows(values_only=True):
mac = str(r[0]).strip() if r and r[0] else None
batch = str(r[1]).strip() if r and len(r) > 1 and r[1] else None
if mac and batch:
rows.append({'mac': mac, 'batch': batch})
except Exception:
return jsonify({'error': 'parse error'}), 400
conn = get_db()
c = conn.cursor()
now = datetime.utcnow().isoformat()
for r in rows:
c.execute('INSERT INTO defects(mac, batch, ts) VALUES(?,?,?)', (r['mac'], r['batch'], now))
conn.commit()
conn.close()
log('upload_defects_file', f"count={len(rows)}")
notify_superadmin('批量上传不良明细文件', f"上传了 {len(rows)} 条记录")
return jsonify({'ok': True, 'count': len(rows)})
@app.route('/api/validate/shipments-file', methods=['POST'])
@require_login
@require_any_role('admin','superadmin')
def validate_shipments_file():
"""验证发货记录Excel文件格式"""
f = request.files.get('file')
if not f:
return jsonify({'error': 'no file'}), 400
name = secure_filename(f.filename or '')
ext = (name.split('.')[-1] or '').lower()
if ext not in ['csv', 'xlsx', 'xls']:
return jsonify({'valid': False, 'message': '文件格式不支持请上传CSV或Excel文件'}), 200
try:
if ext == 'csv':
text = f.stream.read().decode('utf-8', errors='ignore')
lines = [l.strip() for l in text.splitlines() if l.strip()]
if not lines:
return jsonify({'valid': False, 'message': '文件为空,没有数据'}), 200
header = [h.strip() for h in lines[0].split(',')]
header_lower = [h.lower() for h in header]
# 检查必需的列
has_date = any('出货日期' in h or '发货日期' in h or 'date' in hl for h, hl in zip(header, header_lower))
has_box = any('箱号' in h or 'box' in hl for h, hl in zip(header, header_lower))
if not has_date:
return jsonify({'valid': False, 'message': '缺少必需的列:出货日期'}), 200
if not has_box:
return jsonify({'valid': False, 'message': '缺少必需的列:箱号'}), 200
# 检查SN列SN1-SN20
sn_cols = [h for h in header if h.startswith('SN') and h[2:].isdigit()]
if not sn_cols:
return jsonify({'valid': False, 'message': '缺少SN列SN1, SN2, ... SN20'}), 200
data_rows = len(lines) - 1
return jsonify({'valid': True, 'message': f'文件格式正确,包含{len(sn_cols)}个SN列{data_rows}行数据'}), 200
else:
import openpyxl
wb = openpyxl.load_workbook(f)
ws = wb.active
if ws.max_row < 2:
wb.close()
return jsonify({'valid': False, 'message': '文件为空,没有数据'}), 200
# 检查表头
header_row = list(ws.iter_rows(min_row=1, max_row=1, values_only=True))[0]
header = [str(h).strip() if h else '' for h in header_row]
header_lower = [h.lower() for h in header]
# 检查必需的列
has_date = any('出货日期' in h or '发货日期' in h or 'date' in hl for h, hl in zip(header, header_lower))
has_box = any('箱号' in h or 'box' in hl for h, hl in zip(header, header_lower))
if not has_date:
wb.close()
return jsonify({'valid': False, 'message': '缺少必需的列:出货日期'}), 200
if not has_box:
wb.close()
return jsonify({'valid': False, 'message': '缺少必需的列:箱号'}), 200
# 检查SN列
sn_cols = [h for h in header if h.startswith('SN') and h[2:].isdigit()]
if not sn_cols:
wb.close()
return jsonify({'valid': False, 'message': '缺少SN列SN1, SN2, ... SN20'}), 200
data_rows = ws.max_row - 1
wb.close()
return jsonify({'valid': True, 'message': f'文件格式正确,包含{len(sn_cols)}个SN列{data_rows}行数据'}), 200
except Exception as e:
return jsonify({'valid': False, 'message': f'读取文件失败:{str(e)}'}), 200
@app.get('/api/shipments/query-by-sn')
@require_login
def query_shipment_by_sn():
"""通过 SN/MAC 号查询出货信息"""
sn = request.args.get('sn', '').strip()
if not sn:
return jsonify({'error': '请提供 SN/MAC 号'}), 400
try:
r = get_redis()
redis_key = 'shipment_sn_mapping'
# 从 Redis Hash 中查询
result = r.hget(redis_key, sn)
if result:
# 解析 JSON 数据
shipment_info = json.loads(result)
platform = shipment_info.get('platform', 'pdd') # 默认拼多多
platform_name = {'pdd': '拼多多', 'yt': '圆通', 'tx': '兔喜'}.get(platform, platform)
return jsonify({
'found': True,
'sn': sn,
'date': shipment_info.get('date'),
'box': shipment_info.get('box'),
'platform': platform,
'platform_name': platform_name,
'ts': shipment_info.get('ts')
})
else:
return jsonify({
'found': False,
'sn': sn,
'message': '未找到该 SN 的出货记录'
})
except Exception as e:
log('query_shipment_error', str(e))
return jsonify({'error': f'查询失败:{str(e)}'}), 500
@app.post('/api/shipments/update-platform')
@require_login
@require_any_role('superadmin')
def update_shipments_platform():
"""批量更新Redis中发货记录的机种字段"""
try:
r = get_redis()
redis_key = 'shipment_sn_mapping'
# 获取所有记录
all_data = r.hgetall(redis_key)
updated_count = 0
pipe = r.pipeline()
for sn, value in all_data.items():
try:
info = json.loads(value)
# 如果没有platform字段添加为pdd
if 'platform' not in info:
info['platform'] = 'pdd'
pipe.hset(redis_key, sn, json.dumps(info, ensure_ascii=False))
updated_count += 1
except Exception:
continue
pipe.execute()
log('update_shipments_platform', f'updated {updated_count} records')
return jsonify({
'ok': True,
'message': f'已更新 {updated_count} 条记录为拼多多',
'updated': updated_count
})
except Exception as e:
log('update_shipments_platform_error', str(e))
return jsonify({'error': f'更新失败: {str(e)}'}), 500
@app.get('/api/shipments/redis-stats')
@require_login
def shipments_redis_stats():
"""获取 Redis 中发货记录的统计信息"""
try:
r = get_redis()
redis_key = 'shipment_sn_mapping'
count = r.hlen(redis_key)
return jsonify({
'key': redis_key,
'count': count,
'exists': r.exists(redis_key) > 0
})
except Exception as e:
log('shipments_redis_stats_error', str(e))
return jsonify({'error': f'获取统计失败:{str(e)}'}), 500
@app.post('/api/shipments/clear-redis')
@require_login
@require_any_role('admin','superadmin')
def clear_shipments_redis():
"""清空 Redis 中的发货记录数据"""
try:
r = get_redis()
redis_key = 'shipment_sn_mapping'
# 获取删除前的数量
count_before = r.hlen(redis_key)
# 删除整个 Hash
r.delete(redis_key)
log('clear_shipments_redis', f'cleared {count_before} records')
return jsonify({
'ok': True,
'message': f'已清空 {count_before} 条发货记录',
'count': count_before
})
except Exception as e:
log('clear_shipments_redis_error', str(e))
return jsonify({'error': f'清空失败:{str(e)}'}), 500
@app.route('/api/upload/shipments-file', methods=['POST'])
@require_login
@require_any_role('admin','superadmin')
def upload_shipments_file():
"""上传发货记录Excel文件"""
f = request.files.get('file')
platform = request.form.get('platform') # 获取机种参数
if not f:
return jsonify({'error': '请选择文件'}), 400
if not platform or platform not in ['pdd', 'yt', 'tx']:
return jsonify({'error': '请选择机种(拼多多/圆通/兔喜)'}), 400
name = secure_filename(f.filename or '')
ext = (name.split('.')[-1] or '').lower()
if ext not in ['csv', 'xlsx', 'xls']:
return jsonify({'error': '文件格式不支持'}), 400
try:
rows = []
if ext == 'csv':
text = f.stream.read().decode('utf-8', errors='ignore')
lines = [l.strip() for l in text.splitlines() if l.strip()]
if len(lines) < 2:
return jsonify({'error': '文件为空'}), 400
header = [h.strip() for h in lines[0].split(',')]
header_lower = [h.lower() for h in header]
# 找到列索引
date_idx = next((i for i, h in enumerate(header) if '出货日期' in h or '发货日期' in h or 'date' in header_lower[i]), None)
box_idx = next((i for i, h in enumerate(header) if '箱号' in h or 'box' in header_lower[i]), None)
if date_idx is None or box_idx is None:
return jsonify({'error': '缺少必需的列'}), 400
# 找到所有SN列按数字排序
sn_cols = [(i, h) for i, h in enumerate(header) if h.startswith('SN') and h[2:].isdigit()]
sn_indices = sorted(sn_cols, key=lambda x: int(x[1][2:])) # 按 SN 后面的数字排序
# 记录上一个有效的日期(用于处理合并单元格)
last_valid_date = None
for line in lines[1:]:
parts = [p.strip() for p in line.split(',')]
if len(parts) <= max(date_idx, box_idx):
continue
# 处理日期(合并单元格时可能为空)
current_date = parts[date_idx] if date_idx < len(parts) and parts[date_idx] else ''
# 如果当前行日期为空,使用上一个有效日期
if current_date:
last_valid_date = current_date
date = current_date
else:
date = last_valid_date
# 处理箱号
box = parts[box_idx] if box_idx < len(parts) and parts[box_idx] else ''
# 如果没有日期或箱号,跳过这行
if not date or not box:
continue
# 收集所有SN横向 20 个)
sns = []
for idx, _ in sn_indices:
if idx < len(parts) and parts[idx]:
sns.append(parts[idx])
# 只有当有 SN 数据时才添加记录
if sns:
rows.append({
'date': date,
'box': box,
'sns': sns,
'qty': len(sns)
})
else:
import openpyxl
wb = openpyxl.load_workbook(f)
ws = wb.active
if ws.max_row < 2:
wb.close()
return jsonify({'error': '文件为空'}), 400
# 读取表头
header_row = list(ws.iter_rows(min_row=1, max_row=1, values_only=True))[0]
header = [str(h).strip() if h else '' for h in header_row]
header_lower = [h.lower() for h in header]
# 找到列索引
date_idx = next((i for i, h in enumerate(header) if '出货日期' in h or '发货日期' in h or 'date' in header_lower[i]), None)
box_idx = next((i for i, h in enumerate(header) if '箱号' in h or 'box' in header_lower[i]), None)
if date_idx is None or box_idx is None:
wb.close()
return jsonify({'error': '缺少必需的列'}), 400
# 找到所有SN列按数字排序
sn_cols = [(i, h) for i, h in enumerate(header) if h.startswith('SN') and h[2:].isdigit()]
sn_indices = sorted(sn_cols, key=lambda x: int(x[1][2:])) # 按 SN 后面的数字排序
# 记录上一个有效的日期(用于处理合并单元格)
last_valid_date = None
# 读取数据行
for row in ws.iter_rows(min_row=2, values_only=True):
# 处理日期(合并单元格时可能为 None
date_value = row[date_idx] if date_idx < len(row) else None
current_date = None
if date_value:
# 如果是 datetime.datetime 或 datetime.date 对象
if hasattr(date_value, 'strftime'):
current_date = date_value.strftime('%Y-%m-%d')
# 如果是数字Excel 日期序列号)
elif isinstance(date_value, (int, float)):
try:
# Excel 日期从 1900-01-01 开始计数
from datetime import datetime as dt, timedelta
# Excel 的 bug1900 不是闰年但 Excel 认为是所以需要减1
excel_epoch = dt(1899, 12, 30)
date_obj = excel_epoch + timedelta(days=float(date_value))
current_date = date_obj.strftime('%Y-%m-%d')
except Exception:
current_date = str(date_value).strip()
else:
current_date = str(date_value).strip()
if current_date == 'None':
current_date = None
# 如果当前行日期为空,使用上一个有效日期
if current_date:
last_valid_date = current_date
date = current_date
else:
date = last_valid_date
# 处理箱号
box = str(row[box_idx]).strip() if box_idx < len(row) and row[box_idx] else ''
# 如果没有日期或箱号,跳过这行
if not date or not box or box == 'None':
continue
# 收集所有SN横向 20 个)
sns = []
for idx, _ in sn_indices:
if idx < len(row) and row[idx]:
sn_value = str(row[idx]).strip()
if sn_value and sn_value != 'None':
sns.append(sn_value)
# 只有当有 SN 数据时才添加记录
if sns:
rows.append({
'date': date,
'box': box,
'sns': sns,
'qty': len(sns)
})
wb.close()
# 保存到 SQLite 数据库
conn = get_db()
c = conn.cursor()
# 使用北京时间UTC+8
from datetime import timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz).strftime('%Y-%m-%d %H:%M:%S')
total_qty = 0
for r in rows:
receiver_info = f"箱号:{r['box']}"
c.execute('INSERT INTO shipments(date, qty, receiver, ts) VALUES(?,?,?,?)',
(r['date'], r['qty'], receiver_info, now))
total_qty += r['qty']
conn.commit()
conn.close()
# 保存到 Redis - 使用 Hash 结构存储 MAC -> 出货信息的映射
try:
r = get_redis()
redis_key = 'shipment_sn_mapping' # Redis Hash key
# 使用 pipeline 批量写入,提高性能
pipe = r.pipeline()
redis_count = 0
for row_data in rows:
date = row_data['date']
box = row_data['box']
sns = row_data['sns']
# 为每个 SN/MAC 创建映射记录
for sn in sns:
if sn: # 确保 SN 不为空
# 存储格式: MAC -> JSON(date, box, platform, timestamp)
shipment_info = json.dumps({
'date': date,
'box': box,
'platform': platform,
'ts': now
}, ensure_ascii=False)
pipe.hset(redis_key, sn, shipment_info)
redis_count += 1
# 执行批量写入
pipe.execute()
log('upload_shipments_redis', f"redis_key={redis_key}, sn_count={redis_count}")
except Exception as redis_error:
# Redis 写入失败不影响主流程,只记录日志
log('upload_shipments_redis_error', str(redis_error))
log('upload_shipments_file', f"boxes={len(rows)}, total_qty={total_qty}")
notify_superadmin('批量上传发货记录文件', f"箱数: {len(rows)}, 总数量: {total_qty}")
return jsonify({'ok': True, 'count': len(rows), 'total_qty': total_qty})
except Exception as e:
log('upload_shipments_file_error', str(e))
return jsonify({'error': f'处理文件失败:{str(e)}'}), 500
init_db()
if __name__ == '__main__':
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', '5000')))