ERP/server/app.py
2025-12-08 11:20:28 +08:00

5284 lines
186 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import os
import json
import sqlite3
from datetime import datetime
from functools import wraps
from flask import Flask, request, jsonify, session, send_from_directory
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.utils import secure_filename
try:
import redis
except Exception:
redis = None
_redis_client = None
_audit_cache = {'pdd': {'ts': 0, 'list': []}, 'yt': {'ts': 0, 'list': []}}
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DB_PATH = os.path.join(BASE_DIR, 'data.db')
FRONTEND_DIR = os.path.join(os.path.dirname(BASE_DIR), 'frontend')
app = Flask(__name__, static_folder=FRONTEND_DIR, static_url_path='')
app.config['SECRET_KEY'] = os.environ.get('APP_SECRET', 'change-me')
app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 限制上传文件大小为50MB
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
conn = get_db()
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS users(
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
role TEXT NOT NULL
)''')
c.execute('''CREATE TABLE IF NOT EXISTS operations_log(
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
action TEXT,
detail TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS notifications(
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
username TEXT,
action TEXT,
detail TEXT,
ts TEXT,
read INTEGER DEFAULT 0
)''')
c.execute('''CREATE TABLE IF NOT EXISTS mac_batches(
id INTEGER PRIMARY KEY AUTOINCREMENT,
mac TEXT,
batch TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS stats(
id INTEGER PRIMARY KEY AUTOINCREMENT,
good INTEGER,
bad INTEGER,
fpy_good INTEGER DEFAULT 0,
platform TEXT DEFAULT 'pdd',
ts TEXT
)''')
# 为已存在的表添加列(如果不存在)
try:
c.execute('ALTER TABLE stats ADD COLUMN fpy_good INTEGER DEFAULT 0')
except Exception:
pass # 列已存在
try:
c.execute('ALTER TABLE stats ADD COLUMN platform TEXT DEFAULT "pdd"')
except Exception:
pass # 列已存在
try:
c.execute('ALTER TABLE users ADD COLUMN avatar TEXT')
except Exception:
pass # 列已存在
try:
c.execute('ALTER TABLE users ADD COLUMN factory TEXT')
except Exception:
pass # 列已存在
try:
c.execute('ALTER TABLE mac_batches ADD COLUMN platform TEXT DEFAULT "pdd"')
except Exception:
pass # 列已存在
c.execute('''CREATE TABLE IF NOT EXISTS defects(
id INTEGER PRIMARY KEY AUTOINCREMENT,
mac TEXT,
batch TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS shipments(
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT,
qty INTEGER,
receiver TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS devices(
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
status TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS environment(
id INTEGER PRIMARY KEY AUTOINCREMENT,
temp TEXT,
hum TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS personnel(
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
role TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS qa(
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
date TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS production(
id INTEGER PRIMARY KEY AUTOINCREMENT,
batch TEXT,
duration TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS repairs(
id INTEGER PRIMARY KEY AUTOINCREMENT,
qty INTEGER,
note TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS sop_files(
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT NOT NULL,
original_name TEXT NOT NULL,
description TEXT,
uploader TEXT,
ts TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS work_orders(
id INTEGER PRIMARY KEY AUTOINCREMENT,
factory TEXT NOT NULL,
order_no TEXT NOT NULL,
product_model TEXT,
order_qty INTEGER NOT NULL,
production_start_time TEXT,
production_end_time TEXT,
status TEXT DEFAULT 'issued',
status_text TEXT DEFAULT '已下发',
remark TEXT,
created_by TEXT,
created_at TEXT,
updated_at TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS material_purchase(
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
list_no TEXT NOT NULL,
plan_no TEXT NOT NULL,
bom_result TEXT,
status TEXT NOT NULL,
demand_status TEXT NOT NULL,
complete_rate REAL,
material_code TEXT NOT NULL,
material_name TEXT NOT NULL,
batch_no TEXT,
level INTEGER,
required_qty INTEGER NOT NULL,
stock_qty INTEGER,
shortage INTEGER,
acquire_method TEXT,
realtime_stock INTEGER,
pending_qty INTEGER,
dispatched_qty INTEGER,
received_qty INTEGER,
submitter TEXT,
submit_time TEXT,
update_time TEXT,
deleted INTEGER DEFAULT 0,
deleted_at TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS customer_orders(
id INTEGER PRIMARY KEY AUTOINCREMENT,
order_date TEXT NOT NULL,
order_no TEXT NOT NULL,
customer_name TEXT NOT NULL,
material TEXT NOT NULL,
quantity INTEGER NOT NULL,
unit_price REAL NOT NULL,
created_by TEXT,
created_at TEXT,
updated_at TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS reconciliations(
id INTEGER PRIMARY KEY AUTOINCREMENT,
order_date TEXT NOT NULL,
contract_no TEXT NOT NULL,
material_name TEXT NOT NULL,
spec_model TEXT NOT NULL,
transport_no TEXT,
quantity INTEGER NOT NULL,
unit TEXT NOT NULL,
unit_price REAL NOT NULL,
total_amount REAL NOT NULL,
delivery_date TEXT,
shipment_date TEXT,
created_by TEXT,
created_at TEXT,
updated_at TEXT
)''')
# 为已存在的表添加列(如果不存在)
try:
c.execute('ALTER TABLE customer_orders ADD COLUMN customer_name TEXT')
except Exception:
pass # 列已存在
# 为已存在的表添加列(如果不存在)
try:
c.execute('ALTER TABLE work_orders ADD COLUMN product_model TEXT')
except Exception:
pass # 列已存在
# BOM物料清单表 - 定义产品的物料组成
c.execute('''CREATE TABLE IF NOT EXISTS bom(
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_code TEXT NOT NULL,
product_name TEXT NOT NULL,
material_code TEXT NOT NULL,
material_name TEXT NOT NULL,
unit_qty REAL NOT NULL,
unit TEXT DEFAULT 'pcs',
min_package INTEGER DEFAULT 1,
supplier TEXT,
remark TEXT,
created_by TEXT,
created_at TEXT,
updated_at TEXT
)''')
# 期初库存表 - 记录物料的初始库存
c.execute('''CREATE TABLE IF NOT EXISTS initial_stock(
id INTEGER PRIMARY KEY AUTOINCREMENT,
material_code TEXT NOT NULL UNIQUE,
material_name TEXT NOT NULL,
stock_qty INTEGER DEFAULT 0,
unit TEXT DEFAULT 'pcs',
min_package INTEGER DEFAULT 1,
supplier TEXT,
remark TEXT,
created_by TEXT,
created_at TEXT,
updated_at TEXT
)''')
# 采购需求清单表 - 记录计算后的采购需求
c.execute('''CREATE TABLE IF NOT EXISTS purchase_demand(
id INTEGER PRIMARY KEY AUTOINCREMENT,
demand_no TEXT NOT NULL,
material_code TEXT NOT NULL,
material_name TEXT NOT NULL,
order_qty INTEGER DEFAULT 0,
bom_unit_qty REAL DEFAULT 0,
total_demand INTEGER DEFAULT 0,
initial_stock INTEGER DEFAULT 0,
net_demand INTEGER DEFAULT 0,
min_package INTEGER DEFAULT 1,
actual_purchase_qty INTEGER DEFAULT 0,
unit TEXT DEFAULT 'pcs',
supplier TEXT,
status TEXT DEFAULT 'pending',
remark TEXT,
created_by TEXT,
created_at TEXT,
updated_at TEXT
)''')
conn.commit()
# create default admin
c.execute('SELECT id FROM users WHERE username=?', ('admin',))
if not c.fetchone():
pwd = os.environ.get('ADMIN_PASSWORD', 'admin123')
c.execute('INSERT INTO users(username, password_hash, role) VALUES(?,?,?)', (
'admin', generate_password_hash(pwd), 'admin'
))
conn.commit()
# create superadmin from env
su_user = os.environ.get('SUPERADMIN_USERNAME')
su_pass = os.environ.get('SUPERADMIN_PASSWORD')
if su_user and su_pass:
c.execute('SELECT id FROM users WHERE username=?', (su_user,))
if not c.fetchone():
c.execute('INSERT INTO users(username, password_hash, role) VALUES(?,?,?)', (
su_user, generate_password_hash(su_pass), 'superadmin'
))
conn.commit()
conn.close()
def log(action, detail=''):
try:
conn = get_db()
c = conn.cursor()
c.execute('INSERT INTO operations_log(user_id, action, detail, ts) VALUES(?,?,?,?)', (
session.get('user_id'), action, detail, get_beijing_time()
))
conn.commit()
conn.close()
except Exception:
pass
def notify_superadmin(action, detail=''):
"""为超级管理员创建通知"""
try:
user_id = session.get('user_id')
if not user_id:
return
conn = get_db()
c = conn.cursor()
# 获取当前用户信息
c.execute('SELECT username, role FROM users WHERE id=?', (user_id,))
user = c.fetchone()
if not user:
conn.close()
return
# 如果是超级管理员自己的操作,不创建通知
if user['role'] == 'superadmin':
conn.close()
return
# 为所有超级管理员创建通知
c.execute('SELECT id FROM users WHERE role=?', ('superadmin',))
superadmins = c.fetchall()
# 使用北京时间UTC+8
from datetime import timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz).isoformat()
for admin in superadmins:
c.execute('INSERT INTO notifications(user_id, username, action, detail, ts, read) VALUES(?,?,?,?,?,?)', (
admin['id'], user['username'], action, detail, now, 0
))
conn.commit()
conn.close()
except Exception:
pass
def notify_admins(action, detail=''):
"""为管理员创建通知(超级管理员操作时使用)"""
try:
user_id = session.get('user_id')
if not user_id:
return
conn = get_db()
c = conn.cursor()
# 获取当前用户信息
c.execute('SELECT username, role FROM users WHERE id=?', (user_id,))
user = c.fetchone()
if not user:
conn.close()
return
# 只有超级管理员的操作才通知管理员
if user['role'] != 'superadmin':
conn.close()
return
# 为所有管理员创建通知
c.execute('SELECT id FROM users WHERE role=?', ('admin',))
admins = c.fetchall()
# 使用北京时间UTC+8
from datetime import timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz).isoformat()
for admin in admins:
c.execute('INSERT INTO notifications(user_id, username, action, detail, ts, read) VALUES(?,?,?,?,?,?)', (
admin['id'], user['username'], action, detail, now, 0
))
conn.commit()
conn.close()
except Exception:
pass
def notify_admins_by_factory(action, detail='', factory=None):
"""为指定工厂的管理员创建通知(超级管理员操作时使用)"""
try:
user_id = session.get('user_id')
if not user_id:
return
conn = get_db()
c = conn.cursor()
# 获取当前用户信息
c.execute('SELECT username, role FROM users WHERE id=?', (user_id,))
user = c.fetchone()
if not user:
conn.close()
return
# 只有超级管理员的操作才通知管理员
if user['role'] != 'superadmin':
conn.close()
return
# 如果指定了工厂,只通知该工厂的管理员;否则通知所有管理员
if factory:
c.execute('SELECT id FROM users WHERE role=? AND factory=?', ('admin', factory))
else:
c.execute('SELECT id FROM users WHERE role=?', ('admin',))
admins = c.fetchall()
# 使用北京时间UTC+8
from datetime import timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz).isoformat()
for admin in admins:
c.execute('INSERT INTO notifications(user_id, username, action, detail, ts, read) VALUES(?,?,?,?,?,?)', (
admin['id'], user['username'], action, detail, now, 0
))
conn.commit()
conn.close()
except Exception:
pass
def get_redis():
global _redis_client
if not redis:
raise RuntimeError('redis missing')
if _redis_client is not None:
return _redis_client
host = os.environ.get('REDIS_HOST', '180.163.74.83')
port = int(os.environ.get('REDIS_PORT', '6379'))
password = os.environ.get('REDIS_PASSWORD') or os.environ.get('SUPERADMIN_PASSWORD')
db = int(os.environ.get('REDIS_DB', '0'))
_redis_client = redis.Redis(host=host, port=port, password=password, db=db, decode_responses=True, socket_timeout=0.5, socket_connect_timeout=0.5)
return _redis_client
def get_beijing_time():
"""获取北京时间UTC+8的ISO格式字符串"""
from datetime import timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
return datetime.now(beijing_tz).isoformat()
def format_date_to_slash(date_str):
"""将日期格式统一转换为 YYYY/MM/DD 格式空值返回None"""
if not date_str or str(date_str).strip() == '':
return None
date_str = str(date_str).strip()
# 去掉时间部分
if ' ' in date_str:
date_str = date_str.split()[0]
# 将 YYYY-MM-DD 转换为 YYYY/MM/DD
if '-' in date_str:
return date_str.replace('-', '/')
return date_str
def parse_audit_line(s):
if not s:
return {'ts_cn': None, 'batch': None, 'mac': None, 'note': None}
def normalize_ts(ts):
try:
from datetime import datetime, timezone, timedelta
# Attempt ISO parsing
# Support "Z" UTC suffix and offsets like +08:00
if ts.endswith('Z'):
dt = datetime.fromisoformat(ts.replace('Z', '+00:00'))
else:
dt = datetime.fromisoformat(ts)
# Convert to Beijing time (UTC+8)
bj = dt.astimezone(timezone(timedelta(hours=8)))
return bj.strftime('%Y-%m-%d %H:%M:%S')
except Exception:
return ts
def has_time(v):
return isinstance(v, str) and (('T' in v) or (':' in v))
def choose_ts(d):
candidates = [d.get('ts_cn'), d.get('ts_local'), d.get('ts'), d.get('ts_utc'), d.get('timestamp'), d.get('time')]
for v in candidates:
if has_time(v):
return v
for v in candidates:
if v:
return v
return None
try:
obj = json.loads(s)
ts = choose_ts(obj)
if not ts:
ts = s if isinstance(s, str) else None
return {
'ts_cn': ts if ts else None,
'batch': obj.get('batch') or obj.get('batch_no') or obj.get('lot'),
'mac': obj.get('mac') or obj.get('mac_addr') or obj.get('mac_address'),
'note': obj.get('note') or obj.get('msg') or obj.get('message')
}
except Exception:
pass
d = {}
parts = []
for sep in [' ', ',', ';', '|']:
if sep in s:
parts = s.split(sep)
break
if not parts:
parts = [s]
i = 0
while i < len(parts):
part = parts[i]
if '=' in part:
k, v = part.split('=', 1)
kk = k.strip()
vv = v.strip()
try:
import re
if kk in ('ts_cn', 'ts_local', 'ts', 'timestamp', 'time'):
if re.match(r'^\d{4}-\d{2}-\d{2}$', vv) and i + 1 < len(parts) and re.match(r'^\d{2}:\d{2}:\d{2}', parts[i+1]):
vv = vv + ' ' + parts[i+1]
i += 1
except Exception:
pass
d[kk] = vv
i += 1
ts = choose_ts(d)
if not ts:
ts = s if isinstance(s, str) else None
return {
'ts_cn': ts if ts else None,
'batch': d.get('batch') or d.get('batch_no') or d.get('lot'),
'mac': d.get('mac') or d.get('mac_addr') or d.get('mac_address'),
'note': d.get('note') or d.get('msg') or d.get('message') or s
}
def require_login(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
if not session.get('user_id'):
return jsonify({'error': 'unauthorized'}), 401
return fn(*args, **kwargs)
return wrapper
def require_role(role):
def deco(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
if session.get('role') != role:
return jsonify({'error': 'forbidden'}), 403
return fn(*args, **kwargs)
return wrapper
return deco
def require_any_role(*roles):
def deco(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
if session.get('role') not in roles:
return jsonify({'error': 'forbidden'}), 403
return fn(*args, **kwargs)
return wrapper
return deco
@app.route('/')
def index():
return send_from_directory(FRONTEND_DIR, 'index.html')
@app.route('/index.html')
def index_html():
return send_from_directory(FRONTEND_DIR, 'index.html')
# auth
@app.get('/api/auth/captcha')
def captcha():
"""生成验证码图片"""
try:
from PIL import Image, ImageDraw, ImageFont
import random
import io
import base64
# 生成4位随机数字
code = ''.join([str(random.randint(0, 9)) for _ in range(4)])
# 将验证码存储到session
session['captcha'] = code
# 创建图片
width, height = 120, 40
image = Image.new('RGB', (width, height), color='#f0f4f8')
draw = ImageDraw.Draw(image)
# 尝试使用系统字体,如果失败则使用默认字体
try:
font = ImageFont.truetype('/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf', 28)
except:
try:
font = ImageFont.truetype('/System/Library/Fonts/Helvetica.ttc', 28)
except:
font = ImageFont.load_default()
# 绘制干扰线
for _ in range(3):
x1 = random.randint(0, width)
y1 = random.randint(0, height)
x2 = random.randint(0, width)
y2 = random.randint(0, height)
draw.line([(x1, y1), (x2, y2)], fill='#cbd5e1', width=1)
# 绘制验证码文字
colors = ['#3b82f6', '#2563eb', '#1e40af', '#1e3a8a']
for i, char in enumerate(code):
x = 20 + i * 25 + random.randint(-3, 3)
y = 5 + random.randint(-3, 3)
color = random.choice(colors)
draw.text((x, y), char, font=font, fill=color)
# 绘制干扰点
for _ in range(50):
x = random.randint(0, width)
y = random.randint(0, height)
draw.point((x, y), fill='#94a3b8')
# 转换为base64
buffer = io.BytesIO()
image.save(buffer, format='PNG')
buffer.seek(0)
img_base64 = base64.b64encode(buffer.getvalue()).decode()
return jsonify({'image': f'data:image/png;base64,{img_base64}'})
except Exception as e:
log('captcha_error', str(e))
# 如果生成失败,返回简单的验证码
code = ''.join([str(random.randint(0, 9)) for _ in range(4)])
session['captcha'] = code
return jsonify({'image': '', 'code': code})
@app.post('/api/auth/login')
def login():
data = request.get_json() or {}
username = data.get('username')
password = data.get('password')
captcha = data.get('captcha')
# 验证验证码
session_captcha = session.get('captcha', '').lower()
if not captcha or captcha.lower() != session_captcha:
# 清除验证码,防止重复使用
session.pop('captcha', None)
return jsonify({'error': '验证码错误'}), 400
# 清除验证码,防止重复使用
session.pop('captcha', None)
conn = get_db()
c = conn.cursor()
c.execute('SELECT id, password_hash, role FROM users WHERE username=?', (username,))
row = c.fetchone()
conn.close()
if not row or not check_password_hash(row['password_hash'], password or ''):
return jsonify({'error': '用户名或密码错误'}), 400
session['user_id'] = row['id']
session['role'] = row['role']
session['username'] = username
session.permanent = True
log('login', username)
return jsonify({'ok': True})
@app.get('/api/auth/me')
def me():
uid = session.get('user_id')
if not uid:
return jsonify({'username': None, 'role': None, 'avatar': None, 'factory': None})
conn = get_db()
c = conn.cursor()
c.execute('SELECT username, role, avatar, factory FROM users WHERE id=?', (uid,))
row = c.fetchone()
conn.close()
return jsonify({
'username': row['username'],
'role': row['role'],
'avatar': row['avatar'] if row['avatar'] else None,
'factory': row['factory'] if row['factory'] else None
})
@app.post('/api/auth/logout')
def logout():
log('logout')
session.clear()
return jsonify({'ok': True})
@app.post('/api/user/upload-avatar')
@require_login
def upload_avatar():
uid = session.get('user_id')
if not uid:
return jsonify({'error': '未登录'}), 401
if 'avatar' not in request.files:
return jsonify({'error': '未选择文件'}), 400
file = request.files['avatar']
if file.filename == '':
return jsonify({'error': '未选择文件'}), 400
# 验证文件类型
allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
# 先从原始文件名获取扩展名
original_filename = file.filename
if '.' not in original_filename:
return jsonify({'error': '无效的文件格式'}), 400
ext = original_filename.rsplit('.', 1)[1].lower()
if ext not in allowed_extensions:
return jsonify({'error': '不支持的文件格式,请上传 PNG、JPG、GIF 或 WEBP 格式'}), 400
# 创建avatars目录
avatars_dir = os.path.join(FRONTEND_DIR, 'assets', 'avatars')
os.makedirs(avatars_dir, exist_ok=True)
# 生成唯一文件名
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
new_filename = f'avatar_{uid}_{timestamp}.{ext}'
filepath = os.path.join(avatars_dir, new_filename)
# 保存文件
file.save(filepath)
# 更新数据库
avatar_url = f'./assets/avatars/{new_filename}'
conn = get_db()
c = conn.cursor()
# 删除旧头像文件(如果存在)
c.execute('SELECT avatar FROM users WHERE id=?', (uid,))
row = c.fetchone()
if row and row['avatar'] and row['avatar'].startswith('./assets/avatars/'):
old_file = os.path.join(FRONTEND_DIR, row['avatar'].replace('./', ''))
if os.path.exists(old_file):
try:
os.remove(old_file)
except Exception:
pass
c.execute('UPDATE users SET avatar=? WHERE id=?', (avatar_url, uid))
conn.commit()
conn.close()
log('upload_avatar', f'上传头像: {new_filename}')
return jsonify({'ok': True, 'avatar_url': avatar_url})
@app.post('/api/user/reset-avatar')
@require_login
def reset_avatar():
uid = session.get('user_id')
if not uid:
return jsonify({'error': '未登录'}), 401
conn = get_db()
c = conn.cursor()
# 删除旧头像文件(如果存在)
c.execute('SELECT avatar FROM users WHERE id=?', (uid,))
row = c.fetchone()
if row and row['avatar'] and row['avatar'].startswith('./assets/avatars/'):
old_file = os.path.join(FRONTEND_DIR, row['avatar'].replace('./', ''))
if os.path.exists(old_file):
try:
os.remove(old_file)
except Exception:
pass
c.execute('UPDATE users SET avatar=NULL WHERE id=?', (uid,))
conn.commit()
conn.close()
log('reset_avatar', '恢复默认头像')
return jsonify({'ok': True})
# dashboard
@app.get('/api/dashboard')
@require_login
def dashboard():
conn = get_db()
c = conn.cursor()
c.execute('SELECT COALESCE(SUM(good),0) AS good_total, COALESCE(SUM(bad),0) AS bad_total, COALESCE(SUM(fpy_good),0) AS fpy_good_total FROM stats')
s = c.fetchone()
c.execute('SELECT COUNT(1) AS total FROM defects')
defects = c.fetchone()
conn.close()
good = s['good_total'] if s else 0
bad = s['bad_total'] if s else 0
fpy_good = s['fpy_good_total'] if s else 0
# 计算总良品率
rate = "{}%".format(round((good/(good+bad)) * 100, 2)) if (good+bad) > 0 else u''
# 计算直通良品率FPY = First Pass Yield
total_produced = good + bad
fpy_rate = "{}%".format(round((fpy_good/total_produced) * 100, 2)) if total_produced > 0 else u''
# 从 Redis 获取发货数量SN 记录数)
shipments_count = 0
try:
r = get_redis()
redis_key = 'shipment_sn_mapping'
shipments_count = r.hlen(redis_key)
except Exception as e:
log('dashboard_redis_error', str(e))
# Redis 失败时回退到 SQLite
conn = get_db()
c = conn.cursor()
c.execute('SELECT SUM(qty) AS total FROM shipments')
ship = c.fetchone()
conn.close()
shipments_count = (ship['total'] or 0) if ship else 0
return jsonify({
'fpyRate': fpy_rate,
'goodRate': rate,
'shipments': shipments_count,
'defects': (defects['total'] or 0) if defects else 0,
'badCount': bad
})
@app.get('/api/audit/pdd')
@require_login
def audit_pdd():
start = datetime.utcnow()
try:
q_start = request.args.get('start')
q_end = request.args.get('end')
q_limit = request.args.get('limit')
q_order = request.args.get('order', 'desc')
has_filter = bool(q_start or q_end or q_limit or q_order)
# 缓存优化3秒内不重复查询
if (not has_filter) and ((datetime.utcnow().timestamp() - _audit_cache['pdd']['ts']) < 3):
return jsonify({'list': _audit_cache['pdd']['list']})
r = get_redis()
# 设置Redis超时为5秒
r.connection_pool.connection_kwargs['socket_timeout'] = 5
items = []
# 限制最大返回数量,避免数据过大
max_items = 500 if has_filter else 200
for key in ['mac_batch_audit_pdd', 'audit:pdd', 'pdd:audit']:
try:
if r.exists(key):
t = r.type(key)
if t == 'list':
# 限制最大查询数量
total = r.llen(key)
if has_filter:
items = r.lrange(key, max(0, total - max_items), -1)
else:
items = r.lrange(key, -200, -1)
elif t == 'zset':
items = r.zrevrange(key, 0, max_items - 1)
elif t == 'stream':
entries = r.xrevrange(key, max='+', min='-', count=max_items)
items = [json.dumps(v) for _id, v in entries]
else:
v = r.get(key)
items = [v] if v else []
break
except Exception as e:
log('audit_pdd_error', f'Redis query error: {str(e)}')
continue
try:
host = os.environ.get('REDIS_HOST')
db = os.environ.get('REDIS_DB')
tp = r.type('mac_batch_audit_pdd')
ln = 0
try:
ln = r.llen('mac_batch_audit_pdd')
except Exception:
pass
log('audit_pdd_probe', json.dumps({'host': host, 'db': db, 'type': tp, 'len': ln}))
except Exception:
pass
if not items and r.exists('batch_sn_mapping_pdd') and r.type('batch_sn_mapping_pdd') == 'hash':
try:
pairs = []
cursor = 0
while True:
cursor, res = r.hscan('batch_sn_mapping_pdd', cursor=cursor, count=200)
for k, v in (res or {}).items():
pairs.append({'mac': k, 'batch': v})
if len(pairs) >= 100:
break
if cursor == 0 or len(pairs) >= 100:
break
items = [json.dumps({'mac': p['mac'], 'batch': p['batch'], 'note': 'mapping'}) for p in pairs]
except Exception:
pass
res = [parse_audit_line(x) for x in items]
if q_start or q_end:
def to_epoch(s):
try:
if not s:
return None
if 'T' in s or 'Z' in s or '+' in s:
return datetime.fromisoformat(s.replace('Z','+00:00')).timestamp()
if ' ' in s and ':' in s:
return datetime.strptime(s, '%Y-%m-%d %H:%M:%S').timestamp()
return datetime.strptime(s, '%Y-%m-%d').timestamp()
except Exception:
return None
s_epoch = to_epoch(q_start) if q_start else None
e_epoch = to_epoch(q_end) if q_end else None
tmp = []
for r0 in res:
ts = to_epoch(r0.get('ts_cn'))
if ts is None:
continue
if s_epoch is not None and ts < s_epoch:
continue
if e_epoch is not None and ts > e_epoch:
continue
tmp.append(r0)
res = tmp
try:
def to_key(r):
s = r.get('ts_cn') or ''
try:
if 'T' in s or 'Z' in s or '+' in s:
return datetime.fromisoformat(s.replace('Z','+00:00')).timestamp()
if ' ' in s and ':' in s:
return datetime.strptime(s, '%Y-%m-%d %H:%M:%S').timestamp()
return datetime.strptime(s, '%Y-%m-%d').timestamp()
except Exception:
return 0
res.sort(key=to_key, reverse=(q_order != 'asc'))
except Exception:
res.reverse()
if q_limit:
try:
lim = int(q_limit)
if lim > 0:
res = res[:lim]
except Exception:
pass
if not has_filter:
_audit_cache['pdd'] = {'ts': datetime.utcnow().timestamp(), 'list': res}
dur = (datetime.utcnow() - start).total_seconds()
log('audit_pdd_cost', f"{dur}s len={len(res)}")
return jsonify({'list': res})
except Exception as e:
log('audit_pdd_error', str(e))
return jsonify({'list': []})
@app.get('/api/audit/yt')
@require_login
def audit_yt():
start = datetime.utcnow()
try:
q_start = request.args.get('start')
q_end = request.args.get('end')
q_limit = request.args.get('limit')
q_order = request.args.get('order', 'desc')
has_filter = bool(q_start or q_end or q_limit or q_order)
# 缓存优化3秒内不重复查询
if (not has_filter) and ((datetime.utcnow().timestamp() - _audit_cache['yt']['ts']) < 3):
return jsonify({'list': _audit_cache['yt']['list']})
r = get_redis()
# 设置Redis超时为5秒
r.connection_pool.connection_kwargs['socket_timeout'] = 5
items = []
# 限制最大返回数量,避免数据过大
max_items = 500 if has_filter else 200
for key in ['mac_batch_audit_yt', 'audit:yt', 'yt:audit']:
try:
if r.exists(key):
t = r.type(key)
if t == 'list':
# 限制最大查询数量
total = r.llen(key)
if has_filter:
items = r.lrange(key, max(0, total - max_items), -1)
else:
items = r.lrange(key, -200, -1)
elif t == 'zset':
items = r.zrevrange(key, 0, max_items - 1)
elif t == 'stream':
entries = r.xrevrange(key, max='+', min='-', count=max_items)
items = [json.dumps(v) for _id, v in entries]
else:
v = r.get(key)
items = [v] if v else []
break
except Exception as e:
log('audit_yt_error', f'Redis query error: {str(e)}')
continue
try:
host = os.environ.get('REDIS_HOST')
db = os.environ.get('REDIS_DB')
tp = r.type('mac_batch_audit_yt')
ln = 0
try:
ln = r.llen('mac_batch_audit_yt')
except Exception:
pass
log('audit_yt_probe', json.dumps({'host': host, 'db': db, 'type': tp, 'len': ln}))
except Exception:
pass
if not items and r.exists('batch_sn_mapping_yt') and r.type('batch_sn_mapping_yt') == 'hash':
try:
pairs = []
cursor = 0
while True:
cursor, res = r.hscan('batch_sn_mapping_yt', cursor=cursor, count=200)
for k, v in (res or {}).items():
pairs.append({'mac': k, 'batch': v})
if len(pairs) >= 100:
break
if cursor == 0 or len(pairs) >= 100:
break
items = [json.dumps({'mac': p['mac'], 'batch': p['batch'], 'note': 'mapping'}) for p in pairs]
except Exception:
pass
res = [parse_audit_line(x) for x in items]
if q_start or q_end:
def to_epoch(s):
try:
if not s:
return None
if 'T' in s or 'Z' in s or '+' in s:
return datetime.fromisoformat(s.replace('Z','+00:00')).timestamp()
if ' ' in s and ':' in s:
return datetime.strptime(s, '%Y-%m-%d %H:%M:%S').timestamp()
return datetime.strptime(s, '%Y-%m-%d').timestamp()
except Exception:
return None
s_epoch = to_epoch(q_start) if q_start else None
e_epoch = to_epoch(q_end) if q_end else None
tmp = []
for r0 in res:
ts = to_epoch(r0.get('ts_cn'))
if ts is None:
continue
if s_epoch is not None and ts < s_epoch:
continue
if e_epoch is not None and ts > e_epoch:
continue
tmp.append(r0)
res = tmp
try:
def to_key(r):
s = r.get('ts_cn') or ''
try:
if 'T' in s or 'Z' in s or '+' in s:
return datetime.fromisoformat(s.replace('Z','+00:00')).timestamp()
if ' ' in s and ':' in s:
return datetime.strptime(s, '%Y-%m-%d %H:%M:%S').timestamp()
return datetime.strptime(s, '%Y-%m-%d').timestamp()
except Exception:
return 0
res.sort(key=to_key, reverse=(q_order != 'asc'))
except Exception:
res.reverse()
if q_limit:
try:
lim = int(q_limit)
if lim > 0:
res = res[:lim]
except Exception:
pass
if not has_filter:
_audit_cache['yt'] = {'ts': datetime.utcnow().timestamp(), 'list': res}
dur = (datetime.utcnow() - start).total_seconds()
log('audit_yt_cost', f"{dur}s len={len(res)}")
return jsonify({'list': res})
except Exception as e:
log('audit_yt_error', str(e))
return jsonify({'list': []})
@app.get('/api/audit/diagnose')
@require_login
def audit_diagnose():
try:
r = get_redis()
result = {}
for key in ['mac_batch_audit_pdd', 'mac_batch_audit_yt', 'batch_sn_mapping_pdd', 'batch_sn_mapping_yt']:
try:
t = r.type(key)
if t == 'list':
result[key] = {'type': t, 'len': r.llen(key)}
elif t == 'zset':
result[key] = {'type': t, 'len': r.zcard(key)}
elif t == 'stream':
info = r.xinfo_stream(key)
result[key] = {'type': t, 'len': info.get('length')}
elif t == 'hash':
result[key] = {'type': t, 'len': r.hlen(key)}
elif t == 'none':
result[key] = {'type': t, 'len': 0}
else:
v = r.get(key)
result[key] = {'type': t, 'len': 1 if v else 0}
except Exception as e:
result[key] = {'error': str(e)}
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.get('/api/overview')
@require_login
def overview():
conn = get_db()
c = conn.cursor()
c.execute('SELECT COUNT(1) AS cnt, COALESCE(SUM(good),0) AS good_total, COALESCE(SUM(bad),0) AS bad_total, COALESCE(SUM(fpy_good),0) AS fpy_good_total FROM stats')
stats_row = c.fetchone()
c.execute('SELECT COUNT(1) AS cnt FROM defects')
defects_row = c.fetchone()
c.execute('SELECT COUNT(1) AS cnt FROM mac_batches')
mac_row = c.fetchone()
c.execute('SELECT COUNT(1) AS cnt, COALESCE(SUM(qty),0) AS qty_total FROM shipments')
ship_row = c.fetchone()
c.execute('SELECT COUNT(1) AS cnt FROM devices')
devices_row = c.fetchone()
c.execute('SELECT COUNT(1) AS cnt FROM personnel')
personnel_row = c.fetchone()
c.execute('SELECT COUNT(1) AS cnt FROM qa')
qa_row = c.fetchone()
c.execute('SELECT COUNT(1) AS cnt FROM production')
production_row = c.fetchone()
conn.close()
return jsonify({
'stats': {
'records': (stats_row['cnt'] or 0) if stats_row else 0,
'goodTotal': (stats_row['good_total'] or 0) if stats_row else 0,
'badTotal': (stats_row['bad_total'] or 0) if stats_row else 0,
'fpyGoodTotal': (stats_row['fpy_good_total'] or 0) if stats_row else 0
},
'defects': (defects_row['cnt'] or 0) if defects_row else 0,
'mac': (mac_row['cnt'] or 0) if mac_row else 0,
'shipments': {
'records': (ship_row['cnt'] or 0) if ship_row else 0,
'qtyTotal': (ship_row['qty_total'] or 0) if ship_row else 0
},
'devices': (devices_row['cnt'] or 0) if devices_row else 0,
'personnel': (personnel_row['cnt'] or 0) if personnel_row else 0,
'qa': (qa_row['cnt'] or 0) if qa_row else 0,
'production': (production_row['cnt'] or 0) if production_row else 0
})
# uploads
@app.post('/api/upload/mac')
@require_login
@require_any_role('admin','superadmin')
def upload_mac():
data = request.get_json() or {}
rows = data.get('rows') or []
if not isinstance(rows, list):
return jsonify({'error': 'invalid rows'}), 400
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
for r in rows:
mac = (r or {}).get('mac')
batch = (r or {}).get('batch')
if not mac or not batch:
continue
c.execute('INSERT INTO mac_batches(mac, batch, ts) VALUES(?,?,?)', (mac, batch, now))
conn.commit()
conn.close()
log('upload_mac', f"count={len(rows)}")
notify_superadmin('上传MAC与批次', f"上传了 {len(rows)} 条记录")
return jsonify({'ok': True})
@app.post('/api/upload/stats')
@require_login
@require_any_role('admin','superadmin')
def upload_stats():
data = request.get_json() or {}
good = int(data.get('good') or 0)
bad = int(data.get('bad') or 0)
fpy_good = int(data.get('fpy_good') or 0) # 直通良品数
platform = data.get('platform') or 'pdd' # 平台pdd/yt/tx
details = data.get('details') or []
if good < 0 or bad < 0 or fpy_good < 0:
return jsonify({'error': 'invalid count'}), 400
if platform not in ['pdd', 'yt', 'tx']:
platform = 'pdd'
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
# 保存统计数据
c.execute('INSERT INTO stats(good,bad,fpy_good,platform,ts) VALUES(?,?,?,?,?)', (good, bad, fpy_good, platform, now))
# 如果有不良明细保存到defects表
if details and isinstance(details, list):
for item in details:
mac = (item or {}).get('mac')
batch = (item or {}).get('batch')
if mac and batch:
c.execute('INSERT INTO defects(mac, batch, ts) VALUES(?,?,?)', (mac, batch, now))
conn.commit()
conn.close()
platform_name = {'pdd': '拼多多', 'yt': '圆通', 'tx': '兔喜'}.get(platform, platform)
log('upload_stats', json.dumps({'good': good, 'bad': bad, 'fpy_good': fpy_good, 'platform': platform, 'details_count': len(details)}))
notify_superadmin('上传良/不良统计', f"平台: {platform_name}, 良品: {good}, 不良品: {bad}, 直通良品: {fpy_good}")
return jsonify({'ok': True})
@app.post('/api/upload/repairs')
@require_login
@require_any_role('admin','superadmin')
def upload_repairs():
data = request.get_json() or {}
qty = int(data.get('qty') or 0)
note = data.get('note') or ''
if qty < 0:
return jsonify({'error': 'invalid quantity'}), 400
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
c.execute('INSERT INTO repairs(qty, note, ts) VALUES(?,?,?)', (qty, note, now))
conn.commit()
conn.close()
log('upload_repairs', json.dumps({'qty': qty, 'note': note}))
notify_superadmin('上传返修记录', f"数量: {qty}")
return jsonify({'ok': True})
@app.post('/api/upload/defects')
@require_login
@require_any_role('admin','superadmin')
def upload_defects():
data = request.get_json() or {}
rows = data.get('rows') or []
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
for r in rows:
mac = (r or {}).get('mac')
batch = (r or {}).get('batch')
if not mac or not batch:
continue
c.execute('INSERT INTO defects(mac, batch, ts) VALUES(?,?,?)', (mac, batch, now))
conn.commit()
conn.close()
log('upload_defects', f"count={len(rows)}")
notify_superadmin('上传不良明细', f"上传了 {len(rows)} 条记录")
return jsonify({'ok': True})
@app.post('/api/upload/shipments')
@require_login
@require_any_role('admin','superadmin')
def upload_shipments():
data = request.get_json() or {}
date = data.get('date')
qty = int(data.get('qty') or 0)
to = data.get('to')
platform = data.get('platform', '')
box_no = data.get('box_no', '')
if not date or qty <= 0 or not to or not platform:
return jsonify({'error': 'invalid payload'}), 400
conn = get_db()
c = conn.cursor()
# 检查shipments表是否有platform和box_no列如果没有则添加
c.execute("PRAGMA table_info(shipments)")
columns = [col[1] for col in c.fetchall()]
if 'platform' not in columns:
c.execute('ALTER TABLE shipments ADD COLUMN platform TEXT')
if 'box_no' not in columns:
c.execute('ALTER TABLE shipments ADD COLUMN box_no TEXT')
c.execute(
'INSERT INTO shipments(date, qty, receiver, platform, box_no, ts) VALUES(?,?,?,?,?,?)',
(date, qty, to, platform, box_no, get_beijing_time())
)
conn.commit()
conn.close()
platform_name = {'pdd': '拼多多', 'yt': '圆通', 'tx': '兔喜'}.get(platform, platform)
log_data = {'date': date, 'qty': qty, 'to': to, 'platform': platform}
if box_no:
log_data['box_no'] = box_no
log('upload_shipments', json.dumps(log_data))
notify_superadmin('上传发货记录', f"机种: {platform_name}, 日期: {date}, 数量: {qty}, 接收方: {to}")
return jsonify({'ok': True})
# collect
@app.get('/api/collect/devices')
@require_login
def devices():
conn = get_db()
c = conn.cursor()
c.execute('SELECT name, status FROM devices ORDER BY id DESC LIMIT 50')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.get('/api/collect/environment')
@require_login
def environment():
conn = get_db()
c = conn.cursor()
c.execute('SELECT temp, hum FROM environment ORDER BY id DESC LIMIT 1')
r = c.fetchone()
conn.close()
return jsonify({'temp': (r['temp'] if r else ''), 'hum': (r['hum'] if r else '')})
@app.get('/api/collect/personnel')
@require_login
def personnel():
conn = get_db()
c = conn.cursor()
c.execute('SELECT name, role FROM personnel ORDER BY id DESC LIMIT 100')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.post('/api/collect/personnel')
@require_login
@require_any_role('admin','superadmin')
def add_personnel():
data = request.get_json() or {}
name = (data.get('name') or '').strip()
role = (data.get('role') or '').strip()
if not name:
return jsonify({'error': 'invalid payload'}), 400
conn = get_db()
c = conn.cursor()
c.execute('INSERT INTO personnel(name, role, ts) VALUES(?,?,?)', (name, role, get_beijing_time()))
conn.commit()
conn.close()
log('add_personnel', name)
notify_superadmin('添加人员信息', f"姓名: {name}, 角色: {role}")
return jsonify({'ok': True})
@app.get('/api/collect/qa')
@require_login
def qa():
conn = get_db()
c = conn.cursor()
c.execute('SELECT title, date FROM qa ORDER BY id DESC LIMIT 100')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.get('/api/collect/production')
@require_login
def production():
conn = get_db()
c = conn.cursor()
c.execute('SELECT batch, duration FROM production ORDER BY id DESC LIMIT 100')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
# export
@app.post('/api/export/excel')
@require_login
def export_excel():
try:
import openpyxl
from openpyxl.styles import Font, Alignment, PatternFill
from io import BytesIO
data = request.get_json() or {}
data_type = data.get('type', 'stats')
# 创建工作簿
wb = openpyxl.Workbook()
ws = wb.active
# 设置标题样式
header_fill = PatternFill(start_color='4F8CFF', end_color='4F8CFF', fill_type='solid')
header_font = Font(bold=True, color='FFFFFF')
header_alignment = Alignment(horizontal='center', vertical='center')
conn = get_db()
c = conn.cursor()
# 根据类型导出不同的数据
if data_type == 'stats':
ws.title = '良不良统计'
ws.append(['直通良品数', '良品数', '不良品数', '时间'])
c.execute('SELECT fpy_good, good, bad, ts FROM stats ORDER BY id DESC')
elif data_type == 'mac':
ws.title = 'MAC与批次'
ws.append(['MAC地址', '批次号', '时间'])
c.execute('SELECT mac, batch, ts FROM mac_batches ORDER BY id DESC')
elif data_type == 'repairs':
ws.title = '返修记录'
ws.append(['返修数量', '备注', '时间'])
c.execute('SELECT qty, note, ts FROM repairs ORDER BY id DESC')
elif data_type == 'defects':
ws.title = '不良明细'
ws.append(['MAC地址', '批次号', '时间'])
c.execute('SELECT mac, batch, ts FROM defects ORDER BY id DESC')
elif data_type == 'shipments':
ws.title = '发货记录'
ws.append(['日期', '数量', '收货方', '时间'])
c.execute('SELECT date, qty, receiver, ts FROM shipments ORDER BY id DESC')
elif data_type == 'devices':
ws.title = '设备状态'
ws.append(['设备名称', '状态'])
c.execute('SELECT name, status FROM devices ORDER BY id DESC')
elif data_type == 'personnel':
ws.title = '人员信息'
ws.append(['姓名', '角色'])
c.execute('SELECT name, role FROM personnel ORDER BY id DESC')
elif data_type == 'qa':
ws.title = '质检报告'
ws.append(['标题', '日期'])
c.execute('SELECT title, date FROM qa ORDER BY id DESC')
elif data_type == 'production':
ws.title = '时间记录'
ws.append(['批次', '时长'])
c.execute('SELECT batch, duration FROM production ORDER BY id DESC')
else:
conn.close()
return jsonify({'error': 'invalid type'}), 400
# 应用标题样式
for cell in ws[1]:
cell.fill = header_fill
cell.font = header_font
cell.alignment = header_alignment
# 写入数据
rows = c.fetchall()
for row in rows:
ws.append(list(row))
conn.close()
# 自动调整列宽
for column in ws.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
ws.column_dimensions[column_letter].width = adjusted_width
# 保存到内存
output = BytesIO()
wb.save(output)
output.seek(0)
log('export_excel', data_type)
# 返回文件
from flask import send_file
filename = f'{ws.title}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.xlsx'
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=filename
)
except Exception as e:
log('export_excel_error', str(e))
return jsonify({'error': str(e)}), 500
@app.post('/api/export/pdf')
@require_login
def export_pdf():
try:
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4, landscape
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import cm
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.lib.enums import TA_CENTER
from io import BytesIO
data = request.get_json() or {}
data_type = data.get('type', 'stats')
# 注册中文字体
font_name = 'Helvetica'
try:
# 尝试常见的中文字体路径
font_paths = [
'/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc',
'/usr/share/fonts/truetype/arphic/uming.ttc',
'/System/Library/Fonts/PingFang.ttc',
'C:\\Windows\\Fonts\\simhei.ttf'
]
for font_path in font_paths:
if os.path.exists(font_path):
pdfmetrics.registerFont(TTFont('ChineseFont', font_path))
font_name = 'ChineseFont'
break
except Exception as e:
log('pdf_font_warning', f'无法加载中文字体: {str(e)}')
# 创建PDF
buffer = BytesIO()
doc = SimpleDocTemplate(
buffer,
pagesize=landscape(A4),
topMargin=1.5*cm,
bottomMargin=1.5*cm,
leftMargin=1.5*cm,
rightMargin=1.5*cm
)
elements = []
# 样式
styles = getSampleStyleSheet()
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontName=font_name,
fontSize=18,
textColor=colors.HexColor('#4F8CFF'),
spaceAfter=20,
alignment=TA_CENTER,
leading=24
)
conn = get_db()
c = conn.cursor()
# 根据类型导出不同的数据
if data_type == 'stats':
title = '良/不良统计报表'
headers = ['直通良品数', '良品数', '不良品数', '时间']
c.execute('SELECT fpy_good, good, bad, ts FROM stats ORDER BY id DESC LIMIT 200')
elif data_type == 'mac':
title = 'MAC与批次报表'
headers = ['MAC地址', '批次号', '时间']
c.execute('SELECT mac, batch, ts FROM mac_batches ORDER BY id DESC LIMIT 200')
elif data_type == 'repairs':
title = '返修记录报表'
headers = ['返修数量', '备注', '时间']
c.execute('SELECT qty, note, ts FROM repairs ORDER BY id DESC LIMIT 200')
elif data_type == 'defects':
title = '不良明细报表'
headers = ['MAC地址', '批次号', '时间']
c.execute('SELECT mac, batch, ts FROM defects ORDER BY id DESC LIMIT 200')
elif data_type == 'shipments':
title = '发货记录报表'
headers = ['日期', '数量', '收货方', '时间']
c.execute('SELECT date, qty, receiver, ts FROM shipments ORDER BY id DESC LIMIT 200')
elif data_type == 'devices':
title = '设备状态报表'
headers = ['设备名称', '状态']
c.execute('SELECT name, status FROM devices ORDER BY id DESC LIMIT 200')
elif data_type == 'personnel':
title = '人员信息报表'
headers = ['姓名', '角色']
c.execute('SELECT name, role FROM personnel ORDER BY id DESC LIMIT 200')
elif data_type == 'qa':
title = '质检报告'
headers = ['标题', '日期']
c.execute('SELECT title, date FROM qa ORDER BY id DESC LIMIT 200')
elif data_type == 'production':
title = '生产时间记录'
headers = ['批次', '时长']
c.execute('SELECT batch, duration FROM production ORDER BY id DESC LIMIT 200')
else:
conn.close()
return jsonify({'error': 'invalid type'}), 400
# 添加标题
elements.append(Paragraph(title, title_style))
elements.append(Spacer(1, 0.5*cm))
# 获取数据
rows = c.fetchall()
conn.close()
if len(rows) == 0:
# 没有数据时的提示
no_data_style = ParagraphStyle(
'NoData',
parent=styles['Normal'],
fontName=font_name,
fontSize=12,
textColor=colors.grey,
alignment=TA_CENTER
)
elements.append(Paragraph('暂无数据', no_data_style))
else:
# 构建表格数据
table_data = [headers]
for row in rows:
table_data.append([str(val) if val is not None else '' for val in row])
# 创建表格
table = Table(table_data, repeatRows=1)
table.setStyle(TableStyle([
# 表头样式
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#4F8CFF')),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('FONTNAME', (0, 0), (-1, 0), font_name),
('FONTSIZE', (0, 0), (-1, 0), 10),
('BOTTOMPADDING', (0, 0), (-1, 0), 8),
('TOPPADDING', (0, 0), (-1, 0), 8),
# 数据行样式
('BACKGROUND', (0, 1), (-1, -1), colors.white),
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
('FONTNAME', (0, 1), (-1, -1), font_name),
('FONTSIZE', (0, 1), (-1, -1), 8),
('ROWBACKGROUNDS', (0, 1), (-1, -1), [colors.white, colors.HexColor('#F5F7FA')]),
('TOPPADDING', (0, 1), (-1, -1), 5),
('BOTTOMPADDING', (0, 1), (-1, -1), 5),
]))
elements.append(table)
# 添加页脚信息
elements.append(Spacer(1, 0.5*cm))
footer_style = ParagraphStyle(
'Footer',
parent=styles['Normal'],
fontName=font_name,
fontSize=8,
textColor=colors.grey,
alignment=TA_CENTER
)
footer_text = f'导出时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | 共 {len(rows)} 条记录'
elements.append(Paragraph(footer_text, footer_style))
# 生成PDF
doc.build(elements)
buffer.seek(0)
log('export_pdf', data_type)
# 返回文件
from flask import send_file
filename = f'{title}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.pdf'
return send_file(
buffer,
mimetype='application/pdf',
as_attachment=True,
download_name=filename
)
except Exception as e:
log('export_pdf_error', str(e))
return jsonify({'error': f'PDF导出失败: {str(e)}'}), 500
# lists
@app.get('/api/list/mac')
@require_login
def list_mac():
conn = get_db()
c = conn.cursor()
c.execute('SELECT mac, batch, platform, ts FROM mac_batches ORDER BY id DESC LIMIT 200')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.get('/api/list/stats')
@require_login
def list_stats():
conn = get_db()
c = conn.cursor()
c.execute('SELECT good, bad, fpy_good, platform, ts FROM stats ORDER BY id DESC LIMIT 200')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.get('/api/list/repairs')
@require_login
def list_repairs():
conn = get_db()
c = conn.cursor()
c.execute('SELECT qty, note, ts FROM repairs ORDER BY id DESC LIMIT 200')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.get('/api/list/defects')
@require_login
def list_defects():
conn = get_db()
c = conn.cursor()
c.execute('SELECT mac, batch, ts FROM defects ORDER BY id DESC LIMIT 200')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.get('/api/list/shipments')
@require_login
def list_shipments():
conn = get_db()
c = conn.cursor()
c.execute('SELECT date, qty, receiver, ts FROM shipments ORDER BY id DESC LIMIT 200')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
# admin management
@app.get('/api/admin/users')
@require_login
@require_any_role('superadmin')
def list_users():
conn = get_db()
c = conn.cursor()
c.execute('SELECT username, role, factory FROM users ORDER BY id ASC')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.post('/api/admin/reset-password')
@require_login
@require_any_role('superadmin')
def reset_password():
data = request.get_json() or {}
username = data.get('username')
new_password = data.get('new_password')
if not username or not new_password:
return jsonify({'error': 'invalid payload'}), 400
conn = get_db()
c = conn.cursor()
c.execute('SELECT id FROM users WHERE username=?', (username,))
row = c.fetchone()
if not row:
conn.close()
return jsonify({'error': 'user not found'}), 404
c.execute('UPDATE users SET password_hash=? WHERE id=?', (generate_password_hash(new_password), row['id']))
conn.commit()
conn.close()
log('reset_password', username)
return jsonify({'ok': True})
@app.post('/api/admin/change-password')
@require_login
@require_any_role('superadmin')
def change_password():
data = request.get_json() or {}
username = data.get('username')
new_password = data.get('new_password')
if not username or not new_password:
return jsonify({'error': 'invalid payload'}), 400
conn = get_db()
c = conn.cursor()
c.execute('SELECT id FROM users WHERE username=?', (username,))
row = c.fetchone()
if not row:
conn.close()
return jsonify({'error': 'user not found'}), 404
c.execute('UPDATE users SET password_hash=? WHERE id=?', (generate_password_hash(new_password), row['id']))
conn.commit()
conn.close()
log('change_password', username)
return jsonify({'ok': True})
@app.post('/api/admin/update-user-factory')
@require_login
@require_any_role('superadmin')
def update_user_factory():
"""更新用户所属工厂"""
data = request.get_json() or {}
username = data.get('username')
factory = (data.get('factory') or '').strip()
if not username:
return jsonify({'error': '用户名不能为空'}), 400
if not factory:
return jsonify({'error': '工厂名称不能为空'}), 400
conn = get_db()
c = conn.cursor()
c.execute('SELECT id FROM users WHERE username=?', (username,))
row = c.fetchone()
if not row:
conn.close()
return jsonify({'error': '用户不存在'}), 404
c.execute('UPDATE users SET factory=? WHERE id=?', (factory, row['id']))
conn.commit()
conn.close()
log('update_user_factory', f'username={username}, factory={factory}')
return jsonify({'ok': True, 'message': f'已更新用户 {username} 的所属工厂为 {factory}'})
@app.post('/api/admin/add-user')
@require_login
@require_any_role('superadmin')
def add_user():
"""添加新用户"""
data = request.get_json() or {}
username = (data.get('username') or '').strip()
password = data.get('password')
role = (data.get('role') or 'admin').strip()
factory = (data.get('factory') or '').strip()
if not username or not password:
return jsonify({'error': '用户名和密码不能为空'}), 400
if not factory:
return jsonify({'error': '所属工厂不能为空'}), 400
if role not in ['admin', 'superadmin']:
return jsonify({'error': '角色必须是 admin 或 superadmin'}), 400
conn = get_db()
c = conn.cursor()
# 检查用户名是否已存在
c.execute('SELECT id FROM users WHERE username=?', (username,))
if c.fetchone():
conn.close()
return jsonify({'error': '用户名已存在'}), 400
# 创建新用户
try:
c.execute('INSERT INTO users(username, password_hash, role, factory) VALUES(?,?,?,?)',
(username, generate_password_hash(password), role, factory))
conn.commit()
conn.close()
log('add_user', f'username={username}, role={role}, factory={factory}')
return jsonify({'ok': True, 'message': f'用户 {username} 创建成功'})
except Exception as e:
conn.close()
return jsonify({'error': f'创建用户失败:{str(e)}'}), 500
@app.post('/api/admin/delete-user')
@require_login
@require_any_role('superadmin')
def delete_user():
"""删除用户"""
data = request.get_json() or {}
username = (data.get('username') or '').strip()
if not username:
return jsonify({'error': '用户名不能为空'}), 400
# 获取当前登录用户
current_user = session.get('user')
if current_user and current_user.get('username') == username:
return jsonify({'error': '不能删除当前登录的用户'}), 400
conn = get_db()
c = conn.cursor()
# 检查用户是否存在
c.execute('SELECT id, role FROM users WHERE username=?', (username,))
user = c.fetchone()
if not user:
conn.close()
return jsonify({'error': '用户不存在'}), 404
# 检查是否是最后一个超级管理员
if user['role'] == 'superadmin':
c.execute('SELECT COUNT(*) as cnt FROM users WHERE role=?', ('superadmin',))
count = c.fetchone()['cnt']
if count <= 1:
conn.close()
return jsonify({'error': '不能删除最后一个超级管理员'}), 400
# 删除用户
try:
c.execute('DELETE FROM users WHERE username=?', (username,))
conn.commit()
conn.close()
log('delete_user', f'username={username}')
return jsonify({'ok': True, 'message': f'用户 {username} 已删除'})
except Exception as e:
conn.close()
return jsonify({'error': f'删除用户失败:{str(e)}'}), 500
@app.post('/api/admin/clear')
@require_login
@require_any_role('superadmin')
def clear_module():
data = request.get_json() or {}
module = data.get('module')
tables = {
'mac': 'mac_batches',
'stats': 'stats',
'defects': 'defects',
'shipments': 'shipments',
'devices': 'devices',
'environment': 'environment',
'personnel': 'personnel',
'qa': 'qa',
'production': 'production'
}
table = tables.get(module)
if not table:
return jsonify({'error': 'invalid module'}), 400
# 清空 SQLite 表
conn = get_db()
c = conn.cursor()
c.execute(f'DELETE FROM {table}')
conn.commit()
conn.close()
# 如果是清空发货记录,同时清空 Redis
if module == 'shipments':
try:
r = get_redis()
redis_key = 'shipment_sn_mapping'
redis_count = r.hlen(redis_key)
r.delete(redis_key)
log('clear_module_redis', f'shipments: cleared {redis_count} records from redis')
except Exception as e:
log('clear_module_redis_error', str(e))
log('clear_module', module)
return jsonify({'ok': True})
# notifications
@app.get('/api/notifications')
@require_login
@require_any_role('superadmin', 'admin')
def get_notifications():
"""获取当前用户的通知列表"""
user_id = session.get('user_id')
conn = get_db()
c = conn.cursor()
c.execute('SELECT id, username, action, detail, ts, read FROM notifications WHERE user_id=? ORDER BY id DESC LIMIT 100', (user_id,))
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.get('/api/notifications/unread-count')
@require_login
@require_any_role('superadmin', 'admin')
def get_unread_count():
"""获取未读通知数量"""
user_id = session.get('user_id')
conn = get_db()
c = conn.cursor()
c.execute('SELECT COUNT(*) as count FROM notifications WHERE user_id=? AND read=0', (user_id,))
row = c.fetchone()
conn.close()
return jsonify({'count': row['count'] if row else 0})
@app.post('/api/notifications/mark-read')
@require_login
@require_any_role('superadmin', 'admin')
def mark_notification_read():
"""标记通知为已读"""
data = request.get_json() or {}
notification_id = data.get('id')
if not notification_id:
return jsonify({'error': 'invalid id'}), 400
user_id = session.get('user_id')
conn = get_db()
c = conn.cursor()
c.execute('UPDATE notifications SET read=1 WHERE id=? AND user_id=?', (notification_id, user_id))
conn.commit()
conn.close()
return jsonify({'ok': True})
@app.post('/api/notifications/mark-all-read')
@require_login
@require_any_role('superadmin', 'admin')
def mark_all_notifications_read():
"""标记所有通知为已读"""
user_id = session.get('user_id')
conn = get_db()
c = conn.cursor()
c.execute('UPDATE notifications SET read=1 WHERE user_id=?', (user_id,))
conn.commit()
conn.close()
return jsonify({'ok': True})
@app.post('/api/notifications/delete-read')
@require_login
@require_any_role('superadmin', 'admin')
def delete_read_notifications():
"""删除所有已读通知"""
user_id = session.get('user_id')
conn = get_db()
c = conn.cursor()
c.execute('DELETE FROM notifications WHERE user_id=? AND read=1', (user_id,))
deleted_count = c.rowcount
conn.commit()
conn.close()
return jsonify({'ok': True, 'count': deleted_count})
@app.errorhandler(404)
def not_found(_):
return jsonify({'error': 'not found'}), 404
@app.route('/api/validate/mac-file', methods=['POST'])
@require_login
@require_any_role('admin','superadmin')
def validate_mac_file():
"""验证Excel文件格式是否符合要求"""
f = request.files.get('file')
if not f:
return jsonify({'error': 'no file'}), 400
name = secure_filename(f.filename or '')
ext = (name.split('.')[-1] or '').lower()
if ext not in ['csv', 'xlsx', 'xls']:
return jsonify({'valid': False, 'message': '文件格式不支持请上传CSV或Excel文件'}), 200
try:
if ext == 'csv':
text = f.stream.read().decode('utf-8', errors='ignore')
lines = [l.strip() for l in text.splitlines() if l.strip()]
if not lines:
return jsonify({'valid': False, 'message': '文件为空,没有数据'}), 200
# 检查第一行(表头)
header = [h.strip() for h in lines[0].split(',')]
if len(header) != 2:
return jsonify({'valid': False, 'message': f'文件应该只包含2列数据当前有{len(header)}'}), 200
# 记录表头用于调试
log('validate_mac_file_csv', f'headers: {header}')
# 更灵活的表头检查(不区分大小写)
header_lower = [h.lower() for h in header]
has_mac = any('mac' in h and 'sn' not in h for h in header_lower)
has_sn_mac = any('sn_mac' in h or 'sn-mac' in h for h in header_lower)
has_batch = any('批次' in h or 'batch' in h for h in header_lower)
if not (has_mac or has_sn_mac):
return jsonify({'valid': False, 'message': f'缺少必需的列MAC 或 SN_MAC当前列{", ".join(header)}'}), 200
if not has_batch:
return jsonify({'valid': False, 'message': f'缺少必需的列:批次号(当前列:{", ".join(header)}'}), 200
data_rows = len(lines) - 1
mac_col = 'MAC' if has_mac else 'SN_MAC'
return jsonify({'valid': True, 'message': f'文件格式正确,包含列:{mac_col} 和 批次号,共{data_rows}行数据'}), 200
else:
# 使用pandas读取Excel文件支持.xlsx和.xls格式
import pandas as pd
import io
# 将文件流保存到BytesIO对象
file_content = f.stream.read()
f.stream.seek(0) # 重置流位置
file_io = io.BytesIO(file_content)
try:
# 根据扩展名选择引擎
if ext == 'xls':
df = pd.read_excel(file_io, engine='xlrd')
else:
df = pd.read_excel(file_io)
except Exception as e:
return jsonify({'valid': False, 'message': f'读取Excel文件失败{str(e)}'}), 200
if len(df) == 0:
return jsonify({'valid': False, 'message': '文件为空,没有数据'}), 200
if len(df.columns) != 2:
return jsonify({'valid': False, 'message': f'文件应该只包含2列数据当前有{len(df.columns)}'}), 200
# 检查表头
header = [str(h).strip() for h in df.columns]
# 记录表头用于调试
log('validate_mac_file', f'headers: {header}')
# 更灵活的表头检查(不区分大小写)
header_lower = [h.lower() for h in header]
has_mac = any('mac' in h and 'sn' not in h for h in header_lower)
has_sn_mac = any('sn_mac' in h or 'sn-mac' in h for h in header_lower)
has_batch = any('批次' in h or 'batch' in h for h in header_lower)
if not (has_mac or has_sn_mac):
return jsonify({'valid': False, 'message': f'缺少必需的列MAC 或 SN_MAC当前列{", ".join(header)}'}), 200
if not has_batch:
return jsonify({'valid': False, 'message': f'缺少必需的列:批次号(当前列:{", ".join(header)}'}), 200
data_rows = len(df)
mac_col = 'MAC' if has_mac else 'SN_MAC'
return jsonify({'valid': True, 'message': f'文件格式正确,包含列:{mac_col} 和 批次号,共{data_rows}行数据'}), 200
except Exception as e:
return jsonify({'valid': False, 'message': f'读取文件失败:{str(e)}'}), 200
@app.post('/api/upload/mac-file')
@require_login
@require_any_role('admin','superadmin')
def upload_mac_file():
import subprocess
import tempfile
f = request.files.get('file')
upload_type = request.form.get('type', 'pdd') # pdd, yt, or tx
if not f:
return jsonify({'error': 'no file'}), 400
if upload_type not in ['pdd', 'yt', 'tx']:
return jsonify({'error': 'invalid type'}), 400
# 保存上传的文件到临时位置
name = secure_filename(f.filename or 'upload.xlsx')
temp_dir = '/home/hyx/work/batch_import_xlsx'
os.makedirs(temp_dir, exist_ok=True)
# 检测文件扩展名,保持原始格式
original_ext = '.xls' if name.lower().endswith('.xls') and not name.lower().endswith('.xlsx') else '.xlsx'
# 根据类型确定基础文件名
if upload_type == 'yt':
base_name = 'sn_test_yt'
elif upload_type == 'pdd':
base_name = 'sn_test_pdd'
else:
base_name = 'sn_test_tx'
# 删除旧文件(.xlsx 和 .xls 都删除,确保只保留最新的)
for old_ext in ['.xlsx', '.xls']:
old_path = os.path.join(temp_dir, f'{base_name}{old_ext}')
if os.path.exists(old_path):
try:
os.remove(old_path)
log('upload_mac_file', f'removed old file: {old_path}')
except Exception as e:
log('upload_mac_file_error', f'failed to remove old file: {e}')
# 保存新文件
temp_path = os.path.join(temp_dir, f'{base_name}{original_ext}')
f.save(temp_path)
# 调用batch_import.py脚本
script_path = '/home/hyx/work/生产管理系统/test_py/batch_import.py'
python_path = '/home/hyx/work/.venv/bin/python'
try:
result = subprocess.run(
[python_path, script_path, upload_type],
capture_output=True,
text=True,
timeout=300 # 5分钟超时
)
output = result.stdout + result.stderr
success = result.returncode == 0
# 解析输出中的成功导入数据同时保存到SQLite数据库
if success:
import re
json_match = re.search(r'=== 成功导入的数据 ===\n([\s\S]*?)\n=== 数据输出结束 ===', output)
if json_match:
try:
import json as json_lib
records = json_lib.loads(json_match.group(1).strip())
if records and isinstance(records, list):
# 保存到SQLite数据库使用北京时间UTC+8
conn = get_db()
c = conn.cursor()
from datetime import timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz).isoformat()
for record in records:
mac = record.get('mac')
batch = record.get('batch')
if mac and batch:
c.execute('INSERT INTO mac_batches(mac, batch, platform, ts) VALUES(?,?,?,?)', (mac, batch, upload_type, now))
conn.commit()
conn.close()
log('upload_mac_file_db', f"saved {len(records)} records to database")
except Exception as e:
log('upload_mac_file_db_error', str(e))
log('upload_mac_file', f"type={upload_type}, success={success}")
if success:
notify_superadmin('批量上传MAC文件', f"类型: {upload_type}")
return jsonify({
'ok': success,
'output': output,
'returncode': result.returncode
})
except subprocess.TimeoutExpired:
return jsonify({'error': '上传超时', 'output': '处理时间超过5分钟'}), 500
except Exception as e:
return jsonify({'error': str(e), 'output': ''}), 500
@app.post('/api/upload/defects-file')
@require_login
@require_any_role('admin','superadmin')
def upload_defects_file():
f = request.files.get('file')
if not f:
return jsonify({'error': 'no file'}), 400
name = secure_filename(f.filename or '')
ext = (name.split('.')[-1] or '').lower()
rows = []
if ext == 'csv':
text = f.stream.read().decode('utf-8', errors='ignore')
for l in text.splitlines():
parts = [p.strip() for p in l.split(',')]
if len(parts) >= 2:
rows.append({'mac': parts[0], 'batch': parts[1]})
else:
try:
import openpyxl
wb = openpyxl.load_workbook(f)
ws = wb.active
for r in ws.iter_rows(values_only=True):
mac = str(r[0]).strip() if r and r[0] else None
batch = str(r[1]).strip() if r and len(r) > 1 and r[1] else None
if mac and batch:
rows.append({'mac': mac, 'batch': batch})
except Exception:
return jsonify({'error': 'parse error'}), 400
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
for r in rows:
c.execute('INSERT INTO defects(mac, batch, ts) VALUES(?,?,?)', (r['mac'], r['batch'], now))
conn.commit()
conn.close()
log('upload_defects_file', f"count={len(rows)}")
notify_superadmin('批量上传不良明细文件', f"上传了 {len(rows)} 条记录")
return jsonify({'ok': True, 'count': len(rows)})
@app.route('/api/validate/shipments-file', methods=['POST'])
@require_login
@require_any_role('admin','superadmin')
def validate_shipments_file():
"""验证发货记录Excel文件格式"""
f = request.files.get('file')
if not f:
return jsonify({'error': 'no file'}), 400
name = secure_filename(f.filename or '')
ext = (name.split('.')[-1] or '').lower()
if ext not in ['csv', 'xlsx', 'xls']:
return jsonify({'valid': False, 'message': '文件格式不支持请上传CSV或Excel文件'}), 200
try:
if ext == 'csv':
text = f.stream.read().decode('utf-8', errors='ignore')
lines = [l.strip() for l in text.splitlines() if l.strip()]
if not lines:
return jsonify({'valid': False, 'message': '文件为空,没有数据'}), 200
header = [h.strip() for h in lines[0].split(',')]
header_lower = [h.lower() for h in header]
# 检查必需的列
has_date = any('出货日期' in h or '发货日期' in h or 'date' in hl for h, hl in zip(header, header_lower))
has_box = any('箱号' in h or 'box' in hl for h, hl in zip(header, header_lower))
if not has_date:
return jsonify({'valid': False, 'message': '缺少必需的列:出货日期'}), 200
if not has_box:
return jsonify({'valid': False, 'message': '缺少必需的列:箱号'}), 200
# 检查SN列SN1-SN20
sn_cols = [h for h in header if h.startswith('SN') and h[2:].isdigit()]
if not sn_cols:
return jsonify({'valid': False, 'message': '缺少SN列SN1, SN2, ... SN20'}), 200
data_rows = len(lines) - 1
return jsonify({'valid': True, 'message': f'文件格式正确,包含{len(sn_cols)}个SN列{data_rows}行数据'}), 200
else:
import openpyxl
wb = openpyxl.load_workbook(f)
ws = wb.active
if ws.max_row < 2:
wb.close()
return jsonify({'valid': False, 'message': '文件为空,没有数据'}), 200
# 检查表头
header_row = list(ws.iter_rows(min_row=1, max_row=1, values_only=True))[0]
header = [str(h).strip() if h else '' for h in header_row]
header_lower = [h.lower() for h in header]
# 检查必需的列
has_date = any('出货日期' in h or '发货日期' in h or 'date' in hl for h, hl in zip(header, header_lower))
has_box = any('箱号' in h or 'box' in hl for h, hl in zip(header, header_lower))
if not has_date:
wb.close()
return jsonify({'valid': False, 'message': '缺少必需的列:出货日期'}), 200
if not has_box:
wb.close()
return jsonify({'valid': False, 'message': '缺少必需的列:箱号'}), 200
# 检查SN列
sn_cols = [h for h in header if h.startswith('SN') and h[2:].isdigit()]
if not sn_cols:
wb.close()
return jsonify({'valid': False, 'message': '缺少SN列SN1, SN2, ... SN20'}), 200
data_rows = ws.max_row - 1
wb.close()
return jsonify({'valid': True, 'message': f'文件格式正确,包含{len(sn_cols)}个SN列{data_rows}行数据'}), 200
except Exception as e:
return jsonify({'valid': False, 'message': f'读取文件失败:{str(e)}'}), 200
@app.get('/api/shipments/query-by-sn')
@require_login
def query_shipment_by_sn():
"""通过 SN/MAC 号查询出货信息"""
sn = request.args.get('sn', '').strip()
if not sn:
return jsonify({'error': '请提供 SN/MAC 号'}), 400
try:
r = get_redis()
redis_key = 'shipment_sn_mapping'
# 从 Redis Hash 中查询
result = r.hget(redis_key, sn)
if result:
# 解析 JSON 数据
shipment_info = json.loads(result)
platform = shipment_info.get('platform', 'pdd') # 默认拼多多
platform_name = {'pdd': '拼多多', 'yt': '圆通', 'tx': '兔喜'}.get(platform, platform)
return jsonify({
'found': True,
'sn': sn,
'date': shipment_info.get('date'),
'box': shipment_info.get('box'),
'platform': platform,
'platform_name': platform_name,
'ts': shipment_info.get('ts')
})
else:
return jsonify({
'found': False,
'sn': sn,
'message': '未找到该 SN 的出货记录'
})
except Exception as e:
log('query_shipment_error', str(e))
return jsonify({'error': f'查询失败:{str(e)}'}), 500
@app.get('/api/shipments/query-by-box')
@require_login
def query_shipment_by_box():
"""通过箱号查询出货信息"""
box_no = request.args.get('box', '').strip()
if not box_no:
return jsonify({'error': '请提供箱号'}), 400
try:
r = get_redis()
redis_key = 'shipment_sn_mapping'
# 获取所有记录
all_records = r.hgetall(redis_key)
# 筛选出匹配箱号的记录
matched_records = []
for sn, data in all_records.items():
try:
shipment_info = json.loads(data)
if shipment_info.get('box') == box_no:
platform = shipment_info.get('platform', 'pdd')
platform_name = {'pdd': '拼多多', 'yt': '圆通', 'tx': '兔喜'}.get(platform, platform)
matched_records.append({
'sn': sn.decode('utf-8') if isinstance(sn, bytes) else sn,
'date': shipment_info.get('date'),
'box': shipment_info.get('box'),
'platform': platform,
'platform_name': platform_name,
'ts': shipment_info.get('ts')
})
except:
continue
if matched_records:
return jsonify({
'found': True,
'box': box_no,
'count': len(matched_records),
'records': matched_records
})
else:
return jsonify({
'found': False,
'box': box_no,
'message': '未找到该箱号的出货记录'
})
except Exception as e:
log('query_shipment_by_box_error', str(e))
return jsonify({'error': f'查询失败:{str(e)}'}), 500
@app.post('/api/shipments/update-platform')
@require_login
@require_any_role('superadmin')
def update_shipments_platform():
"""批量更新Redis中发货记录的机种字段"""
try:
r = get_redis()
redis_key = 'shipment_sn_mapping'
# 获取所有记录
all_data = r.hgetall(redis_key)
updated_count = 0
pipe = r.pipeline()
for sn, value in all_data.items():
try:
info = json.loads(value)
# 如果没有platform字段添加为pdd
if 'platform' not in info:
info['platform'] = 'pdd'
pipe.hset(redis_key, sn, json.dumps(info, ensure_ascii=False))
updated_count += 1
except Exception:
continue
pipe.execute()
log('update_shipments_platform', f'updated {updated_count} records')
return jsonify({
'ok': True,
'message': f'已更新 {updated_count} 条记录为拼多多',
'updated': updated_count
})
except Exception as e:
log('update_shipments_platform_error', str(e))
return jsonify({'error': f'更新失败: {str(e)}'}), 500
@app.get('/api/shipments/redis-stats')
@require_login
def shipments_redis_stats():
"""获取 Redis 中发货记录的统计信息"""
try:
r = get_redis()
redis_key = 'shipment_sn_mapping'
count = r.hlen(redis_key)
return jsonify({
'key': redis_key,
'count': count,
'exists': r.exists(redis_key) > 0
})
except Exception as e:
log('shipments_redis_stats_error', str(e))
return jsonify({'error': f'获取统计失败:{str(e)}'}), 500
@app.get('/api/shipments/summary')
@require_login
def shipments_summary():
"""查询发货记录汇总信息(按日期范围)"""
try:
start_date = request.args.get('start')
end_date = request.args.get('end')
if not start_date or not end_date:
return jsonify({'error': '请提供开始和结束日期'}), 400
conn = get_db()
c = conn.cursor()
# 查询指定日期范围内的发货记录
c.execute('''
SELECT date, qty, receiver, ts
FROM shipments
WHERE date >= ? AND date <= ?
ORDER BY date DESC
''', (start_date, end_date))
rows = [dict(r) for r in c.fetchall()]
conn.close()
log('shipments_summary', f'start={start_date}, end={end_date}, count={len(rows)}')
return jsonify({
'ok': True,
'records': rows,
'count': len(rows)
})
except Exception as e:
log('shipments_summary_error', str(e))
return jsonify({'error': f'查询失败:{str(e)}'}), 500
@app.post('/api/shipments/clear-redis')
@require_login
@require_any_role('admin','superadmin')
def clear_shipments_redis():
"""清空 Redis 和 SQLite 中的发货记录数据"""
try:
# 清空 Redis
r = get_redis()
redis_key = 'shipment_sn_mapping'
redis_count = r.hlen(redis_key)
r.delete(redis_key)
# 同时清空 SQLite 中的 shipments 表
conn = get_db()
c = conn.cursor()
c.execute('SELECT COUNT(*) as cnt FROM shipments')
sqlite_count = c.fetchone()['cnt']
c.execute('DELETE FROM shipments')
conn.commit()
conn.close()
log('clear_shipments_all', f'cleared redis={redis_count}, sqlite={sqlite_count}')
return jsonify({
'ok': True,
'message': f'已清空 Redis {redis_count} 条记录和 SQLite {sqlite_count} 条记录',
'redis_count': redis_count,
'sqlite_count': sqlite_count
})
except Exception as e:
log('clear_shipments_redis_error', str(e))
return jsonify({'error': f'清空失败:{str(e)}'}), 500
@app.route('/api/upload/shipments-file', methods=['POST'])
@require_login
@require_any_role('admin','superadmin')
def upload_shipments_file():
"""上传发货记录Excel文件"""
f = request.files.get('file')
platform = request.form.get('platform') # 获取机种参数
if not f:
return jsonify({'error': '请选择文件'}), 400
if not platform or platform not in ['pdd', 'yt', 'tx']:
return jsonify({'error': '请选择机种(拼多多/圆通/兔喜)'}), 400
name = secure_filename(f.filename or '')
ext = (name.split('.')[-1] or '').lower()
if ext not in ['csv', 'xlsx', 'xls']:
return jsonify({'error': '文件格式不支持'}), 400
try:
rows = []
if ext == 'csv':
text = f.stream.read().decode('utf-8', errors='ignore')
lines = [l.strip() for l in text.splitlines() if l.strip()]
if len(lines) < 2:
return jsonify({'error': '文件为空'}), 400
header = [h.strip() for h in lines[0].split(',')]
header_lower = [h.lower() for h in header]
# 找到列索引
date_idx = next((i for i, h in enumerate(header) if '出货日期' in h or '发货日期' in h or 'date' in header_lower[i]), None)
box_idx = next((i for i, h in enumerate(header) if '箱号' in h or 'box' in header_lower[i]), None)
if date_idx is None or box_idx is None:
return jsonify({'error': '缺少必需的列'}), 400
# 找到所有SN列按数字排序
sn_cols = [(i, h) for i, h in enumerate(header) if h.startswith('SN') and h[2:].isdigit()]
sn_indices = sorted(sn_cols, key=lambda x: int(x[1][2:])) # 按 SN 后面的数字排序
# 记录上一个有效的日期(用于处理合并单元格)
last_valid_date = None
for line in lines[1:]:
parts = [p.strip() for p in line.split(',')]
if len(parts) <= max(date_idx, box_idx):
continue
# 处理日期(合并单元格时可能为空)
current_date = parts[date_idx] if date_idx < len(parts) and parts[date_idx] else ''
# 如果当前行日期为空,使用上一个有效日期
if current_date:
last_valid_date = current_date
date = current_date
else:
date = last_valid_date
# 处理箱号
box = parts[box_idx] if box_idx < len(parts) and parts[box_idx] else ''
# 如果没有日期或箱号,跳过这行
if not date or not box:
continue
# 收集所有SN横向 20 个)
sns = []
for idx, _ in sn_indices:
if idx < len(parts) and parts[idx]:
sns.append(parts[idx])
# 只有当有 SN 数据时才添加记录
if sns:
rows.append({
'date': date,
'box': box,
'sns': sns,
'qty': len(sns)
})
else:
import openpyxl
wb = openpyxl.load_workbook(f)
ws = wb.active
if ws.max_row < 2:
wb.close()
return jsonify({'error': '文件为空'}), 400
# 读取表头
header_row = list(ws.iter_rows(min_row=1, max_row=1, values_only=True))[0]
header = [str(h).strip() if h else '' for h in header_row]
header_lower = [h.lower() for h in header]
# 找到列索引
date_idx = next((i for i, h in enumerate(header) if '出货日期' in h or '发货日期' in h or 'date' in header_lower[i]), None)
box_idx = next((i for i, h in enumerate(header) if '箱号' in h or 'box' in header_lower[i]), None)
if date_idx is None or box_idx is None:
wb.close()
return jsonify({'error': '缺少必需的列'}), 400
# 找到所有SN列按数字排序
sn_cols = [(i, h) for i, h in enumerate(header) if h.startswith('SN') and h[2:].isdigit()]
sn_indices = sorted(sn_cols, key=lambda x: int(x[1][2:])) # 按 SN 后面的数字排序
# 记录上一个有效的日期(用于处理合并单元格)
last_valid_date = None
# 读取数据行
for row in ws.iter_rows(min_row=2, values_only=True):
# 处理日期(合并单元格时可能为 None
date_value = row[date_idx] if date_idx < len(row) else None
current_date = None
if date_value:
# 如果是 datetime.datetime 或 datetime.date 对象
if hasattr(date_value, 'strftime'):
current_date = date_value.strftime('%Y-%m-%d')
# 如果是数字Excel 日期序列号)
elif isinstance(date_value, (int, float)):
try:
# Excel 日期从 1900-01-01 开始计数
from datetime import datetime as dt, timedelta
# Excel 的 bug1900 不是闰年但 Excel 认为是所以需要减1
excel_epoch = dt(1899, 12, 30)
date_obj = excel_epoch + timedelta(days=float(date_value))
current_date = date_obj.strftime('%Y-%m-%d')
except Exception:
current_date = str(date_value).strip()
else:
current_date = str(date_value).strip()
if current_date == 'None':
current_date = None
# 如果当前行日期为空,使用上一个有效日期
if current_date:
last_valid_date = current_date
date = current_date
else:
date = last_valid_date
# 处理箱号
box = str(row[box_idx]).strip() if box_idx < len(row) and row[box_idx] else ''
# 如果没有日期或箱号,跳过这行
if not date or not box or box == 'None':
continue
# 收集所有SN横向 20 个)
sns = []
for idx, _ in sn_indices:
if idx < len(row) and row[idx]:
sn_value = str(row[idx]).strip()
if sn_value and sn_value != 'None':
sns.append(sn_value)
# 只有当有 SN 数据时才添加记录
if sns:
rows.append({
'date': date,
'box': box,
'sns': sns,
'qty': len(sns)
})
wb.close()
# 保存到 SQLite 数据库
conn = get_db()
c = conn.cursor()
# 使用北京时间UTC+8
from datetime import timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz).strftime('%Y-%m-%d %H:%M:%S')
total_qty = 0
for r in rows:
receiver_info = f"箱号:{r['box']}"
c.execute('INSERT INTO shipments(date, qty, receiver, ts) VALUES(?,?,?,?)',
(r['date'], r['qty'], receiver_info, now))
total_qty += r['qty']
conn.commit()
conn.close()
# 保存到 Redis - 使用 Hash 结构存储 MAC -> 出货信息的映射
try:
r = get_redis()
redis_key = 'shipment_sn_mapping' # Redis Hash key
# 使用 pipeline 批量写入,提高性能
pipe = r.pipeline()
redis_count = 0
for row_data in rows:
date = row_data['date']
box = row_data['box']
sns = row_data['sns']
# 为每个 SN/MAC 创建映射记录
for sn in sns:
if sn: # 确保 SN 不为空
# 存储格式: MAC -> JSON(date, box, platform, timestamp)
shipment_info = json.dumps({
'date': date,
'box': box,
'platform': platform,
'ts': now
}, ensure_ascii=False)
pipe.hset(redis_key, sn, shipment_info)
redis_count += 1
# 执行批量写入
pipe.execute()
log('upload_shipments_redis', f"redis_key={redis_key}, sn_count={redis_count}")
except Exception as redis_error:
# Redis 写入失败不影响主流程,只记录日志
log('upload_shipments_redis_error', str(redis_error))
log('upload_shipments_file', f"boxes={len(rows)}, total_qty={total_qty}")
notify_superadmin('批量上传发货记录文件', f"箱数: {len(rows)}, 总数量: {total_qty}")
return jsonify({'ok': True, 'count': len(rows), 'total_qty': total_qty})
except Exception as e:
log('upload_shipments_file_error', str(e))
return jsonify({'error': f'处理文件失败:{str(e)}'}), 500
# SOP 文件管理
SOP_DIR = os.path.join(FRONTEND_DIR, 'sop_files')
os.makedirs(SOP_DIR, exist_ok=True)
@app.get('/api/sop/list')
@require_login
def list_sop_files():
"""获取所有 SOP 文件列表"""
conn = get_db()
c = conn.cursor()
c.execute('SELECT id, filename, original_name, description, uploader, ts FROM sop_files ORDER BY id DESC')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.post('/api/sop/upload')
@require_login
@require_any_role('admin','superadmin')
def upload_sop_file():
"""上传 SOP 文件"""
f = request.files.get('file')
description = request.form.get('description', '').strip()
if not f:
return jsonify({'error': '请选择文件'}), 400
# 保留原始文件名(包含中文)
original_name = f.filename or 'sop.xlsx'
# 获取扩展名
if '.' in original_name:
ext = original_name.rsplit('.', 1)[1].lower()
else:
ext = ''
# 验证文件类型Excel 和 Word 文件)
if ext not in ['xlsx', 'xls', 'csv', 'doc', 'docx']:
return jsonify({'error': '只支持 Excel 和 Word 文件格式(.xlsx, .xls, .csv, .doc, .docx'}), 400
# 生成唯一文件名(用于存储)
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
# 存储文件名使用安全的名称
safe_name = secure_filename(original_name)
if not safe_name or safe_name == ext:
# 如果 secure_filename 返回空或只有扩展名,使用时间戳
safe_name = f'file.{ext}'
filename = f'sop_{timestamp}_{safe_name}'
filepath = os.path.join(SOP_DIR, filename)
# 保存文件
f.save(filepath)
# 保存到数据库
conn = get_db()
c = conn.cursor()
uploader = session.get('user_id')
# 获取上传者用户名
c.execute('SELECT username FROM users WHERE id=?', (uploader,))
user = c.fetchone()
uploader_name = user['username'] if user else '未知'
c.execute('INSERT INTO sop_files(filename, original_name, description, uploader, ts) VALUES(?,?,?,?,?)',
(filename, original_name, description, uploader_name, get_beijing_time()))
conn.commit()
conn.close()
log('upload_sop', f'filename={original_name}, description={description}')
notify_superadmin('上传 SOP 文件', f'文件名: {original_name}')
return jsonify({'ok': True, 'message': 'SOP 文件上传成功'})
@app.get('/api/sop/download/<int:file_id>')
@require_login
def download_sop_file(file_id):
"""下载 SOP 文件"""
conn = get_db()
c = conn.cursor()
c.execute('SELECT filename, original_name FROM sop_files WHERE id=?', (file_id,))
row = c.fetchone()
conn.close()
if not row:
return jsonify({'error': '文件不存在'}), 404
filepath = os.path.join(SOP_DIR, row['filename'])
if not os.path.exists(filepath):
return jsonify({'error': '文件已被删除'}), 404
log('download_sop', f'file_id={file_id}, filename={row["original_name"]}')
from flask import send_file
return send_file(
filepath,
as_attachment=True,
download_name=row['original_name']
)
@app.post('/api/sop/delete/<int:file_id>')
@require_login
@require_any_role('admin','superadmin')
def delete_sop_file(file_id):
"""删除 SOP 文件"""
conn = get_db()
c = conn.cursor()
c.execute('SELECT filename, original_name FROM sop_files WHERE id=?', (file_id,))
row = c.fetchone()
if not row:
conn.close()
return jsonify({'error': '文件不存在'}), 404
# 删除物理文件
filepath = os.path.join(SOP_DIR, row['filename'])
if os.path.exists(filepath):
try:
os.remove(filepath)
except Exception as e:
log('delete_sop_file_error', str(e))
# 删除数据库记录
c.execute('DELETE FROM sop_files WHERE id=?', (file_id,))
conn.commit()
conn.close()
log('delete_sop', f'file_id={file_id}, filename={row["original_name"]}')
notify_superadmin('删除 SOP 文件', f'文件名: {row["original_name"]}')
return jsonify({'ok': True, 'message': 'SOP 文件已删除'})
# ==================== 工单管理 API ====================
@app.get('/api/work-orders')
@require_login
def get_work_orders():
"""获取工单列表"""
factory = request.args.get('factory', '')
order_no = request.args.get('order', '')
date = request.args.get('date', '')
# 获取当前用户信息
user_id = session.get('user_id')
user_role = session.get('role')
conn = get_db()
c = conn.cursor()
# 获取用户所属工厂
c.execute('SELECT factory FROM users WHERE id=?', (user_id,))
user_row = c.fetchone()
user_factory = user_row['factory'] if user_row and user_row['factory'] else None
query = 'SELECT * FROM work_orders WHERE 1=1'
params = []
# 如果是管理员(非超级管理员),只能看到自己工厂的订单
if user_role == 'admin' and user_factory:
query += ' AND factory = ?'
params.append(user_factory)
if factory:
query += ' AND factory LIKE ?'
params.append(f'%{factory}%')
if order_no:
query += ' AND order_no LIKE ?'
params.append(f'%{order_no}%')
if date:
query += ' AND (production_start_time LIKE ? OR production_end_time LIKE ?)'
params.append(f'{date}%')
params.append(f'{date}%')
query += ' ORDER BY created_at DESC'
c.execute(query, params)
rows = c.fetchall()
conn.close()
orders = []
for row in rows:
orders.append({
'id': str(row['id']),
'factory': row['factory'],
'orderNo': row['order_no'],
'productModel': row['product_model'] if 'product_model' in row.keys() else '',
'orderQty': row['order_qty'],
'productionStartTime': row['production_start_time'],
'productionEndTime': row['production_end_time'],
'status': row['status'],
'statusText': row['status_text'],
'remark': row['remark'],
'createdBy': row['created_by'],
'createdAt': row['created_at']
})
return jsonify({'ok': True, 'data': orders})
@app.post('/api/work-orders')
@require_login
@require_any_role('admin','superadmin')
def create_work_order():
"""创建工单"""
data = request.get_json()
factory = data.get('factory', '').strip()
order_no = data.get('orderNo', '').strip()
product_model = data.get('productModel', '').strip()
order_qty = data.get('orderQty', 0)
production_start_time = data.get('productionStartTime', '')
production_end_time = data.get('productionEndTime', '')
remark = data.get('remark', '').strip()
# 获取当前用户信息
user_id = session.get('user_id')
user_role = session.get('role')
username = session.get('username', '')
conn = get_db()
c = conn.cursor()
# 如果是管理员(非超级管理员),强制使用用户自己的工厂
if user_role == 'admin':
c.execute('SELECT factory FROM users WHERE id=?', (user_id,))
user_row = c.fetchone()
if user_row and user_row['factory']:
factory = user_row['factory']
else:
conn.close()
return jsonify({'error': '您的账户未设置所属工厂,请联系超级管理员'}), 400
if not factory or not order_no or not order_qty:
conn.close()
return jsonify({'error': '请填写所有必填项'}), 400
now = get_beijing_time()
c.execute('''INSERT INTO work_orders(
factory, order_no, product_model, order_qty, production_start_time, production_end_time,
status, status_text, remark, created_by, created_at, updated_at
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?)''', (
factory, order_no, product_model, order_qty, production_start_time, production_end_time,
'issued', '已下发', remark, username, now, now
))
order_id = c.lastrowid
conn.commit()
conn.close()
log('create_work_order', f'工单号: {order_no}, 工厂: {factory}, 型号: {product_model}')
# 如果是超级管理员添加工单,通知该工厂的管理员
notify_admins_by_factory('添加工单', f'工单号: {order_no}, 工厂: {factory}, 数量: {order_qty}', factory=factory)
return jsonify({'ok': True, 'id': order_id, 'message': '工单创建成功'})
@app.put('/api/work-orders/<int:order_id>')
@require_login
@require_any_role('superadmin')
def update_work_order(order_id):
"""更新工单(仅超级管理员)"""
data = request.get_json()
factory = data.get('factory', '').strip()
order_no = data.get('orderNo', '').strip()
product_model = data.get('productModel', '').strip()
order_qty = data.get('orderQty', 0)
production_start_time = data.get('productionStartTime', '')
production_end_time = data.get('productionEndTime', '')
remark = data.get('remark', '').strip()
if not factory or not order_no or not order_qty:
return jsonify({'error': '请填写所有必填项'}), 400
conn = get_db()
c = conn.cursor()
# 检查工单是否存在
c.execute('SELECT id FROM work_orders WHERE id=?', (order_id,))
if not c.fetchone():
conn.close()
return jsonify({'error': '工单不存在'}), 404
now = get_beijing_time()
c.execute('''UPDATE work_orders SET
factory=?, order_no=?, product_model=?, order_qty=?, production_start_time=?, production_end_time=?,
remark=?, updated_at=?
WHERE id=?''', (
factory, order_no, product_model, order_qty, production_start_time, production_end_time,
remark, now, order_id
))
conn.commit()
conn.close()
log('update_work_order', f'工单ID: {order_id}, 工单号: {order_no}, 型号: {product_model}')
notify_admins_by_factory('更新工单', f'工单号: {order_no}, 工厂: {factory}, 型号: {product_model}', factory=factory)
return jsonify({'ok': True, 'message': '工单更新成功'})
@app.delete('/api/work-orders/<int:order_id>')
@require_login
@require_any_role('superadmin')
def delete_work_order(order_id):
"""删除工单(仅超级管理员)"""
conn = get_db()
c = conn.cursor()
# 获取工单信息用于日志
c.execute('SELECT order_no, factory FROM work_orders WHERE id=?', (order_id,))
row = c.fetchone()
if not row:
conn.close()
return jsonify({'error': '工单不存在'}), 404
order_no = row['order_no']
factory = row['factory']
c.execute('DELETE FROM work_orders WHERE id=?', (order_id,))
conn.commit()
conn.close()
log('delete_work_order', f'工单ID: {order_id}, 工单号: {order_no}')
notify_admins_by_factory('删除工单', f'工单号: {order_no}, 工厂: {factory}', factory=factory)
return jsonify({'ok': True, 'message': '工单删除成功'})
@app.post('/api/work-orders/<int:order_id>/confirm')
@require_login
@require_any_role('admin','superadmin')
def confirm_work_order(order_id):
"""确认工单"""
# 获取当前用户信息
user_id = session.get('user_id')
user_role = session.get('role')
conn = get_db()
c = conn.cursor()
# 检查工单是否存在
c.execute('SELECT order_no, factory, status FROM work_orders WHERE id=?', (order_id,))
row = c.fetchone()
if not row:
conn.close()
return jsonify({'error': '工单不存在'}), 404
order_no = row['order_no']
factory = row['factory']
current_status = row['status']
# 如果是管理员(非超级管理员),检查工单是否属于自己的工厂
if user_role == 'admin':
c.execute('SELECT factory FROM users WHERE id=?', (user_id,))
user_row = c.fetchone()
user_factory = user_row['factory'] if user_row and user_row['factory'] else None
if not user_factory or factory != user_factory:
conn.close()
return jsonify({'error': '您只能确认自己工厂的工单'}), 403
# 如果已经确认,返回提示
if current_status == 'confirmed':
conn.close()
return jsonify({'ok': True, 'message': '工单已确认'})
# 更新状态为已确认
now = get_beijing_time()
c.execute('''UPDATE work_orders SET
status=?, status_text=?, updated_at=?
WHERE id=?''', (
'confirmed', '已确认', now, order_id
))
conn.commit()
conn.close()
log('confirm_work_order', f'工单ID: {order_id}, 工单号: {order_no}')
notify_superadmin('确认工单', f'工单号: {order_no}, 工厂: {factory}')
return jsonify({'ok': True, 'message': '工单确认成功'})
# ==================== 物料清单-采购 API ====================
def convert_material_purchase_to_camel(row):
"""将数据库行转换为驼峰命名的字典"""
return {
'id': row['id'],
'title': row['title'],
'listNo': row['list_no'],
'planNo': row['plan_no'],
'bomResult': row['bom_result'],
'status': row['status'],
'demandStatus': row['demand_status'],
'completeRate': row['complete_rate'],
'materialCode': row['material_code'],
'materialName': row['material_name'],
'batchNo': row['batch_no'],
'level': row['level'],
'requiredQty': row['required_qty'],
'stockQty': row['stock_qty'],
'shortage': row['shortage'],
'acquireMethod': row['acquire_method'],
'realtimeStock': row['realtime_stock'],
'pendingQty': row['pending_qty'],
'dispatchedQty': row['dispatched_qty'],
'receivedQty': row['received_qty'],
'submitter': row['submitter'],
'submitTime': row['submit_time'],
'updateTime': row['update_time'],
'deleted': row['deleted'],
'deletedAt': row['deleted_at']
}
@app.get('/api/material-purchase/list')
@require_login
@require_any_role('superadmin')
def list_material_purchase():
"""获取物料清单列表(不包括已删除的)"""
conn = get_db()
c = conn.cursor()
c.execute('''SELECT * FROM material_purchase
WHERE deleted=0
ORDER BY submit_time DESC''')
rows = c.fetchall()
conn.close()
return jsonify({'list': [convert_material_purchase_to_camel(r) for r in rows]})
@app.get('/api/material-purchase/recycle-bin')
@require_login
@require_any_role('superadmin')
def list_material_purchase_recycle_bin():
"""获取回收站列表"""
conn = get_db()
c = conn.cursor()
c.execute('''SELECT * FROM material_purchase
WHERE deleted=1
ORDER BY deleted_at DESC''')
rows = c.fetchall()
conn.close()
return jsonify({'list': [convert_material_purchase_to_camel(r) for r in rows]})
@app.post('/api/material-purchase/add')
@require_login
@require_any_role('superadmin')
def add_material_purchase():
"""新增物料需求"""
data = request.get_json() or {}
required_fields = ['title', 'list_no', 'plan_no', 'status', 'demand_status',
'material_code', 'material_name', 'required_qty']
for field in required_fields:
if not data.get(field):
return jsonify({'error': f'缺少必填字段: {field}'}), 400
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
username = session.get('username', '')
c.execute('''INSERT INTO material_purchase(
title, list_no, plan_no, bom_result, status, demand_status,
complete_rate, material_code, material_name, batch_no, level,
required_qty, stock_qty, shortage, acquire_method, realtime_stock,
pending_qty, dispatched_qty, received_qty, submitter, submit_time, update_time
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)''', (
data.get('title'), data.get('list_no'), data.get('plan_no'),
data.get('bom_result'), data.get('status'), data.get('demand_status'),
data.get('complete_rate'), data.get('material_code'), data.get('material_name'),
data.get('batch_no'), data.get('level'), data.get('required_qty'),
data.get('stock_qty'), data.get('shortage'), data.get('acquire_method'),
data.get('realtime_stock'), data.get('pending_qty'), data.get('dispatched_qty'),
data.get('received_qty'), username, now, now
))
conn.commit()
item_id = c.lastrowid
conn.close()
log('add_material_purchase', f'物料编码: {data.get("material_code")}')
return jsonify({'ok': True, 'id': item_id})
@app.post('/api/material-purchase/update')
@require_login
@require_any_role('superadmin')
def update_material_purchase():
"""更新物料需求"""
data = request.get_json() or {}
item_id = data.get('id')
if not item_id:
return jsonify({'error': '缺少ID'}), 400
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
c.execute('''UPDATE material_purchase SET
title=?, list_no=?, plan_no=?, bom_result=?, status=?, demand_status=?,
complete_rate=?, material_code=?, material_name=?, batch_no=?, level=?,
required_qty=?, stock_qty=?, shortage=?, acquire_method=?, realtime_stock=?,
pending_qty=?, dispatched_qty=?, received_qty=?, update_time=?
WHERE id=?''', (
data.get('title'), data.get('list_no'), data.get('plan_no'),
data.get('bom_result'), data.get('status'), data.get('demand_status'),
data.get('complete_rate'), data.get('material_code'), data.get('material_name'),
data.get('batch_no'), data.get('level'), data.get('required_qty'),
data.get('stock_qty'), data.get('shortage'), data.get('acquire_method'),
data.get('realtime_stock'), data.get('pending_qty'), data.get('dispatched_qty'),
data.get('received_qty'), now, item_id
))
conn.commit()
conn.close()
log('update_material_purchase', f'ID: {item_id}')
return jsonify({'ok': True})
@app.post('/api/material-purchase/delete')
@require_login
@require_any_role('superadmin')
def delete_material_purchase():
"""删除物料需求(移到回收站)"""
data = request.get_json() or {}
item_ids = data.get('ids', [])
if not item_ids:
return jsonify({'error': '缺少ID'}), 400
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
placeholders = ','.join('?' * len(item_ids))
c.execute(f'''UPDATE material_purchase SET
deleted=1, deleted_at=?
WHERE id IN ({placeholders})''', [now] + item_ids)
conn.commit()
conn.close()
log('delete_material_purchase', f'删除数量: {len(item_ids)}')
return jsonify({'ok': True, 'count': len(item_ids)})
@app.post('/api/material-purchase/restore')
@require_login
@require_any_role('superadmin')
def restore_material_purchase():
"""从回收站恢复"""
data = request.get_json() or {}
item_id = data.get('id')
if not item_id:
return jsonify({'error': '缺少ID'}), 400
conn = get_db()
c = conn.cursor()
c.execute('''UPDATE material_purchase SET
deleted=0, deleted_at=NULL
WHERE id=?''', (item_id,))
conn.commit()
conn.close()
log('restore_material_purchase', f'ID: {item_id}')
return jsonify({'ok': True})
@app.post('/api/material-purchase/permanent-delete')
@require_login
@require_any_role('superadmin')
def permanent_delete_material_purchase():
"""永久删除"""
data = request.get_json() or {}
item_id = data.get('id')
if not item_id:
return jsonify({'error': '缺少ID'}), 400
conn = get_db()
c = conn.cursor()
c.execute('DELETE FROM material_purchase WHERE id=?', (item_id,))
conn.commit()
conn.close()
log('permanent_delete_material_purchase', f'ID: {item_id}')
return jsonify({'ok': True})
@app.post('/api/material-purchase/empty-recycle-bin')
@require_login
@require_any_role('superadmin')
def empty_recycle_bin():
"""清空回收站"""
conn = get_db()
c = conn.cursor()
c.execute('DELETE FROM material_purchase WHERE deleted=1')
count = c.rowcount
conn.commit()
conn.close()
log('empty_recycle_bin', f'清空数量: {count}')
return jsonify({'ok': True, 'count': count})
@app.route('/api/validate/material-purchase-file', methods=['POST'])
@require_login
@require_any_role('superadmin')
def validate_material_purchase_file():
"""验证物料清单Excel文件格式"""
f = request.files.get('file')
if not f:
return jsonify({'error': 'no file'}), 400
name = secure_filename(f.filename or '')
ext = (name.split('.')[-1] or '').lower()
if ext not in ['xlsx', 'xls']:
return jsonify({'valid': False, 'message': '文件格式不支持请上传Excel文件.xlsx或.xls'}), 200
try:
import openpyxl
wb = openpyxl.load_workbook(f)
ws = wb.active
if ws.max_row < 2:
wb.close()
return jsonify({'valid': False, 'message': '文件为空,没有数据'}), 200
# 检查表头
header_row = list(ws.iter_rows(min_row=1, max_row=1, values_only=True))[0]
header = [str(h).strip() if h else '' for h in header_row]
# 必需的列
required_columns = ['标题', '生产计划明细物料需求清单编号', '生产计划编号', '状态',
'需求状态', '物料编码', '物料名称', '所需物料数']
missing_columns = []
for col in required_columns:
if col not in header:
missing_columns.append(col)
if missing_columns:
wb.close()
return jsonify({
'valid': False,
'message': f'缺少必需的列:{", ".join(missing_columns)}'
}), 200
data_rows = ws.max_row - 1
wb.close()
return jsonify({
'valid': True,
'message': f'文件格式正确,共{data_rows}行数据'
}), 200
except Exception as e:
return jsonify({'valid': False, 'message': f'读取文件失败:{str(e)}'}), 200
@app.post('/api/upload/material-purchase-file')
@require_login
@require_any_role('superadmin')
def upload_material_purchase_file():
"""上传物料清单Excel文件"""
f = request.files.get('file')
if not f:
return jsonify({'error': 'no file'}), 400
try:
import openpyxl
wb = openpyxl.load_workbook(f)
ws = wb.active
# 获取表头
header_row = list(ws.iter_rows(min_row=1, max_row=1, values_only=True))[0]
header = [str(h).strip() if h else '' for h in header_row]
# 创建列索引映射
col_map = {h: i for i, h in enumerate(header)}
# 辅助函数:解析百分比字符串
def parse_percentage(value):
if value is None:
return 0
if isinstance(value, (int, float)):
return float(value)
# 如果是字符串,去除%符号
value_str = str(value).strip()
if value_str.endswith('%'):
value_str = value_str[:-1]
try:
return float(value_str)
except:
return 0
# 辅助函数:安全转换为整数
def safe_int(value, default=0):
if value is None:
return default
try:
return int(float(value))
except:
return default
# 读取数据
rows = []
for row in ws.iter_rows(min_row=2, values_only=True):
if not any(row): # 跳过空行
continue
# 辅助函数:安全获取单元格值
def get_cell_value(col_name, default=''):
if col_name not in col_map:
return default
idx = col_map[col_name]
if idx >= len(row):
return default
value = row[idx]
return value if value is not None else default
# 提取数据
item = {
'title': str(get_cell_value('标题') or ''),
'list_no': str(get_cell_value('生产计划明细物料需求清单编号') or ''),
'plan_no': str(get_cell_value('生产计划编号') or ''),
'bom_result': str(get_cell_value('产品BOM分析结果') or ''),
'status': str(get_cell_value('状态') or 'pending'),
'demand_status': str(get_cell_value('需求状态') or 'normal'),
'complete_rate': parse_percentage(get_cell_value('物料齐套率', 0)),
'material_code': str(get_cell_value('物料编码') or ''),
'material_name': str(get_cell_value('物料名称') or ''),
'batch_no': str(get_cell_value('物料批次号') or ''),
'level': safe_int(get_cell_value('物料层级', 1), 1),
'required_qty': safe_int(get_cell_value('所需物料数', 0), 0),
'stock_qty': safe_int(get_cell_value('库存现有物料数', 0), 0),
'shortage': safe_int(get_cell_value('欠缺值', 0), 0),
'acquire_method': str(get_cell_value('物料获取方式') or ''),
'realtime_stock': safe_int(get_cell_value('实时库存值', 0), 0),
'pending_qty': safe_int(get_cell_value('待入库数量', 0), 0),
'dispatched_qty': safe_int(get_cell_value('派发数量', 0), 0),
'received_qty': safe_int(get_cell_value('入库数量', 0), 0),
}
# 验证必填字段
if not item['title'] or not item['list_no'] or not item['material_code']:
continue
rows.append(item)
wb.close()
if not rows:
return jsonify({'error': '文件中没有有效数据'}), 400
# 批量插入数据库
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
username = session.get('username', '')
for item in rows:
c.execute('''INSERT INTO material_purchase(
title, list_no, plan_no, bom_result, status, demand_status,
complete_rate, material_code, material_name, batch_no, level,
required_qty, stock_qty, shortage, acquire_method, realtime_stock,
pending_qty, dispatched_qty, received_qty, submitter, submit_time, update_time
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)''', (
item['title'], item['list_no'], item['plan_no'], item['bom_result'],
item['status'], item['demand_status'], item['complete_rate'],
item['material_code'], item['material_name'], item['batch_no'],
item['level'], item['required_qty'], item['stock_qty'], item['shortage'],
item['acquire_method'], item['realtime_stock'], item['pending_qty'],
item['dispatched_qty'], item['received_qty'], username, now, now
))
conn.commit()
conn.close()
log('upload_material_purchase_file', f'导入数量: {len(rows)}')
return jsonify({'ok': True, 'count': len(rows), 'message': f'成功导入{len(rows)}条数据'})
except Exception as e:
return jsonify({'error': f'导入失败:{str(e)}'}), 500
# ==================== 客户订单 API ====================
@app.get('/api/customer-orders')
@require_login
@require_any_role('superadmin')
def get_customer_orders():
"""获取客户订单列表"""
conn = get_db()
c = conn.cursor()
c.execute('''SELECT id, order_date, order_no, customer_name, material, quantity, unit_price,
created_by, created_at, updated_at
FROM customer_orders
ORDER BY order_date DESC, id DESC''')
rows = c.fetchall()
conn.close()
orders = []
for row in rows:
orders.append({
'id': row['id'],
'order_date': row['order_date'],
'order_no': row['order_no'],
'customer_name': row['customer_name'] if 'customer_name' in row.keys() else '',
'material': row['material'],
'quantity': row['quantity'],
'unit_price': row['unit_price'],
'created_by': row['created_by'],
'created_at': row['created_at'],
'updated_at': row['updated_at']
})
return jsonify({'list': orders})
@app.post('/api/customer-orders')
@require_login
@require_any_role('superadmin')
def create_customer_order():
"""创建客户订单"""
data = request.get_json() or {}
order_date = data.get('order_date', '').strip()
order_no = data.get('order_no', '').strip()
customer_name = data.get('customer_name', '').strip()
material = data.get('material', '').strip()
quantity = data.get('quantity', 0)
unit_price = data.get('unit_price', 0)
if not order_date or not order_no or not customer_name or not material:
return jsonify({'error': '请填写所有必填项'}), 400
if quantity <= 0:
return jsonify({'error': '订单数量必须大于0'}), 400
if unit_price < 0:
return jsonify({'error': '单价不能为负数'}), 400
username = session.get('username', '')
now = get_beijing_time()
conn = get_db()
c = conn.cursor()
c.execute('''INSERT INTO customer_orders(
order_date, order_no, customer_name, material, quantity, unit_price,
created_by, created_at, updated_at
) VALUES(?,?,?,?,?,?,?,?,?)''', (
order_date, order_no, customer_name, material, quantity, unit_price,
username, now, now
))
order_id = c.lastrowid
conn.commit()
conn.close()
log('create_customer_order', f'客户: {customer_name}, 订单号: {order_no}, 物料: {material}, 数量: {quantity}')
return jsonify({'ok': True, 'id': order_id, 'message': '订单创建成功'})
@app.put('/api/customer-orders/<int:order_id>')
@require_login
@require_any_role('superadmin')
def update_customer_order(order_id):
"""更新客户订单"""
data = request.get_json()
order_date = data.get('order_date', '').strip()
order_no = data.get('order_no', '').strip()
customer_name = data.get('customer_name', '').strip()
material = data.get('material', '').strip()
quantity = data.get('quantity')
unit_price = data.get('unit_price')
if not all([order_date, order_no, customer_name, material, quantity is not None, unit_price is not None]):
return jsonify({'error': '请填写所有必填项'}), 400
try:
quantity = int(quantity)
unit_price = float(unit_price)
except (ValueError, TypeError):
return jsonify({'error': '数量和单价必须是有效数字'}), 400
if quantity <= 0:
return jsonify({'error': '数量必须大于0'}), 400
if unit_price < 0:
return jsonify({'error': '单价不能为负数'}), 400
conn = get_db()
c = conn.cursor()
# 检查订单是否存在
c.execute('SELECT id FROM customer_orders WHERE id=?', (order_id,))
if not c.fetchone():
conn.close()
return jsonify({'error': '订单不存在'}), 404
# 更新订单
c.execute('''UPDATE customer_orders
SET order_date=?, order_no=?, customer_name=?, material=?,
quantity=?, unit_price=?, updated_at=?, updated_by=?
WHERE id=?''',
(order_date, order_no, customer_name, material, quantity, unit_price,
datetime.now(timezone(timedelta(hours=8))).isoformat(),
session.get('username', 'unknown'), order_id))
conn.commit()
conn.close()
log('update_customer_order', f'订单ID: {order_id}, 客户: {customer_name}, 订单号: {order_no}')
return jsonify({'ok': True, 'message': '订单更新成功'})
@app.delete('/api/customer-orders/<int:order_id>')
@require_login
@require_any_role('superadmin')
def delete_customer_order(order_id):
"""删除客户订单"""
conn = get_db()
c = conn.cursor()
# 获取订单信息用于日志
c.execute('SELECT order_no, customer_name, material FROM customer_orders WHERE id=?', (order_id,))
row = c.fetchone()
if not row:
conn.close()
return jsonify({'error': '订单不存在'}), 404
order_no = row['order_no']
customer_name = row['customer_name'] if 'customer_name' in row.keys() else ''
material = row['material']
c.execute('DELETE FROM customer_orders WHERE id=?', (order_id,))
conn.commit()
conn.close()
log('delete_customer_order', f'订单ID: {order_id}, 客户: {customer_name}, 订单号: {order_no}')
return jsonify({'ok': True, 'message': '订单删除成功'})
# 对账单管理
@app.get('/api/reconciliations')
@require_login
def get_reconciliations():
"""获取对账单列表"""
conn = get_db()
c = conn.cursor()
c.execute('''
SELECT id, order_date, contract_no, material_name, spec_model, transport_no,
quantity, unit, unit_price, total_amount, delivery_date, shipment_date,
created_by, created_at, updated_at
FROM reconciliations
ORDER BY id ASC
''')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.post('/api/reconciliations')
@require_login
@require_any_role('admin', 'superadmin')
def create_reconciliation():
"""创建对账单"""
data = request.get_json() or {}
order_date = format_date_to_slash(data.get('order_date'))
contract_no = (data.get('contract_no') or '').strip()
material_name = (data.get('material_name') or '').strip()
spec_model = (data.get('spec_model') or '').strip()
transport_no = (data.get('transport_no') or '').strip()
quantity = data.get('quantity')
unit = (data.get('unit') or 'pcs').strip()
unit_price = data.get('unit_price')
total_amount = data.get('total_amount')
delivery_date = format_date_to_slash(data.get('delivery_date'))
shipment_date = format_date_to_slash(data.get('shipment_date'))
# 验证必填字段
if not all([order_date, contract_no, material_name, spec_model, quantity, unit, unit_price is not None]):
return jsonify({'error': '请填写所有必填字段'}), 400
try:
quantity = int(quantity)
unit_price = float(unit_price)
total_amount = float(total_amount) if total_amount else quantity * unit_price
except (ValueError, TypeError):
return jsonify({'error': '数量、单价或金额格式不正确'}), 400
if quantity <= 0:
return jsonify({'error': '数量必须大于0'}), 400
if unit_price < 0:
return jsonify({'error': '单价不能为负数'}), 400
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
username = session.get('username', '')
try:
c.execute('''
INSERT INTO reconciliations(
order_date, contract_no, material_name, spec_model, transport_no,
quantity, unit, unit_price, total_amount, delivery_date, shipment_date,
created_by, created_at, updated_at
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)
''', (
order_date, contract_no, material_name, spec_model, transport_no,
quantity, unit, unit_price, total_amount, delivery_date, shipment_date,
username, now, now
))
conn.commit()
reconciliation_id = c.lastrowid
conn.close()
log('create_reconciliation', f'合同号: {contract_no}, 物料: {material_name}, 数量: {quantity}')
notify_superadmin('新增对账单', f'合同号: {contract_no}, 物料: {material_name}')
return jsonify({'ok': True, 'id': reconciliation_id, 'message': '对账单创建成功'})
except Exception as e:
conn.close()
return jsonify({'error': f'创建失败:{str(e)}'}), 500
@app.put('/api/reconciliations/<int:reconciliation_id>')
@require_login
@require_any_role('admin', 'superadmin')
def update_reconciliation(reconciliation_id):
"""更新对账单"""
data = request.get_json() or {}
order_date = format_date_to_slash(data.get('order_date'))
contract_no = (data.get('contract_no') or '').strip()
material_name = (data.get('material_name') or '').strip()
spec_model = (data.get('spec_model') or '').strip()
transport_no = (data.get('transport_no') or '').strip()
quantity = data.get('quantity')
unit = (data.get('unit') or 'pcs').strip()
unit_price = data.get('unit_price')
total_amount = data.get('total_amount')
delivery_date = format_date_to_slash(data.get('delivery_date'))
shipment_date = format_date_to_slash(data.get('shipment_date'))
# 验证必填字段
if not all([order_date, contract_no, material_name, spec_model, quantity, unit, unit_price is not None]):
return jsonify({'error': '请填写所有必填字段'}), 400
try:
quantity = int(quantity)
unit_price = float(unit_price)
total_amount = float(total_amount) if total_amount else quantity * unit_price
except (ValueError, TypeError):
return jsonify({'error': '数量、单价或金额格式不正确'}), 400
if quantity <= 0:
return jsonify({'error': '数量必须大于0'}), 400
if unit_price < 0:
return jsonify({'error': '单价不能为负数'}), 400
conn = get_db()
c = conn.cursor()
# 检查对账单是否存在
c.execute('SELECT id FROM reconciliations WHERE id=?', (reconciliation_id,))
if not c.fetchone():
conn.close()
return jsonify({'error': '对账单不存在'}), 404
now = get_beijing_time()
try:
c.execute('''
UPDATE reconciliations SET
order_date=?, contract_no=?, material_name=?, spec_model=?, transport_no=?,
quantity=?, unit=?, unit_price=?, total_amount=?, delivery_date=?, shipment_date=?,
updated_at=?
WHERE id=?
''', (
order_date, contract_no, material_name, spec_model, transport_no,
quantity, unit, unit_price, total_amount, delivery_date, shipment_date,
now, reconciliation_id
))
conn.commit()
conn.close()
log('update_reconciliation', f'对账单ID: {reconciliation_id}, 合同号: {contract_no}, 物料: {material_name}')
notify_superadmin('更新对账单', f'对账单ID: {reconciliation_id}, 合同号: {contract_no}')
return jsonify({'ok': True, 'message': '对账单更新成功'})
except Exception as e:
conn.close()
return jsonify({'error': f'更新失败:{str(e)}'}), 500
@app.delete('/api/reconciliations/<int:reconciliation_id>')
@require_login
@require_any_role('admin', 'superadmin')
def delete_reconciliation(reconciliation_id):
"""删除对账单"""
conn = get_db()
c = conn.cursor()
# 获取对账单信息用于日志
c.execute('SELECT contract_no, material_name FROM reconciliations WHERE id=?', (reconciliation_id,))
row = c.fetchone()
if not row:
conn.close()
return jsonify({'error': '对账单不存在'}), 404
contract_no = row['contract_no']
material_name = row['material_name']
c.execute('DELETE FROM reconciliations WHERE id=?', (reconciliation_id,))
conn.commit()
conn.close()
log('delete_reconciliation', f'对账单ID: {reconciliation_id}, 合同号: {contract_no}')
return jsonify({'ok': True, 'message': '对账单删除成功'})
@app.post('/api/reconciliations/batch-delete')
@require_login
@require_any_role('admin', 'superadmin')
def batch_delete_reconciliations():
"""批量删除对账单"""
data = request.get_json()
ids = data.get('ids', [])
if not ids or not isinstance(ids, list):
return jsonify({'error': '请提供要删除的对账单ID列表'}), 400
if len(ids) == 0:
return jsonify({'error': '请至少选择一条对账单'}), 400
conn = get_db()
c = conn.cursor()
try:
# 获取对账单信息用于日志
placeholders = ','.join('?' * len(ids))
c.execute(f'SELECT id, contract_no, material_name FROM reconciliations WHERE id IN ({placeholders})', ids)
rows = c.fetchall()
if len(rows) == 0:
conn.close()
return jsonify({'error': '未找到要删除的对账单'}), 404
# 批量删除
c.execute(f'DELETE FROM reconciliations WHERE id IN ({placeholders})', ids)
conn.commit()
# 记录日志
deleted_info = ', '.join([f"ID:{row['id']}({row['contract_no']})" for row in rows])
log('batch_delete_reconciliations', f'批量删除 {len(rows)} 条对账单: {deleted_info}')
conn.close()
return jsonify({'ok': True, 'message': f'成功删除 {len(rows)} 条对账单'})
except Exception as e:
conn.close()
print(f'批量删除对账单失败: {e}')
return jsonify({'error': f'批量删除失败: {str(e)}'}), 500
@app.get('/api/reconciliations/export')
@require_login
def export_reconciliations():
"""导出对账单为 xlsx 格式"""
try:
import pandas as pd
from io import BytesIO
from flask import send_file
conn = get_db()
c = conn.cursor()
c.execute('''
SELECT order_date, contract_no, material_name, spec_model, transport_no,
quantity, unit, unit_price, total_amount, delivery_date, shipment_date
FROM reconciliations
ORDER BY id ASC
''')
rows = c.fetchall()
conn.close()
if not rows:
return jsonify({'error': '暂无数据可导出'}), 400
# 转换为 DataFrame
data = []
for idx, row in enumerate(rows, start=1):
data.append({
'序号': idx,
'下单时间': row['order_date'] or '',
'合同编号': row['contract_no'] or '',
'物料名称': row['material_name'] or '',
'规格型号': row['spec_model'] or '',
'运输单号': row['transport_no'] or '',
'数量': row['quantity'] or 0,
'单位': row['unit'] or '',
'含税单价': row['unit_price'] or 0,
'含税金额': row['total_amount'] or 0,
'交货日期': row['delivery_date'] or '',
'出货日期': row['shipment_date'] or ''
})
df = pd.DataFrame(data)
# 创建 Excel 文件
output = BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
df.to_excel(writer, index=False, sheet_name='对账单')
# 获取工作表并设置列宽和行高
worksheet = writer.sheets['对账单']
# 导入样式
from openpyxl.styles import Alignment, Font
# 设置列宽按指定宽度Excel列宽需要稍微增加以达到实际显示效果
column_widths = {
'序号': 8.7,
'下单时间': 13.5,
'合同编号': 14.2,
'物料名称': 14,
'规格型号': 25,
'运输单号': 32,
'数量': 16.7,
'单位': 6.5,
'含税单价': 9.5,
'含税金额': 8.8,
'交货日期': 13.2,
'出货日期': 11.7
}
for idx, col in enumerate(df.columns):
col_letter = chr(65 + idx) # A, B, C, ...
if col in column_widths:
worksheet.column_dimensions[col_letter].width = column_widths[col]
else:
worksheet.column_dimensions[col_letter].width = 15 # 默认宽度
# 设置所有行的行高为39并设置单元格居中对齐、宋体字体和自动换行
for row_idx in range(1, len(df) + 2): # +2 因为包含表头行且从1开始
worksheet.row_dimensions[row_idx].height = 39
# 设置该行所有单元格居中对齐、宋体字体和自动换行
for col_idx in range(1, len(df.columns) + 1):
cell = worksheet.cell(row=row_idx, column=col_idx)
cell.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
cell.font = Font(name='宋体', size=11)
output.seek(0)
# 生成文件名(包含当前日期)
from datetime import datetime
filename = f'对账单_{datetime.now().strftime("%Y%m%d_%H%M%S")}.xlsx'
log('export_reconciliations', f'导出对账单,共 {len(rows)} 条记录')
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=filename
)
except Exception as e:
log('export_reconciliations_error', str(e))
return jsonify({'error': f'导出失败:{str(e)}'}), 500
@app.post('/api/reconciliations/upload-shipment')
@require_login
@require_any_role('admin', 'superadmin')
def upload_shipment():
"""上传发货单并解析生成对账单"""
if 'file' not in request.files:
return jsonify({'error': '未选择文件'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': '未选择文件'}), 400
# 验证文件类型
allowed_extensions = {'xls', 'xlsx'}
if '.' not in file.filename:
return jsonify({'error': '无效的文件格式'}), 400
ext = file.filename.rsplit('.', 1)[1].lower()
if ext not in allowed_extensions:
return jsonify({'error': '不支持的文件格式,请上传 XLS 或 XLSX 格式'}), 400
try:
import pandas as pd
import numpy as np
from io import BytesIO
# 读取Excel文件
file_content = file.read()
df = pd.read_excel(BytesIO(file_content), header=None)
# 日期格式化函数:统一转换为 YYYY/MM/DD 格式
def format_date(date_val):
if pd.isna(date_val):
return ''
if isinstance(date_val, pd.Timestamp):
return date_val.strftime('%Y/%m/%d')
date_str = str(date_val).strip()
# 去掉时间部分
if ' ' in date_str:
date_str = date_str.split()[0]
# 将 YYYY-MM-DD 转换为 YYYY/MM/DD
if '-' in date_str:
return date_str.replace('-', '/')
return date_str
# 提取头部信息
shipment_date = None
transport_method = None
# 解析发货日期第1行索引2
if len(df) > 1 and len(df.columns) > 2:
shipment_date_raw = df.iloc[1, 2]
if pd.notna(shipment_date_raw):
shipment_date = format_date(shipment_date_raw)
# 解析供货方式第2行索引2
if len(df) > 2 and len(df.columns) > 2:
transport_method_raw = df.iloc[2, 2]
if pd.notna(transport_method_raw):
transport_method = str(transport_method_raw)
# 找到表格数据的起始行(序号、采购单号、物料编码...
header_row = None
for i in range(len(df)):
if df.iloc[i, 0] == '序号':
header_row = i
break
if header_row is None:
return jsonify({'error': '无法识别发货单格式,未找到表格头部'}), 400
# 从表格起始行读取数据
data_df = pd.read_excel(BytesIO(file_content), header=header_row)
# 过滤掉合计行和备注行(只保留序号为数字的行)
valid_data = data_df[data_df['序号'].apply(lambda x: isinstance(x, (int, float)) and not pd.isna(x))]
if len(valid_data) == 0:
return jsonify({'error': '发货单中没有有效的数据行'}), 400
# 获取客户订单数据用于查找单价和下单时间
conn = get_db()
c = conn.cursor()
c.execute('SELECT order_no, order_date, material, unit_price FROM customer_orders')
customer_orders_list = c.fetchall()
# 构建字典(支持一个订单号对应多个物料)
customer_orders = {}
for row in customer_orders_list:
order_no = row['order_no']
if order_no not in customer_orders:
customer_orders[order_no] = []
customer_orders[order_no].append({
'order_date': row['order_date'],
'material': row['material'],
'unit_price': row['unit_price']
})
# 解析每一行数据并插入对账单
now = get_beijing_time()
username = session.get('username', '')
success_count = 0
error_rows = []
for idx, row in valid_data.iterrows():
try:
# 提取数据
contract_no = row.get('采购单号')
if pd.isna(contract_no):
# 如果采购单号为空,尝试使用上一行的采购单号
if success_count > 0:
contract_no = last_contract_no
else:
error_rows.append(f"{int(row['序号'])}行:采购单号为空")
continue
else:
contract_no = str(contract_no).strip()
last_contract_no = contract_no
material_code = row.get('物料编码')
if pd.isna(material_code):
error_rows.append(f"{int(row['序号'])}行:物料编码为空")
continue
material_code = str(material_code).strip().replace('\n', ' ')
spec_model = row.get('规格型号')
if pd.isna(spec_model):
spec_model = ''
else:
spec_model = str(spec_model).strip()
quantity = row.get('实送数量')
if pd.isna(quantity):
error_rows.append(f"{int(row['序号'])}行:实送数量为空")
continue
quantity = int(float(quantity))
# 单位统一设置为 pcs
unit = 'pcs'
# 使用发货单头部的供货方式作为运输单号(统一)
transport_no = transport_method or ''
# 从客户订单中查找单价和下单时间
unit_price = 0
order_date = shipment_date or ''
# 提取物料编码的第一部分(去掉换行符后的内容)
material_code_key = material_code.split('\n')[0].split()[0].strip() if material_code else ''
# 遍历客户订单查找匹配的物料
if contract_no in customer_orders:
for order_info in customer_orders[contract_no]:
# 提取订单中的物料编码(第一部分)
order_material = order_info['material'].split('\n')[0].split()[0].strip()
# 匹配物料编码
if material_code_key and order_material and material_code_key in order_material:
unit_price = order_info['unit_price']
# 格式化下单时间为 YYYY/MM/DD 格式
order_date = format_date(order_info['order_date']) or shipment_date
break
# 如果未找到匹配,检查是否是飞机盒,设置默认单价
if unit_price == 0 and '飞机盒' in material_code:
unit_price = 2
# 计算含税金额
total_amount = quantity * unit_price
# 插入对账单
c.execute('''
INSERT INTO reconciliations(
order_date, contract_no, material_name, spec_model, transport_no,
quantity, unit, unit_price, total_amount, delivery_date, shipment_date,
created_by, created_at, updated_at
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)
''', (
order_date, contract_no, material_code, spec_model, transport_no,
quantity, unit, unit_price, total_amount, shipment_date, shipment_date,
username, now, now
))
success_count += 1
except Exception as e:
error_rows.append(f"{int(row['序号'])}行:{str(e)}")
continue
conn.commit()
conn.close()
log('upload_shipment', f'上传发货单,成功导入 {success_count} 条记录')
notify_superadmin('上传发货单', f'成功导入 {success_count} 条对账单记录')
result = {
'ok': True,
'success_count': success_count,
'message': f'成功导入 {success_count} 条对账单记录'
}
if error_rows:
result['errors'] = error_rows
result['message'] += f'{len(error_rows)} 条记录失败'
return jsonify(result)
except Exception as e:
log('upload_shipment_error', str(e))
return jsonify({'error': f'解析发货单失败:{str(e)}'}), 500
# ==================== BOM物料清单 API ====================
@app.get('/api/bom')
@require_login
@require_any_role('superadmin', 'admin')
def get_bom_list():
"""获取BOM列表"""
conn = get_db()
c = conn.cursor()
c.execute('SELECT * FROM bom ORDER BY product_code, id')
rows = c.fetchall()
conn.close()
return jsonify({'list': [dict(r) for r in rows]})
@app.get('/api/bom/products')
@require_login
@require_any_role('superadmin', 'admin')
def get_bom_products():
"""获取BOM中所有产品列表去重"""
conn = get_db()
c = conn.cursor()
c.execute('SELECT DISTINCT product_code, product_name FROM bom ORDER BY product_code')
rows = c.fetchall()
conn.close()
return jsonify({'list': [dict(r) for r in rows]})
@app.post('/api/bom')
@require_login
@require_any_role('superadmin')
def create_bom():
"""创建BOM条目"""
data = request.get_json() or {}
product_code = (data.get('product_code') or '').strip()
product_name = (data.get('product_name') or '').strip()
material_code = (data.get('material_code') or '').strip()
material_name = (data.get('material_name') or '').strip()
unit_qty = data.get('unit_qty', 1)
unit = (data.get('unit') or 'pcs').strip()
min_package = data.get('min_package', 1)
supplier = (data.get('supplier') or '').strip()
remark = (data.get('remark') or '').strip()
if not product_code or not product_name or not material_code or not material_name:
return jsonify({'error': '产品编码、产品名称、物料编码、物料名称为必填项'}), 400
try:
unit_qty = float(unit_qty)
min_package = int(min_package) if min_package else 1
except (ValueError, TypeError):
return jsonify({'error': '单机用量和最小包装必须是有效数字'}), 400
if unit_qty <= 0:
return jsonify({'error': '单机用量必须大于0'}), 400
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
username = session.get('username', '')
c.execute('''INSERT INTO bom(
product_code, product_name, material_code, material_name, unit_qty,
unit, min_package, supplier, remark, created_by, created_at, updated_at
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?)''', (
product_code, product_name, material_code, material_name, unit_qty,
unit, min_package, supplier, remark, username, now, now
))
bom_id = c.lastrowid
conn.commit()
conn.close()
log('create_bom', f'产品: {product_code}, 物料: {material_code}')
return jsonify({'ok': True, 'id': bom_id, 'message': 'BOM创建成功'})
@app.put('/api/bom/<int:bom_id>')
@require_login
@require_any_role('superadmin')
def update_bom(bom_id):
"""更新BOM条目"""
data = request.get_json() or {}
product_code = (data.get('product_code') or '').strip()
product_name = (data.get('product_name') or '').strip()
material_code = (data.get('material_code') or '').strip()
material_name = (data.get('material_name') or '').strip()
unit_qty = data.get('unit_qty', 1)
unit = (data.get('unit') or 'pcs').strip()
min_package = data.get('min_package', 1)
supplier = (data.get('supplier') or '').strip()
remark = (data.get('remark') or '').strip()
if not product_code or not product_name or not material_code or not material_name:
return jsonify({'error': '产品编码、产品名称、物料编码、物料名称为必填项'}), 400
try:
unit_qty = float(unit_qty)
min_package = int(min_package) if min_package else 1
except (ValueError, TypeError):
return jsonify({'error': '单机用量和最小包装必须是有效数字'}), 400
conn = get_db()
c = conn.cursor()
c.execute('SELECT id FROM bom WHERE id=?', (bom_id,))
if not c.fetchone():
conn.close()
return jsonify({'error': 'BOM不存在'}), 404
now = get_beijing_time()
c.execute('''UPDATE bom SET
product_code=?, product_name=?, material_code=?, material_name=?, unit_qty=?,
unit=?, min_package=?, supplier=?, remark=?, updated_at=?
WHERE id=?''', (
product_code, product_name, material_code, material_name, unit_qty,
unit, min_package, supplier, remark, now, bom_id
))
conn.commit()
conn.close()
log('update_bom', f'BOM ID: {bom_id}')
return jsonify({'ok': True, 'message': 'BOM更新成功'})
@app.delete('/api/bom/<int:bom_id>')
@require_login
@require_any_role('superadmin')
def delete_bom(bom_id):
"""删除BOM条目"""
conn = get_db()
c = conn.cursor()
c.execute('SELECT product_code, material_code FROM bom WHERE id=?', (bom_id,))
row = c.fetchone()
if not row:
conn.close()
return jsonify({'error': 'BOM不存在'}), 404
c.execute('DELETE FROM bom WHERE id=?', (bom_id,))
conn.commit()
conn.close()
log('delete_bom', f'BOM ID: {bom_id}, 产品: {row["product_code"]}, 物料: {row["material_code"]}')
return jsonify({'ok': True, 'message': 'BOM删除成功'})
@app.post('/api/bom/batch-delete')
@require_login
@require_any_role('superadmin')
def batch_delete_bom():
"""批量删除BOM"""
data = request.get_json() or {}
ids = data.get('ids', [])
if not ids:
return jsonify({'error': '请选择要删除的BOM'}), 400
conn = get_db()
c = conn.cursor()
placeholders = ','.join('?' * len(ids))
c.execute(f'DELETE FROM bom WHERE id IN ({placeholders})', ids)
count = c.rowcount
conn.commit()
conn.close()
log('batch_delete_bom', f'批量删除 {count} 条BOM')
return jsonify({'ok': True, 'count': count, 'message': f'成功删除 {count} 条BOM'})
@app.post('/api/bom/import')
@require_login
@require_any_role('superadmin')
def import_bom():
"""从Excel导入BOM"""
f = request.files.get('file')
if not f:
return jsonify({'error': '请上传文件'}), 400
filename = f.filename.lower()
if not (filename.endswith('.xlsx') or filename.endswith('.xls') or filename.endswith('.csv')):
return jsonify({'error': '请上传Excel或CSV文件'}), 400
try:
import pandas as pd
if filename.endswith('.csv'):
df = pd.read_csv(f)
else:
df = pd.read_excel(f)
# 列名映射(支持中英文)
column_map = {
'产品编码': 'product_code', 'product_code': 'product_code',
'产品名称': 'product_name', 'product_name': 'product_name',
'物料编码': 'material_code', 'material_code': 'material_code',
'物料名称': 'material_name', 'material_name': 'material_name',
'单机用量': 'unit_qty', 'unit_qty': 'unit_qty',
'单位': 'unit', 'unit': 'unit',
'最小包装': 'min_package', 'min_package': 'min_package',
'供应商': 'supplier', 'supplier': 'supplier',
'备注': 'remark', 'remark': 'remark'
}
df = df.rename(columns=column_map)
# 检查必填列
required = ['product_code', 'product_name', 'material_code', 'material_name']
missing = [col for col in required if col not in df.columns]
if missing:
return jsonify({'error': f'缺少必填列: {", ".join(missing)}'}), 400
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
username = session.get('username', '')
success_count = 0
for _, row in df.iterrows():
product_code = str(row.get('product_code', '')).strip()
product_name = str(row.get('product_name', '')).strip()
material_code = str(row.get('material_code', '')).strip()
material_name = str(row.get('material_name', '')).strip()
if not product_code or not material_code or product_code == 'nan' or material_code == 'nan':
continue
unit_qty = 1
try:
unit_qty = float(row.get('unit_qty', 1))
except:
pass
min_package = 1
try:
min_package = int(float(row.get('min_package', 1)))
except:
pass
unit = str(row.get('unit', 'pcs')).strip()
if unit == 'nan':
unit = 'pcs'
supplier = str(row.get('supplier', '')).strip()
if supplier == 'nan':
supplier = ''
remark = str(row.get('remark', '')).strip()
if remark == 'nan':
remark = ''
c.execute('''INSERT INTO bom(
product_code, product_name, material_code, material_name, unit_qty,
unit, min_package, supplier, remark, created_by, created_at, updated_at
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?)''', (
product_code, product_name, material_code, material_name, unit_qty,
unit, min_package, supplier, remark, username, now, now
))
success_count += 1
conn.commit()
conn.close()
log('import_bom', f'导入 {success_count} 条BOM')
return jsonify({'ok': True, 'count': success_count, 'message': f'成功导入 {success_count} 条BOM'})
except Exception as e:
return jsonify({'error': f'导入失败: {str(e)}'}), 500
# ==================== 期初库存 API ====================
@app.get('/api/initial-stock')
@require_login
@require_any_role('superadmin', 'admin')
def get_initial_stock_list():
"""获取期初库存列表"""
conn = get_db()
c = conn.cursor()
c.execute('SELECT * FROM initial_stock ORDER BY material_code')
rows = c.fetchall()
conn.close()
return jsonify({'list': [dict(r) for r in rows]})
@app.post('/api/initial-stock')
@require_login
@require_any_role('superadmin')
def create_initial_stock():
"""创建期初库存"""
data = request.get_json() or {}
material_code = (data.get('material_code') or '').strip()
material_name = (data.get('material_name') or '').strip()
stock_qty = data.get('stock_qty', 0)
unit = (data.get('unit') or 'pcs').strip()
min_package = data.get('min_package', 1)
supplier = (data.get('supplier') or '').strip()
remark = (data.get('remark') or '').strip()
if not material_code or not material_name:
return jsonify({'error': '物料编码和物料名称为必填项'}), 400
try:
stock_qty = int(stock_qty)
min_package = int(min_package) if min_package else 1
except (ValueError, TypeError):
return jsonify({'error': '库存数量和最小包装必须是有效整数'}), 400
if stock_qty < 0:
return jsonify({'error': '库存数量不能为负数'}), 400
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
username = session.get('username', '')
# 检查物料编码是否已存在
c.execute('SELECT id FROM initial_stock WHERE material_code=?', (material_code,))
if c.fetchone():
conn.close()
return jsonify({'error': '该物料编码已存在,请使用更新功能'}), 400
c.execute('''INSERT INTO initial_stock(
material_code, material_name, stock_qty, unit, min_package,
supplier, remark, created_by, created_at, updated_at
) VALUES(?,?,?,?,?,?,?,?,?,?)''', (
material_code, material_name, stock_qty, unit, min_package,
supplier, remark, username, now, now
))
stock_id = c.lastrowid
conn.commit()
conn.close()
log('create_initial_stock', f'物料: {material_code}, 库存: {stock_qty}')
return jsonify({'ok': True, 'id': stock_id, 'message': '期初库存创建成功'})
@app.put('/api/initial-stock/<int:stock_id>')
@require_login
@require_any_role('superadmin')
def update_initial_stock(stock_id):
"""更新期初库存"""
data = request.get_json() or {}
material_code = (data.get('material_code') or '').strip()
material_name = (data.get('material_name') or '').strip()
stock_qty = data.get('stock_qty', 0)
unit = (data.get('unit') or 'pcs').strip()
min_package = data.get('min_package', 1)
supplier = (data.get('supplier') or '').strip()
remark = (data.get('remark') or '').strip()
if not material_code or not material_name:
return jsonify({'error': '物料编码和物料名称为必填项'}), 400
try:
stock_qty = int(stock_qty)
min_package = int(min_package) if min_package else 1
except (ValueError, TypeError):
return jsonify({'error': '库存数量和最小包装必须是有效整数'}), 400
conn = get_db()
c = conn.cursor()
c.execute('SELECT id FROM initial_stock WHERE id=?', (stock_id,))
if not c.fetchone():
conn.close()
return jsonify({'error': '期初库存不存在'}), 404
now = get_beijing_time()
c.execute('''UPDATE initial_stock SET
material_code=?, material_name=?, stock_qty=?, unit=?, min_package=?,
supplier=?, remark=?, updated_at=?
WHERE id=?''', (
material_code, material_name, stock_qty, unit, min_package,
supplier, remark, now, stock_id
))
conn.commit()
conn.close()
log('update_initial_stock', f'库存 ID: {stock_id}')
return jsonify({'ok': True, 'message': '期初库存更新成功'})
@app.delete('/api/initial-stock/<int:stock_id>')
@require_login
@require_any_role('superadmin')
def delete_initial_stock(stock_id):
"""删除期初库存"""
conn = get_db()
c = conn.cursor()
c.execute('SELECT material_code FROM initial_stock WHERE id=?', (stock_id,))
row = c.fetchone()
if not row:
conn.close()
return jsonify({'error': '期初库存不存在'}), 404
c.execute('DELETE FROM initial_stock WHERE id=?', (stock_id,))
conn.commit()
conn.close()
log('delete_initial_stock', f'库存 ID: {stock_id}, 物料: {row["material_code"]}')
return jsonify({'ok': True, 'message': '期初库存删除成功'})
@app.post('/api/initial-stock/batch-delete')
@require_login
@require_any_role('superadmin')
def batch_delete_initial_stock():
"""批量删除期初库存"""
data = request.get_json() or {}
ids = data.get('ids', [])
if not ids:
return jsonify({'error': '请选择要删除的库存'}), 400
conn = get_db()
c = conn.cursor()
placeholders = ','.join('?' * len(ids))
c.execute(f'DELETE FROM initial_stock WHERE id IN ({placeholders})', ids)
count = c.rowcount
conn.commit()
conn.close()
log('batch_delete_initial_stock', f'批量删除 {count} 条期初库存')
return jsonify({'ok': True, 'count': count, 'message': f'成功删除 {count} 条期初库存'})
@app.post('/api/initial-stock/import')
@require_login
@require_any_role('superadmin')
def import_initial_stock():
"""从Excel导入期初库存"""
f = request.files.get('file')
if not f:
return jsonify({'error': '请上传文件'}), 400
filename = f.filename.lower()
if not (filename.endswith('.xlsx') or filename.endswith('.xls') or filename.endswith('.csv')):
return jsonify({'error': '请上传Excel或CSV文件'}), 400
try:
import pandas as pd
if filename.endswith('.csv'):
df = pd.read_csv(f)
else:
df = pd.read_excel(f)
# 列名映射(支持中英文)
column_map = {
'物料编码': 'material_code', 'material_code': 'material_code',
'物料名称': 'material_name', 'material_name': 'material_name',
'库存数量': 'stock_qty', 'stock_qty': 'stock_qty',
'单位': 'unit', 'unit': 'unit',
'最小包装': 'min_package', 'min_package': 'min_package',
'供应商': 'supplier', 'supplier': 'supplier',
'备注': 'remark', 'remark': 'remark'
}
df = df.rename(columns=column_map)
# 检查必填列
required = ['material_code', 'material_name']
missing = [col for col in required if col not in df.columns]
if missing:
return jsonify({'error': f'缺少必填列: {", ".join(missing)}'}), 400
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
username = session.get('username', '')
success_count = 0
update_count = 0
for _, row in df.iterrows():
material_code = str(row.get('material_code', '')).strip()
material_name = str(row.get('material_name', '')).strip()
if not material_code or material_code == 'nan':
continue
stock_qty = 0
try:
stock_qty = int(float(row.get('stock_qty', 0)))
except:
pass
min_package = 1
try:
min_package = int(float(row.get('min_package', 1)))
except:
pass
unit = str(row.get('unit', 'pcs')).strip()
if unit == 'nan':
unit = 'pcs'
supplier = str(row.get('supplier', '')).strip()
if supplier == 'nan':
supplier = ''
remark = str(row.get('remark', '')).strip()
if remark == 'nan':
remark = ''
# 检查是否已存在,存在则更新
c.execute('SELECT id FROM initial_stock WHERE material_code=?', (material_code,))
existing = c.fetchone()
if existing:
c.execute('''UPDATE initial_stock SET
material_name=?, stock_qty=?, unit=?, min_package=?,
supplier=?, remark=?, updated_at=?
WHERE material_code=?''', (
material_name, stock_qty, unit, min_package,
supplier, remark, now, material_code
))
update_count += 1
else:
c.execute('''INSERT INTO initial_stock(
material_code, material_name, stock_qty, unit, min_package,
supplier, remark, created_by, created_at, updated_at
) VALUES(?,?,?,?,?,?,?,?,?,?)''', (
material_code, material_name, stock_qty, unit, min_package,
supplier, remark, username, now, now
))
success_count += 1
conn.commit()
conn.close()
log('import_initial_stock', f'新增 {success_count} 条, 更新 {update_count} 条期初库存')
return jsonify({
'ok': True,
'count': success_count + update_count,
'message': f'成功导入 {success_count} 条新记录,更新 {update_count} 条已有记录'
})
except Exception as e:
return jsonify({'error': f'导入失败: {str(e)}'}), 500
# ==================== 采购需求清单 API ====================
@app.get('/api/purchase-demand')
@require_login
@require_any_role('superadmin', 'admin')
def get_purchase_demand_list():
"""获取采购需求清单"""
conn = get_db()
c = conn.cursor()
c.execute('SELECT * FROM purchase_demand ORDER BY created_at DESC')
rows = c.fetchall()
conn.close()
return jsonify({'list': [dict(r) for r in rows]})
@app.post('/api/purchase-demand/calculate')
@require_login
@require_any_role('superadmin')
def calculate_purchase_demand():
"""计算采购需求
公式: 客户订单数量 * BOM单机数量 - 期初库存 = 净需求
按照最小包装得出实际采购数量
"""
data = request.get_json() or {}
product_code = (data.get('product_code') or '').strip()
order_qty = data.get('order_qty', 0)
if not product_code:
return jsonify({'error': '请选择产品'}), 400
try:
order_qty = int(order_qty)
except (ValueError, TypeError):
return jsonify({'error': '订单数量必须是有效整数'}), 400
if order_qty <= 0:
return jsonify({'error': '订单数量必须大于0'}), 400
conn = get_db()
c = conn.cursor()
# 获取产品的BOM清单
c.execute('SELECT * FROM bom WHERE product_code=?', (product_code,))
bom_list = c.fetchall()
if not bom_list:
conn.close()
return jsonify({'error': f'未找到产品 {product_code} 的BOM清单'}), 404
# 生成需求编号
import math
now = get_beijing_time()
demand_no = 'PD' + datetime.now().strftime('%Y%m%d%H%M%S')
username = session.get('username', '')
results = []
for bom in bom_list:
material_code = bom['material_code']
material_name = bom['material_name']
unit_qty = bom['unit_qty']
min_package = bom['min_package'] or 1
unit = bom['unit']
supplier = bom['supplier']
# 计算总需求 = 订单数量 * 单机用量
total_demand = int(math.ceil(order_qty * unit_qty))
# 获取期初库存
c.execute('SELECT stock_qty, min_package as stock_min_package FROM initial_stock WHERE material_code=?', (material_code,))
stock_row = c.fetchone()
initial_stock = stock_row['stock_qty'] if stock_row else 0
# 如果BOM没有设置最小包装但库存表有使用库存表的
if min_package <= 1 and stock_row and stock_row['stock_min_package']:
min_package = stock_row['stock_min_package']
# 计算净需求 = 总需求 - 期初库存
net_demand = total_demand - initial_stock
if net_demand < 0:
net_demand = 0
# 按最小包装计算实际采购数量
if net_demand > 0 and min_package > 0:
actual_purchase_qty = math.ceil(net_demand / min_package) * min_package
else:
actual_purchase_qty = 0
# 插入采购需求记录
c.execute('''INSERT INTO purchase_demand(
demand_no, material_code, material_name, order_qty, bom_unit_qty,
total_demand, initial_stock, net_demand, min_package, actual_purchase_qty,
unit, supplier, status, created_by, created_at, updated_at
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)''', (
demand_no, material_code, material_name, order_qty, unit_qty,
total_demand, initial_stock, net_demand, min_package, actual_purchase_qty,
unit, supplier, 'pending', username, now, now
))
results.append({
'material_code': material_code,
'material_name': material_name,
'order_qty': order_qty,
'bom_unit_qty': unit_qty,
'total_demand': total_demand,
'initial_stock': initial_stock,
'net_demand': net_demand,
'min_package': min_package,
'actual_purchase_qty': actual_purchase_qty,
'unit': unit,
'supplier': supplier
})
conn.commit()
conn.close()
log('calculate_purchase_demand', f'产品: {product_code}, 订单数量: {order_qty}, 生成 {len(results)} 条采购需求')
return jsonify({
'ok': True,
'demand_no': demand_no,
'list': results,
'message': f'成功生成 {len(results)} 条采购需求'
})
@app.post('/api/purchase-demand/calculate-from-orders')
@require_login
@require_any_role('superadmin')
def calculate_purchase_demand_from_orders():
"""从客户订单自动计算采购需求
遍历客户订单根据物料匹配BOM计算采购需求
"""
data = request.get_json() or {}
order_ids = data.get('order_ids', []) # 可选指定订单ID列表
conn = get_db()
c = conn.cursor()
# 获取客户订单
if order_ids:
placeholders = ','.join('?' * len(order_ids))
c.execute(f'SELECT * FROM customer_orders WHERE id IN ({placeholders})', order_ids)
else:
c.execute('SELECT * FROM customer_orders')
orders = c.fetchall()
if not orders:
conn.close()
return jsonify({'error': '未找到客户订单'}), 404
import math
now = get_beijing_time()
demand_no = 'PD' + datetime.now().strftime('%Y%m%d%H%M%S')
username = session.get('username', '')
# 汇总物料需求
material_demands = {} # {material_code: {'total_demand': x, 'material_name': '', ...}}
for order in orders:
material = order['material']
order_qty = order['quantity']
# 尝试通过物料名称匹配BOM中的产品
c.execute('''SELECT * FROM bom WHERE product_name LIKE ? OR product_code LIKE ?''',
(f'%{material}%', f'%{material}%'))
bom_list = c.fetchall()
if bom_list:
# 找到BOM按BOM展开
for bom in bom_list:
mat_code = bom['material_code']
mat_name = bom['material_name']
unit_qty = bom['unit_qty']
min_pkg = bom['min_package'] or 1
unit = bom['unit']
supplier = bom['supplier']
demand = int(math.ceil(order_qty * unit_qty))
if mat_code in material_demands:
material_demands[mat_code]['total_demand'] += demand
else:
material_demands[mat_code] = {
'material_name': mat_name,
'total_demand': demand,
'min_package': min_pkg,
'unit': unit,
'supplier': supplier,
'bom_unit_qty': unit_qty,
'order_qty': order_qty
}
else:
# 没找到BOM直接按物料处理假设1:1
if material in material_demands:
material_demands[material]['total_demand'] += order_qty
else:
material_demands[material] = {
'material_name': material,
'total_demand': order_qty,
'min_package': 1,
'unit': 'pcs',
'supplier': '',
'bom_unit_qty': 1,
'order_qty': order_qty
}
# 计算净需求和实际采购数量
results = []
for mat_code, info in material_demands.items():
# 获取期初库存
c.execute('SELECT stock_qty, min_package FROM initial_stock WHERE material_code=?', (mat_code,))
stock_row = c.fetchone()
initial_stock = stock_row['stock_qty'] if stock_row else 0
min_package = info['min_package']
if min_package <= 1 and stock_row and stock_row['min_package']:
min_package = stock_row['min_package']
total_demand = info['total_demand']
net_demand = total_demand - initial_stock
if net_demand < 0:
net_demand = 0
if net_demand > 0 and min_package > 0:
actual_purchase_qty = math.ceil(net_demand / min_package) * min_package
else:
actual_purchase_qty = 0
# 插入采购需求记录
c.execute('''INSERT INTO purchase_demand(
demand_no, material_code, material_name, order_qty, bom_unit_qty,
total_demand, initial_stock, net_demand, min_package, actual_purchase_qty,
unit, supplier, status, created_by, created_at, updated_at
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)''', (
demand_no, mat_code, info['material_name'], info['order_qty'], info['bom_unit_qty'],
total_demand, initial_stock, net_demand, min_package, actual_purchase_qty,
info['unit'], info['supplier'], 'pending', username, now, now
))
results.append({
'material_code': mat_code,
'material_name': info['material_name'],
'total_demand': total_demand,
'initial_stock': initial_stock,
'net_demand': net_demand,
'min_package': min_package,
'actual_purchase_qty': actual_purchase_qty
})
conn.commit()
conn.close()
log('calculate_purchase_demand_from_orders', f'需求编号: {demand_no}, 生成 {len(results)} 条采购需求')
return jsonify({
'ok': True,
'demand_no': demand_no,
'list': results,
'message': f'成功生成 {len(results)} 条采购需求'
})
@app.put('/api/purchase-demand/<int:demand_id>')
@require_login
@require_any_role('superadmin')
def update_purchase_demand(demand_id):
"""更新采购需求状态"""
data = request.get_json() or {}
status = (data.get('status') or '').strip()
remark = (data.get('remark') or '').strip()
if status and status not in ['pending', 'ordered', 'received', 'completed', 'cancelled']:
return jsonify({'error': '无效的状态'}), 400
conn = get_db()
c = conn.cursor()
c.execute('SELECT id FROM purchase_demand WHERE id=?', (demand_id,))
if not c.fetchone():
conn.close()
return jsonify({'error': '采购需求不存在'}), 404
now = get_beijing_time()
updates = ['updated_at=?']
params = [now]
if status:
updates.append('status=?')
params.append(status)
if remark:
updates.append('remark=?')
params.append(remark)
params.append(demand_id)
c.execute(f'UPDATE purchase_demand SET {", ".join(updates)} WHERE id=?', params)
conn.commit()
conn.close()
log('update_purchase_demand', f'需求 ID: {demand_id}')
return jsonify({'ok': True, 'message': '采购需求更新成功'})
@app.delete('/api/purchase-demand/<int:demand_id>')
@require_login
@require_any_role('superadmin')
def delete_purchase_demand(demand_id):
"""删除采购需求"""
conn = get_db()
c = conn.cursor()
c.execute('SELECT demand_no, material_code FROM purchase_demand WHERE id=?', (demand_id,))
row = c.fetchone()
if not row:
conn.close()
return jsonify({'error': '采购需求不存在'}), 404
c.execute('DELETE FROM purchase_demand WHERE id=?', (demand_id,))
conn.commit()
conn.close()
log('delete_purchase_demand', f'需求 ID: {demand_id}')
return jsonify({'ok': True, 'message': '采购需求删除成功'})
@app.post('/api/purchase-demand/batch-delete')
@require_login
@require_any_role('superadmin')
def batch_delete_purchase_demand():
"""批量删除采购需求"""
data = request.get_json() or {}
ids = data.get('ids', [])
if not ids:
return jsonify({'error': '请选择要删除的采购需求'}), 400
conn = get_db()
c = conn.cursor()
placeholders = ','.join('?' * len(ids))
c.execute(f'DELETE FROM purchase_demand WHERE id IN ({placeholders})', ids)
count = c.rowcount
conn.commit()
conn.close()
log('batch_delete_purchase_demand', f'批量删除 {count} 条采购需求')
return jsonify({'ok': True, 'count': count, 'message': f'成功删除 {count} 条采购需求'})
@app.errorhandler(404)
def not_found(e):
# 如果请求的是 HTML 页面(通过 Accept header 判断),返回 index.html
# 这样可以支持前端路由的直接访问
if request.path and not request.path.startswith('/api/'):
accept = request.headers.get('Accept', '')
if 'text/html' in accept:
return send_from_directory(FRONTEND_DIR, 'index.html')
return jsonify({'error': 'not found'}), 404
init_db()
if __name__ == '__main__':
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', '5000')))