增加对账单优化细节

This commit is contained in:
zzh 2025-11-25 10:35:02 +08:00
parent 3751854815
commit a12f769f15
29 changed files with 3232 additions and 22 deletions

3
.gitignore vendored
View File

@ -58,3 +58,6 @@ dump.rdb
*.tmp
*.bak
*.cache
# Documentation
README/

Binary file not shown.

View File

@ -0,0 +1,37 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
customer_orders 表添加 customer_name
"""
import sqlite3
import os
# 数据库路径
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DB_PATH = os.path.join(BASE_DIR, 'server', 'data.db')
def add_column():
"""添加 customer_name 列"""
if not os.path.exists(DB_PATH):
print(f"错误: 数据库文件不存在: {DB_PATH}")
return
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
try:
# 尝试添加列
c.execute('ALTER TABLE customer_orders ADD COLUMN customer_name TEXT')
conn.commit()
print("成功添加 customer_name 列")
except Exception as e:
if 'duplicate column name' in str(e).lower():
print("customer_name 列已存在")
else:
print(f"添加列失败: {e}")
conn.close()
if __name__ == '__main__':
add_column()

31
check_reconciliations.py Normal file
View File

@ -0,0 +1,31 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""检查对账单数据"""
import sqlite3
conn = sqlite3.connect('server/data.db')
c = conn.cursor()
# 统计总数
c.execute('SELECT COUNT(*) FROM reconciliations')
total = c.fetchone()[0]
print(f'对账单总数: {total}')
# 查看最新10条记录
c.execute('''
SELECT id, order_date, contract_no, material_name, spec_model,
quantity, unit, unit_price, total_amount, delivery_date
FROM reconciliations
ORDER BY id DESC
LIMIT 10
''')
print('\n最新10条记录:')
print('-' * 120)
for row in c.fetchall():
print(f"ID: {row[0]}, 下单: {row[1]}, 合同: {row[2]}, 物料: {row[3]}, 规格: {row[4]}")
print(f" 数量: {row[5]} {row[6]}, 单价: {row[7]}, 金额: {row[8]}, 交货: {row[9]}")
print('-' * 120)
conn.close()

View File

@ -107,6 +107,10 @@
<span class="child-icon">📋</span>
<span>客户订单</span>
</a>
<a href="#/plan-mgmt/reconciliation" class="nav-child" data-route="plan-mgmt-reconciliation">
<span class="child-icon">💰</span>
<span>对账单</span>
</a>
</div>
</div>
</div>
@ -226,6 +230,7 @@
<script src="./js/components/work-order.js"></script>
<script src="./js/components/material-purchase.js"></script>
<script src="./js/components/customer-order.js"></script>
<script src="./js/components/reconciliation.js"></script>
<script src="./js/components/export.js"></script>
<script src="./js/components/settings.js"></script>
<script src="./js/components/notifications.js"></script>

View File

@ -0,0 +1,348 @@
// 客户订单管理
(() => {
Router.register('/plan-mgmt/customer-order', async () => {
// 先返回 HTML
const html = `
<div class="page-header">
<h1>客户订单管理</h1>
<div class="page-actions">
<button id="add-order-btn" class="btn btn-primary">新增订单</button>
</div>
</div>
<div class="card">
<div class="card-body">
<div class="table-container">
<table class="data-table">
<thead>
<tr>
<th>下单时间</th>
<th>订单编号</th>
<th>客户名称</th>
<th>物料</th>
<th>订单数量</th>
<th>单价</th>
<th>操作</th>
</tr>
</thead>
<tbody id="order-list">
<tr>
<td colspan="7" class="text-center">加载中...</td>
</tr>
</tbody>
</table>
</div>
</div>
</div>
<!-- 新增订单弹窗 -->
<div id="order-modal" class="modal" style="display:none;">
<div class="modal-content" style="max-width: 900px; max-height: 90vh; overflow-y: auto;">
<div class="modal-header">
<h2 id="modal-title">新增订单</h2>
<button class="modal-close" onclick="window.CustomerOrder.closeModal()">&times;</button>
</div>
<div class="modal-body">
<form id="order-form">
<!-- 订单基本信息 -->
<div style="background: var(--surface); padding: 20px; border-radius: 8px; margin-bottom: 20px; border: 1px solid var(--border);">
<h3 style="margin: 0 0 15px 0; font-size: 16px; color: var(--text); font-weight: 600;">订单基本信息</h3>
<div style="display: grid; grid-template-columns: 1fr 1fr 1fr; gap: 15px;">
<div class="field" style="margin: 0;">
<label>下单时间 <span style="color: var(--danger);">*</span></label>
<input type="date" id="order-date" class="input" required />
</div>
<div class="field" style="margin: 0;">
<label>订单编号 <span style="color: var(--danger);">*</span></label>
<input type="text" id="order-no" class="input" placeholder="如CGDD001695" required />
</div>
<div class="field" style="margin: 0;">
<label>客户名称 <span style="color: var(--danger);">*</span></label>
<input type="text" id="customer-name" class="input" placeholder="如:易泰勒" required />
</div>
</div>
</div>
<!-- 物料列表 -->
<div style="background: var(--surface); padding: 20px; border-radius: 8px; border: 1px solid var(--border);">
<div style="display: flex; justify-content: space-between; align-items: center; margin-bottom: 15px;">
<h3 style="margin: 0; font-size: 16px; color: var(--text); font-weight: 600;">物料清单</h3>
<button type="button" class="btn btn-primary" onclick="window.CustomerOrder.addMaterialRow()">
添加物料
</button>
</div>
<div id="materials-container">
<!-- 物料行将动态添加到这里 -->
</div>
</div>
</form>
</div>
<div class="modal-footer">
<button class="btn btn-secondary" onclick="window.CustomerOrder.closeModal()">取消</button>
<button class="btn btn-primary" onclick="window.CustomerOrder.saveOrder()">保存订单</button>
</div>
</div>
</div>
`;
// DOM 渲染后初始化
setTimeout(() => {
const addBtn = document.getElementById('add-order-btn');
if (addBtn) {
addBtn.addEventListener('click', () => {
openModal();
});
}
loadOrders();
}, 100);
return html;
});
async function loadOrders() {
try {
console.log('开始加载订单列表...');
const res = await fetch('/api/customer-orders');
console.log('API响应状态:', res.status);
const data = await res.json();
console.log('订单数据:', data);
const tbody = document.getElementById('order-list');
if (!tbody) {
console.error('找不到 order-list 元素');
return;
}
if (!data.list || data.list.length === 0) {
tbody.innerHTML = '<tr><td colspan="7" class="text-center">暂无数据</td></tr>';
return;
}
tbody.innerHTML = data.list.map(order => `
<tr>
<td>${order.order_date || '—'}</td>
<td>${order.order_no || '—'}</td>
<td>${order.customer_name || '—'}</td>
<td style="white-space: pre-wrap;">${order.material || '—'}</td>
<td>${order.quantity || 0}</td>
<td>${order.unit_price || 0}</td>
<td>
<button class="btn-text btn-danger" onclick="window.CustomerOrder.deleteOrder(${order.id})">删除</button>
</td>
</tr>
`).join('');
console.log('订单列表加载完成');
} catch (err) {
console.error('加载订单失败:', err);
const tbody = document.getElementById('order-list');
if (tbody) {
tbody.innerHTML = '<tr><td colspan="7" class="text-center" style="color: red;">加载失败,请刷新重试</td></tr>';
}
API.toast('加载订单失败', 'error');
}
}
let materialRowIndex = 0;
function addMaterialRow(material = '', quantity = '', unitPrice = '') {
const container = document.getElementById('materials-container');
const index = materialRowIndex++;
const row = document.createElement('div');
row.className = 'material-row';
row.id = `material-row-${index}`;
row.style.cssText = 'display: grid; grid-template-columns: 2fr 1fr 1fr auto; gap: 12px; margin-bottom: 12px; padding: 16px; background: var(--surface-2); border-radius: 8px; border: 1px solid var(--border);';
row.innerHTML = `
<div class="field" style="margin: 0;">
<label>物料名称 <span style="color: var(--danger);">*</span></label>
<input type="text" class="input material-name" placeholder="如ETAP05 基站-5.0" value="${material}" required />
</div>
<div class="field" style="margin: 0;">
<label>数量 <span style="color: var(--danger);">*</span></label>
<input type="number" class="input material-quantity" placeholder="数量" value="${quantity}" min="1" required />
</div>
<div class="field" style="margin: 0;">
<label>单价 <span style="color: var(--danger);">*</span></label>
<input type="number" class="input material-price" placeholder="单价" value="${unitPrice}" step="0.01" min="0" required />
</div>
<div style="display: flex; align-items: flex-end; padding-bottom: 2px;">
<button type="button" class="btn btn-danger" onclick="window.CustomerOrder.removeMaterialRow(${index})" style="white-space: nowrap;">
🗑 删除
</button>
</div>
`;
container.appendChild(row);
}
function removeMaterialRow(index) {
const row = document.getElementById(`material-row-${index}`);
if (row) {
row.remove();
}
// 如果没有物料行了,至少保留一行
const container = document.getElementById('materials-container');
if (container.children.length === 0) {
addMaterialRow();
}
}
function openModal(order = null) {
const modal = document.getElementById('order-modal');
const title = document.getElementById('modal-title');
title.textContent = '新增订单';
document.getElementById('order-form').reset();
// 设置默认日期为今天
const today = new Date().toISOString().split('T')[0];
document.getElementById('order-date').value = today;
// 清空物料列表并添加一行
const container = document.getElementById('materials-container');
container.innerHTML = '';
materialRowIndex = 0;
addMaterialRow();
modal.style.display = 'flex';
}
function closeModal() {
const modal = document.getElementById('order-modal');
modal.style.display = 'none';
document.getElementById('order-form').reset();
// 清空物料列表
const container = document.getElementById('materials-container');
if (container) {
container.innerHTML = '';
}
materialRowIndex = 0;
}
async function saveOrder() {
const orderDate = document.getElementById('order-date').value.trim();
const orderNo = document.getElementById('order-no').value.trim();
const customerName = document.getElementById('customer-name').value.trim();
if (!orderDate || !orderNo || !customerName) {
API.toast('请填写订单基本信息', 'error');
return;
}
// 收集所有物料信息
const materials = [];
const materialRows = document.querySelectorAll('.material-row');
if (materialRows.length === 0) {
API.toast('请至少添加一个物料', 'error');
return;
}
for (const row of materialRows) {
const materialName = row.querySelector('.material-name').value.trim();
const quantity = parseInt(row.querySelector('.material-quantity').value);
const unitPrice = parseFloat(row.querySelector('.material-price').value);
if (!materialName || !quantity || isNaN(unitPrice)) {
API.toast('请填写所有物料信息', 'error');
return;
}
if (quantity <= 0) {
API.toast('物料数量必须大于0', 'error');
return;
}
if (unitPrice < 0) {
API.toast('单价不能为负数', 'error');
return;
}
materials.push({
material: materialName,
quantity: quantity,
unit_price: unitPrice
});
}
try {
// 为每个物料创建一条订单记录
let successCount = 0;
for (const mat of materials) {
const res = await fetch('/api/customer-orders', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
order_date: orderDate,
order_no: orderNo,
customer_name: customerName,
material: mat.material,
quantity: mat.quantity,
unit_price: mat.unit_price
})
});
const data = await res.json();
if (res.ok && data.ok) {
successCount++;
} else {
console.error('保存物料失败:', mat.material, data.error);
}
}
if (successCount === materials.length) {
API.toast(`订单保存成功,共 ${successCount} 个物料`, 'success');
closeModal();
await loadOrders();
} else if (successCount > 0) {
API.toast(`部分保存成功(${successCount}/${materials.length}`, 'warning');
closeModal();
await loadOrders();
} else {
API.toast('保存失败', 'error');
}
} catch (err) {
console.error('保存订单失败:', err);
API.toast('保存失败', 'error');
}
}
async function deleteOrder(id) {
if (!confirm('确定要删除这条订单吗?')) {
return;
}
try {
const res = await fetch(`/api/customer-orders/${id}`, {
method: 'DELETE'
});
const data = await res.json();
if (res.ok && data.ok) {
API.toast('删除成功', 'success');
await loadOrders();
} else {
API.toast(data.error || '删除失败', 'error');
}
} catch (err) {
console.error('删除订单失败:', err);
API.toast('删除失败', 'error');
}
}
// 暴露给全局
window.CustomerOrder = {
openModal,
closeModal,
saveOrder,
deleteOrder,
addMaterialRow,
removeMaterialRow
};
})();

View File

@ -0,0 +1,559 @@
// 对账单管理
(() => {
Router.register('/plan-mgmt/reconciliation', async () => {
const html = `
<div class="page-header">
<h1>对账单管理</h1>
<div class="page-actions">
<button id="batch-delete-btn" class="btn btn-danger" style="margin-right: 10px; display: none;">批量删除</button>
<button id="export-reconciliation-btn" class="btn btn-secondary" style="margin-right: 10px;">导出对账单</button>
<button id="upload-shipment-btn" class="btn btn-secondary" style="margin-right: 10px;">上传发货单</button>
<button id="add-reconciliation-btn" class="btn btn-primary">新增对账单</button>
<input type="file" id="shipment-file-input" accept=".xls,.xlsx,application/vnd.ms-excel,application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" style="display: none;" />
</div>
</div>
<div class="card">
<div class="card-body">
<div class="table-container" style="max-height: calc(100vh - 200px); overflow-y: auto; padding-bottom: 5px;">
<table class="data-table">
<thead>
<tr>
<th style="width: 40px;">
<input type="checkbox" id="select-all-reconciliation" style="cursor: pointer;" />
</th>
<th>序号</th>
<th>下单时间</th>
<th>合同编号</th>
<th>物料名称</th>
<th>规格型号</th>
<th>运输单号</th>
<th>数量</th>
<th>单位</th>
<th>含税单价</th>
<th>含税金额</th>
<th>交货日期</th>
<th>出货日期</th>
<th>操作</th>
</tr>
</thead>
<tbody id="reconciliation-list">
<tr>
<td colspan="14" class="text-center">加载中...</td>
</tr>
</tbody>
</table>
</div>
</div>
</div>
<div id="reconciliation-modal" class="modal" style="display:none;">
<div class="modal-content" style="max-width: 800px;">
<div class="modal-header">
<h2 id="modal-title">新增对账单</h2>
<button class="modal-close" onclick="window.Reconciliation.closeModal()">&times;</button>
</div>
<div class="modal-body">
<form id="reconciliation-form">
<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 15px;">
<div class="field">
<label>下单时间 <span style="color: var(--danger);">*</span></label>
<input type="date" id="order-date" class="input" required />
</div>
<div class="field">
<label>合同编号 <span style="color: var(--danger);">*</span></label>
<input type="text" id="contract-no" class="input" placeholder="如CGDD002876" required />
</div>
<div class="field">
<label>物料名称 <span style="color: var(--danger);">*</span></label>
<input type="text" id="material-name" class="input" placeholder="如:扩产-9988 红黑线" required />
</div>
<div class="field">
<label>规格型号 <span style="color: var(--danger);">*</span></label>
<input type="text" id="spec-model" class="input" placeholder="如PCXK0P0NSNT_1_2.54*1C14TE" required />
</div>
<div class="field">
<label>运输单号</label>
<input type="text" id="transport-no" class="input" placeholder="如:快递上门" />
</div>
<div class="field">
<label>数量 <span style="color: var(--danger);">*</span></label>
<input type="number" id="quantity" class="input" placeholder="数量" min="1" required />
</div>
<div class="field">
<label>单位 <span style="color: var(--danger);">*</span></label>
<input type="text" id="unit" class="input" placeholder="如pcs" value="pcs" required />
</div>
<div class="field">
<label>含税单价 <span style="color: var(--danger);">*</span></label>
<input type="number" id="unit-price" class="input" placeholder="单价" step="0.1" min="0" required />
</div>
<div class="field">
<label>含税金额</label>
<input type="number" id="total-amount" class="input" placeholder="自动计算" step="0.1" readonly />
</div>
<div class="field">
<label>交货日期</label>
<input type="date" id="delivery-date" class="input" />
</div>
<div class="field">
<label>出货日期</label>
<input type="date" id="shipment-date" class="input" />
</div>
</div>
</form>
</div>
<div class="modal-footer">
<button class="btn btn-secondary" onclick="window.Reconciliation.closeModal()">取消</button>
<button class="btn btn-primary" onclick="window.Reconciliation.saveReconciliation()">保存</button>
</div>
</div>
</div>
`;
setTimeout(() => {
const addBtn = document.getElementById('add-reconciliation-btn');
if (addBtn) {
addBtn.addEventListener('click', () => {
openModal();
});
}
// 全选/取消全选
const selectAllCheckbox = document.getElementById('select-all-reconciliation');
if (selectAllCheckbox) {
selectAllCheckbox.addEventListener('change', (e) => {
const checkboxes = document.querySelectorAll('.reconciliation-checkbox');
checkboxes.forEach(cb => cb.checked = e.target.checked);
updateBatchDeleteButton();
});
}
// 批量删除按钮
const batchDeleteBtn = document.getElementById('batch-delete-btn');
if (batchDeleteBtn) {
batchDeleteBtn.addEventListener('click', async () => {
const selectedIds = getSelectedReconciliationIds();
if (selectedIds.length === 0) {
API.toast('请选择要删除的对账单', 'warning');
return;
}
if (!confirm(`确定要删除选中的 ${selectedIds.length} 条对账单吗?`)) {
return;
}
await batchDeleteReconciliations(selectedIds);
});
}
const exportBtn = document.getElementById('export-reconciliation-btn');
if (exportBtn) {
exportBtn.addEventListener('click', async () => {
try {
API.toast('正在导出对账单...', 'info');
const res = await fetch('/api/reconciliations/export');
if (!res.ok) {
const data = await res.json();
API.toast(data.error || '导出失败', 'error');
return;
}
// 下载文件
const blob = await res.blob();
const url = window.URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
// 从响应头获取文件名,如果没有则使用默认名称
const contentDisposition = res.headers.get('Content-Disposition');
let filename = '对账单.xlsx';
if (contentDisposition) {
const matches = /filename[^;=\n]*=((['"]).*?\2|[^;\n]*)/.exec(contentDisposition);
if (matches != null && matches[1]) {
filename = matches[1].replace(/['"]/g, '');
// 解码 URL 编码的文件名
filename = decodeURIComponent(filename);
}
}
a.download = filename;
document.body.appendChild(a);
a.click();
window.URL.revokeObjectURL(url);
document.body.removeChild(a);
API.toast('导出成功', 'success');
} catch (err) {
console.error('导出对账单失败:', err);
API.toast('导出失败', 'error');
}
});
}
const uploadBtn = document.getElementById('upload-shipment-btn');
const fileInput = document.getElementById('shipment-file-input');
if (uploadBtn && fileInput) {
uploadBtn.addEventListener('click', () => {
fileInput.click();
});
fileInput.addEventListener('change', async (e) => {
const file = e.target.files[0];
if (!file) return;
// 验证文件类型
const fileName = file.name.toLowerCase();
if (!fileName.endsWith('.xls') && !fileName.endsWith('.xlsx')) {
API.toast('请上传 XLS 或 XLSX 格式的发货单', 'error');
fileInput.value = '';
return;
}
// 显示上传中提示
API.toast('正在解析发货单...', 'info');
const formData = new FormData();
formData.append('file', file);
try {
const res = await fetch('/api/reconciliations/upload-shipment', {
method: 'POST',
body: formData
});
const data = await res.json();
if (res.ok && data.ok) {
let message = data.message;
if (data.errors && data.errors.length > 0) {
message += '\n\n错误详情\n' + data.errors.join('\n');
API.toast(message, 'warning');
} else {
API.toast(message, 'success');
}
await loadReconciliations();
} else {
API.toast(data.error || '上传失败', 'error');
}
} catch (err) {
console.error('上传发货单失败:', err);
API.toast('上传失败', 'error');
} finally {
fileInput.value = '';
}
});
}
const quantityInput = document.getElementById('quantity');
const unitPriceInput = document.getElementById('unit-price');
const totalAmountInput = document.getElementById('total-amount');
const calculateTotal = () => {
const qty = parseFloat(quantityInput.value) || 0;
const price = parseFloat(unitPriceInput.value) || 0;
const total = qty * price;
totalAmountInput.value = total.toFixed(1);
};
if (quantityInput) quantityInput.addEventListener('input', calculateTotal);
if (unitPriceInput) unitPriceInput.addEventListener('input', calculateTotal);
loadReconciliations();
}, 100);
return html;
});
async function loadReconciliations() {
try {
const res = await fetch('/api/reconciliations');
const data = await res.json();
const tbody = document.getElementById('reconciliation-list');
if (!tbody) return;
if (!data.list || data.list.length === 0) {
tbody.innerHTML = '<tr><td colspan="14" class="text-center">暂无数据</td></tr>';
return;
}
tbody.innerHTML = data.list.map((item, index) => {
// 格式化数字:去掉不必要的小数点和尾随零
const formatNumber = (num) => {
if (!num && num !== 0) return '—';
const n = parseFloat(num);
// 先四舍五入到2位小数避免浮点数精度问题
const rounded = Math.round(n * 100) / 100;
// 如果是整数,直接返回整数
if (Number.isInteger(rounded)) return rounded.toString();
// 否则返回字符串自动去掉尾随的0
return rounded.toString();
};
return `
<tr>
<td>
<input type="checkbox" class="reconciliation-checkbox" data-id="${item.id}" style="cursor: pointer;" onchange="window.Reconciliation.updateBatchDeleteButton()" />
</td>
<td>${index + 1}</td>
<td>${item.order_date || '—'}</td>
<td>${item.contract_no || '—'}</td>
<td>${item.material_name || '—'}</td>
<td>${item.spec_model || '—'}</td>
<td>${item.transport_no || '—'}</td>
<td>${item.quantity || 0}</td>
<td>${item.unit || '—'}</td>
<td>${formatNumber(item.unit_price)}</td>
<td>${formatNumber(item.total_amount)}</td>
<td>${item.delivery_date || '—'}</td>
<td>${item.shipment_date || '—'}</td>
<td>
<button class="btn-text" onclick="window.Reconciliation.editReconciliation(${item.id})">编辑</button>
<button class="btn-text btn-danger" onclick="window.Reconciliation.deleteReconciliation(${item.id})">删除</button>
</td>
</tr>
`;
}).join('');
// 重置全选状态
const selectAllCheckbox = document.getElementById('select-all-reconciliation');
if (selectAllCheckbox) {
selectAllCheckbox.checked = false;
}
updateBatchDeleteButton();
} catch (err) {
console.error('加载对账单失败:', err);
const tbody = document.getElementById('reconciliation-list');
if (tbody) {
tbody.innerHTML = '<tr><td colspan="14" class="text-center" style="color: red;">加载失败,请刷新重试</td></tr>';
}
API.toast('加载对账单失败', 'error');
}
}
let currentEditId = null;
// 日期格式转换YYYY/MM/DD -> YYYY-MM-DD (用于input[type=date])
function formatDateForInput(dateStr) {
if (!dateStr) return '';
return dateStr.replace(/\//g, '-');
}
function openModal(item = null) {
const modal = document.getElementById('reconciliation-modal');
const title = document.getElementById('modal-title');
if (item) {
// 编辑模式
title.textContent = '编辑对账单';
currentEditId = item.id;
document.getElementById('order-date').value = formatDateForInput(item.order_date) || '';
document.getElementById('contract-no').value = item.contract_no || '';
document.getElementById('material-name').value = item.material_name || '';
document.getElementById('spec-model').value = item.spec_model || '';
document.getElementById('transport-no').value = item.transport_no || '';
document.getElementById('quantity').value = item.quantity || '';
document.getElementById('unit').value = item.unit || 'pcs';
document.getElementById('unit-price').value = item.unit_price || '';
document.getElementById('total-amount').value = item.total_amount || '';
document.getElementById('delivery-date').value = formatDateForInput(item.delivery_date) || '';
document.getElementById('shipment-date').value = formatDateForInput(item.shipment_date) || '';
} else {
// 新增模式
title.textContent = '新增对账单';
currentEditId = null;
document.getElementById('reconciliation-form').reset();
const today = new Date().toISOString().split('T')[0];
document.getElementById('order-date').value = today;
document.getElementById('unit').value = 'pcs';
}
modal.style.display = 'flex';
}
function closeModal() {
const modal = document.getElementById('reconciliation-modal');
modal.style.display = 'none';
document.getElementById('reconciliation-form').reset();
currentEditId = null;
}
async function saveReconciliation() {
const orderDate = document.getElementById('order-date').value.trim();
const contractNo = document.getElementById('contract-no').value.trim();
const materialName = document.getElementById('material-name').value.trim();
const specModel = document.getElementById('spec-model').value.trim();
const transportNo = document.getElementById('transport-no').value.trim();
const quantity = parseInt(document.getElementById('quantity').value);
const unit = document.getElementById('unit').value.trim();
const unitPrice = parseFloat(document.getElementById('unit-price').value);
const totalAmount = parseFloat(document.getElementById('total-amount').value);
const deliveryDate = document.getElementById('delivery-date').value.trim();
const shipmentDate = document.getElementById('shipment-date').value.trim();
if (!orderDate || !contractNo || !materialName || !specModel || !quantity || !unit || isNaN(unitPrice)) {
API.toast('请填写必填项', 'error');
return;
}
if (quantity <= 0) {
API.toast('数量必须大于0', 'error');
return;
}
if (unitPrice < 0) {
API.toast('单价不能为负数', 'error');
return;
}
const payload = {
order_date: orderDate,
contract_no: contractNo,
material_name: materialName,
spec_model: specModel,
transport_no: transportNo,
quantity: quantity,
unit: unit,
unit_price: unitPrice,
total_amount: totalAmount,
delivery_date: deliveryDate || null,
shipment_date: shipmentDate || null
};
try {
const url = currentEditId ? `/api/reconciliations/${currentEditId}` : '/api/reconciliations';
const method = currentEditId ? 'PUT' : 'POST';
const res = await fetch(url, {
method: method,
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload)
});
const data = await res.json();
if (res.ok && data.ok) {
API.toast(currentEditId ? '更新成功' : '保存成功', 'success');
closeModal();
await loadReconciliations();
} else {
API.toast(data.error || '保存失败', 'error');
}
} catch (err) {
console.error('保存对账单失败:', err);
API.toast('保存失败', 'error');
}
}
async function editReconciliation(id) {
try {
const res = await fetch('/api/reconciliations');
const data = await res.json();
const item = data.list.find(r => r.id === id);
if (!item) {
API.toast('对账单不存在', 'error');
return;
}
openModal(item);
} catch (err) {
console.error('加载对账单失败:', err);
API.toast('加载失败', 'error');
}
}
async function deleteReconciliation(id) {
if (!confirm('确定要删除这条对账单吗?')) {
return;
}
try {
const res = await fetch(`/api/reconciliations/${id}`, {
method: 'DELETE'
});
const data = await res.json();
if (res.ok && data.ok) {
API.toast('删除成功', 'success');
await loadReconciliations();
} else {
API.toast(data.error || '删除失败', 'error');
}
} catch (err) {
console.error('删除对账单失败:', err);
API.toast('删除失败', 'error');
}
}
// 获取选中的对账单ID列表
function getSelectedReconciliationIds() {
const checkboxes = document.querySelectorAll('.reconciliation-checkbox:checked');
return Array.from(checkboxes).map(cb => parseInt(cb.dataset.id));
}
// 更新批量删除按钮的显示状态
function updateBatchDeleteButton() {
const selectedIds = getSelectedReconciliationIds();
const batchDeleteBtn = document.getElementById('batch-delete-btn');
if (batchDeleteBtn) {
if (selectedIds.length > 0) {
batchDeleteBtn.style.display = 'inline-block';
batchDeleteBtn.textContent = `批量删除 (${selectedIds.length})`;
} else {
batchDeleteBtn.style.display = 'none';
}
}
// 更新全选框状态
const selectAllCheckbox = document.getElementById('select-all-reconciliation');
const allCheckboxes = document.querySelectorAll('.reconciliation-checkbox');
if (selectAllCheckbox && allCheckboxes.length > 0) {
const allChecked = Array.from(allCheckboxes).every(cb => cb.checked);
const someChecked = Array.from(allCheckboxes).some(cb => cb.checked);
selectAllCheckbox.checked = allChecked;
selectAllCheckbox.indeterminate = someChecked && !allChecked;
}
}
// 批量删除对账单
async function batchDeleteReconciliations(ids) {
try {
API.toast('正在删除...', 'info');
const res = await fetch('/api/reconciliations/batch-delete', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ ids })
});
const data = await res.json();
if (res.ok && data.ok) {
API.toast(`成功删除 ${ids.length} 条对账单`, 'success');
await loadReconciliations();
} else {
API.toast(data.error || '批量删除失败', 'error');
}
} catch (err) {
console.error('批量删除对账单失败:', err);
API.toast('批量删除失败', 'error');
}
}
window.Reconciliation = {
openModal,
closeModal,
saveReconciliation,
editReconciliation,
deleteReconciliation,
updateBatchDeleteButton
};
})();

View File

@ -86,6 +86,7 @@ const Router = (() => {
'plan-mgmt': '计划管理',
'material-purchase': '物料清单-采购',
'customer-order': '客户订单',
'reconciliation': '对账单',
export: '导出',
settings: '设置'
};

View File

@ -0,0 +1,144 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""从Excel导入对账单数据"""
import pandas as pd
import sqlite3
import os
from datetime import datetime, timezone, timedelta
# 数据库路径
DB_PATH = os.path.join(os.path.dirname(__file__), 'server', 'data.db')
EXCEL_FILE = '25年11月份对账单-易泰勒.xlsx'
def get_beijing_time():
"""获取北京时间UTC+8的ISO格式字符串"""
beijing_tz = timezone(timedelta(hours=8))
return datetime.now(beijing_tz).isoformat()
def import_from_excel():
"""从Excel导入对账单数据"""
# 读取Excel文件
df = pd.read_excel(EXCEL_FILE)
# 打印前20行查看结构
print("Excel文件结构:")
print("=" * 80)
for i in range(min(20, len(df))):
print(f"{i}行: {df.iloc[i].tolist()}")
print("=" * 80)
# 查找表头行(包含"序号"的行)
header_row = None
for i in range(len(df)):
row_values = df.iloc[i].tolist()
if any(str(val).strip() == '序号' for val in row_values if pd.notna(val)):
header_row = i
print(f"\n找到表头行: 第{i}")
print(f"表头内容: {row_values}")
break
if header_row is None:
print("❌ 未找到表头行(包含'序号'的行)")
return
# 重新读取,跳过前面的行,使用找到的行作为表头
df = pd.read_excel(EXCEL_FILE, skiprows=header_row)
# 设置第一行为列名
df.columns = df.iloc[0]
df = df[1:] # 删除第一行(已经作为列名)
df = df.reset_index(drop=True)
print(f"\n数据形状: {df.shape}")
print(f"列名: {df.columns.tolist()}")
print(f"\n前5行数据:")
print(df.head())
# 连接数据库
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
now = get_beijing_time()
imported_count = 0
# 遍历数据行
for idx, row in df.iterrows():
# 跳过空行或无效行
if pd.isna(row.get('序号')):
continue
try:
# 提取数据
# 处理日期格式
def format_date(date_val):
if pd.isna(date_val):
return ''
if isinstance(date_val, datetime):
return date_val.strftime('%Y/%m/%d')
date_str = str(date_val).strip()
# 如果已经是 YYYY/MM/DD 格式,保持不变
if '/' in date_str:
return date_str
# 如果是 YYYY-MM-DD 格式,转换为 YYYY/MM/DD
if '-' in date_str:
return date_str.split()[0].replace('-', '/')
return date_str
order_date = format_date(row.get('下单时间'))
contract_no = str(row.get('合同编号', '')).strip() if pd.notna(row.get('合同编号')) else ''
material_name = str(row.get('物料名称', '')).strip() if pd.notna(row.get('物料名称')) else ''
spec_model = str(row.get('规格型号', '')).strip() if pd.notna(row.get('规格型号')) else ''
transport_no = str(row.get('运输单号', '')).strip() if pd.notna(row.get('运输单号')) else ''
quantity = int(row.get('数量', 0)) if pd.notna(row.get('数量')) else 0
unit = str(row.get('单位', 'pcs')).strip() if pd.notna(row.get('单位')) else 'pcs'
unit_price = float(row.get('含税单价', 0)) if pd.notna(row.get('含税单价')) else 0.0
total_amount = float(row.get('含税金额', 0)) if pd.notna(row.get('含税金额')) else 0.0
delivery_date = format_date(row.get('交货日期'))
shipment_date = format_date(row.get('出货日期'))
# 验证必填字段
if not all([order_date, contract_no, material_name, spec_model, quantity, unit, unit_price]):
print(f"⚠️ 跳过第{idx+1}行: 缺少必填字段")
continue
# 插入数据库
c.execute('''
INSERT INTO reconciliations(
order_date, contract_no, material_name, spec_model, transport_no,
quantity, unit, unit_price, total_amount, delivery_date, shipment_date,
created_by, created_at, updated_at
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)
''', (
order_date,
contract_no,
material_name,
spec_model,
transport_no,
quantity,
unit,
unit_price,
total_amount,
delivery_date,
shipment_date,
'admin',
now,
now
))
imported_count += 1
print(f"✅ 导入第{idx+1}行: {contract_no} - {material_name}")
except Exception as e:
print(f"❌ 导入第{idx+1}行失败: {e}")
continue
conn.commit()
conn.close()
print(f"\n{'='*80}")
print(f"✅ 成功导入 {imported_count} 条对账单数据")
print(f"{'='*80}")
if __name__ == '__main__':
import_from_excel()

99
init_customer_orders.py Normal file
View File

@ -0,0 +1,99 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
初始化客户订单数据
根据图片中的数据填充客户订单表
"""
import sqlite3
import os
from datetime import datetime, timezone, timedelta
# 数据库路径
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DB_PATH = os.path.join(BASE_DIR, 'server', 'data.db')
def get_beijing_time():
"""获取北京时间UTC+8的ISO格式字符串"""
beijing_tz = timezone(timedelta(hours=8))
return datetime.now(beijing_tz).isoformat()
# 从图片中提取的订单数据 - 客户:易泰勒
orders_data = [
# 2025/4/28 - CGDD001695
{'order_date': '2025-04-28', 'order_no': 'CGDD001695', 'customer_name': '易泰勒', 'material': 'ETAP05\n基站-5.0\nETAP05', 'quantity': 950, 'unit_price': 315.19},
{'order_date': '2025-04-28', 'order_no': 'CGDD001695', 'customer_name': '易泰勒', 'material': 'WD1MK0SMD0551\n蓝牙模块\nPCBCOMPONENT_1_-_DUPLICATE', 'quantity': 950, 'unit_price': 1.3},
{'order_date': '2025-04-28', 'order_no': 'CGDD001695', 'customer_name': '易泰勒', 'material': 'WA0000000040\nPCBA', 'quantity': 4750, 'unit_price': 8.05},
{'order_date': '2025-04-28', 'order_no': 'CGDD001695', 'customer_name': '易泰勒', 'material': 'CH6121-MODULE-V14', 'quantity': 4750, 'unit_price': 8.05},
# 2025/9/4 - CGDD002429
{'order_date': '2025-09-04', 'order_no': 'CGDD002429', 'customer_name': '易泰勒', 'material': 'ETAP05\n基站-5.0\nETAP05', 'quantity': 1500, 'unit_price': 315.19},
{'order_date': '2025-09-04', 'order_no': 'CGDD002429', 'customer_name': '易泰勒', 'material': 'WD1MK0SMD0551\n蓝牙模块\nPCBCOMPONENT_1_-_DUPLICATE', 'quantity': 1500, 'unit_price': 1.3},
{'order_date': '2025-09-04', 'order_no': 'CGDD002429', 'customer_name': '易泰勒', 'material': 'WA0000000040\nPCBA', 'quantity': 7500, 'unit_price': 8.05},
{'order_date': '2025-09-04', 'order_no': 'CGDD002429', 'customer_name': '易泰勒', 'material': 'CH6121-MODULE-V14', 'quantity': 7500, 'unit_price': 8.05},
{'order_date': '2025-09-04', 'order_no': 'CGDD002429', 'customer_name': '易泰勒', 'material': 'AP-ET010\n基站-5.0\nETAP05', 'quantity': 500, 'unit_price': 315.19},
{'order_date': '2025-09-04', 'order_no': 'CGDD002429', 'customer_name': '易泰勒', 'material': 'WD1MK0SMD0551\n蓝牙模块\nPCBCOMPONENT_1_-_DUPLICATE', 'quantity': 500, 'unit_price': 1.3},
{'order_date': '2025-09-04', 'order_no': 'CGDD002429', 'customer_name': '易泰勒', 'material': 'WA0000000040\nPCBA\nCH6121-MODULE-V14', 'quantity': 2500, 'unit_price': 8.05},
# 2025/10/23 - CGDD002878
{'order_date': '2025-10-23', 'order_no': 'CGDD002878', 'customer_name': '易泰勒', 'material': 'AP-DZ006\n智能灯条基站\nETAP05-D1', 'quantity': 4000, 'unit_price': 239.2},
{'order_date': '2025-10-23', 'order_no': 'CGDD002878', 'customer_name': '易泰勒', 'material': 'WD1MK0SMD0551\n蓝牙模块\nPCBCOMPONENT_1_-_DUPLICATE', 'quantity': 12000, 'unit_price': 1.1},
# 2025/11/13 - CGDD003037
{'order_date': '2025-11-13', 'order_no': 'CGDD003037', 'customer_name': '易泰勒', 'material': 'AP-DZ009\n智能灯条基站\nETAP05-D1', 'quantity': 500, 'unit_price': 229.61},
{'order_date': '2025-11-13', 'order_no': 'CGDD003037', 'customer_name': '易泰勒', 'material': 'WD1MK0SMD0551\n蓝牙模块\nPCBCOMPONENT_1_-_DUPLICAT', 'quantity': 1500, 'unit_price': 1.1},
{'order_date': '2025-11-13', 'order_no': 'CGDD003037', 'customer_name': '易泰勒', 'material': 'AP-DZ006\n智能灯条基站\nETAP05-D1', 'quantity': 4000, 'unit_price': 239.2},
{'order_date': '2025-11-13', 'order_no': 'CGDD003037', 'customer_name': '易泰勒', 'material': 'WD1MK0SMD0551\n蓝牙模块\nPCBCOMPONENT_1_-_DUPLICATE', 'quantity': 12000, 'unit_price': 1.1},
]
def init_orders():
"""初始化客户订单数据"""
if not os.path.exists(DB_PATH):
print(f"错误: 数据库文件不存在: {DB_PATH}")
return
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
# 检查表是否存在
c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='customer_orders'")
if not c.fetchone():
print("错误: customer_orders 表不存在,请先运行服务器以创建表")
conn.close()
return
# 清空现有数据(可选)
c.execute('DELETE FROM customer_orders')
print("已清空现有订单数据")
# 插入新数据
now = get_beijing_time()
inserted_count = 0
for order in orders_data:
try:
c.execute('''INSERT INTO customer_orders(
order_date, order_no, customer_name, material, quantity, unit_price,
created_by, created_at, updated_at
) VALUES(?,?,?,?,?,?,?,?,?)''', (
order['order_date'],
order['order_no'],
order['customer_name'],
order['material'],
order['quantity'],
order['unit_price'],
'admin', # 创建者
now,
now
))
inserted_count += 1
except Exception as e:
print(f"插入订单失败: {order['order_no']} - {e}")
conn.commit()
conn.close()
print(f"成功插入 {inserted_count} 条订单数据")
if __name__ == '__main__':
init_orders()

124
init_reconciliations.py Normal file
View File

@ -0,0 +1,124 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""初始化对账单示例数据"""
import sqlite3
import os
from datetime import datetime, timezone, timedelta
# 数据库路径
DB_PATH = os.path.join(os.path.dirname(__file__), 'server', 'data.db')
def get_beijing_time():
"""获取北京时间UTC+8的ISO格式字符串"""
beijing_tz = timezone(timedelta(hours=8))
return datetime.now(beijing_tz).isoformat()
def init_reconciliations():
"""初始化对账单示例数据"""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
# 示例数据(根据图片中的数据)
sample_data = [
{
'order_date': '2025/10/31',
'contract_no': 'CGDD002876',
'material_name': '扩产-9988 红黑线',
'spec_model': 'PCXK0P0NSNT_1_2.54*1C14TE',
'transport_no': '快递上门',
'quantity': 45,
'unit': 'pcs',
'unit_price': 239.2,
'total_amount': 10764,
'delivery_date': '2025/11/3',
'shipment_date': '2025/11/3'
},
{
'order_date': '2025/9/20',
'contract_no': 'CGDD004562',
'material_name': '扩产-9988 红黑线',
'spec_model': 'M1H0EM0N511 PCXK0P0NSNT_1_2.54*1C14TE',
'transport_no': '快递上门',
'quantity': 355,
'unit': 'pcs',
'unit_price': 1.1,
'total_amount': 390.5,
'delivery_date': '2025/11/3',
'shipment_date': '2025/11/3'
},
{
'order_date': '2025/9/20',
'contract_no': 'CGDD004562',
'material_name': '扩产-9988 红黑线',
'spec_model': 'ETAP05-01',
'transport_no': '快递上门',
'quantity': 2,
'unit': 'pcs',
'unit_price': 245.46,
'total_amount': 490.92,
'delivery_date': '2025/11/3',
'shipment_date': '2025/11/3'
},
{
'order_date': '2025/9/20',
'contract_no': 'CGDD004562',
'material_name': 'M1H0EM0N511 红黑线',
'spec_model': 'PCXK0P0NSNT_1_2.54*1C14TE',
'transport_no': '快递上门',
'quantity': 6,
'unit': 'pcs',
'unit_price': 1.1,
'total_amount': 6.6,
'delivery_date': '2025/11/3',
'shipment_date': '2025/11/3'
},
{
'order_date': '2025/10/11',
'contract_no': 'CGDD002717',
'material_name': '扩产-9988 红黑线',
'spec_model': 'ETAP05-01',
'transport_no': '快递上门',
'quantity': 500,
'unit': 'pcs',
'unit_price': 228.45,
'total_amount': 114225,
'delivery_date': '2025/11/3',
'shipment_date': '2025/11/3'
}
]
now = get_beijing_time()
for data in sample_data:
c.execute('''
INSERT INTO reconciliations(
order_date, contract_no, material_name, spec_model, transport_no,
quantity, unit, unit_price, total_amount, delivery_date, shipment_date,
created_by, created_at, updated_at
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)
''', (
data['order_date'],
data['contract_no'],
data['material_name'],
data['spec_model'],
data['transport_no'],
data['quantity'],
data['unit'],
data['unit_price'],
data['total_amount'],
data['delivery_date'],
data['shipment_date'],
'admin',
now,
now
))
conn.commit()
count = len(sample_data)
conn.close()
print(f'✅ 成功初始化 {count} 条对账单示例数据')
if __name__ == '__main__':
init_reconciliations()

0
server/=2.0.0 Normal file
View File

View File

@ -200,6 +200,23 @@ def init_db():
created_at TEXT,
updated_at TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS reconciliations(
id INTEGER PRIMARY KEY AUTOINCREMENT,
order_date TEXT NOT NULL,
contract_no TEXT NOT NULL,
material_name TEXT NOT NULL,
spec_model TEXT NOT NULL,
transport_no TEXT,
quantity INTEGER NOT NULL,
unit TEXT NOT NULL,
unit_price REAL NOT NULL,
total_amount REAL NOT NULL,
delivery_date TEXT,
shipment_date TEXT,
created_by TEXT,
created_at TEXT,
updated_at TEXT
)''')
# 为已存在的表添加列(如果不存在)
try:
c.execute('ALTER TABLE customer_orders ADD COLUMN customer_name TEXT')
@ -394,6 +411,19 @@ def get_beijing_time():
beijing_tz = timezone(timedelta(hours=8))
return datetime.now(beijing_tz).isoformat()
def format_date_to_slash(date_str):
"""将日期格式统一转换为 YYYY/MM/DD 格式空值返回None"""
if not date_str or str(date_str).strip() == '':
return None
date_str = str(date_str).strip()
# 去掉时间部分
if ' ' in date_str:
date_str = date_str.split()[0]
# 将 YYYY-MM-DD 转换为 YYYY/MM/DD
if '-' in date_str:
return date_str.replace('-', '/')
return date_str
def parse_audit_line(s):
if not s:
return {'ts_cn': None, 'batch': None, 'mac': None, 'note': None}
@ -2039,21 +2069,32 @@ def validate_mac_file():
mac_col = 'MAC' if has_mac else 'SN_MAC'
return jsonify({'valid': True, 'message': f'文件格式正确,包含列:{mac_col} 和 批次号,共{data_rows}行数据'}), 200
else:
import openpyxl
wb = openpyxl.load_workbook(f)
ws = wb.active
# 使用pandas读取Excel文件支持.xlsx和.xls格式
import pandas as pd
import io
if ws.max_row < 2:
wb.close()
# 将文件流保存到BytesIO对象
file_content = f.stream.read()
f.stream.seek(0) # 重置流位置
file_io = io.BytesIO(file_content)
try:
# 根据扩展名选择引擎
if ext == 'xls':
df = pd.read_excel(file_io, engine='xlrd')
else:
df = pd.read_excel(file_io)
except Exception as e:
return jsonify({'valid': False, 'message': f'读取Excel文件失败{str(e)}'}), 200
if len(df) == 0:
return jsonify({'valid': False, 'message': '文件为空,没有数据'}), 200
if ws.max_column != 2:
wb.close()
return jsonify({'valid': False, 'message': f'文件应该只包含2列数据当前有{ws.max_column}'}), 200
if len(df.columns) != 2:
return jsonify({'valid': False, 'message': f'文件应该只包含2列数据当前有{len(df.columns)}'}), 200
# 检查表头
header_row = list(ws.iter_rows(min_row=1, max_row=1, values_only=True))[0]
header = [str(h).strip() if h else '' for h in header_row]
header = [str(h).strip() for h in df.columns]
# 记录表头用于调试
log('validate_mac_file', f'headers: {header}')
@ -2065,15 +2106,12 @@ def validate_mac_file():
has_batch = any('批次' in h or 'batch' in h for h in header_lower)
if not (has_mac or has_sn_mac):
wb.close()
return jsonify({'valid': False, 'message': f'缺少必需的列MAC 或 SN_MAC当前列{", ".join(header)}'}), 200
if not has_batch:
wb.close()
return jsonify({'valid': False, 'message': f'缺少必需的列:批次号(当前列:{", ".join(header)}'}), 200
data_rows = ws.max_row - 1
data_rows = len(df)
mac_col = 'MAC' if has_mac else 'SN_MAC'
wb.close()
return jsonify({'valid': True, 'message': f'文件格式正确,包含列:{mac_col} 和 批次号,共{data_rows}行数据'}), 200
except Exception as e:
@ -2101,18 +2139,33 @@ def upload_mac_file():
temp_dir = '/home/hyx/work/batch_import_xlsx'
os.makedirs(temp_dir, exist_ok=True)
# 根据类型确定文件名
if upload_type == 'yt':
temp_path = os.path.join(temp_dir, 'sn_test_yt.xlsx')
elif upload_type == 'pdd':
temp_path = os.path.join(temp_dir, 'sn_test_pdd.xlsx')
else:
temp_path = os.path.join(temp_dir, 'sn_test_tx.xlsx')
# 检测文件扩展名,保持原始格式
original_ext = '.xls' if name.lower().endswith('.xls') and not name.lower().endswith('.xlsx') else '.xlsx'
# 根据类型确定基础文件名
if upload_type == 'yt':
base_name = 'sn_test_yt'
elif upload_type == 'pdd':
base_name = 'sn_test_pdd'
else:
base_name = 'sn_test_tx'
# 删除旧文件(.xlsx 和 .xls 都删除,确保只保留最新的)
for old_ext in ['.xlsx', '.xls']:
old_path = os.path.join(temp_dir, f'{base_name}{old_ext}')
if os.path.exists(old_path):
try:
os.remove(old_path)
log('upload_mac_file', f'removed old file: {old_path}')
except Exception as e:
log('upload_mac_file_error', f'failed to remove old file: {e}')
# 保存新文件
temp_path = os.path.join(temp_dir, f'{base_name}{original_ext}')
f.save(temp_path)
# 调用batch_import.py脚本
script_path = '/home/hyx/work/生产管理系统/batch_import.py'
script_path = '/home/hyx/work/生产管理系统/test_py/batch_import.py'
python_path = '/home/hyx/work/.venv/bin/python'
try:
result = subprocess.run(
@ -3659,6 +3712,552 @@ def delete_customer_order(order_id):
return jsonify({'ok': True, 'message': '订单删除成功'})
# 对账单管理
@app.get('/api/reconciliations')
@require_login
def get_reconciliations():
"""获取对账单列表"""
conn = get_db()
c = conn.cursor()
c.execute('''
SELECT id, order_date, contract_no, material_name, spec_model, transport_no,
quantity, unit, unit_price, total_amount, delivery_date, shipment_date,
created_by, created_at, updated_at
FROM reconciliations
ORDER BY id ASC
''')
rows = [dict(r) for r in c.fetchall()]
conn.close()
return jsonify({'list': rows})
@app.post('/api/reconciliations')
@require_login
@require_any_role('admin', 'superadmin')
def create_reconciliation():
"""创建对账单"""
data = request.get_json() or {}
order_date = format_date_to_slash(data.get('order_date'))
contract_no = (data.get('contract_no') or '').strip()
material_name = (data.get('material_name') or '').strip()
spec_model = (data.get('spec_model') or '').strip()
transport_no = (data.get('transport_no') or '').strip()
quantity = data.get('quantity')
unit = (data.get('unit') or 'pcs').strip()
unit_price = data.get('unit_price')
total_amount = data.get('total_amount')
delivery_date = format_date_to_slash(data.get('delivery_date'))
shipment_date = format_date_to_slash(data.get('shipment_date'))
# 验证必填字段
if not all([order_date, contract_no, material_name, spec_model, quantity, unit, unit_price is not None]):
return jsonify({'error': '请填写所有必填字段'}), 400
try:
quantity = int(quantity)
unit_price = float(unit_price)
total_amount = float(total_amount) if total_amount else quantity * unit_price
except (ValueError, TypeError):
return jsonify({'error': '数量、单价或金额格式不正确'}), 400
if quantity <= 0:
return jsonify({'error': '数量必须大于0'}), 400
if unit_price < 0:
return jsonify({'error': '单价不能为负数'}), 400
conn = get_db()
c = conn.cursor()
now = get_beijing_time()
username = session.get('username', '')
try:
c.execute('''
INSERT INTO reconciliations(
order_date, contract_no, material_name, spec_model, transport_no,
quantity, unit, unit_price, total_amount, delivery_date, shipment_date,
created_by, created_at, updated_at
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)
''', (
order_date, contract_no, material_name, spec_model, transport_no,
quantity, unit, unit_price, total_amount, delivery_date, shipment_date,
username, now, now
))
conn.commit()
reconciliation_id = c.lastrowid
conn.close()
log('create_reconciliation', f'合同号: {contract_no}, 物料: {material_name}, 数量: {quantity}')
notify_superadmin('新增对账单', f'合同号: {contract_no}, 物料: {material_name}')
return jsonify({'ok': True, 'id': reconciliation_id, 'message': '对账单创建成功'})
except Exception as e:
conn.close()
return jsonify({'error': f'创建失败:{str(e)}'}), 500
@app.put('/api/reconciliations/<int:reconciliation_id>')
@require_login
@require_any_role('admin', 'superadmin')
def update_reconciliation(reconciliation_id):
"""更新对账单"""
data = request.get_json() or {}
order_date = format_date_to_slash(data.get('order_date'))
contract_no = (data.get('contract_no') or '').strip()
material_name = (data.get('material_name') or '').strip()
spec_model = (data.get('spec_model') or '').strip()
transport_no = (data.get('transport_no') or '').strip()
quantity = data.get('quantity')
unit = (data.get('unit') or 'pcs').strip()
unit_price = data.get('unit_price')
total_amount = data.get('total_amount')
delivery_date = format_date_to_slash(data.get('delivery_date'))
shipment_date = format_date_to_slash(data.get('shipment_date'))
# 验证必填字段
if not all([order_date, contract_no, material_name, spec_model, quantity, unit, unit_price is not None]):
return jsonify({'error': '请填写所有必填字段'}), 400
try:
quantity = int(quantity)
unit_price = float(unit_price)
total_amount = float(total_amount) if total_amount else quantity * unit_price
except (ValueError, TypeError):
return jsonify({'error': '数量、单价或金额格式不正确'}), 400
if quantity <= 0:
return jsonify({'error': '数量必须大于0'}), 400
if unit_price < 0:
return jsonify({'error': '单价不能为负数'}), 400
conn = get_db()
c = conn.cursor()
# 检查对账单是否存在
c.execute('SELECT id FROM reconciliations WHERE id=?', (reconciliation_id,))
if not c.fetchone():
conn.close()
return jsonify({'error': '对账单不存在'}), 404
now = get_beijing_time()
try:
c.execute('''
UPDATE reconciliations SET
order_date=?, contract_no=?, material_name=?, spec_model=?, transport_no=?,
quantity=?, unit=?, unit_price=?, total_amount=?, delivery_date=?, shipment_date=?,
updated_at=?
WHERE id=?
''', (
order_date, contract_no, material_name, spec_model, transport_no,
quantity, unit, unit_price, total_amount, delivery_date, shipment_date,
now, reconciliation_id
))
conn.commit()
conn.close()
log('update_reconciliation', f'对账单ID: {reconciliation_id}, 合同号: {contract_no}, 物料: {material_name}')
notify_superadmin('更新对账单', f'对账单ID: {reconciliation_id}, 合同号: {contract_no}')
return jsonify({'ok': True, 'message': '对账单更新成功'})
except Exception as e:
conn.close()
return jsonify({'error': f'更新失败:{str(e)}'}), 500
@app.delete('/api/reconciliations/<int:reconciliation_id>')
@require_login
@require_any_role('admin', 'superadmin')
def delete_reconciliation(reconciliation_id):
"""删除对账单"""
conn = get_db()
c = conn.cursor()
# 获取对账单信息用于日志
c.execute('SELECT contract_no, material_name FROM reconciliations WHERE id=?', (reconciliation_id,))
row = c.fetchone()
if not row:
conn.close()
return jsonify({'error': '对账单不存在'}), 404
contract_no = row['contract_no']
material_name = row['material_name']
c.execute('DELETE FROM reconciliations WHERE id=?', (reconciliation_id,))
conn.commit()
conn.close()
log('delete_reconciliation', f'对账单ID: {reconciliation_id}, 合同号: {contract_no}')
return jsonify({'ok': True, 'message': '对账单删除成功'})
@app.post('/api/reconciliations/batch-delete')
@require_login
@require_any_role('admin', 'superadmin')
def batch_delete_reconciliations():
"""批量删除对账单"""
data = request.get_json()
ids = data.get('ids', [])
if not ids or not isinstance(ids, list):
return jsonify({'error': '请提供要删除的对账单ID列表'}), 400
if len(ids) == 0:
return jsonify({'error': '请至少选择一条对账单'}), 400
conn = get_db()
c = conn.cursor()
try:
# 获取对账单信息用于日志
placeholders = ','.join('?' * len(ids))
c.execute(f'SELECT id, contract_no, material_name FROM reconciliations WHERE id IN ({placeholders})', ids)
rows = c.fetchall()
if len(rows) == 0:
conn.close()
return jsonify({'error': '未找到要删除的对账单'}), 404
# 批量删除
c.execute(f'DELETE FROM reconciliations WHERE id IN ({placeholders})', ids)
conn.commit()
# 记录日志
deleted_info = ', '.join([f"ID:{row['id']}({row['contract_no']})" for row in rows])
log('batch_delete_reconciliations', f'批量删除 {len(rows)} 条对账单: {deleted_info}')
conn.close()
return jsonify({'ok': True, 'message': f'成功删除 {len(rows)} 条对账单'})
except Exception as e:
conn.close()
print(f'批量删除对账单失败: {e}')
return jsonify({'error': f'批量删除失败: {str(e)}'}), 500
@app.get('/api/reconciliations/export')
@require_login
def export_reconciliations():
"""导出对账单为 xlsx 格式"""
try:
import pandas as pd
from io import BytesIO
from flask import send_file
conn = get_db()
c = conn.cursor()
c.execute('''
SELECT order_date, contract_no, material_name, spec_model, transport_no,
quantity, unit, unit_price, total_amount, delivery_date, shipment_date
FROM reconciliations
ORDER BY id ASC
''')
rows = c.fetchall()
conn.close()
if not rows:
return jsonify({'error': '暂无数据可导出'}), 400
# 转换为 DataFrame
data = []
for idx, row in enumerate(rows, start=1):
data.append({
'序号': idx,
'下单时间': row['order_date'] or '',
'合同编号': row['contract_no'] or '',
'物料名称': row['material_name'] or '',
'规格型号': row['spec_model'] or '',
'运输单号': row['transport_no'] or '',
'数量': row['quantity'] or 0,
'单位': row['unit'] or '',
'含税单价': row['unit_price'] or 0,
'含税金额': row['total_amount'] or 0,
'交货日期': row['delivery_date'] or '',
'出货日期': row['shipment_date'] or ''
})
df = pd.DataFrame(data)
# 创建 Excel 文件
output = BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
df.to_excel(writer, index=False, sheet_name='对账单')
# 获取工作表并设置列宽和行高
worksheet = writer.sheets['对账单']
# 导入样式
from openpyxl.styles import Alignment, Font
# 设置列宽按指定宽度Excel列宽需要稍微增加以达到实际显示效果
column_widths = {
'序号': 8.7,
'下单时间': 13.5,
'合同编号': 14.2,
'物料名称': 14,
'规格型号': 25,
'运输单号': 32,
'数量': 16.7,
'单位': 6.5,
'含税单价': 9.5,
'含税金额': 8.8,
'交货日期': 13.2,
'出货日期': 11.7
}
for idx, col in enumerate(df.columns):
col_letter = chr(65 + idx) # A, B, C, ...
if col in column_widths:
worksheet.column_dimensions[col_letter].width = column_widths[col]
else:
worksheet.column_dimensions[col_letter].width = 15 # 默认宽度
# 设置所有行的行高为39并设置单元格居中对齐、宋体字体和自动换行
for row_idx in range(1, len(df) + 2): # +2 因为包含表头行且从1开始
worksheet.row_dimensions[row_idx].height = 39
# 设置该行所有单元格居中对齐、宋体字体和自动换行
for col_idx in range(1, len(df.columns) + 1):
cell = worksheet.cell(row=row_idx, column=col_idx)
cell.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
cell.font = Font(name='宋体', size=11)
output.seek(0)
# 生成文件名(包含当前日期)
from datetime import datetime
filename = f'对账单_{datetime.now().strftime("%Y%m%d_%H%M%S")}.xlsx'
log('export_reconciliations', f'导出对账单,共 {len(rows)} 条记录')
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=filename
)
except Exception as e:
log('export_reconciliations_error', str(e))
return jsonify({'error': f'导出失败:{str(e)}'}), 500
@app.post('/api/reconciliations/upload-shipment')
@require_login
@require_any_role('admin', 'superadmin')
def upload_shipment():
"""上传发货单并解析生成对账单"""
if 'file' not in request.files:
return jsonify({'error': '未选择文件'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': '未选择文件'}), 400
# 验证文件类型
allowed_extensions = {'xls', 'xlsx'}
if '.' not in file.filename:
return jsonify({'error': '无效的文件格式'}), 400
ext = file.filename.rsplit('.', 1)[1].lower()
if ext not in allowed_extensions:
return jsonify({'error': '不支持的文件格式,请上传 XLS 或 XLSX 格式'}), 400
try:
import pandas as pd
import numpy as np
from io import BytesIO
# 读取Excel文件
file_content = file.read()
df = pd.read_excel(BytesIO(file_content), header=None)
# 日期格式化函数:统一转换为 YYYY/MM/DD 格式
def format_date(date_val):
if pd.isna(date_val):
return ''
if isinstance(date_val, pd.Timestamp):
return date_val.strftime('%Y/%m/%d')
date_str = str(date_val).strip()
# 去掉时间部分
if ' ' in date_str:
date_str = date_str.split()[0]
# 将 YYYY-MM-DD 转换为 YYYY/MM/DD
if '-' in date_str:
return date_str.replace('-', '/')
return date_str
# 提取头部信息
shipment_date = None
transport_method = None
# 解析发货日期第1行索引2
if len(df) > 1 and len(df.columns) > 2:
shipment_date_raw = df.iloc[1, 2]
if pd.notna(shipment_date_raw):
shipment_date = format_date(shipment_date_raw)
# 解析供货方式第2行索引2
if len(df) > 2 and len(df.columns) > 2:
transport_method_raw = df.iloc[2, 2]
if pd.notna(transport_method_raw):
transport_method = str(transport_method_raw)
# 找到表格数据的起始行(序号、采购单号、物料编码...
header_row = None
for i in range(len(df)):
if df.iloc[i, 0] == '序号':
header_row = i
break
if header_row is None:
return jsonify({'error': '无法识别发货单格式,未找到表格头部'}), 400
# 从表格起始行读取数据
data_df = pd.read_excel(BytesIO(file_content), header=header_row)
# 过滤掉合计行和备注行(只保留序号为数字的行)
valid_data = data_df[data_df['序号'].apply(lambda x: isinstance(x, (int, float)) and not pd.isna(x))]
if len(valid_data) == 0:
return jsonify({'error': '发货单中没有有效的数据行'}), 400
# 获取客户订单数据用于查找单价和下单时间
conn = get_db()
c = conn.cursor()
c.execute('SELECT order_no, order_date, material, unit_price FROM customer_orders')
customer_orders_list = c.fetchall()
# 构建字典(支持一个订单号对应多个物料)
customer_orders = {}
for row in customer_orders_list:
order_no = row['order_no']
if order_no not in customer_orders:
customer_orders[order_no] = []
customer_orders[order_no].append({
'order_date': row['order_date'],
'material': row['material'],
'unit_price': row['unit_price']
})
# 解析每一行数据并插入对账单
now = get_beijing_time()
username = session.get('username', '')
success_count = 0
error_rows = []
for idx, row in valid_data.iterrows():
try:
# 提取数据
contract_no = row.get('采购单号')
if pd.isna(contract_no):
# 如果采购单号为空,尝试使用上一行的采购单号
if success_count > 0:
contract_no = last_contract_no
else:
error_rows.append(f"{int(row['序号'])}行:采购单号为空")
continue
else:
contract_no = str(contract_no).strip()
last_contract_no = contract_no
material_code = row.get('物料编码')
if pd.isna(material_code):
error_rows.append(f"{int(row['序号'])}行:物料编码为空")
continue
material_code = str(material_code).strip().replace('\n', ' ')
spec_model = row.get('规格型号')
if pd.isna(spec_model):
spec_model = ''
else:
spec_model = str(spec_model).strip()
quantity = row.get('实送数量')
if pd.isna(quantity):
error_rows.append(f"{int(row['序号'])}行:实送数量为空")
continue
quantity = int(float(quantity))
# 单位统一设置为 pcs
unit = 'pcs'
# 使用发货单头部的供货方式作为运输单号(统一)
transport_no = transport_method or ''
# 从客户订单中查找单价和下单时间
unit_price = 0
order_date = shipment_date or ''
# 提取物料编码的第一部分(去掉换行符后的内容)
material_code_key = material_code.split('\n')[0].split()[0].strip() if material_code else ''
# 遍历客户订单查找匹配的物料
if contract_no in customer_orders:
for order_info in customer_orders[contract_no]:
# 提取订单中的物料编码(第一部分)
order_material = order_info['material'].split('\n')[0].split()[0].strip()
# 匹配物料编码
if material_code_key and order_material and material_code_key in order_material:
unit_price = order_info['unit_price']
# 格式化下单时间为 YYYY/MM/DD 格式
order_date = format_date(order_info['order_date']) or shipment_date
break
# 如果未找到匹配,检查是否是飞机盒,设置默认单价
if unit_price == 0 and '飞机盒' in material_code:
unit_price = 2
# 计算含税金额
total_amount = quantity * unit_price
# 插入对账单
c.execute('''
INSERT INTO reconciliations(
order_date, contract_no, material_name, spec_model, transport_no,
quantity, unit, unit_price, total_amount, delivery_date, shipment_date,
created_by, created_at, updated_at
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)
''', (
order_date, contract_no, material_code, spec_model, transport_no,
quantity, unit, unit_price, total_amount, shipment_date, shipment_date,
username, now, now
))
success_count += 1
except Exception as e:
error_rows.append(f"{int(row['序号'])}行:{str(e)}")
continue
conn.commit()
conn.close()
log('upload_shipment', f'上传发货单,成功导入 {success_count} 条记录')
notify_superadmin('上传发货单', f'成功导入 {success_count} 条对账单记录')
result = {
'ok': True,
'success_count': success_count,
'message': f'成功导入 {success_count} 条对账单记录'
}
if error_rows:
result['errors'] = error_rows
result['message'] += f'{len(error_rows)} 条记录失败'
return jsonify(result)
except Exception as e:
log('upload_shipment_error', str(e))
return jsonify({'error': f'解析发货单失败:{str(e)}'}), 500
@app.errorhandler(404)
def not_found(e):
# 如果请求的是 HTML 页面(通过 Accept header 判断),返回 index.html

View File

@ -1,6 +1,7 @@
Flask>=2.3.0
Werkzeug>=2.3.0
redis>=4.5.0
pandas>=2.0.0
openpyxl>=3.1.0
reportlab>=4.0.0
Pillow>=10.0.0

149
test_py/batch_import.py Normal file
View File

@ -0,0 +1,149 @@
#!/usr/bin/env python3
import pandas as pd
import redis
from tqdm import tqdm
import argparse
import os
# 连接Redis
parser = argparse.ArgumentParser()
parser.add_argument("type", choices=["pdd", "yt", "tx"], help="目标: pdd/yt/tx")
args = parser.parse_args()
r = redis.Redis(host='180.163.74.83', port=6379, password='Zzh08165511', decode_responses=True)
# 读取Excel文件
base_dir = '/home/hyx/work/batch_import_xlsx'
if args.type == "yt":
base_name = 'sn_test_yt'
pool = 'batch_sn_mapping_yt'
mac_col = 'MAC'
elif args.type == "pdd":
base_name = 'sn_test_pdd'
pool = 'batch_sn_mapping_pdd'
mac_col = 'MAC'
else:
base_name = 'sn_test_tx'
pool = 'batch_sn_mapping'
mac_col = 'SN_MAC'
# 自动检测文件扩展名(优先.xlsx其次.xls
excel_path = None
for ext in ['.xlsx', '.xls']:
test_path = os.path.join(base_dir, f'{base_name}{ext}')
if os.path.exists(test_path):
excel_path = test_path
break
if not excel_path:
print(f"错误: 找不到文件 {base_name}.xlsx 或 {base_name}.xls")
exit(1)
# 根据文件扩展名选择合适的引擎
if excel_path.endswith('.xls'):
df = pd.read_excel(excel_path, engine='xlrd')
else:
df = pd.read_excel(excel_path)
existing = r.hgetall(pool)
mac_to_batches = {}
for b, m in existing.items():
mac_to_batches.setdefault(m, []).append(b)
s = df[mac_col].astype(str).str.strip()
dup_keys = set(s[s.duplicated(keep=False)].unique())
# 批量导入数据
pipe = r.pipeline()
duplicates = []
inserted_count = 0
invalids = []
duplicates_current = {}
dup_current_count = 0
for index, row in tqdm(df.iterrows(), total=len(df)):
batch_no = str(row['批次号']).strip()
sn_mac = str(row[mac_col]).strip()
expected_len = 27 if args.type == 'tx' else 12
if len(sn_mac) != expected_len:
invalids.append((sn_mac, batch_no))
continue
if sn_mac in dup_keys:
s = duplicates_current.get(sn_mac, set())
s.add(batch_no)
duplicates_current[sn_mac] = s
dup_current_count += 1
continue
if sn_mac in mac_to_batches:
for b in mac_to_batches[sn_mac]:
duplicates.append((sn_mac, b))
continue
pipe.hset(pool, batch_no, sn_mac)
inserted_count += 1
if (index + 1) % 100 == 0:
pipe.execute()
pipe = r.pipeline()
pipe.execute()
print(f"成功导入 {inserted_count} 条数据,数据库重复跳过 {len(duplicates)} 条,当前批次重复跳过 {dup_current_count} 条,长度错误跳过 {len(invalids)}")
# 输出成功导入的数据JSON格式方便前端解析
if inserted_count > 0:
print("\n=== 成功导入的数据 ===")
import json
success_records = []
for index, row in df.iterrows():
batch_no = str(row['批次号']).strip()
sn_mac = str(row[mac_col]).strip()
expected_len = 27 if args.type == 'tx' else 12
# 只输出成功导入的记录
if len(sn_mac) == expected_len and sn_mac not in dup_keys and sn_mac not in mac_to_batches:
success_records.append({
'mac': sn_mac,
'batch': batch_no
})
# 移除数量限制,输出所有成功导入的记录
print(json.dumps(success_records, ensure_ascii=False))
print("=== 数据输出结束 ===")
if duplicates:
for mac, b in duplicates:
print(f"重复: {mac} 已存在于批次号 {b}")
dup_df = pd.DataFrame(duplicates, columns=[mac_col, '批次号'])
out_path = f"/home/hyx/work/batch_import_xlsx/duplicates_{args.type}.xlsx"
if os.path.exists(out_path):
old_df = pd.read_excel(out_path)
combined = pd.concat([old_df, dup_df], ignore_index=True)
combined.to_excel(out_path, index=False)
else:
dup_df.to_excel(out_path, index=False)
#print(f"重复数据已导出: {out_path}")
if duplicates_current:
for mac, bs in duplicates_current.items():
for b in bs:
print(f"重复: {mac} 当前批次号 {b}")
cur_rows = [(mac, b) for mac, bs in duplicates_current.items() for b in bs]
cur_dup_df = pd.DataFrame(cur_rows, columns=[mac_col, '批次号'])
out_path_cur = f"/home/hyx/work/batch_import_xlsx/duplicates_current_{args.type}.xlsx"
if os.path.exists(out_path_cur):
old_cur_df = pd.read_excel(out_path_cur)
combined_cur = pd.concat([old_cur_df, cur_dup_df], ignore_index=True)
combined_cur.to_excel(out_path_cur, index=False)
else:
cur_dup_df.to_excel(out_path_cur, index=False)
#print(f"当前批次重复数据已导出: {out_path_cur}")
if invalids:
for mac, b in invalids:
print(f"长度错误: {mac} 批次号 {b}")
inv_df = pd.DataFrame(invalids, columns=[mac_col, '批次号'])
out_path_inv = f"/home/hyx/work/batch_import_xlsx/invalid_{args.type}.xlsx"
if os.path.exists(out_path_inv):
old_inv_df = pd.read_excel(out_path_inv)
combined_inv = pd.concat([old_inv_df, inv_df], ignore_index=True)
combined_inv.to_excel(out_path_inv, index=False)
else:
inv_df.to_excel(out_path_inv, index=False)
#print(f"长度错误数据已导出: {out_path_inv}")

46
test_py/check_excel.py Normal file
View File

@ -0,0 +1,46 @@
#!/usr/bin/env python3
import pandas as pd
import openpyxl
import warnings
# 过滤openpyxl的跨平台兼容性警告
warnings.filterwarnings('ignore', category=UserWarning, module='openpyxl')
file_path = '/home/hyx/work/batch_import_xlsx/sn_test_tx.xlsx'
print("检查Excel文件信息...")
try:
# 使用openpyxl检查工作表兼容Windows到Mac的Excel文件
wb = openpyxl.load_workbook(file_path, data_only=True)
print(f"工作表数量: {len(wb.sheetnames)}")
print(f"工作表名称: {wb.sheetnames}")
if wb.sheetnames:
ws = wb.active
print(f"活动工作表: {ws.title}")
print(f"最大行数: {ws.max_row}")
print(f"最大列数: {ws.max_column}")
# 显示前几行数据
print("\n前10行数据:")
for i, row in enumerate(ws.iter_rows(values_only=True), 1):
if i <= 10:
print(f"{i}行: {row}")
else:
break
wb.close() # 关闭工作簿释放资源
except Exception as e:
print(f"openpyxl错误: {e}")
print("提示: 这可能是Windows到Mac的Excel文件兼容性问题")
try:
# 使用pandas检查
print("\n使用pandas检查...")
xl_file = pd.ExcelFile(file_path)
print(f"pandas检测到的工作表: {xl_file.sheet_names}")
except Exception as e:
print(f"pandas错误: {e}")

222
test_py/check_login.py Executable file
View File

@ -0,0 +1,222 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
登录问题诊断脚本
"""
import sqlite3
import os
DB_PATH = 'server/data.db'
def check_database():
"""检查数据库和用户"""
print("=" * 60)
print("🔍 检查数据库...")
print("=" * 60)
if not os.path.exists(DB_PATH):
print("❌ 数据库文件不存在:", DB_PATH)
return False
print("✅ 数据库文件存在")
try:
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
# 检查用户表
users = c.execute('SELECT username, role FROM users').fetchall()
if not users:
print("❌ 没有找到任何用户")
return False
print(f"\n✅ 找到 {len(users)} 个用户:")
print("-" * 60)
for username, role in users:
print(f" 👤 用户名: {username:15s} | 角色: {role}")
conn.close()
return True
except Exception as e:
print(f"❌ 数据库错误: {e}")
return False
def check_server():
"""检查服务器状态"""
print("\n" + "=" * 60)
print("🔍 检查服务器...")
print("=" * 60)
import subprocess
# 检查进程
try:
result = subprocess.run(
['ps', 'aux'],
capture_output=True,
text=True
)
if 'python' in result.stdout and 'app.py' in result.stdout:
print("✅ 服务器正在运行")
# 提取进程信息
for line in result.stdout.split('\n'):
if 'app.py' in line:
print(f" 📋 进程: {' '.join(line.split()[10:])}")
return True
else:
print("❌ 服务器未运行")
print("\n💡 启动服务器:")
print(" cd server && python3 app.py")
return False
except Exception as e:
print(f"⚠️ 无法检查进程: {e}")
return None
def check_files():
"""检查关键文件"""
print("\n" + "=" * 60)
print("🔍 检查关键文件...")
print("=" * 60)
files = {
'frontend/login.html': '登录页面',
'frontend/assets/login.css': '登录样式',
'frontend/js/api.js': 'API 接口',
'server/app.py': '后端服务',
}
all_exist = True
for path, desc in files.items():
if os.path.exists(path):
size = os.path.getsize(path)
print(f"{desc:20s} - {path} ({size} bytes)")
else:
print(f"{desc:20s} - {path} (不存在)")
all_exist = False
return all_exist
def show_instructions():
"""显示使用说明"""
print("\n" + "=" * 60)
print("📖 使用说明")
print("=" * 60)
print("\n1⃣ 启动后端服务:")
print(" cd server")
print(" python3 app.py")
print("\n2⃣ 访问登录页面:")
print(" http://localhost:5000/login.html")
print(" ⚠️ 注意:不是 login-preview.html")
print("\n3⃣ 使用以下账号登录:")
print(" - tz (超级管理员)")
print(" - 张正浩 (超级管理员)")
print(" - admin (管理员)")
print(" - 黄有想 (管理员)")
print("\n4⃣ 如果忘记密码,重置密码:")
print(" python3 reset_password.py <用户名> <新密码>")
print("\n5⃣ 清除浏览器缓存:")
print(" Chrome/Edge: Ctrl+Shift+Delete")
print(" 或者使用无痕模式: Ctrl+Shift+N")
def create_reset_script():
"""创建密码重置脚本"""
script = '''#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
密码重置脚本
用法: python3 reset_password.py <用户名> <新密码>
"""
import sys
import sqlite3
from werkzeug.security import generate_password_hash
if len(sys.argv) != 3:
print("用法: python3 reset_password.py <用户名> <新密码>")
sys.exit(1)
username = sys.argv[1]
new_password = sys.argv[2]
DB_PATH = 'server/data.db'
try:
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
# 检查用户是否存在
user = c.execute('SELECT id FROM users WHERE username = ?', (username,)).fetchone()
if not user:
print(f"❌ 用户 '{username}' 不存在")
print("\\n现有用户:")
users = c.execute('SELECT username FROM users').fetchall()
for u in users:
print(f" - {u[0]}")
sys.exit(1)
# 更新密码
password_hash = generate_password_hash(new_password)
c.execute('UPDATE users SET password_hash = ? WHERE username = ?', (password_hash, username))
conn.commit()
print(f"✅ 用户 '{username}' 的密码已重置")
print(f" 新密码: {new_password}")
conn.close()
except Exception as e:
print(f"❌ 错误: {e}")
sys.exit(1)
'''
with open('reset_password.py', 'w', encoding='utf-8') as f:
f.write(script)
os.chmod('reset_password.py', 0o755)
print("\n✅ 已创建密码重置脚本: reset_password.py")
def main():
print("\n" + "🔐 登录问题诊断工具".center(60, "="))
print()
db_ok = check_database()
files_ok = check_files()
server_ok = check_server()
print("\n" + "=" * 60)
print("📊 诊断结果")
print("=" * 60)
if db_ok and files_ok:
print("✅ 数据库和文件都正常")
if server_ok:
print("✅ 服务器正在运行")
print("\n💡 如果仍然无法登录,请尝试:")
print(" 1. 清除浏览器缓存")
print(" 2. 使用无痕模式")
print(" 3. 检查浏览器控制台的错误信息")
print(" 4. 确认访问的是 login.html 而不是 login-preview.html")
else:
print("❌ 服务器未运行,请先启动服务器")
else:
print("❌ 发现问题,请检查上述错误信息")
show_instructions()
create_reset_script()
print("\n" + "=" * 60)
print()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,107 @@
#!/usr/bin/env python3
"""
创建发货记录 Excel 模板文件
"""
import pandas as pd
from datetime import datetime, timedelta
def create_template():
"""创建发货记录模板文件(带合并单元格)"""
import openpyxl
from openpyxl.styles import Alignment, Font, Border, Side
# 创建工作簿
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "发货记录"
# 创建表头
headers = ['出货日期', '箱号']
headers.extend([f'SN{i}' for i in range(1, 21)])
ws.append(headers)
# 设置表头样式
for cell in ws[1]:
cell.font = Font(bold=True)
cell.alignment = Alignment(horizontal='center', vertical='center')
# 创建示例数据
base_date = datetime.now()
row_num = 2
# 每个日期3个箱子
for day in range(3):
date = (base_date + timedelta(days=day)).strftime('%Y-%m-%d')
start_row = row_num
# 每天3个箱子
for box in range(3):
box_num = f"BOX{day*3+box+1:03d}"
# 第一列:日期(只在第一行写入,后面会合并)
if box == 0:
ws.cell(row=row_num, column=1, value=date)
# 第二列:箱号
ws.cell(row=row_num, column=2, value=box_num)
# SN1-SN20
for sn_idx in range(1, 21):
sn_value = f"SN{(day*3+box)*20+sn_idx:04d}" if sn_idx <= 15 else ''
ws.cell(row=row_num, column=2+sn_idx, value=sn_value)
row_num += 1
# 合并日期单元格
if start_row < row_num - 1:
ws.merge_cells(f'A{start_row}:A{row_num-1}')
# 设置合并单元格的对齐方式
ws.cell(row=start_row, column=1).alignment = Alignment(horizontal='center', vertical='center')
# 调整列宽
ws.column_dimensions['A'].width = 12
ws.column_dimensions['B'].width = 10
for i in range(3, 23):
ws.column_dimensions[openpyxl.utils.get_column_letter(i)].width = 10
# 保存文件
output_file = 'shipments_template.xlsx'
wb.save(output_file)
print(f"✓ 模板文件已创建:{output_file}")
print(f" - 包含 {row_num-2} 行示例数据")
print(f" - 列出货日期合并单元格、箱号、SN1-SN20")
print(f" - 每个日期包含 3 个箱子")
return output_file
def create_empty_template():
"""创建空白模板文件"""
# 创建列头
columns = ['出货日期', '箱号']
columns.extend([f'SN{i}' for i in range(1, 21)])
# 创建空 DataFrame
df = pd.DataFrame(columns=columns)
# 保存为 Excel
output_file = 'shipments_template_empty.xlsx'
df.to_excel(output_file, index=False, engine='openpyxl')
print(f"✓ 空白模板文件已创建:{output_file}")
return output_file
if __name__ == '__main__':
print("创建发货记录 Excel 模板...\n")
# 创建带示例数据的模板
create_template()
print()
# 创建空白模板
create_empty_template()
print()
print("完成!您可以使用这些模板文件进行测试。")

76
test_py/reset_all_passwords.py Executable file
View File

@ -0,0 +1,76 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
批量重置所有用户密码
"""
import sqlite3
from werkzeug.security import generate_password_hash
DB_PATH = 'server/data.db'
# 默认密码设置
DEFAULT_PASSWORDS = {
'tz': 'tz123',
'张正浩': 'zzh123',
'admin': 'admin123',
'黄有想': 'hyx123'
}
def reset_all_passwords():
"""重置所有用户密码"""
try:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.cursor()
# 获取所有用户
users = c.execute('SELECT username FROM users').fetchall()
print("=" * 60)
print("🔐 批量重置用户密码")
print("=" * 60)
print()
for user in users:
username = user['username']
# 使用默认密码或统一密码
if username in DEFAULT_PASSWORDS:
new_password = DEFAULT_PASSWORDS[username]
else:
new_password = 'admin123' # 统一默认密码
# 生成新的密码哈希
password_hash = generate_password_hash(new_password)
# 更新密码
c.execute('UPDATE users SET password_hash = ? WHERE username = ?',
(password_hash, username))
print(f"{username:15s} | 新密码: {new_password}")
conn.commit()
conn.close()
print()
print("=" * 60)
print("✅ 所有密码已重置完成!")
print("=" * 60)
print()
print("📝 登录信息:")
print("-" * 60)
for username, password in DEFAULT_PASSWORDS.items():
print(f" 用户名: {username:15s} | 密码: {password}")
print()
print("💡 请使用上述账号密码登录")
print(" 登录地址: http://localhost:5000/login.html")
print()
except Exception as e:
print(f"❌ 错误: {e}")
return False
return True
if __name__ == '__main__':
reset_all_passwords()

47
test_py/reset_password.py Executable file
View File

@ -0,0 +1,47 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
密码重置脚本
用法: python3 reset_password.py <用户名> <新密码>
"""
import sys
import sqlite3
from werkzeug.security import generate_password_hash
if len(sys.argv) != 3:
print("用法: python3 reset_password.py <用户名> <新密码>")
sys.exit(1)
username = sys.argv[1]
new_password = sys.argv[2]
DB_PATH = 'server/data.db'
try:
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
# 检查用户是否存在
user = c.execute('SELECT id FROM users WHERE username = ?', (username,)).fetchone()
if not user:
print(f"❌ 用户 '{username}' 不存在")
print("\n现有用户:")
users = c.execute('SELECT username FROM users').fetchall()
for u in users:
print(f" - {u[0]}")
sys.exit(1)
# 更新密码
password_hash = generate_password_hash(new_password)
c.execute('UPDATE users SET password_hash = ? WHERE username = ?', (password_hash, username))
conn.commit()
print(f"✅ 用户 '{username}' 的密码已重置")
print(f" 新密码: {new_password}")
conn.close()
except Exception as e:
print(f"❌ 错误: {e}")
sys.exit(1)

71
test_py/test-theme.html Normal file
View File

@ -0,0 +1,71 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="utf-8" />
<title>主题测试</title>
<link rel="stylesheet" href="frontend/assets/styles.css" />
<style>
body { padding: 20px; }
.test-section { margin: 20px 0; padding: 20px; border: 2px solid var(--border); border-radius: 8px; }
.theme-toggle { margin-bottom: 20px; }
</style>
</head>
<body>
<div class="theme-toggle">
<button class="btn" onclick="toggleTheme()">切换主题</button>
<span id="current-theme"></span>
</div>
<div class="test-section">
<h3>上传日志测试</h3>
<div id="upload-log">
<div style="font-weight:600;margin-bottom:8px">上传日志</div>
<pre style="background:var(--surface);border:1px solid var(--border);border-radius:8px;padding:12px;max-height:300px;overflow-y:auto;font-size:12px;color:var(--text);white-space:pre-wrap">
[2024-01-01 10:00:00] 开始上传文件...
[2024-01-01 10:00:01] 验证文件格式...
[2024-01-01 10:00:02] 处理数据中...
[2024-01-01 10:00:03] 成功上传 100 条记录
[2024-01-01 10:00:04] 完成!
</pre>
</div>
</div>
<div class="test-section">
<h3>日期选择器测试</h3>
<input type="date" style="padding:4px 8px;background:var(--surface);border:1px solid var(--border);border-radius:6px;color:var(--text);font-size:13px;outline:none" />
</div>
<div class="test-section">
<h3>卡片测试</h3>
<div class="card">
<div style="font-weight:600;margin-bottom:8px">测试卡片</div>
<p>这是一个测试卡片,用于验证主题颜色是否正确。</p>
<ul class="list">
<li><span>项目1</span><span class="badge">标签</span></li>
<li><span>项目2</span><span class="badge">标签</span></li>
</ul>
</div>
</div>
<script>
function toggleTheme() {
const html = document.documentElement;
const current = html.getAttribute('data-theme');
const newTheme = current === 'light' ? 'dark' : 'light';
html.setAttribute('data-theme', newTheme);
localStorage.setItem('theme', newTheme);
updateThemeDisplay();
}
function updateThemeDisplay() {
const current = document.documentElement.getAttribute('data-theme') || 'dark';
document.getElementById('current-theme').textContent = `当前主题: ${current === 'light' ? '浅色' : '深色'}`;
}
// 初始化
const savedTheme = localStorage.getItem('theme') || 'dark';
document.documentElement.setAttribute('data-theme', savedTheme);
updateThemeDisplay();
</script>
</body>
</html>

BIN
test_py/test_captcha.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.1 KiB

88
test_py/test_captcha.py Normal file
View File

@ -0,0 +1,88 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""测试验证码生成功能"""
try:
from PIL import Image, ImageDraw, ImageFont
import random
import io
import base64
print("✓ Pillow 库已安装")
# 生成4位随机数字
code = ''.join([str(random.randint(0, 9)) for _ in range(4)])
print(f"✓ 生成验证码: {code}")
# 创建图片
width, height = 120, 40
image = Image.new('RGB', (width, height), color='#f0f4f8')
draw = ImageDraw.Draw(image)
print("✓ 创建图片成功")
# 尝试使用系统字体
font = None
font_paths = [
'/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf',
'/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf',
'/System/Library/Fonts/Helvetica.ttc',
'C:\\Windows\\Fonts\\arial.ttf'
]
for font_path in font_paths:
try:
font = ImageFont.truetype(font_path, 28)
print(f"✓ 使用字体: {font_path}")
break
except:
continue
if not font:
font = ImageFont.load_default()
print("⚠ 使用默认字体")
# 绘制干扰线
for _ in range(3):
x1 = random.randint(0, width)
y1 = random.randint(0, height)
x2 = random.randint(0, width)
y2 = random.randint(0, height)
draw.line([(x1, y1), (x2, y2)], fill='#cbd5e1', width=1)
# 绘制验证码文字
colors = ['#3b82f6', '#2563eb', '#1e40af', '#1e3a8a']
for i, char in enumerate(code):
x = 20 + i * 25 + random.randint(-3, 3)
y = 5 + random.randint(-3, 3)
color = random.choice(colors)
draw.text((x, y), char, font=font, fill=color)
# 绘制干扰点
for _ in range(50):
x = random.randint(0, width)
y = random.randint(0, height)
draw.point((x, y), fill='#94a3b8')
print("✓ 绘制验证码成功")
# 转换为base64
buffer = io.BytesIO()
image.save(buffer, format='PNG')
buffer.seek(0)
img_base64 = base64.b64encode(buffer.getvalue()).decode()
print(f"✓ 转换为base64成功 (长度: {len(img_base64)})")
print("\n验证码功能测试通过!")
# 保存测试图片
image.save('test_captcha.png')
print("✓ 测试图片已保存为 test_captcha.png")
except ImportError as e:
print(f"✗ 缺少依赖库: {e}")
print("\n请安装 Pillow 库:")
print(" pip install Pillow")
except Exception as e:
print(f"✗ 测试失败: {e}")
import traceback
traceback.print_exc()

59
test_py/test_import.py Normal file
View File

@ -0,0 +1,59 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
sys.path.insert(0, 'server')
# 测试导入函数的逻辑
def test_parse():
# 模拟辅助函数
def parse_percentage(value):
if value is None:
return 0
if isinstance(value, (int, float)):
return float(value)
value_str = str(value).strip()
if value_str.endswith('%'):
value_str = value_str[:-1]
try:
return float(value_str)
except:
return 0
def safe_int(value, default=0):
if value is None:
return default
try:
return int(float(value))
except:
return default
# 测试数据
test_cases = [
('62.28%', 62.28),
('62.28', 62.28),
(62.28, 62.28),
('', 0),
(None, 0),
]
print("测试 parse_percentage:")
for input_val, expected in test_cases:
result = parse_percentage(input_val)
status = "" if result == expected else ""
print(f" {status} parse_percentage({repr(input_val)}) = {result} (期望: {expected})")
print("\n测试 safe_int:")
test_int_cases = [
(100, 100),
(100.5, 100),
('100', 100),
('', 0),
(None, 0),
]
for input_val, expected in test_int_cases:
result = safe_int(input_val)
status = "" if result == expected else ""
print(f" {status} safe_int({repr(input_val)}) = {result} (期望: {expected})")
if __name__ == '__main__':
test_parse()

0
test_py/test_login.html Normal file
View File

173
test_py/test_sop_feature.py Executable file
View File

@ -0,0 +1,173 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
SOP 功能测试脚本
用于验证 SOP 文件管理功能是否正常工作
"""
import os
import sys
import sqlite3
# 添加 server 目录到路径
sys.path.insert(0, 'server')
def test_database_table():
"""测试数据库表是否创建成功"""
print("测试 1: 检查数据库表...")
db_path = 'server/data.db'
if not os.path.exists(db_path):
print(" ❌ 数据库文件不存在")
return False
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 检查 sop_files 表是否存在
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='sop_files'")
result = cursor.fetchone()
if result:
print(" ✅ sop_files 表已创建")
# 检查表结构
cursor.execute("PRAGMA table_info(sop_files)")
columns = cursor.fetchall()
print(f" ✅ 表结构包含 {len(columns)} 列:")
for col in columns:
print(f" - {col[1]} ({col[2]})")
conn.close()
return True
else:
print(" ❌ sop_files 表不存在")
conn.close()
return False
def test_sop_directory():
"""测试 SOP 文件存储目录"""
print("\n测试 2: 检查 SOP 文件目录...")
sop_dir = 'frontend/sop_files'
if os.path.exists(sop_dir):
print(f" ✅ 目录已存在: {sop_dir}")
# 检查目录权限
if os.access(sop_dir, os.W_OK):
print(" ✅ 目录可写")
else:
print(" ⚠️ 目录不可写,可能需要调整权限")
# 列出现有文件
files = os.listdir(sop_dir)
if files:
print(f" 📁 目录中已有 {len(files)} 个文件:")
for f in files[:5]: # 只显示前5个
print(f" - {f}")
else:
print(" 📁 目录为空")
return True
else:
print(f" ⚠️ 目录不存在: {sop_dir}")
print(" 💡 系统会在首次上传时自动创建")
return True
def test_api_routes():
"""测试 API 路由是否注册"""
print("\n测试 3: 检查 API 路由...")
try:
from app import app
# 获取所有路由
routes = []
for rule in app.url_map.iter_rules():
if 'sop' in rule.rule:
routes.append(f"{rule.rule} [{', '.join(rule.methods - {'HEAD', 'OPTIONS'})}]")
if routes:
print(" ✅ SOP API 路由已注册:")
for route in routes:
print(f" - {route}")
return True
else:
print(" ❌ 未找到 SOP API 路由")
return False
except Exception as e:
print(f" ❌ 导入失败: {e}")
return False
def test_frontend_files():
"""测试前端文件是否更新"""
print("\n测试 4: 检查前端文件...")
files_to_check = {
'frontend/js/api.js': ['listSopFiles', 'uploadSopFile', 'deleteSopFile'],
'frontend/js/components/upload.js': ['renderSop', 'bindSopEvents', 'loadSopList'],
'frontend/js/router.js': ['sop'],
'frontend/index.html': ['upload/sop']
}
all_ok = True
for filepath, keywords in files_to_check.items():
if not os.path.exists(filepath):
print(f" ❌ 文件不存在: {filepath}")
all_ok = False
continue
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
missing = [kw for kw in keywords if kw not in content]
if missing:
print(f" ⚠️ {filepath} 缺少关键字: {', '.join(missing)}")
all_ok = False
else:
print(f"{filepath} 已更新")
return all_ok
def main():
"""运行所有测试"""
print("=" * 60)
print("SOP 功能测试")
print("=" * 60)
results = []
# 运行测试
results.append(("数据库表", test_database_table()))
results.append(("文件目录", test_sop_directory()))
results.append(("API 路由", test_api_routes()))
results.append(("前端文件", test_frontend_files()))
# 汇总结果
print("\n" + "=" * 60)
print("测试结果汇总")
print("=" * 60)
for name, result in results:
status = "✅ 通过" if result else "❌ 失败"
print(f"{name}: {status}")
all_passed = all(r[1] for r in results)
print("\n" + "=" * 60)
if all_passed:
print("🎉 所有测试通过SOP 功能已准备就绪。")
print("\n下一步:")
print("1. 重启服务器")
print("2. 登录系统(使用管理员账号)")
print("3. 进入 '上传''SOP' 菜单")
print("4. 尝试上传测试文件: sop_template_example.csv")
else:
print("⚠️ 部分测试未通过,请检查上述错误信息。")
print("=" * 60)
return 0 if all_passed else 1
if __name__ == '__main__':
sys.exit(main())

59
test_py/validate_excel.py Normal file
View File

@ -0,0 +1,59 @@
#!/usr/bin/env python3
"""
验证Excel文件格式是否符合MAC与批次导入要求
"""
import sys
import pandas as pd
import warnings
warnings.filterwarnings('ignore', category=UserWarning, module='openpyxl')
def validate_excel(file_path):
"""
验证Excel文件格式
返回: (is_valid, error_message)
"""
try:
df = pd.read_excel(file_path)
if df.empty:
return False, "文件为空,没有数据"
columns = df.columns.tolist()
# 检查是否有批次号列
if '批次号' not in columns:
return False, "缺少必需的列:批次号"
# 检查是否有MAC或SN_MAC列
has_mac = 'MAC' in columns
has_sn_mac = 'SN_MAC' in columns
if not has_mac and not has_sn_mac:
return False, "缺少必需的列MAC 或 SN_MAC"
# 检查列数应该只有2列
if len(columns) != 2:
return False, f"文件应该只包含2列数据当前有{len(columns)}列:{', '.join(columns)}"
# 验证通过
mac_col = 'MAC' if has_mac else 'SN_MAC'
return True, f"文件格式正确,包含列:{mac_col} 和 批次号,共{len(df)}行数据"
except Exception as e:
return False, f"读取文件失败:{str(e)}"
if __name__ == '__main__':
if len(sys.argv) < 2:
print("用法: python validate_excel.py <excel文件路径>")
sys.exit(1)
file_path = sys.argv[1]
is_valid, message = validate_excel(file_path)
if is_valid:
print(f"{message}")
sys.exit(0)
else:
print(f"{message}")
sys.exit(1)

162
test_shipment_upload.py Normal file
View File

@ -0,0 +1,162 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试发货单上传解析功能
"""
import pandas as pd
import numpy as np
def test_parse_shipment():
"""测试解析发货单"""
file_path = '/home/hyx/work/生产管理系统/发货单-20251121.xls'
# 读取Excel文件
df = pd.read_excel(file_path, header=None)
print("=== 解析发货单头部信息 ===")
# 提取头部信息
shipment_date = None
transport_method = None
# 解析发货日期第1行索引2
if len(df) > 1 and len(df.columns) > 2:
shipment_date_raw = df.iloc[1, 2]
if pd.notna(shipment_date_raw):
if isinstance(shipment_date_raw, pd.Timestamp):
shipment_date = shipment_date_raw.strftime('%Y-%m-%d')
else:
shipment_date = str(shipment_date_raw)
print(f"发货日期: {shipment_date}")
# 解析供货方式第2行索引2
if len(df) > 2 and len(df.columns) > 2:
transport_method_raw = df.iloc[2, 2]
if pd.notna(transport_method_raw):
transport_method = str(transport_method_raw)
print(f"供货方式(运输单号): {transport_method}")
# 找到表格数据的起始行(序号、采购单号、物料编码...
header_row = None
for i in range(len(df)):
if df.iloc[i, 0] == '序号':
header_row = i
break
if header_row is None:
print("错误:无法识别发货单格式,未找到表格头部")
return
print(f"\n表格起始行: {header_row}")
# 从表格起始行读取数据
data_df = pd.read_excel(file_path, header=header_row)
print(f"表格列名: {data_df.columns.tolist()}")
# 过滤掉合计行和备注行(只保留序号为数字的行)
valid_data = data_df[data_df['序号'].apply(lambda x: isinstance(x, (int, float)) and not pd.isna(x))]
print(f"\n有效数据行数: {len(valid_data)}")
# 模拟客户订单数据
customer_orders = {
'CGDD002878': {
'order_date': '2025-11-15',
'material': 'AP-DZ006 灯条基站',
'unit_price': 150.5
},
'CGDD003082': {
'order_date': '2025-11-18',
'material': '飞机盒',
'unit_price': 5.0
}
}
print("\n=== 解析数据行 ===")
last_contract_no = None
for idx, row in valid_data.iterrows():
print(f"\n序号: {int(row['序号'])}")
# 提取数据
contract_no = row.get('采购单号')
if pd.isna(contract_no):
if last_contract_no:
contract_no = last_contract_no
print(f" 采购单号: {contract_no} (继承上一行)")
else:
print(f" 采购单号: 空 (错误)")
continue
else:
contract_no = str(contract_no).strip()
last_contract_no = contract_no
print(f" 采购单号(合同编号): {contract_no}")
material_code = row.get('物料编码')
if pd.isna(material_code):
print(f" 物料编码: 空 (错误)")
continue
material_code = str(material_code).strip().replace('\n', ' ')
print(f" 物料编码(物料名称): {material_code}")
spec_model = row.get('规格型号')
if pd.isna(spec_model):
spec_model = ''
else:
spec_model = str(spec_model).strip()
print(f" 规格型号: {spec_model}")
quantity = row.get('实送数量')
if pd.isna(quantity):
print(f" 实送数量: 空 (错误)")
continue
quantity = int(float(quantity))
print(f" 实送数量(数量): {quantity}")
unit = row.get('单位')
if pd.isna(unit):
unit = 'pcs'
else:
unit = str(unit).strip()
print(f" 单位: {unit}")
# 从备注中提取运输单号(如果有)
remark = row.get('备注')
transport_no = transport_method or ''
if pd.notna(remark):
remark_str = str(remark).strip()
if remark_str:
transport_no = remark_str
print(f" 运输单号: {transport_no}")
# 从客户订单中查找单价和下单时间
unit_price = 0
order_date = shipment_date or ''
if contract_no in customer_orders:
order_info = customer_orders[contract_no]
# 匹配物料名称
if material_code in order_info['material']:
unit_price = order_info['unit_price']
order_date = order_info['order_date']
print(f" 含税单价: {unit_price} (从客户订单查找)")
print(f" 下单时间: {order_date} (从客户订单查找)")
else:
print(f" 含税单价: {unit_price} (未找到匹配的客户订单)")
print(f" 下单时间: {order_date} (使用发货日期)")
else:
print(f" 含税单价: {unit_price} (未找到对应的采购单号)")
print(f" 下单时间: {order_date} (使用发货日期)")
# 计算含税金额
total_amount = quantity * unit_price
print(f" 含税金额: {total_amount}")
print(f" 交货日期: {shipment_date}")
print(f" 出货日期: {shipment_date}")
if __name__ == '__main__':
test_parse_shipment()

BIN
发货单-20251121.xls Normal file

Binary file not shown.