441 lines
18 KiB
Python
441 lines
18 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
AI分析服务
|
||
支持多种AI接口:OpenAI、通义千问、文心一言、本地模型等
|
||
"""
|
||
|
||
import json
|
||
import asyncio
|
||
from datetime import datetime, timedelta
|
||
from typing import Dict, List, Optional, Union
|
||
import aiohttp
|
||
from dataclasses import dataclass
|
||
import os
|
||
|
||
@dataclass
|
||
class AIConfig:
|
||
"""AI配置"""
|
||
provider: str # openai, qwen, wenxin, local
|
||
api_key: str
|
||
base_url: Optional[str] = None
|
||
model: str = "gpt-3.5-turbo"
|
||
temperature: float = 0.7
|
||
max_tokens: int = 2000
|
||
|
||
class AIService:
|
||
"""AI分析服务"""
|
||
|
||
def __init__(self, config: AIConfig):
|
||
self.config = config
|
||
self.session = None
|
||
|
||
async def __aenter__(self):
|
||
self.session = aiohttp.ClientSession()
|
||
return self
|
||
|
||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||
if self.session:
|
||
await self.session.close()
|
||
|
||
async def analyze_production_data(self, data: Dict) -> Dict:
|
||
"""分析生产数据"""
|
||
# 构建分析提示词
|
||
prompt = self._build_analysis_prompt(data)
|
||
|
||
# 调用AI接口
|
||
response = await self._call_ai(prompt)
|
||
|
||
# 解析AI返回的结果
|
||
return self._parse_ai_response(response)
|
||
|
||
async def generate_thinking_stream(self, data: Dict) -> str:
|
||
"""生成思考过程(用于流式输出)"""
|
||
# 构建一个专门用于思考过程的prompt
|
||
thinking_prompt = f"""
|
||
请分析以下生产数据,并详细展示你的思考过程。请按照以下步骤进行分析:
|
||
|
||
数据情况:
|
||
- 拼多多数据:{len(data.get('pdd', []))} 条记录
|
||
- 圆通数据:{len(data.get('yt', []))} 条记录
|
||
- 分析时间:{data.get('analysis_time', '未知')}
|
||
|
||
请按照以下格式输出思考过程:
|
||
第一步:数据概览 - 描述看到的基础数据情况
|
||
第二步:规律发现 - 分析数据中的模式和趋势
|
||
第三步:原因推断 - 解释为什么会出现这些规律
|
||
第四步:结论形成 - 总结关键发现和建议
|
||
|
||
请详细描述每个步骤的思考内容,让用户了解AI是如何分析数据的。
|
||
"""
|
||
|
||
# 调用AI获取思考过程
|
||
response = await self._call_ai(thinking_prompt)
|
||
|
||
# 提取思考过程部分
|
||
if "第一步:" in response:
|
||
# 提取从第一步开始的内容
|
||
start_idx = response.find("第一步:")
|
||
thinking = response[start_idx:]
|
||
else:
|
||
thinking = response
|
||
|
||
return thinking
|
||
|
||
def _build_analysis_prompt(self, data: Dict) -> str:
|
||
"""构建分析提示词"""
|
||
# 获取最近30天的数据
|
||
today = datetime.now()
|
||
thirty_days_ago = today - timedelta(days=30)
|
||
|
||
# 统计数据
|
||
pdd_data = data.get('pdd', [])
|
||
yt_data = data.get('yt', [])
|
||
shipment_stats = data.get('shipments', {'total': 0, 'by_platform': {}})
|
||
bom_stats = data.get('bom', {'count': 0, 'products': 0})
|
||
inventory_stats = data.get('inventory', {'count': 0, 'total_qty': 0})
|
||
purchase_demand_stats = data.get('purchase_demand', {'count': 0, 'total_required': 0})
|
||
customer_order_stats = data.get('customer_orders', {'count': 0, 'total_qty': 0, 'completed': 0})
|
||
reconciliation_stats = data.get('reconciliations', {'count': 0, 'total_qty': 0})
|
||
|
||
# 计算关键指标
|
||
total_pdd = len(pdd_data)
|
||
total_yt = len(yt_data)
|
||
total_production = total_pdd + total_yt
|
||
|
||
# 发货数据
|
||
total_shipments = shipment_stats.get('total', 0)
|
||
shipment_by_platform = shipment_stats.get('by_platform', {})
|
||
|
||
# 计划管理数据
|
||
bom_count = bom_stats.get('count', 0) or 0
|
||
bom_products = bom_stats.get('products', 0) or 0
|
||
inventory_count = inventory_stats.get('count', 0) or 0
|
||
inventory_qty = inventory_stats.get('total_qty', 0) or 0
|
||
purchase_demand_count = purchase_demand_stats.get('count', 0) or 0
|
||
purchase_required = purchase_demand_stats.get('total_required', 0) or 0
|
||
customer_order_count = customer_order_stats.get('count', 0) or 0
|
||
customer_order_qty = customer_order_stats.get('total_qty', 0) or 0
|
||
customer_order_completed = customer_order_stats.get('completed', 0) or 0
|
||
reconciliation_count = reconciliation_stats.get('count', 0) or 0
|
||
reconciliation_qty = reconciliation_stats.get('total_qty', 0) or 0
|
||
|
||
# 计算订单完成率
|
||
order_completion_rate = (customer_order_completed / customer_order_count * 100) if customer_order_count > 0 else 0
|
||
|
||
# 计算良品率(假设最后100条数据中有良品标记)
|
||
recent_data = (pdd_data + yt_data)[-100:]
|
||
good_count = sum(1 for item in recent_data if '不良' not in item.get('note', ''))
|
||
good_rate = (good_count / len(recent_data) * 100) if recent_data else 0
|
||
|
||
# 计算趋势(最近7天 vs 之前7天)
|
||
seven_days_ago = today - timedelta(days=7)
|
||
recent_pdd = len([d for d in pdd_data if datetime.strptime(d['ts_cn'], '%Y-%m-%d %H:%M:%S') > seven_days_ago])
|
||
recent_yt = len([d for d in yt_data if datetime.strptime(d['ts_cn'], '%Y-%m-%d %H:%M:%S') > seven_days_ago])
|
||
|
||
fourteen_days_ago = today - timedelta(days=14)
|
||
prev_pdd = len([d for d in pdd_data if fourteen_days_ago < datetime.strptime(d['ts_cn'], '%Y-%m-%d %H:%M:%S') <= seven_days_ago])
|
||
prev_yt = len([d for d in yt_data if fourteen_days_ago < datetime.strptime(d['ts_cn'], '%Y-%m-%d %H:%M:%S') <= seven_days_ago])
|
||
|
||
trend = "上升" if (recent_pdd + recent_yt) > (prev_pdd + prev_yt) else "下降"
|
||
|
||
# 统计不良类型
|
||
defect_types = {}
|
||
for item in recent_data:
|
||
note = item.get('note', '')
|
||
if '不良' in note:
|
||
defect_type = note.split(':')[-1] if ':' in note else note
|
||
defect_types[defect_type] = defect_types.get(defect_type, 0) + 1
|
||
|
||
top_defects = sorted(defect_types.items(), key=lambda x: x[1], reverse=True)[:3]
|
||
|
||
# 计算趋势百分比
|
||
pdd_trend = ((recent_pdd - prev_pdd) / prev_pdd * 100) if prev_pdd > 0 else 0
|
||
yt_trend = ((recent_yt - prev_yt) / prev_yt * 100) if prev_yt > 0 else 0
|
||
|
||
# 计算百分比
|
||
pdd_percentage = (total_pdd/total_production*100) if total_production > 0 else 0
|
||
yt_percentage = (total_yt/total_production*100) if total_production > 0 else 0
|
||
|
||
# 计算生产与发货的差异
|
||
production_shipment_gap = total_production - total_shipments
|
||
gap_percentage = (production_shipment_gap / total_production * 100) if total_production > 0 else 0
|
||
|
||
prompt = f"""
|
||
作为生产管理专家,请分析以下生产数据并提供专业洞察:
|
||
|
||
【基础数据】
|
||
- 统计周期:最近30天
|
||
- 总产量:{total_production} 台
|
||
- 良品率:{good_rate:.1f}%
|
||
- 产量趋势:{trend.lower()}
|
||
|
||
【平台分布】
|
||
- 拼多多:{total_pdd} 台 ({pdd_percentage:.1f}%)
|
||
- 圆通:{total_yt} 台 ({yt_percentage:.1f}%)
|
||
|
||
【发货统计】
|
||
- 总发货量:{total_shipments} 台
|
||
- 发货平台分布:{shipment_by_platform}
|
||
- 生产与发货差异:{production_shipment_gap} 台 ({gap_percentage:.1f}%)
|
||
|
||
【计划管理】
|
||
- BOM物料清单:{bom_count} 条记录,涉及 {bom_products} 个产品
|
||
- 期初库存:{inventory_count} 种物料,总库存量 {inventory_qty}
|
||
- 采购需求:{purchase_demand_count} 条需求,总需求量 {purchase_required}
|
||
- 客户订单:{customer_order_count} 个订单,总订单量 {customer_order_qty} 台,完成率 {order_completion_rate:.1f}%
|
||
- 对账单:{reconciliation_count} 条记录,总对账量 {reconciliation_qty} 台
|
||
|
||
【质量分析】
|
||
主要不良问题:
|
||
{chr(10).join([f"- {d[0]}:{d[1]} 次" for d in top_defects]) if top_defects else "- 暂无不良记录"}
|
||
|
||
【最近7天产量】
|
||
- 拼多多:{recent_pdd} 台
|
||
- 圆通:{recent_yt} 台
|
||
|
||
请以JSON格式返回分析结果,包含以下字段。**重要提示:对于需要特别注意的问题,请在insights中使用"⚠️"标记**:
|
||
{{
|
||
"thinking": "第一步:数据概览 - 描述看到的基础数据情况\\n第二步:规律发现 - 分析数据中的模式和趋势\\n第三步:原因推断 - 解释为什么会出现这些规律\\n第四步:结论形成 - 总结关键发现和建议",
|
||
"summary": {{
|
||
"totalProduction": {total_production},
|
||
"goodRate": "{good_rate:.1f}%",
|
||
"trend": "{trend.lower()}",
|
||
"insights": [
|
||
"洞察1:具体分析产量变化原因(如有问题用⚠️标记)",
|
||
"洞察2:质量管控建议(如有问题用⚠️标记)",
|
||
"洞察3:生产优化建议"
|
||
]
|
||
}},
|
||
"platforms": {{
|
||
"pdd": {{
|
||
"count": {total_pdd},
|
||
"percentage": {pdd_percentage:.1f},
|
||
"trend": "{pdd_trend:+.1f}%"
|
||
}},
|
||
"yt": {{
|
||
"count": {total_yt},
|
||
"percentage": {yt_percentage:.1f},
|
||
"trend": "{yt_trend:+.1f}%"
|
||
}}
|
||
}},
|
||
"quality": {{
|
||
"topIssues": [
|
||
{{"issue": "问题1", "count": 数量, "percentage": "百分比"}}
|
||
]
|
||
}},
|
||
"prediction": {{
|
||
"tomorrow": 预测明日产量,
|
||
"weekRange": "本周预测范围",
|
||
"confidence": "置信度百分比"
|
||
}}
|
||
}}
|
||
|
||
重要提示:
|
||
1. thinking字段必须简洁,控制在200字以内,用分步骤的方式展示
|
||
2. 每一步都要简明扼要,突出重点
|
||
3. 先描述数据特征,再分析问题,最后给出建议
|
||
4. 洞察要具体、实用,基于实际数据
|
||
5. 预测要基于历史趋势
|
||
6. 返回标准JSON格式,不要包含其他文字
|
||
"""
|
||
|
||
return prompt
|
||
|
||
async def _call_ai(self, prompt: str) -> str:
|
||
"""调用AI接口"""
|
||
if self.config.provider == "openai":
|
||
return await self._call_openai(prompt)
|
||
elif self.config.provider == "qwen":
|
||
return await self._call_qwen(prompt)
|
||
elif self.config.provider == "wenxin":
|
||
return await self._call_wenxin(prompt)
|
||
elif self.config.provider == "local":
|
||
return await self._call_local(prompt)
|
||
else:
|
||
raise ValueError(f"不支持的AI提供商: {self.config.provider}")
|
||
|
||
async def _call_openai(self, prompt: str) -> str:
|
||
"""调用OpenAI API"""
|
||
headers = {
|
||
"Authorization": f"Bearer {self.config.api_key}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
data = {
|
||
"model": self.config.model,
|
||
"messages": [
|
||
{"role": "system", "content": "你是一个专业的生产管理数据分析专家。"},
|
||
{"role": "user", "content": prompt}
|
||
],
|
||
"temperature": self.config.temperature,
|
||
"max_tokens": self.config.max_tokens
|
||
}
|
||
|
||
url = self.config.base_url or "https://api.openai.com/v1/chat/completions"
|
||
|
||
async with self.session.post(url, headers=headers, json=data) as response:
|
||
if response.status == 200:
|
||
result = await response.json()
|
||
return result["choices"][0]["message"]["content"]
|
||
else:
|
||
raise Exception(f"OpenAI API调用失败: {response.status}")
|
||
|
||
async def _call_qwen(self, prompt: str) -> str:
|
||
"""调用通义千问API(使用OpenAI兼容接口)"""
|
||
headers = {
|
||
"Authorization": f"Bearer {self.config.api_key}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
data = {
|
||
"model": self.config.model or "qwen-turbo",
|
||
"messages": [
|
||
{"role": "system", "content": "你是一个专业的生产管理数据分析专家。"},
|
||
{"role": "user", "content": prompt}
|
||
],
|
||
"temperature": self.config.temperature,
|
||
"max_tokens": 1000 # 减少token数量
|
||
}
|
||
|
||
# 使用兼容模式的URL
|
||
if self.config.base_url and self.config.base_url.endswith('/chat/completions'):
|
||
url = self.config.base_url
|
||
else:
|
||
url = (self.config.base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1") + "/chat/completions"
|
||
|
||
async with self.session.post(url, headers=headers, json=data) as response:
|
||
if response.status == 200:
|
||
result = await response.json()
|
||
return result["choices"][0]["message"]["content"]
|
||
else:
|
||
error_text = await response.text()
|
||
raise Exception(f"通义千问API调用失败: {response.status} - {error_text}")
|
||
|
||
async def _call_wenxin(self, prompt: str) -> str:
|
||
"""调用文心一言API"""
|
||
# 实现文心一言API调用
|
||
access_token = await self._get_wenxin_access_token()
|
||
|
||
headers = {
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
data = {
|
||
"messages": [
|
||
{"role": "user", "content": prompt}
|
||
],
|
||
"temperature": self.config.temperature,
|
||
"max_output_tokens": self.config.max_tokens
|
||
}
|
||
|
||
url = f"https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions?access_token={access_token}"
|
||
|
||
async with self.session.post(url, headers=headers, json=data) as response:
|
||
if response.status == 200:
|
||
result = await response.json()
|
||
return result["result"]
|
||
else:
|
||
raise Exception(f"文心一言API调用失败: {response.status}")
|
||
|
||
async def _call_local(self, prompt: str) -> str:
|
||
"""调用本地模型(如Ollama)"""
|
||
headers = {
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
data = {
|
||
"model": self.config.model,
|
||
"prompt": prompt,
|
||
"stream": False,
|
||
"options": {
|
||
"temperature": self.config.temperature,
|
||
"num_predict": self.config.max_tokens
|
||
}
|
||
}
|
||
|
||
url = self.config.base_url or "http://localhost:11434/api/generate"
|
||
|
||
async with self.session.post(url, headers=headers, json=data) as response:
|
||
if response.status == 200:
|
||
result = await response.json()
|
||
return result["response"]
|
||
else:
|
||
raise Exception(f"本地模型API调用失败: {response.status}")
|
||
|
||
async def _get_wenxin_access_token(self) -> str:
|
||
"""获取文心一言access token"""
|
||
# 这里需要实现获取access token的逻辑
|
||
# 参考百度AI文档
|
||
pass
|
||
|
||
def _parse_ai_response(self, response: str) -> Dict:
|
||
"""解析AI响应"""
|
||
try:
|
||
# 尝试解析JSON
|
||
# 如果响应包含```json,需要提取
|
||
if "```json" in response:
|
||
start = response.find("```json") + 7
|
||
end = response.find("```", start)
|
||
json_str = response[start:end].strip()
|
||
else:
|
||
json_str = response.strip()
|
||
|
||
return json.loads(json_str)
|
||
except json.JSONDecodeError:
|
||
# 如果解析失败,返回默认结构
|
||
return {
|
||
"summary": {
|
||
"totalProduction": 0,
|
||
"goodRate": "0%",
|
||
"trend": "stable",
|
||
"insights": ["AI分析暂时不可用,请稍后重试"]
|
||
},
|
||
"platforms": {"pdd": {"count": 0, "percentage": 0, "trend": "+0%"}},
|
||
"quality": {"topIssues": []},
|
||
"prediction": {"tomorrow": 0, "weekRange": "0-0", "confidence": "0%"}
|
||
}
|
||
|
||
# 配置示例
|
||
def get_ai_config() -> AIConfig:
|
||
"""获取AI配置"""
|
||
# 从环境变量或配置文件读取
|
||
provider = os.getenv("AI_PROVIDER", "openai") # openai, qwen, wenxin, local
|
||
|
||
if provider == "openai":
|
||
return AIConfig(
|
||
provider="openai",
|
||
api_key=os.getenv("OPENAI_API_KEY", ""),
|
||
base_url=os.getenv("OPENAI_BASE_URL"),
|
||
model="gpt-3.5-turbo"
|
||
)
|
||
elif provider == "qwen":
|
||
return AIConfig(
|
||
provider="qwen",
|
||
api_key=os.getenv("QWEN_API_KEY", ""),
|
||
base_url=os.getenv("QWEN_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1"),
|
||
model="qwen-turbo"
|
||
)
|
||
elif provider == "wenxin":
|
||
return AIConfig(
|
||
provider="wenxin",
|
||
api_key=os.getenv("WENXIN_API_KEY", ""),
|
||
model="ERNIE-Bot"
|
||
)
|
||
elif provider == "local":
|
||
return AIConfig(
|
||
provider="local",
|
||
api_key="",
|
||
base_url=os.getenv("LOCAL_AI_URL", "http://localhost:11434/api/generate"),
|
||
model="llama2"
|
||
)
|
||
else:
|
||
raise ValueError(f"不支持的AI提供商: {provider}")
|
||
|
||
# 使用示例
|
||
async def analyze_production(data: Dict) -> Dict:
|
||
"""分析生产数据"""
|
||
config = get_ai_config()
|
||
async with AIService(config) as ai_service:
|
||
return await ai_service.analyze_production_data(data)
|