#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ AI分析服务 支持多种AI接口:OpenAI、通义千问、文心一言、本地模型等 """ import json import asyncio from datetime import datetime, timedelta from typing import Dict, List, Optional, Union import aiohttp from dataclasses import dataclass import os @dataclass class AIConfig: """AI配置""" provider: str # openai, qwen, wenxin, local api_key: str base_url: Optional[str] = None model: str = "gpt-3.5-turbo" temperature: float = 0.7 max_tokens: int = 2000 class AIService: """AI分析服务""" def __init__(self, config: AIConfig): self.config = config self.session = None async def __aenter__(self): self.session = aiohttp.ClientSession() return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() async def analyze_production_data(self, data: Dict) -> Dict: """分析生产数据""" # 构建分析提示词 prompt = self._build_analysis_prompt(data) # 调用AI接口 response = await self._call_ai(prompt) # 解析AI返回的结果 return self._parse_ai_response(response) async def generate_thinking_stream(self, data: Dict) -> str: """生成思考过程(用于流式输出)""" # 构建一个专门用于思考过程的prompt thinking_prompt = f""" 请分析以下生产数据,并详细展示你的思考过程。请按照以下步骤进行分析: 数据情况: - 拼多多数据:{len(data.get('pdd', []))} 条记录 - 圆通数据:{len(data.get('yt', []))} 条记录 - 分析时间:{data.get('analysis_time', '未知')} 请按照以下格式输出思考过程: 第一步:数据概览 - 描述看到的基础数据情况 第二步:规律发现 - 分析数据中的模式和趋势 第三步:原因推断 - 解释为什么会出现这些规律 第四步:结论形成 - 总结关键发现和建议 请详细描述每个步骤的思考内容,让用户了解AI是如何分析数据的。 """ # 调用AI获取思考过程 response = await self._call_ai(thinking_prompt) # 提取思考过程部分 if "第一步:" in response: # 提取从第一步开始的内容 start_idx = response.find("第一步:") thinking = response[start_idx:] else: thinking = response return thinking def _build_analysis_prompt(self, data: Dict) -> str: """构建分析提示词""" # 获取最近30天的数据 today = datetime.now() thirty_days_ago = today - timedelta(days=30) # 统计数据 pdd_data = data.get('pdd', []) yt_data = data.get('yt', []) shipment_stats = data.get('shipments', {'total': 0, 'by_platform': {}}) bom_stats = data.get('bom', {'count': 0, 'products': 0}) inventory_stats = data.get('inventory', {'count': 0, 'total_qty': 0}) purchase_demand_stats = data.get('purchase_demand', {'count': 0, 'total_required': 0}) customer_order_stats = data.get('customer_orders', {'count': 0, 'total_qty': 0, 'completed': 0}) reconciliation_stats = data.get('reconciliations', {'count': 0, 'total_qty': 0}) # 计算关键指标 total_pdd = len(pdd_data) total_yt = len(yt_data) total_production = total_pdd + total_yt # 发货数据 total_shipments = shipment_stats.get('total', 0) shipment_by_platform = shipment_stats.get('by_platform', {}) # 计划管理数据 bom_count = bom_stats.get('count', 0) or 0 bom_products = bom_stats.get('products', 0) or 0 inventory_count = inventory_stats.get('count', 0) or 0 inventory_qty = inventory_stats.get('total_qty', 0) or 0 purchase_demand_count = purchase_demand_stats.get('count', 0) or 0 purchase_required = purchase_demand_stats.get('total_required', 0) or 0 customer_order_count = customer_order_stats.get('count', 0) or 0 customer_order_qty = customer_order_stats.get('total_qty', 0) or 0 customer_order_completed = customer_order_stats.get('completed', 0) or 0 reconciliation_count = reconciliation_stats.get('count', 0) or 0 reconciliation_qty = reconciliation_stats.get('total_qty', 0) or 0 # 计算订单完成率 order_completion_rate = (customer_order_completed / customer_order_count * 100) if customer_order_count > 0 else 0 # 计算良品率(假设最后100条数据中有良品标记) recent_data = (pdd_data + yt_data)[-100:] good_count = sum(1 for item in recent_data if '不良' not in item.get('note', '')) good_rate = (good_count / len(recent_data) * 100) if recent_data else 0 # 计算趋势(最近7天 vs 之前7天) seven_days_ago = today - timedelta(days=7) recent_pdd = len([d for d in pdd_data if datetime.strptime(d['ts_cn'], '%Y-%m-%d %H:%M:%S') > seven_days_ago]) recent_yt = len([d for d in yt_data if datetime.strptime(d['ts_cn'], '%Y-%m-%d %H:%M:%S') > seven_days_ago]) fourteen_days_ago = today - timedelta(days=14) prev_pdd = len([d for d in pdd_data if fourteen_days_ago < datetime.strptime(d['ts_cn'], '%Y-%m-%d %H:%M:%S') <= seven_days_ago]) prev_yt = len([d for d in yt_data if fourteen_days_ago < datetime.strptime(d['ts_cn'], '%Y-%m-%d %H:%M:%S') <= seven_days_ago]) trend = "上升" if (recent_pdd + recent_yt) > (prev_pdd + prev_yt) else "下降" # 统计不良类型 defect_types = {} for item in recent_data: note = item.get('note', '') if '不良' in note: defect_type = note.split(':')[-1] if ':' in note else note defect_types[defect_type] = defect_types.get(defect_type, 0) + 1 top_defects = sorted(defect_types.items(), key=lambda x: x[1], reverse=True)[:3] # 计算趋势百分比 pdd_trend = ((recent_pdd - prev_pdd) / prev_pdd * 100) if prev_pdd > 0 else 0 yt_trend = ((recent_yt - prev_yt) / prev_yt * 100) if prev_yt > 0 else 0 # 计算百分比 pdd_percentage = (total_pdd/total_production*100) if total_production > 0 else 0 yt_percentage = (total_yt/total_production*100) if total_production > 0 else 0 # 计算生产与发货的差异 production_shipment_gap = total_production - total_shipments gap_percentage = (production_shipment_gap / total_production * 100) if total_production > 0 else 0 prompt = f""" 作为生产管理专家,请分析以下生产数据并提供专业洞察: 【基础数据】 - 统计周期:最近30天 - 总产量:{total_production} 台 - 良品率:{good_rate:.1f}% - 产量趋势:{trend.lower()} 【平台分布】 - 拼多多:{total_pdd} 台 ({pdd_percentage:.1f}%) - 圆通:{total_yt} 台 ({yt_percentage:.1f}%) 【发货统计】 - 总发货量:{total_shipments} 台 - 发货平台分布:{shipment_by_platform} - 生产与发货差异:{production_shipment_gap} 台 ({gap_percentage:.1f}%) 【计划管理】 - BOM物料清单:{bom_count} 条记录,涉及 {bom_products} 个产品 - 期初库存:{inventory_count} 种物料,总库存量 {inventory_qty} - 采购需求:{purchase_demand_count} 条需求,总需求量 {purchase_required} - 客户订单:{customer_order_count} 个订单,总订单量 {customer_order_qty} 台,完成率 {order_completion_rate:.1f}% - 对账单:{reconciliation_count} 条记录,总对账量 {reconciliation_qty} 台 【质量分析】 主要不良问题: {chr(10).join([f"- {d[0]}:{d[1]} 次" for d in top_defects]) if top_defects else "- 暂无不良记录"} 【最近7天产量】 - 拼多多:{recent_pdd} 台 - 圆通:{recent_yt} 台 请以JSON格式返回分析结果,包含以下字段。**重要提示:对于需要特别注意的问题,请在insights中使用"⚠️"标记**: {{ "thinking": "第一步:数据概览 - 描述看到的基础数据情况\\n第二步:规律发现 - 分析数据中的模式和趋势\\n第三步:原因推断 - 解释为什么会出现这些规律\\n第四步:结论形成 - 总结关键发现和建议", "summary": {{ "totalProduction": {total_production}, "goodRate": "{good_rate:.1f}%", "trend": "{trend.lower()}", "insights": [ "洞察1:具体分析产量变化原因(如有问题用⚠️标记)", "洞察2:质量管控建议(如有问题用⚠️标记)", "洞察3:生产优化建议" ] }}, "platforms": {{ "pdd": {{ "count": {total_pdd}, "percentage": {pdd_percentage:.1f}, "trend": "{pdd_trend:+.1f}%" }}, "yt": {{ "count": {total_yt}, "percentage": {yt_percentage:.1f}, "trend": "{yt_trend:+.1f}%" }} }}, "quality": {{ "topIssues": [ {{"issue": "问题1", "count": 数量, "percentage": "百分比"}} ] }}, "prediction": {{ "tomorrow": 预测明日产量, "weekRange": "本周预测范围", "confidence": "置信度百分比" }} }} 重要提示: 1. thinking字段必须简洁,控制在200字以内,用分步骤的方式展示 2. 每一步都要简明扼要,突出重点 3. 先描述数据特征,再分析问题,最后给出建议 4. 洞察要具体、实用,基于实际数据 5. 预测要基于历史趋势 6. 返回标准JSON格式,不要包含其他文字 """ return prompt async def _call_ai(self, prompt: str) -> str: """调用AI接口""" if self.config.provider == "openai": return await self._call_openai(prompt) elif self.config.provider == "qwen": return await self._call_qwen(prompt) elif self.config.provider == "wenxin": return await self._call_wenxin(prompt) elif self.config.provider == "local": return await self._call_local(prompt) else: raise ValueError(f"不支持的AI提供商: {self.config.provider}") async def _call_openai(self, prompt: str) -> str: """调用OpenAI API""" headers = { "Authorization": f"Bearer {self.config.api_key}", "Content-Type": "application/json" } data = { "model": self.config.model, "messages": [ {"role": "system", "content": "你是一个专业的生产管理数据分析专家。"}, {"role": "user", "content": prompt} ], "temperature": self.config.temperature, "max_tokens": self.config.max_tokens } url = self.config.base_url or "https://api.openai.com/v1/chat/completions" async with self.session.post(url, headers=headers, json=data) as response: if response.status == 200: result = await response.json() return result["choices"][0]["message"]["content"] else: raise Exception(f"OpenAI API调用失败: {response.status}") async def _call_qwen(self, prompt: str) -> str: """调用通义千问API(使用OpenAI兼容接口)""" headers = { "Authorization": f"Bearer {self.config.api_key}", "Content-Type": "application/json" } data = { "model": self.config.model or "qwen-turbo", "messages": [ {"role": "system", "content": "你是一个专业的生产管理数据分析专家。"}, {"role": "user", "content": prompt} ], "temperature": self.config.temperature, "max_tokens": 1000 # 减少token数量 } # 使用兼容模式的URL if self.config.base_url and self.config.base_url.endswith('/chat/completions'): url = self.config.base_url else: url = (self.config.base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1") + "/chat/completions" async with self.session.post(url, headers=headers, json=data) as response: if response.status == 200: result = await response.json() return result["choices"][0]["message"]["content"] else: error_text = await response.text() raise Exception(f"通义千问API调用失败: {response.status} - {error_text}") async def _call_wenxin(self, prompt: str) -> str: """调用文心一言API""" # 实现文心一言API调用 access_token = await self._get_wenxin_access_token() headers = { "Content-Type": "application/json" } data = { "messages": [ {"role": "user", "content": prompt} ], "temperature": self.config.temperature, "max_output_tokens": self.config.max_tokens } url = f"https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions?access_token={access_token}" async with self.session.post(url, headers=headers, json=data) as response: if response.status == 200: result = await response.json() return result["result"] else: raise Exception(f"文心一言API调用失败: {response.status}") async def _call_local(self, prompt: str) -> str: """调用本地模型(如Ollama)""" headers = { "Content-Type": "application/json" } data = { "model": self.config.model, "prompt": prompt, "stream": False, "options": { "temperature": self.config.temperature, "num_predict": self.config.max_tokens } } url = self.config.base_url or "http://localhost:11434/api/generate" async with self.session.post(url, headers=headers, json=data) as response: if response.status == 200: result = await response.json() return result["response"] else: raise Exception(f"本地模型API调用失败: {response.status}") async def _get_wenxin_access_token(self) -> str: """获取文心一言access token""" # 这里需要实现获取access token的逻辑 # 参考百度AI文档 pass def _parse_ai_response(self, response: str) -> Dict: """解析AI响应""" try: # 尝试解析JSON # 如果响应包含```json,需要提取 if "```json" in response: start = response.find("```json") + 7 end = response.find("```", start) json_str = response[start:end].strip() else: json_str = response.strip() return json.loads(json_str) except json.JSONDecodeError: # 如果解析失败,返回默认结构 return { "summary": { "totalProduction": 0, "goodRate": "0%", "trend": "stable", "insights": ["AI分析暂时不可用,请稍后重试"] }, "platforms": {"pdd": {"count": 0, "percentage": 0, "trend": "+0%"}}, "quality": {"topIssues": []}, "prediction": {"tomorrow": 0, "weekRange": "0-0", "confidence": "0%"} } # 配置示例 def get_ai_config() -> AIConfig: """获取AI配置""" # 从环境变量或配置文件读取 provider = os.getenv("AI_PROVIDER", "openai") # openai, qwen, wenxin, local if provider == "openai": return AIConfig( provider="openai", api_key=os.getenv("OPENAI_API_KEY", ""), base_url=os.getenv("OPENAI_BASE_URL"), model="gpt-3.5-turbo" ) elif provider == "qwen": return AIConfig( provider="qwen", api_key=os.getenv("QWEN_API_KEY", ""), base_url=os.getenv("QWEN_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1"), model="qwen-turbo" ) elif provider == "wenxin": return AIConfig( provider="wenxin", api_key=os.getenv("WENXIN_API_KEY", ""), model="ERNIE-Bot" ) elif provider == "local": return AIConfig( provider="local", api_key="", base_url=os.getenv("LOCAL_AI_URL", "http://localhost:11434/api/generate"), model="llama2" ) else: raise ValueError(f"不支持的AI提供商: {provider}") # 使用示例 async def analyze_production(data: Dict) -> Dict: """分析生产数据""" config = get_ai_config() async with AIService(config) as ai_service: return await ai_service.analyze_production_data(data)