# -*- coding: utf-8 -*- import os import json import sqlite3 from datetime import datetime from functools import wraps from flask import Flask, request, jsonify, session, send_from_directory from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.utils import secure_filename try: import redis except Exception: redis = None _redis_client = None _audit_cache = {'pdd': {'ts': 0, 'list': []}, 'yt': {'ts': 0, 'list': []}} BASE_DIR = os.path.dirname(os.path.abspath(__file__)) DB_PATH = os.path.join(BASE_DIR, 'data.db') FRONTEND_DIR = os.path.join(os.path.dirname(BASE_DIR), 'frontend') app = Flask(__name__, static_folder=FRONTEND_DIR, static_url_path='') app.config['SECRET_KEY'] = os.environ.get('APP_SECRET', 'change-me') def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db(): conn = get_db() c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS users( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, role TEXT NOT NULL )''') c.execute('''CREATE TABLE IF NOT EXISTS operations_log( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, action TEXT, detail TEXT, ts TEXT )''') c.execute('''CREATE TABLE IF NOT EXISTS notifications( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, username TEXT, action TEXT, detail TEXT, ts TEXT, read INTEGER DEFAULT 0 )''') c.execute('''CREATE TABLE IF NOT EXISTS mac_batches( id INTEGER PRIMARY KEY AUTOINCREMENT, mac TEXT, batch TEXT, ts TEXT )''') c.execute('''CREATE TABLE IF NOT EXISTS stats( id INTEGER PRIMARY KEY AUTOINCREMENT, good INTEGER, bad INTEGER, fpy_good INTEGER DEFAULT 0, platform TEXT DEFAULT 'pdd', ts TEXT )''') # 为已存在的表添加列(如果不存在) try: c.execute('ALTER TABLE stats ADD COLUMN fpy_good INTEGER DEFAULT 0') except Exception: pass # 列已存在 try: c.execute('ALTER TABLE stats ADD COLUMN platform TEXT DEFAULT "pdd"') except Exception: pass # 列已存在 try: c.execute('ALTER TABLE users ADD COLUMN avatar TEXT') except Exception: pass # 列已存在 c.execute('''CREATE TABLE IF NOT EXISTS defects( id INTEGER PRIMARY KEY AUTOINCREMENT, mac TEXT, batch TEXT, ts TEXT )''') c.execute('''CREATE TABLE IF NOT EXISTS shipments( id INTEGER PRIMARY KEY AUTOINCREMENT, date TEXT, qty INTEGER, receiver TEXT, ts TEXT )''') c.execute('''CREATE TABLE IF NOT EXISTS devices( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, status TEXT, ts TEXT )''') c.execute('''CREATE TABLE IF NOT EXISTS environment( id INTEGER PRIMARY KEY AUTOINCREMENT, temp TEXT, hum TEXT, ts TEXT )''') c.execute('''CREATE TABLE IF NOT EXISTS personnel( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, role TEXT, ts TEXT )''') c.execute('''CREATE TABLE IF NOT EXISTS qa( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, date TEXT, ts TEXT )''') c.execute('''CREATE TABLE IF NOT EXISTS production( id INTEGER PRIMARY KEY AUTOINCREMENT, batch TEXT, duration TEXT, ts TEXT )''') c.execute('''CREATE TABLE IF NOT EXISTS repairs( id INTEGER PRIMARY KEY AUTOINCREMENT, qty INTEGER, note TEXT, ts TEXT )''') conn.commit() # create default admin c.execute('SELECT id FROM users WHERE username=?', ('admin',)) if not c.fetchone(): pwd = os.environ.get('ADMIN_PASSWORD', 'admin123') c.execute('INSERT INTO users(username, password_hash, role) VALUES(?,?,?)', ( 'admin', generate_password_hash(pwd), 'admin' )) conn.commit() # create superadmin from env su_user = os.environ.get('SUPERADMIN_USERNAME') su_pass = os.environ.get('SUPERADMIN_PASSWORD') if su_user and su_pass: c.execute('SELECT id FROM users WHERE username=?', (su_user,)) if not c.fetchone(): c.execute('INSERT INTO users(username, password_hash, role) VALUES(?,?,?)', ( su_user, generate_password_hash(su_pass), 'superadmin' )) conn.commit() conn.close() def log(action, detail=''): try: conn = get_db() c = conn.cursor() c.execute('INSERT INTO operations_log(user_id, action, detail, ts) VALUES(?,?,?,?)', ( session.get('user_id'), action, detail, datetime.utcnow().isoformat() )) conn.commit() conn.close() except Exception: pass def notify_superadmin(action, detail=''): """为超级管理员创建通知""" try: user_id = session.get('user_id') if not user_id: return conn = get_db() c = conn.cursor() # 获取当前用户信息 c.execute('SELECT username, role FROM users WHERE id=?', (user_id,)) user = c.fetchone() if not user: conn.close() return # 如果是超级管理员自己的操作,不创建通知 if user['role'] == 'superadmin': conn.close() return # 为所有超级管理员创建通知 c.execute('SELECT id FROM users WHERE role=?', ('superadmin',)) superadmins = c.fetchall() # 使用北京时间(UTC+8) from datetime import timezone, timedelta beijing_tz = timezone(timedelta(hours=8)) now = datetime.now(beijing_tz).isoformat() for admin in superadmins: c.execute('INSERT INTO notifications(user_id, username, action, detail, ts, read) VALUES(?,?,?,?,?,?)', ( admin['id'], user['username'], action, detail, now, 0 )) conn.commit() conn.close() except Exception: pass def get_redis(): global _redis_client if not redis: raise RuntimeError('redis missing') if _redis_client is not None: return _redis_client host = os.environ.get('REDIS_HOST', '180.163.74.83') port = int(os.environ.get('REDIS_PORT', '6379')) password = os.environ.get('REDIS_PASSWORD') or os.environ.get('SUPERADMIN_PASSWORD') db = int(os.environ.get('REDIS_DB', '0')) _redis_client = redis.Redis(host=host, port=port, password=password, db=db, decode_responses=True, socket_timeout=0.5, socket_connect_timeout=0.5) return _redis_client def parse_audit_line(s): if not s: return {'ts_cn': None, 'batch': None, 'mac': None, 'note': None} def normalize_ts(ts): try: from datetime import datetime, timezone, timedelta # Attempt ISO parsing # Support "Z" UTC suffix and offsets like +08:00 if ts.endswith('Z'): dt = datetime.fromisoformat(ts.replace('Z', '+00:00')) else: dt = datetime.fromisoformat(ts) # Convert to Beijing time (UTC+8) bj = dt.astimezone(timezone(timedelta(hours=8))) return bj.strftime('%Y-%m-%d %H:%M:%S') except Exception: return ts def has_time(v): return isinstance(v, str) and (('T' in v) or (':' in v)) def choose_ts(d): candidates = [d.get('ts_cn'), d.get('ts_local'), d.get('ts'), d.get('ts_utc'), d.get('timestamp'), d.get('time')] for v in candidates: if has_time(v): return v for v in candidates: if v: return v return None try: obj = json.loads(s) ts = choose_ts(obj) if not ts: ts = s if isinstance(s, str) else None return { 'ts_cn': ts if ts else None, 'batch': obj.get('batch') or obj.get('batch_no') or obj.get('lot'), 'mac': obj.get('mac') or obj.get('mac_addr') or obj.get('mac_address'), 'note': obj.get('note') or obj.get('msg') or obj.get('message') } except Exception: pass d = {} parts = [] for sep in [' ', ',', ';', '|']: if sep in s: parts = s.split(sep) break if not parts: parts = [s] i = 0 while i < len(parts): part = parts[i] if '=' in part: k, v = part.split('=', 1) kk = k.strip() vv = v.strip() try: import re if kk in ('ts_cn', 'ts_local', 'ts', 'timestamp', 'time'): if re.match(r'^\d{4}-\d{2}-\d{2}$', vv) and i + 1 < len(parts) and re.match(r'^\d{2}:\d{2}:\d{2}', parts[i+1]): vv = vv + ' ' + parts[i+1] i += 1 except Exception: pass d[kk] = vv i += 1 ts = choose_ts(d) if not ts: ts = s if isinstance(s, str) else None return { 'ts_cn': ts if ts else None, 'batch': d.get('batch') or d.get('batch_no') or d.get('lot'), 'mac': d.get('mac') or d.get('mac_addr') or d.get('mac_address'), 'note': d.get('note') or d.get('msg') or d.get('message') or s } def require_login(fn): @wraps(fn) def wrapper(*args, **kwargs): if not session.get('user_id'): return jsonify({'error': 'unauthorized'}), 401 return fn(*args, **kwargs) return wrapper def require_role(role): def deco(fn): @wraps(fn) def wrapper(*args, **kwargs): if session.get('role') != role: return jsonify({'error': 'forbidden'}), 403 return fn(*args, **kwargs) return wrapper return deco def require_any_role(*roles): def deco(fn): @wraps(fn) def wrapper(*args, **kwargs): if session.get('role') not in roles: return jsonify({'error': 'forbidden'}), 403 return fn(*args, **kwargs) return wrapper return deco @app.route('/') def index(): return send_from_directory(FRONTEND_DIR, 'index.html') # auth @app.post('/api/auth/login') def login(): data = request.get_json() or {} username = data.get('username') password = data.get('password') conn = get_db() c = conn.cursor() c.execute('SELECT id, password_hash, role FROM users WHERE username=?', (username,)) row = c.fetchone() conn.close() if not row or not check_password_hash(row['password_hash'], password or ''): return jsonify({'error': 'invalid credentials'}), 400 session['user_id'] = row['id'] session['role'] = row['role'] session.permanent = True log('login', username) return jsonify({'ok': True}) @app.get('/api/auth/me') def me(): uid = session.get('user_id') if not uid: return jsonify({'username': None, 'role': None, 'avatar': None}) conn = get_db() c = conn.cursor() c.execute('SELECT username, role, avatar FROM users WHERE id=?', (uid,)) row = c.fetchone() conn.close() return jsonify({'username': row['username'], 'role': row['role'], 'avatar': row['avatar'] if row['avatar'] else None}) @app.post('/api/auth/logout') def logout(): log('logout') session.clear() return jsonify({'ok': True}) @app.post('/api/user/upload-avatar') @require_login def upload_avatar(): uid = session.get('user_id') if not uid: return jsonify({'error': '未登录'}), 401 if 'avatar' not in request.files: return jsonify({'error': '未选择文件'}), 400 file = request.files['avatar'] if file.filename == '': return jsonify({'error': '未选择文件'}), 400 # 验证文件类型 allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'webp'} filename = secure_filename(file.filename) if '.' not in filename: return jsonify({'error': '无效的文件格式'}), 400 ext = filename.rsplit('.', 1)[1].lower() if ext not in allowed_extensions: return jsonify({'error': '不支持的文件格式,请上传 PNG、JPG、GIF 或 WEBP 格式'}), 400 # 创建avatars目录 avatars_dir = os.path.join(FRONTEND_DIR, 'assets', 'avatars') os.makedirs(avatars_dir, exist_ok=True) # 生成唯一文件名 timestamp = datetime.now().strftime('%Y%m%d%H%M%S') new_filename = f'avatar_{uid}_{timestamp}.{ext}' filepath = os.path.join(avatars_dir, new_filename) # 保存文件 file.save(filepath) # 更新数据库 avatar_url = f'./assets/avatars/{new_filename}' conn = get_db() c = conn.cursor() # 删除旧头像文件(如果存在) c.execute('SELECT avatar FROM users WHERE id=?', (uid,)) row = c.fetchone() if row and row['avatar'] and row['avatar'].startswith('./assets/avatars/'): old_file = os.path.join(FRONTEND_DIR, row['avatar'].replace('./', '')) if os.path.exists(old_file): try: os.remove(old_file) except Exception: pass c.execute('UPDATE users SET avatar=? WHERE id=?', (avatar_url, uid)) conn.commit() conn.close() log('upload_avatar', f'上传头像: {new_filename}') return jsonify({'ok': True, 'avatar_url': avatar_url}) @app.post('/api/user/reset-avatar') @require_login def reset_avatar(): uid = session.get('user_id') if not uid: return jsonify({'error': '未登录'}), 401 conn = get_db() c = conn.cursor() # 删除旧头像文件(如果存在) c.execute('SELECT avatar FROM users WHERE id=?', (uid,)) row = c.fetchone() if row and row['avatar'] and row['avatar'].startswith('./assets/avatars/'): old_file = os.path.join(FRONTEND_DIR, row['avatar'].replace('./', '')) if os.path.exists(old_file): try: os.remove(old_file) except Exception: pass c.execute('UPDATE users SET avatar=NULL WHERE id=?', (uid,)) conn.commit() conn.close() log('reset_avatar', '恢复默认头像') return jsonify({'ok': True}) # dashboard @app.get('/api/dashboard') @require_login def dashboard(): conn = get_db() c = conn.cursor() c.execute('SELECT COALESCE(SUM(good),0) AS good_total, COALESCE(SUM(bad),0) AS bad_total, COALESCE(SUM(fpy_good),0) AS fpy_good_total FROM stats') s = c.fetchone() c.execute('SELECT COUNT(1) AS total FROM defects') defects = c.fetchone() conn.close() good = s['good_total'] if s else 0 bad = s['bad_total'] if s else 0 fpy_good = s['fpy_good_total'] if s else 0 # 计算总良品率 rate = "{}%".format(round((good/(good+bad)) * 100, 2)) if (good+bad) > 0 else u'—' # 计算直通良品率(FPY = First Pass Yield) total_produced = good + bad fpy_rate = "{}%".format(round((fpy_good/total_produced) * 100, 2)) if total_produced > 0 else u'—' # 从 Redis 获取发货数量(SN 记录数) shipments_count = 0 try: r = get_redis() redis_key = 'shipment_sn_mapping' shipments_count = r.hlen(redis_key) except Exception as e: log('dashboard_redis_error', str(e)) # Redis 失败时回退到 SQLite conn = get_db() c = conn.cursor() c.execute('SELECT SUM(qty) AS total FROM shipments') ship = c.fetchone() conn.close() shipments_count = (ship['total'] or 0) if ship else 0 return jsonify({ 'fpyRate': fpy_rate, 'goodRate': rate, 'shipments': shipments_count, 'defects': (defects['total'] or 0) if defects else 0, 'badCount': bad }) @app.get('/api/audit/pdd') @require_login def audit_pdd(): start = datetime.utcnow() try: q_start = request.args.get('start') q_end = request.args.get('end') q_limit = request.args.get('limit') q_order = request.args.get('order', 'desc') has_filter = bool(q_start or q_end or q_limit or q_order) # 缓存优化:3秒内不重复查询 if (not has_filter) and ((datetime.utcnow().timestamp() - _audit_cache['pdd']['ts']) < 3): return jsonify({'list': _audit_cache['pdd']['list']}) r = get_redis() # 设置Redis超时为5秒 r.connection_pool.connection_kwargs['socket_timeout'] = 5 items = [] # 限制最大返回数量,避免数据过大 max_items = 500 if has_filter else 200 for key in ['mac_batch_audit_pdd', 'audit:pdd', 'pdd:audit']: try: if r.exists(key): t = r.type(key) if t == 'list': # 限制最大查询数量 total = r.llen(key) if has_filter: items = r.lrange(key, max(0, total - max_items), -1) else: items = r.lrange(key, -200, -1) elif t == 'zset': items = r.zrevrange(key, 0, max_items - 1) elif t == 'stream': entries = r.xrevrange(key, max='+', min='-', count=max_items) items = [json.dumps(v) for _id, v in entries] else: v = r.get(key) items = [v] if v else [] break except Exception as e: log('audit_pdd_error', f'Redis query error: {str(e)}') continue try: host = os.environ.get('REDIS_HOST') db = os.environ.get('REDIS_DB') tp = r.type('mac_batch_audit_pdd') ln = 0 try: ln = r.llen('mac_batch_audit_pdd') except Exception: pass log('audit_pdd_probe', json.dumps({'host': host, 'db': db, 'type': tp, 'len': ln})) except Exception: pass if not items and r.exists('batch_sn_mapping_pdd') and r.type('batch_sn_mapping_pdd') == 'hash': try: pairs = [] cursor = 0 while True: cursor, res = r.hscan('batch_sn_mapping_pdd', cursor=cursor, count=200) for k, v in (res or {}).items(): pairs.append({'mac': k, 'batch': v}) if len(pairs) >= 100: break if cursor == 0 or len(pairs) >= 100: break items = [json.dumps({'mac': p['mac'], 'batch': p['batch'], 'note': 'mapping'}) for p in pairs] except Exception: pass res = [parse_audit_line(x) for x in items] if q_start or q_end: def to_epoch(s): try: if not s: return None if 'T' in s or 'Z' in s or '+' in s: return datetime.fromisoformat(s.replace('Z','+00:00')).timestamp() if ' ' in s and ':' in s: return datetime.strptime(s, '%Y-%m-%d %H:%M:%S').timestamp() return datetime.strptime(s, '%Y-%m-%d').timestamp() except Exception: return None s_epoch = to_epoch(q_start) if q_start else None e_epoch = to_epoch(q_end) if q_end else None tmp = [] for r0 in res: ts = to_epoch(r0.get('ts_cn')) if ts is None: continue if s_epoch is not None and ts < s_epoch: continue if e_epoch is not None and ts > e_epoch: continue tmp.append(r0) res = tmp try: def to_key(r): s = r.get('ts_cn') or '' try: if 'T' in s or 'Z' in s or '+' in s: return datetime.fromisoformat(s.replace('Z','+00:00')).timestamp() if ' ' in s and ':' in s: return datetime.strptime(s, '%Y-%m-%d %H:%M:%S').timestamp() return datetime.strptime(s, '%Y-%m-%d').timestamp() except Exception: return 0 res.sort(key=to_key, reverse=(q_order != 'asc')) except Exception: res.reverse() if q_limit: try: lim = int(q_limit) if lim > 0: res = res[:lim] except Exception: pass if not has_filter: _audit_cache['pdd'] = {'ts': datetime.utcnow().timestamp(), 'list': res} dur = (datetime.utcnow() - start).total_seconds() log('audit_pdd_cost', f"{dur}s len={len(res)}") return jsonify({'list': res}) except Exception as e: log('audit_pdd_error', str(e)) return jsonify({'list': []}) @app.get('/api/audit/yt') @require_login def audit_yt(): start = datetime.utcnow() try: q_start = request.args.get('start') q_end = request.args.get('end') q_limit = request.args.get('limit') q_order = request.args.get('order', 'desc') has_filter = bool(q_start or q_end or q_limit or q_order) # 缓存优化:3秒内不重复查询 if (not has_filter) and ((datetime.utcnow().timestamp() - _audit_cache['yt']['ts']) < 3): return jsonify({'list': _audit_cache['yt']['list']}) r = get_redis() # 设置Redis超时为5秒 r.connection_pool.connection_kwargs['socket_timeout'] = 5 items = [] # 限制最大返回数量,避免数据过大 max_items = 500 if has_filter else 200 for key in ['mac_batch_audit_yt', 'audit:yt', 'yt:audit']: try: if r.exists(key): t = r.type(key) if t == 'list': # 限制最大查询数量 total = r.llen(key) if has_filter: items = r.lrange(key, max(0, total - max_items), -1) else: items = r.lrange(key, -200, -1) elif t == 'zset': items = r.zrevrange(key, 0, max_items - 1) elif t == 'stream': entries = r.xrevrange(key, max='+', min='-', count=max_items) items = [json.dumps(v) for _id, v in entries] else: v = r.get(key) items = [v] if v else [] break except Exception as e: log('audit_yt_error', f'Redis query error: {str(e)}') continue try: host = os.environ.get('REDIS_HOST') db = os.environ.get('REDIS_DB') tp = r.type('mac_batch_audit_yt') ln = 0 try: ln = r.llen('mac_batch_audit_yt') except Exception: pass log('audit_yt_probe', json.dumps({'host': host, 'db': db, 'type': tp, 'len': ln})) except Exception: pass if not items and r.exists('batch_sn_mapping_yt') and r.type('batch_sn_mapping_yt') == 'hash': try: pairs = [] cursor = 0 while True: cursor, res = r.hscan('batch_sn_mapping_yt', cursor=cursor, count=200) for k, v in (res or {}).items(): pairs.append({'mac': k, 'batch': v}) if len(pairs) >= 100: break if cursor == 0 or len(pairs) >= 100: break items = [json.dumps({'mac': p['mac'], 'batch': p['batch'], 'note': 'mapping'}) for p in pairs] except Exception: pass res = [parse_audit_line(x) for x in items] if q_start or q_end: def to_epoch(s): try: if not s: return None if 'T' in s or 'Z' in s or '+' in s: return datetime.fromisoformat(s.replace('Z','+00:00')).timestamp() if ' ' in s and ':' in s: return datetime.strptime(s, '%Y-%m-%d %H:%M:%S').timestamp() return datetime.strptime(s, '%Y-%m-%d').timestamp() except Exception: return None s_epoch = to_epoch(q_start) if q_start else None e_epoch = to_epoch(q_end) if q_end else None tmp = [] for r0 in res: ts = to_epoch(r0.get('ts_cn')) if ts is None: continue if s_epoch is not None and ts < s_epoch: continue if e_epoch is not None and ts > e_epoch: continue tmp.append(r0) res = tmp try: def to_key(r): s = r.get('ts_cn') or '' try: if 'T' in s or 'Z' in s or '+' in s: return datetime.fromisoformat(s.replace('Z','+00:00')).timestamp() if ' ' in s and ':' in s: return datetime.strptime(s, '%Y-%m-%d %H:%M:%S').timestamp() return datetime.strptime(s, '%Y-%m-%d').timestamp() except Exception: return 0 res.sort(key=to_key, reverse=(q_order != 'asc')) except Exception: res.reverse() if q_limit: try: lim = int(q_limit) if lim > 0: res = res[:lim] except Exception: pass if not has_filter: _audit_cache['yt'] = {'ts': datetime.utcnow().timestamp(), 'list': res} dur = (datetime.utcnow() - start).total_seconds() log('audit_yt_cost', f"{dur}s len={len(res)}") return jsonify({'list': res}) except Exception as e: log('audit_yt_error', str(e)) return jsonify({'list': []}) @app.get('/api/audit/diagnose') @require_login def audit_diagnose(): try: r = get_redis() result = {} for key in ['mac_batch_audit_pdd', 'mac_batch_audit_yt', 'batch_sn_mapping_pdd', 'batch_sn_mapping_yt']: try: t = r.type(key) if t == 'list': result[key] = {'type': t, 'len': r.llen(key)} elif t == 'zset': result[key] = {'type': t, 'len': r.zcard(key)} elif t == 'stream': info = r.xinfo_stream(key) result[key] = {'type': t, 'len': info.get('length')} elif t == 'hash': result[key] = {'type': t, 'len': r.hlen(key)} elif t == 'none': result[key] = {'type': t, 'len': 0} else: v = r.get(key) result[key] = {'type': t, 'len': 1 if v else 0} except Exception as e: result[key] = {'error': str(e)} return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 500 @app.get('/api/overview') @require_login def overview(): conn = get_db() c = conn.cursor() c.execute('SELECT COUNT(1) AS cnt, COALESCE(SUM(good),0) AS good_total, COALESCE(SUM(bad),0) AS bad_total, COALESCE(SUM(fpy_good),0) AS fpy_good_total FROM stats') stats_row = c.fetchone() c.execute('SELECT COUNT(1) AS cnt FROM defects') defects_row = c.fetchone() c.execute('SELECT COUNT(1) AS cnt FROM mac_batches') mac_row = c.fetchone() c.execute('SELECT COUNT(1) AS cnt, COALESCE(SUM(qty),0) AS qty_total FROM shipments') ship_row = c.fetchone() c.execute('SELECT COUNT(1) AS cnt FROM devices') devices_row = c.fetchone() c.execute('SELECT COUNT(1) AS cnt FROM personnel') personnel_row = c.fetchone() c.execute('SELECT COUNT(1) AS cnt FROM qa') qa_row = c.fetchone() c.execute('SELECT COUNT(1) AS cnt FROM production') production_row = c.fetchone() conn.close() return jsonify({ 'stats': { 'records': (stats_row['cnt'] or 0) if stats_row else 0, 'goodTotal': (stats_row['good_total'] or 0) if stats_row else 0, 'badTotal': (stats_row['bad_total'] or 0) if stats_row else 0, 'fpyGoodTotal': (stats_row['fpy_good_total'] or 0) if stats_row else 0 }, 'defects': (defects_row['cnt'] or 0) if defects_row else 0, 'mac': (mac_row['cnt'] or 0) if mac_row else 0, 'shipments': { 'records': (ship_row['cnt'] or 0) if ship_row else 0, 'qtyTotal': (ship_row['qty_total'] or 0) if ship_row else 0 }, 'devices': (devices_row['cnt'] or 0) if devices_row else 0, 'personnel': (personnel_row['cnt'] or 0) if personnel_row else 0, 'qa': (qa_row['cnt'] or 0) if qa_row else 0, 'production': (production_row['cnt'] or 0) if production_row else 0 }) # uploads @app.post('/api/upload/mac') @require_login @require_any_role('admin','superadmin') def upload_mac(): data = request.get_json() or {} rows = data.get('rows') or [] if not isinstance(rows, list): return jsonify({'error': 'invalid rows'}), 400 conn = get_db() c = conn.cursor() now = datetime.utcnow().isoformat() for r in rows: mac = (r or {}).get('mac') batch = (r or {}).get('batch') if not mac or not batch: continue c.execute('INSERT INTO mac_batches(mac, batch, ts) VALUES(?,?,?)', (mac, batch, now)) conn.commit() conn.close() log('upload_mac', f"count={len(rows)}") notify_superadmin('上传MAC与批次', f"上传了 {len(rows)} 条记录") return jsonify({'ok': True}) @app.post('/api/upload/stats') @require_login @require_any_role('admin','superadmin') def upload_stats(): data = request.get_json() or {} good = int(data.get('good') or 0) bad = int(data.get('bad') or 0) fpy_good = int(data.get('fpy_good') or 0) # 直通良品数 platform = data.get('platform') or 'pdd' # 平台:pdd/yt/tx details = data.get('details') or [] if good < 0 or bad < 0 or fpy_good < 0: return jsonify({'error': 'invalid count'}), 400 if platform not in ['pdd', 'yt', 'tx']: platform = 'pdd' conn = get_db() c = conn.cursor() now = datetime.utcnow().isoformat() # 保存统计数据 c.execute('INSERT INTO stats(good,bad,fpy_good,platform,ts) VALUES(?,?,?,?,?)', (good, bad, fpy_good, platform, now)) # 如果有不良明细,保存到defects表 if details and isinstance(details, list): for item in details: mac = (item or {}).get('mac') batch = (item or {}).get('batch') if mac and batch: c.execute('INSERT INTO defects(mac, batch, ts) VALUES(?,?,?)', (mac, batch, now)) conn.commit() conn.close() platform_name = {'pdd': '拼多多', 'yt': '圆通', 'tx': '兔喜'}.get(platform, platform) log('upload_stats', json.dumps({'good': good, 'bad': bad, 'fpy_good': fpy_good, 'platform': platform, 'details_count': len(details)})) notify_superadmin('上传良/不良统计', f"平台: {platform_name}, 良品: {good}, 不良品: {bad}, 直通良品: {fpy_good}") return jsonify({'ok': True}) @app.post('/api/upload/repairs') @require_login @require_any_role('admin','superadmin') def upload_repairs(): data = request.get_json() or {} qty = int(data.get('qty') or 0) note = data.get('note') or '' if qty < 0: return jsonify({'error': 'invalid quantity'}), 400 conn = get_db() c = conn.cursor() now = datetime.utcnow().isoformat() c.execute('INSERT INTO repairs(qty, note, ts) VALUES(?,?,?)', (qty, note, now)) conn.commit() conn.close() log('upload_repairs', json.dumps({'qty': qty, 'note': note})) notify_superadmin('上传返修记录', f"数量: {qty}") return jsonify({'ok': True}) @app.post('/api/upload/defects') @require_login @require_any_role('admin','superadmin') def upload_defects(): data = request.get_json() or {} rows = data.get('rows') or [] conn = get_db() c = conn.cursor() now = datetime.utcnow().isoformat() for r in rows: mac = (r or {}).get('mac') batch = (r or {}).get('batch') if not mac or not batch: continue c.execute('INSERT INTO defects(mac, batch, ts) VALUES(?,?,?)', (mac, batch, now)) conn.commit() conn.close() log('upload_defects', f"count={len(rows)}") notify_superadmin('上传不良明细', f"上传了 {len(rows)} 条记录") return jsonify({'ok': True}) @app.post('/api/upload/shipments') @require_login @require_any_role('admin','superadmin') def upload_shipments(): data = request.get_json() or {} date = data.get('date') qty = int(data.get('qty') or 0) to = data.get('to') platform = data.get('platform', '') box_no = data.get('box_no', '') if not date or qty <= 0 or not to or not platform: return jsonify({'error': 'invalid payload'}), 400 conn = get_db() c = conn.cursor() # 检查shipments表是否有platform和box_no列,如果没有则添加 c.execute("PRAGMA table_info(shipments)") columns = [col[1] for col in c.fetchall()] if 'platform' not in columns: c.execute('ALTER TABLE shipments ADD COLUMN platform TEXT') if 'box_no' not in columns: c.execute('ALTER TABLE shipments ADD COLUMN box_no TEXT') c.execute( 'INSERT INTO shipments(date, qty, receiver, platform, box_no, ts) VALUES(?,?,?,?,?,?)', (date, qty, to, platform, box_no, datetime.utcnow().isoformat()) ) conn.commit() conn.close() platform_name = {'pdd': '拼多多', 'yt': '圆通', 'tx': '兔喜'}.get(platform, platform) log_data = {'date': date, 'qty': qty, 'to': to, 'platform': platform} if box_no: log_data['box_no'] = box_no log('upload_shipments', json.dumps(log_data)) notify_superadmin('上传发货记录', f"机种: {platform_name}, 日期: {date}, 数量: {qty}, 接收方: {to}") return jsonify({'ok': True}) # collect @app.get('/api/collect/devices') @require_login def devices(): conn = get_db() c = conn.cursor() c.execute('SELECT name, status FROM devices ORDER BY id DESC LIMIT 50') rows = [dict(r) for r in c.fetchall()] conn.close() return jsonify({'list': rows}) @app.get('/api/collect/environment') @require_login def environment(): conn = get_db() c = conn.cursor() c.execute('SELECT temp, hum FROM environment ORDER BY id DESC LIMIT 1') r = c.fetchone() conn.close() return jsonify({'temp': (r['temp'] if r else '—'), 'hum': (r['hum'] if r else '—')}) @app.get('/api/collect/personnel') @require_login def personnel(): conn = get_db() c = conn.cursor() c.execute('SELECT name, role FROM personnel ORDER BY id DESC LIMIT 100') rows = [dict(r) for r in c.fetchall()] conn.close() return jsonify({'list': rows}) @app.post('/api/collect/personnel') @require_login @require_any_role('admin','superadmin') def add_personnel(): data = request.get_json() or {} name = (data.get('name') or '').strip() role = (data.get('role') or '').strip() if not name: return jsonify({'error': 'invalid payload'}), 400 conn = get_db() c = conn.cursor() c.execute('INSERT INTO personnel(name, role, ts) VALUES(?,?,?)', (name, role, datetime.utcnow().isoformat())) conn.commit() conn.close() log('add_personnel', name) notify_superadmin('添加人员信息', f"姓名: {name}, 角色: {role}") return jsonify({'ok': True}) @app.get('/api/collect/qa') @require_login def qa(): conn = get_db() c = conn.cursor() c.execute('SELECT title, date FROM qa ORDER BY id DESC LIMIT 100') rows = [dict(r) for r in c.fetchall()] conn.close() return jsonify({'list': rows}) @app.get('/api/collect/production') @require_login def production(): conn = get_db() c = conn.cursor() c.execute('SELECT batch, duration FROM production ORDER BY id DESC LIMIT 100') rows = [dict(r) for r in c.fetchall()] conn.close() return jsonify({'list': rows}) # export @app.post('/api/export/excel') @require_login def export_excel(): try: import openpyxl from openpyxl.styles import Font, Alignment, PatternFill from io import BytesIO data = request.get_json() or {} data_type = data.get('type', 'stats') # 创建工作簿 wb = openpyxl.Workbook() ws = wb.active # 设置标题样式 header_fill = PatternFill(start_color='4F8CFF', end_color='4F8CFF', fill_type='solid') header_font = Font(bold=True, color='FFFFFF') header_alignment = Alignment(horizontal='center', vertical='center') conn = get_db() c = conn.cursor() # 根据类型导出不同的数据 if data_type == 'stats': ws.title = '良不良统计' ws.append(['直通良品数', '良品数', '不良品数', '时间']) c.execute('SELECT fpy_good, good, bad, ts FROM stats ORDER BY id DESC') elif data_type == 'mac': ws.title = 'MAC与批次' ws.append(['MAC地址', '批次号', '时间']) c.execute('SELECT mac, batch, ts FROM mac_batches ORDER BY id DESC') elif data_type == 'repairs': ws.title = '返修记录' ws.append(['返修数量', '备注', '时间']) c.execute('SELECT qty, note, ts FROM repairs ORDER BY id DESC') elif data_type == 'defects': ws.title = '不良明细' ws.append(['MAC地址', '批次号', '时间']) c.execute('SELECT mac, batch, ts FROM defects ORDER BY id DESC') elif data_type == 'shipments': ws.title = '发货记录' ws.append(['日期', '数量', '收货方', '时间']) c.execute('SELECT date, qty, receiver, ts FROM shipments ORDER BY id DESC') elif data_type == 'devices': ws.title = '设备状态' ws.append(['设备名称', '状态']) c.execute('SELECT name, status FROM devices ORDER BY id DESC') elif data_type == 'personnel': ws.title = '人员信息' ws.append(['姓名', '角色']) c.execute('SELECT name, role FROM personnel ORDER BY id DESC') elif data_type == 'qa': ws.title = '质检报告' ws.append(['标题', '日期']) c.execute('SELECT title, date FROM qa ORDER BY id DESC') elif data_type == 'production': ws.title = '时间记录' ws.append(['批次', '时长']) c.execute('SELECT batch, duration FROM production ORDER BY id DESC') else: conn.close() return jsonify({'error': 'invalid type'}), 400 # 应用标题样式 for cell in ws[1]: cell.fill = header_fill cell.font = header_font cell.alignment = header_alignment # 写入数据 rows = c.fetchall() for row in rows: ws.append(list(row)) conn.close() # 自动调整列宽 for column in ws.columns: max_length = 0 column_letter = column[0].column_letter for cell in column: try: if len(str(cell.value)) > max_length: max_length = len(str(cell.value)) except: pass adjusted_width = min(max_length + 2, 50) ws.column_dimensions[column_letter].width = adjusted_width # 保存到内存 output = BytesIO() wb.save(output) output.seek(0) log('export_excel', data_type) # 返回文件 from flask import send_file filename = f'{ws.title}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.xlsx' return send_file( output, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', as_attachment=True, download_name=filename ) except Exception as e: log('export_excel_error', str(e)) return jsonify({'error': str(e)}), 500 @app.post('/api/export/pdf') @require_login def export_pdf(): try: from reportlab.lib import colors from reportlab.lib.pagesizes import A4, landscape from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.units import cm from reportlab.pdfbase import pdfmetrics from reportlab.pdfbase.ttfonts import TTFont from reportlab.lib.enums import TA_CENTER from io import BytesIO data = request.get_json() or {} data_type = data.get('type', 'stats') # 注册中文字体 font_name = 'Helvetica' try: # 尝试常见的中文字体路径 font_paths = [ '/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc', '/usr/share/fonts/truetype/arphic/uming.ttc', '/System/Library/Fonts/PingFang.ttc', 'C:\\Windows\\Fonts\\simhei.ttf' ] for font_path in font_paths: if os.path.exists(font_path): pdfmetrics.registerFont(TTFont('ChineseFont', font_path)) font_name = 'ChineseFont' break except Exception as e: log('pdf_font_warning', f'无法加载中文字体: {str(e)}') # 创建PDF buffer = BytesIO() doc = SimpleDocTemplate( buffer, pagesize=landscape(A4), topMargin=1.5*cm, bottomMargin=1.5*cm, leftMargin=1.5*cm, rightMargin=1.5*cm ) elements = [] # 样式 styles = getSampleStyleSheet() title_style = ParagraphStyle( 'CustomTitle', parent=styles['Heading1'], fontName=font_name, fontSize=18, textColor=colors.HexColor('#4F8CFF'), spaceAfter=20, alignment=TA_CENTER, leading=24 ) conn = get_db() c = conn.cursor() # 根据类型导出不同的数据 if data_type == 'stats': title = '良/不良统计报表' headers = ['直通良品数', '良品数', '不良品数', '时间'] c.execute('SELECT fpy_good, good, bad, ts FROM stats ORDER BY id DESC LIMIT 200') elif data_type == 'mac': title = 'MAC与批次报表' headers = ['MAC地址', '批次号', '时间'] c.execute('SELECT mac, batch, ts FROM mac_batches ORDER BY id DESC LIMIT 200') elif data_type == 'repairs': title = '返修记录报表' headers = ['返修数量', '备注', '时间'] c.execute('SELECT qty, note, ts FROM repairs ORDER BY id DESC LIMIT 200') elif data_type == 'defects': title = '不良明细报表' headers = ['MAC地址', '批次号', '时间'] c.execute('SELECT mac, batch, ts FROM defects ORDER BY id DESC LIMIT 200') elif data_type == 'shipments': title = '发货记录报表' headers = ['日期', '数量', '收货方', '时间'] c.execute('SELECT date, qty, receiver, ts FROM shipments ORDER BY id DESC LIMIT 200') elif data_type == 'devices': title = '设备状态报表' headers = ['设备名称', '状态'] c.execute('SELECT name, status FROM devices ORDER BY id DESC LIMIT 200') elif data_type == 'personnel': title = '人员信息报表' headers = ['姓名', '角色'] c.execute('SELECT name, role FROM personnel ORDER BY id DESC LIMIT 200') elif data_type == 'qa': title = '质检报告' headers = ['标题', '日期'] c.execute('SELECT title, date FROM qa ORDER BY id DESC LIMIT 200') elif data_type == 'production': title = '生产时间记录' headers = ['批次', '时长'] c.execute('SELECT batch, duration FROM production ORDER BY id DESC LIMIT 200') else: conn.close() return jsonify({'error': 'invalid type'}), 400 # 添加标题 elements.append(Paragraph(title, title_style)) elements.append(Spacer(1, 0.5*cm)) # 获取数据 rows = c.fetchall() conn.close() if len(rows) == 0: # 没有数据时的提示 no_data_style = ParagraphStyle( 'NoData', parent=styles['Normal'], fontName=font_name, fontSize=12, textColor=colors.grey, alignment=TA_CENTER ) elements.append(Paragraph('暂无数据', no_data_style)) else: # 构建表格数据 table_data = [headers] for row in rows: table_data.append([str(val) if val is not None else '' for val in row]) # 创建表格 table = Table(table_data, repeatRows=1) table.setStyle(TableStyle([ # 表头样式 ('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#4F8CFF')), ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), ('ALIGN', (0, 0), (-1, -1), 'CENTER'), ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), ('FONTNAME', (0, 0), (-1, 0), font_name), ('FONTSIZE', (0, 0), (-1, 0), 10), ('BOTTOMPADDING', (0, 0), (-1, 0), 8), ('TOPPADDING', (0, 0), (-1, 0), 8), # 数据行样式 ('BACKGROUND', (0, 1), (-1, -1), colors.white), ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), ('FONTNAME', (0, 1), (-1, -1), font_name), ('FONTSIZE', (0, 1), (-1, -1), 8), ('ROWBACKGROUNDS', (0, 1), (-1, -1), [colors.white, colors.HexColor('#F5F7FA')]), ('TOPPADDING', (0, 1), (-1, -1), 5), ('BOTTOMPADDING', (0, 1), (-1, -1), 5), ])) elements.append(table) # 添加页脚信息 elements.append(Spacer(1, 0.5*cm)) footer_style = ParagraphStyle( 'Footer', parent=styles['Normal'], fontName=font_name, fontSize=8, textColor=colors.grey, alignment=TA_CENTER ) footer_text = f'导出时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | 共 {len(rows)} 条记录' elements.append(Paragraph(footer_text, footer_style)) # 生成PDF doc.build(elements) buffer.seek(0) log('export_pdf', data_type) # 返回文件 from flask import send_file filename = f'{title}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.pdf' return send_file( buffer, mimetype='application/pdf', as_attachment=True, download_name=filename ) except Exception as e: log('export_pdf_error', str(e)) return jsonify({'error': f'PDF导出失败: {str(e)}'}), 500 # lists @app.get('/api/list/mac') @require_login def list_mac(): conn = get_db() c = conn.cursor() c.execute('SELECT mac, batch, ts FROM mac_batches ORDER BY id DESC LIMIT 200') rows = [dict(r) for r in c.fetchall()] conn.close() return jsonify({'list': rows}) @app.get('/api/list/stats') @require_login def list_stats(): conn = get_db() c = conn.cursor() c.execute('SELECT good, bad, fpy_good, platform, ts FROM stats ORDER BY id DESC LIMIT 200') rows = [dict(r) for r in c.fetchall()] conn.close() return jsonify({'list': rows}) @app.get('/api/list/repairs') @require_login def list_repairs(): conn = get_db() c = conn.cursor() c.execute('SELECT qty, note, ts FROM repairs ORDER BY id DESC LIMIT 200') rows = [dict(r) for r in c.fetchall()] conn.close() return jsonify({'list': rows}) @app.get('/api/list/defects') @require_login def list_defects(): conn = get_db() c = conn.cursor() c.execute('SELECT mac, batch, ts FROM defects ORDER BY id DESC LIMIT 200') rows = [dict(r) for r in c.fetchall()] conn.close() return jsonify({'list': rows}) @app.get('/api/list/shipments') @require_login def list_shipments(): conn = get_db() c = conn.cursor() c.execute('SELECT date, qty, receiver, ts FROM shipments ORDER BY id DESC LIMIT 200') rows = [dict(r) for r in c.fetchall()] conn.close() return jsonify({'list': rows}) # admin management @app.get('/api/admin/users') @require_login @require_any_role('superadmin') def list_users(): conn = get_db() c = conn.cursor() c.execute('SELECT username, role FROM users ORDER BY id ASC') rows = [dict(r) for r in c.fetchall()] conn.close() return jsonify({'list': rows}) @app.post('/api/admin/reset-password') @require_login @require_any_role('superadmin') def reset_password(): data = request.get_json() or {} username = data.get('username') new_password = data.get('new_password') if not username or not new_password: return jsonify({'error': 'invalid payload'}), 400 conn = get_db() c = conn.cursor() c.execute('SELECT id FROM users WHERE username=?', (username,)) row = c.fetchone() if not row: conn.close() return jsonify({'error': 'user not found'}), 404 c.execute('UPDATE users SET password_hash=? WHERE id=?', (generate_password_hash(new_password), row['id'])) conn.commit() conn.close() log('reset_password', username) return jsonify({'ok': True}) @app.post('/api/admin/change-password') @require_login @require_any_role('superadmin') def change_password(): data = request.get_json() or {} username = data.get('username') new_password = data.get('new_password') if not username or not new_password: return jsonify({'error': 'invalid payload'}), 400 conn = get_db() c = conn.cursor() c.execute('SELECT id FROM users WHERE username=?', (username,)) row = c.fetchone() if not row: conn.close() return jsonify({'error': 'user not found'}), 404 c.execute('UPDATE users SET password_hash=? WHERE id=?', (generate_password_hash(new_password), row['id'])) conn.commit() conn.close() log('change_password', username) return jsonify({'ok': True}) @app.post('/api/admin/add-user') @require_login @require_any_role('superadmin') def add_user(): """添加新用户""" data = request.get_json() or {} username = (data.get('username') or '').strip() password = data.get('password') role = (data.get('role') or 'admin').strip() if not username or not password: return jsonify({'error': '用户名和密码不能为空'}), 400 if role not in ['admin', 'superadmin']: return jsonify({'error': '角色必须是 admin 或 superadmin'}), 400 conn = get_db() c = conn.cursor() # 检查用户名是否已存在 c.execute('SELECT id FROM users WHERE username=?', (username,)) if c.fetchone(): conn.close() return jsonify({'error': '用户名已存在'}), 400 # 创建新用户 try: c.execute('INSERT INTO users(username, password_hash, role) VALUES(?,?,?)', (username, generate_password_hash(password), role)) conn.commit() conn.close() log('add_user', f'username={username}, role={role}') return jsonify({'ok': True, 'message': f'用户 {username} 创建成功'}) except Exception as e: conn.close() return jsonify({'error': f'创建用户失败:{str(e)}'}), 500 @app.post('/api/admin/clear') @require_login @require_any_role('superadmin') def clear_module(): data = request.get_json() or {} module = data.get('module') tables = { 'mac': 'mac_batches', 'stats': 'stats', 'defects': 'defects', 'shipments': 'shipments', 'devices': 'devices', 'environment': 'environment', 'personnel': 'personnel', 'qa': 'qa', 'production': 'production' } table = tables.get(module) if not table: return jsonify({'error': 'invalid module'}), 400 conn = get_db() c = conn.cursor() c.execute(f'DELETE FROM {table}') conn.commit() conn.close() log('clear_module', module) return jsonify({'ok': True}) # notifications @app.get('/api/notifications') @require_login @require_any_role('superadmin') def get_notifications(): """获取当前用户的通知列表""" user_id = session.get('user_id') conn = get_db() c = conn.cursor() c.execute('SELECT id, username, action, detail, ts, read FROM notifications WHERE user_id=? ORDER BY id DESC LIMIT 100', (user_id,)) rows = [dict(r) for r in c.fetchall()] conn.close() return jsonify({'list': rows}) @app.get('/api/notifications/unread-count') @require_login @require_any_role('superadmin') def get_unread_count(): """获取未读通知数量""" user_id = session.get('user_id') conn = get_db() c = conn.cursor() c.execute('SELECT COUNT(*) as count FROM notifications WHERE user_id=? AND read=0', (user_id,)) row = c.fetchone() conn.close() return jsonify({'count': row['count'] if row else 0}) @app.post('/api/notifications/mark-read') @require_login @require_any_role('superadmin') def mark_notification_read(): """标记通知为已读""" data = request.get_json() or {} notification_id = data.get('id') if not notification_id: return jsonify({'error': 'invalid id'}), 400 user_id = session.get('user_id') conn = get_db() c = conn.cursor() c.execute('UPDATE notifications SET read=1 WHERE id=? AND user_id=?', (notification_id, user_id)) conn.commit() conn.close() return jsonify({'ok': True}) @app.post('/api/notifications/mark-all-read') @require_login @require_any_role('superadmin') def mark_all_notifications_read(): """标记所有通知为已读""" user_id = session.get('user_id') conn = get_db() c = conn.cursor() c.execute('UPDATE notifications SET read=1 WHERE user_id=?', (user_id,)) conn.commit() conn.close() return jsonify({'ok': True}) @app.post('/api/notifications/delete-read') @require_login @require_any_role('superadmin') def delete_read_notifications(): """删除所有已读通知""" user_id = session.get('user_id') conn = get_db() c = conn.cursor() c.execute('DELETE FROM notifications WHERE user_id=? AND read=1', (user_id,)) deleted_count = c.rowcount conn.commit() conn.close() return jsonify({'ok': True, 'count': deleted_count}) @app.errorhandler(404) def not_found(_): return jsonify({'error': 'not found'}), 404 @app.route('/api/validate/mac-file', methods=['POST']) @require_login @require_any_role('admin','superadmin') def validate_mac_file(): """验证Excel文件格式是否符合要求""" f = request.files.get('file') if not f: return jsonify({'error': 'no file'}), 400 name = secure_filename(f.filename or '') ext = (name.split('.')[-1] or '').lower() if ext not in ['csv', 'xlsx', 'xls']: return jsonify({'valid': False, 'message': '文件格式不支持,请上传CSV或Excel文件'}), 200 try: if ext == 'csv': text = f.stream.read().decode('utf-8', errors='ignore') lines = [l.strip() for l in text.splitlines() if l.strip()] if not lines: return jsonify({'valid': False, 'message': '文件为空,没有数据'}), 200 # 检查第一行(表头) header = [h.strip() for h in lines[0].split(',')] if len(header) != 2: return jsonify({'valid': False, 'message': f'文件应该只包含2列数据,当前有{len(header)}列'}), 200 # 记录表头用于调试 log('validate_mac_file_csv', f'headers: {header}') # 更灵活的表头检查(不区分大小写) header_lower = [h.lower() for h in header] has_mac = any('mac' in h and 'sn' not in h for h in header_lower) has_sn_mac = any('sn_mac' in h or 'sn-mac' in h for h in header_lower) has_batch = any('批次' in h or 'batch' in h for h in header_lower) if not (has_mac or has_sn_mac): return jsonify({'valid': False, 'message': f'缺少必需的列:MAC 或 SN_MAC(当前列:{", ".join(header)})'}), 200 if not has_batch: return jsonify({'valid': False, 'message': f'缺少必需的列:批次号(当前列:{", ".join(header)})'}), 200 data_rows = len(lines) - 1 mac_col = 'MAC' if has_mac else 'SN_MAC' return jsonify({'valid': True, 'message': f'文件格式正确,包含列:{mac_col} 和 批次号,共{data_rows}行数据'}), 200 else: import openpyxl wb = openpyxl.load_workbook(f) ws = wb.active if ws.max_row < 2: wb.close() return jsonify({'valid': False, 'message': '文件为空,没有数据'}), 200 if ws.max_column != 2: wb.close() return jsonify({'valid': False, 'message': f'文件应该只包含2列数据,当前有{ws.max_column}列'}), 200 # 检查表头 header_row = list(ws.iter_rows(min_row=1, max_row=1, values_only=True))[0] header = [str(h).strip() if h else '' for h in header_row] # 记录表头用于调试 log('validate_mac_file', f'headers: {header}') # 更灵活的表头检查(不区分大小写) header_lower = [h.lower() for h in header] has_mac = any('mac' in h and 'sn' not in h for h in header_lower) has_sn_mac = any('sn_mac' in h or 'sn-mac' in h for h in header_lower) has_batch = any('批次' in h or 'batch' in h for h in header_lower) if not (has_mac or has_sn_mac): wb.close() return jsonify({'valid': False, 'message': f'缺少必需的列:MAC 或 SN_MAC(当前列:{", ".join(header)})'}), 200 if not has_batch: wb.close() return jsonify({'valid': False, 'message': f'缺少必需的列:批次号(当前列:{", ".join(header)})'}), 200 data_rows = ws.max_row - 1 mac_col = 'MAC' if has_mac else 'SN_MAC' wb.close() return jsonify({'valid': True, 'message': f'文件格式正确,包含列:{mac_col} 和 批次号,共{data_rows}行数据'}), 200 except Exception as e: return jsonify({'valid': False, 'message': f'读取文件失败:{str(e)}'}), 200 @app.post('/api/upload/mac-file') @require_login @require_any_role('admin','superadmin') def upload_mac_file(): import subprocess import tempfile f = request.files.get('file') upload_type = request.form.get('type', 'pdd') # pdd, yt, or tx if not f: return jsonify({'error': 'no file'}), 400 if upload_type not in ['pdd', 'yt', 'tx']: return jsonify({'error': 'invalid type'}), 400 # 保存上传的文件到临时位置 name = secure_filename(f.filename or 'upload.xlsx') temp_dir = '/home/hyx/work/batch_import_xlsx' os.makedirs(temp_dir, exist_ok=True) # 根据类型确定文件名 if upload_type == 'yt': temp_path = os.path.join(temp_dir, 'sn_test_yt.xlsx') elif upload_type == 'pdd': temp_path = os.path.join(temp_dir, 'sn_test_pdd.xlsx') else: temp_path = os.path.join(temp_dir, 'sn_test_tx.xlsx') f.save(temp_path) # 调用batch_import.py脚本 script_path = '/home/hyx/work/生产管理系统/batch_import.py' python_path = '/home/hyx/work/.venv/bin/python' try: result = subprocess.run( [python_path, script_path, upload_type], capture_output=True, text=True, timeout=300 # 5分钟超时 ) output = result.stdout + result.stderr success = result.returncode == 0 log('upload_mac_file', f"type={upload_type}, success={success}") if success: notify_superadmin('批量上传MAC文件', f"类型: {upload_type}") return jsonify({ 'ok': success, 'output': output, 'returncode': result.returncode }) except subprocess.TimeoutExpired: return jsonify({'error': '上传超时', 'output': '处理时间超过5分钟'}), 500 except Exception as e: return jsonify({'error': str(e), 'output': ''}), 500 @app.post('/api/upload/defects-file') @require_login @require_any_role('admin','superadmin') def upload_defects_file(): f = request.files.get('file') if not f: return jsonify({'error': 'no file'}), 400 name = secure_filename(f.filename or '') ext = (name.split('.')[-1] or '').lower() rows = [] if ext == 'csv': text = f.stream.read().decode('utf-8', errors='ignore') for l in text.splitlines(): parts = [p.strip() for p in l.split(',')] if len(parts) >= 2: rows.append({'mac': parts[0], 'batch': parts[1]}) else: try: import openpyxl wb = openpyxl.load_workbook(f) ws = wb.active for r in ws.iter_rows(values_only=True): mac = str(r[0]).strip() if r and r[0] else None batch = str(r[1]).strip() if r and len(r) > 1 and r[1] else None if mac and batch: rows.append({'mac': mac, 'batch': batch}) except Exception: return jsonify({'error': 'parse error'}), 400 conn = get_db() c = conn.cursor() now = datetime.utcnow().isoformat() for r in rows: c.execute('INSERT INTO defects(mac, batch, ts) VALUES(?,?,?)', (r['mac'], r['batch'], now)) conn.commit() conn.close() log('upload_defects_file', f"count={len(rows)}") notify_superadmin('批量上传不良明细文件', f"上传了 {len(rows)} 条记录") return jsonify({'ok': True, 'count': len(rows)}) @app.route('/api/validate/shipments-file', methods=['POST']) @require_login @require_any_role('admin','superadmin') def validate_shipments_file(): """验证发货记录Excel文件格式""" f = request.files.get('file') if not f: return jsonify({'error': 'no file'}), 400 name = secure_filename(f.filename or '') ext = (name.split('.')[-1] or '').lower() if ext not in ['csv', 'xlsx', 'xls']: return jsonify({'valid': False, 'message': '文件格式不支持,请上传CSV或Excel文件'}), 200 try: if ext == 'csv': text = f.stream.read().decode('utf-8', errors='ignore') lines = [l.strip() for l in text.splitlines() if l.strip()] if not lines: return jsonify({'valid': False, 'message': '文件为空,没有数据'}), 200 header = [h.strip() for h in lines[0].split(',')] header_lower = [h.lower() for h in header] # 检查必需的列 has_date = any('出货日期' in h or '发货日期' in h or 'date' in hl for h, hl in zip(header, header_lower)) has_box = any('箱号' in h or 'box' in hl for h, hl in zip(header, header_lower)) if not has_date: return jsonify({'valid': False, 'message': '缺少必需的列:出货日期'}), 200 if not has_box: return jsonify({'valid': False, 'message': '缺少必需的列:箱号'}), 200 # 检查SN列(SN1-SN20) sn_cols = [h for h in header if h.startswith('SN') and h[2:].isdigit()] if not sn_cols: return jsonify({'valid': False, 'message': '缺少SN列(SN1, SN2, ... SN20)'}), 200 data_rows = len(lines) - 1 return jsonify({'valid': True, 'message': f'文件格式正确,包含{len(sn_cols)}个SN列,共{data_rows}行数据'}), 200 else: import openpyxl wb = openpyxl.load_workbook(f) ws = wb.active if ws.max_row < 2: wb.close() return jsonify({'valid': False, 'message': '文件为空,没有数据'}), 200 # 检查表头 header_row = list(ws.iter_rows(min_row=1, max_row=1, values_only=True))[0] header = [str(h).strip() if h else '' for h in header_row] header_lower = [h.lower() for h in header] # 检查必需的列 has_date = any('出货日期' in h or '发货日期' in h or 'date' in hl for h, hl in zip(header, header_lower)) has_box = any('箱号' in h or 'box' in hl for h, hl in zip(header, header_lower)) if not has_date: wb.close() return jsonify({'valid': False, 'message': '缺少必需的列:出货日期'}), 200 if not has_box: wb.close() return jsonify({'valid': False, 'message': '缺少必需的列:箱号'}), 200 # 检查SN列 sn_cols = [h for h in header if h.startswith('SN') and h[2:].isdigit()] if not sn_cols: wb.close() return jsonify({'valid': False, 'message': '缺少SN列(SN1, SN2, ... SN20)'}), 200 data_rows = ws.max_row - 1 wb.close() return jsonify({'valid': True, 'message': f'文件格式正确,包含{len(sn_cols)}个SN列,共{data_rows}行数据'}), 200 except Exception as e: return jsonify({'valid': False, 'message': f'读取文件失败:{str(e)}'}), 200 @app.get('/api/shipments/query-by-sn') @require_login def query_shipment_by_sn(): """通过 SN/MAC 号查询出货信息""" sn = request.args.get('sn', '').strip() if not sn: return jsonify({'error': '请提供 SN/MAC 号'}), 400 try: r = get_redis() redis_key = 'shipment_sn_mapping' # 从 Redis Hash 中查询 result = r.hget(redis_key, sn) if result: # 解析 JSON 数据 shipment_info = json.loads(result) platform = shipment_info.get('platform', 'pdd') # 默认拼多多 platform_name = {'pdd': '拼多多', 'yt': '圆通', 'tx': '兔喜'}.get(platform, platform) return jsonify({ 'found': True, 'sn': sn, 'date': shipment_info.get('date'), 'box': shipment_info.get('box'), 'platform': platform, 'platform_name': platform_name, 'ts': shipment_info.get('ts') }) else: return jsonify({ 'found': False, 'sn': sn, 'message': '未找到该 SN 的出货记录' }) except Exception as e: log('query_shipment_error', str(e)) return jsonify({'error': f'查询失败:{str(e)}'}), 500 @app.post('/api/shipments/update-platform') @require_login @require_any_role('superadmin') def update_shipments_platform(): """批量更新Redis中发货记录的机种字段""" try: r = get_redis() redis_key = 'shipment_sn_mapping' # 获取所有记录 all_data = r.hgetall(redis_key) updated_count = 0 pipe = r.pipeline() for sn, value in all_data.items(): try: info = json.loads(value) # 如果没有platform字段,添加为pdd if 'platform' not in info: info['platform'] = 'pdd' pipe.hset(redis_key, sn, json.dumps(info, ensure_ascii=False)) updated_count += 1 except Exception: continue pipe.execute() log('update_shipments_platform', f'updated {updated_count} records') return jsonify({ 'ok': True, 'message': f'已更新 {updated_count} 条记录为拼多多', 'updated': updated_count }) except Exception as e: log('update_shipments_platform_error', str(e)) return jsonify({'error': f'更新失败: {str(e)}'}), 500 @app.get('/api/shipments/redis-stats') @require_login def shipments_redis_stats(): """获取 Redis 中发货记录的统计信息""" try: r = get_redis() redis_key = 'shipment_sn_mapping' count = r.hlen(redis_key) return jsonify({ 'key': redis_key, 'count': count, 'exists': r.exists(redis_key) > 0 }) except Exception as e: log('shipments_redis_stats_error', str(e)) return jsonify({'error': f'获取统计失败:{str(e)}'}), 500 @app.post('/api/shipments/clear-redis') @require_login @require_any_role('admin','superadmin') def clear_shipments_redis(): """清空 Redis 中的发货记录数据""" try: r = get_redis() redis_key = 'shipment_sn_mapping' # 获取删除前的数量 count_before = r.hlen(redis_key) # 删除整个 Hash r.delete(redis_key) log('clear_shipments_redis', f'cleared {count_before} records') return jsonify({ 'ok': True, 'message': f'已清空 {count_before} 条发货记录', 'count': count_before }) except Exception as e: log('clear_shipments_redis_error', str(e)) return jsonify({'error': f'清空失败:{str(e)}'}), 500 @app.route('/api/upload/shipments-file', methods=['POST']) @require_login @require_any_role('admin','superadmin') def upload_shipments_file(): """上传发货记录Excel文件""" f = request.files.get('file') platform = request.form.get('platform') # 获取机种参数 if not f: return jsonify({'error': '请选择文件'}), 400 if not platform or platform not in ['pdd', 'yt', 'tx']: return jsonify({'error': '请选择机种(拼多多/圆通/兔喜)'}), 400 name = secure_filename(f.filename or '') ext = (name.split('.')[-1] or '').lower() if ext not in ['csv', 'xlsx', 'xls']: return jsonify({'error': '文件格式不支持'}), 400 try: rows = [] if ext == 'csv': text = f.stream.read().decode('utf-8', errors='ignore') lines = [l.strip() for l in text.splitlines() if l.strip()] if len(lines) < 2: return jsonify({'error': '文件为空'}), 400 header = [h.strip() for h in lines[0].split(',')] header_lower = [h.lower() for h in header] # 找到列索引 date_idx = next((i for i, h in enumerate(header) if '出货日期' in h or '发货日期' in h or 'date' in header_lower[i]), None) box_idx = next((i for i, h in enumerate(header) if '箱号' in h or 'box' in header_lower[i]), None) if date_idx is None or box_idx is None: return jsonify({'error': '缺少必需的列'}), 400 # 找到所有SN列(按数字排序) sn_cols = [(i, h) for i, h in enumerate(header) if h.startswith('SN') and h[2:].isdigit()] sn_indices = sorted(sn_cols, key=lambda x: int(x[1][2:])) # 按 SN 后面的数字排序 # 记录上一个有效的日期(用于处理合并单元格) last_valid_date = None for line in lines[1:]: parts = [p.strip() for p in line.split(',')] if len(parts) <= max(date_idx, box_idx): continue # 处理日期(合并单元格时可能为空) current_date = parts[date_idx] if date_idx < len(parts) and parts[date_idx] else '' # 如果当前行日期为空,使用上一个有效日期 if current_date: last_valid_date = current_date date = current_date else: date = last_valid_date # 处理箱号 box = parts[box_idx] if box_idx < len(parts) and parts[box_idx] else '' # 如果没有日期或箱号,跳过这行 if not date or not box: continue # 收集所有SN(横向 20 个) sns = [] for idx, _ in sn_indices: if idx < len(parts) and parts[idx]: sns.append(parts[idx]) # 只有当有 SN 数据时才添加记录 if sns: rows.append({ 'date': date, 'box': box, 'sns': sns, 'qty': len(sns) }) else: import openpyxl wb = openpyxl.load_workbook(f) ws = wb.active if ws.max_row < 2: wb.close() return jsonify({'error': '文件为空'}), 400 # 读取表头 header_row = list(ws.iter_rows(min_row=1, max_row=1, values_only=True))[0] header = [str(h).strip() if h else '' for h in header_row] header_lower = [h.lower() for h in header] # 找到列索引 date_idx = next((i for i, h in enumerate(header) if '出货日期' in h or '发货日期' in h or 'date' in header_lower[i]), None) box_idx = next((i for i, h in enumerate(header) if '箱号' in h or 'box' in header_lower[i]), None) if date_idx is None or box_idx is None: wb.close() return jsonify({'error': '缺少必需的列'}), 400 # 找到所有SN列(按数字排序) sn_cols = [(i, h) for i, h in enumerate(header) if h.startswith('SN') and h[2:].isdigit()] sn_indices = sorted(sn_cols, key=lambda x: int(x[1][2:])) # 按 SN 后面的数字排序 # 记录上一个有效的日期(用于处理合并单元格) last_valid_date = None # 读取数据行 for row in ws.iter_rows(min_row=2, values_only=True): # 处理日期(合并单元格时可能为 None) date_value = row[date_idx] if date_idx < len(row) else None current_date = None if date_value: # 如果是 datetime.datetime 或 datetime.date 对象 if hasattr(date_value, 'strftime'): current_date = date_value.strftime('%Y-%m-%d') # 如果是数字(Excel 日期序列号) elif isinstance(date_value, (int, float)): try: # Excel 日期从 1900-01-01 开始计数 from datetime import datetime as dt, timedelta # Excel 的 bug:1900 不是闰年但 Excel 认为是,所以需要减1 excel_epoch = dt(1899, 12, 30) date_obj = excel_epoch + timedelta(days=float(date_value)) current_date = date_obj.strftime('%Y-%m-%d') except Exception: current_date = str(date_value).strip() else: current_date = str(date_value).strip() if current_date == 'None': current_date = None # 如果当前行日期为空,使用上一个有效日期 if current_date: last_valid_date = current_date date = current_date else: date = last_valid_date # 处理箱号 box = str(row[box_idx]).strip() if box_idx < len(row) and row[box_idx] else '' # 如果没有日期或箱号,跳过这行 if not date or not box or box == 'None': continue # 收集所有SN(横向 20 个) sns = [] for idx, _ in sn_indices: if idx < len(row) and row[idx]: sn_value = str(row[idx]).strip() if sn_value and sn_value != 'None': sns.append(sn_value) # 只有当有 SN 数据时才添加记录 if sns: rows.append({ 'date': date, 'box': box, 'sns': sns, 'qty': len(sns) }) wb.close() # 保存到 SQLite 数据库 conn = get_db() c = conn.cursor() # 使用北京时间(UTC+8) from datetime import timezone, timedelta beijing_tz = timezone(timedelta(hours=8)) now = datetime.now(beijing_tz).strftime('%Y-%m-%d %H:%M:%S') total_qty = 0 for r in rows: receiver_info = f"箱号:{r['box']}" c.execute('INSERT INTO shipments(date, qty, receiver, ts) VALUES(?,?,?,?)', (r['date'], r['qty'], receiver_info, now)) total_qty += r['qty'] conn.commit() conn.close() # 保存到 Redis - 使用 Hash 结构存储 MAC -> 出货信息的映射 try: r = get_redis() redis_key = 'shipment_sn_mapping' # Redis Hash key # 使用 pipeline 批量写入,提高性能 pipe = r.pipeline() redis_count = 0 for row_data in rows: date = row_data['date'] box = row_data['box'] sns = row_data['sns'] # 为每个 SN/MAC 创建映射记录 for sn in sns: if sn: # 确保 SN 不为空 # 存储格式: MAC -> JSON(date, box, platform, timestamp) shipment_info = json.dumps({ 'date': date, 'box': box, 'platform': platform, 'ts': now }, ensure_ascii=False) pipe.hset(redis_key, sn, shipment_info) redis_count += 1 # 执行批量写入 pipe.execute() log('upload_shipments_redis', f"redis_key={redis_key}, sn_count={redis_count}") except Exception as redis_error: # Redis 写入失败不影响主流程,只记录日志 log('upload_shipments_redis_error', str(redis_error)) log('upload_shipments_file', f"boxes={len(rows)}, total_qty={total_qty}") notify_superadmin('批量上传发货记录文件', f"箱数: {len(rows)}, 总数量: {total_qty}") return jsonify({'ok': True, 'count': len(rows), 'total_qty': total_qty}) except Exception as e: log('upload_shipments_file_error', str(e)) return jsonify({'error': f'处理文件失败:{str(e)}'}), 500 init_db() if __name__ == '__main__': app.run(host='0.0.0.0', port=int(os.environ.get('PORT', '5000')))