# #!/usr/bin/env python # # -*- coding: utf-8 -*- # from flask import Flask, request, make_response # import hashlib # import time # import xml.etree.ElementTree as ET # import os # import json # from openai import OpenAI # from dotenv import load_dotenv # from markdown import markdown # import re # import threading # import logging # from datetime import datetime # # 配置日志记录 # logging.basicConfig( # level=logging.INFO, # format='%(asctime)s - %(levelname)s - %(message)s', # handlers=[ # logging.FileHandler('wechat_service.log'), # logging.StreamHandler() # ] # ) # # 加载环境变量 # load_dotenv() # app = Flask(__name__) # # 基础配置 # TOKEN = os.getenv('TOKEN') # API_KEY = os.getenv("API_KEY") # BASE_URL = os.getenv("OPENAI_BASE_URL") # client = OpenAI(api_key=API_KEY, base_url=BASE_URL) # class UserSession: # def __init__(self): # self.messages = [{"role": "system", "content": "You are a helpful assistant."}] # self.pending_parts = [] # self.last_active = time.time() # class SessionManager: # def __init__(self): # self.sessions = {} # self.session_timeout = 3600 # 1小时会话超时 # self._lock = threading.Lock() # def get_session(self, user_id): # with self._lock: # if user_id not in self.sessions: # self.sessions[user_id] = UserSession() # session = self.sessions[user_id] # session.last_active = time.time() # return session # def clear_session(self, user_id): # with self._lock: # if user_id in self.sessions: # self.sessions[user_id] = UserSession() # def cleanup_expired_sessions(self): # with self._lock: # current_time = time.time() # expired_users = [ # user_id for user_id, session in self.sessions.items() # if current_time - session.last_active > self.session_timeout # ] # for user_id in expired_users: # del self.sessions[user_id] # logging.info(f"已清理过期会话: {user_id}") # session_manager = SessionManager() # def convert_markdown_to_wechat(md_text): # """将Markdown转换为微信友好的文本格式""" # md_text = re.sub(r'^# (.*?)$', r'【标题】\1', md_text, flags=re.MULTILINE) # md_text = re.sub(r'^## (.*?)$', r'【子标题】\1', md_text, flags=re.MULTILINE) # md_text = re.sub(r'\*\*(.*?)\*\*', r'『\1』', md_text) # md_text = re.sub(r'\*(.*?)\*', r'「\1」', md_text) # md_text = re.sub(r'^\- ', '• ', md_text, flags=re.MULTILINE) # md_text = re.sub(r'^\d\. ', '○ ', md_text, flags=re.MULTILINE) # md_text = re.sub(r'```(.*?)```', r'【代码】\n\1\n【代码结束】', md_text, flags=re.DOTALL) # md_text = re.sub(r'^> (.*?)$', r'▎\1', md_text, flags=re.MULTILINE) # return md_text # def verify_wechat(request): # """验证微信服务器请求""" # data = request.args # signature = data.get('signature') # timestamp = data.get('timestamp') # nonce = data.get('nonce') # echostr = data.get('echostr') # temp = [timestamp, nonce, TOKEN] # temp.sort() # temp = ''.join(temp) # if hashlib.sha1(temp.encode('utf8')).hexdigest() == signature: # return echostr # return 'error', 403 # def parse_xml_message(xml_content): # """解析微信XML消息""" # root = ET.fromstring(xml_content) # return { # 'content': root.find('Content').text, # 'from_user': root.find('FromUserName').text, # 'to_user': root.find('ToUserName').text, # 'msg_id': root.find('MsgId').text, # 'create_time': root.find('CreateTime').text # } # def generate_response_xml(to_user, from_user, content): # """生成回复的XML消息""" # response_content = convert_markdown_to_wechat(content) # xml_template = ''' # # # # %s # # # # ''' # response = make_response( # xml_template % (to_user, from_user, str(int(time.time())), response_content) # ) # response.content_type = 'application/xml' # return response # def get_openai_response(messages, timeout=30): # """获取OpenAI API响应""" # try: # response = client.chat.completions.create( # model="gpt-4o-mini", # messages=messages, # timeout=timeout # ) # return response.choices[0].message.content # except Exception as e: # logging.error(f"OpenAI API错误: {str(e)}") # return "抱歉,我暂时无法回答,请稍后再试。" # def split_message(message, max_length=500): # """将长消息分段""" # return [message[i:i+max_length] for i in range(0, len(message), max_length)] # def append_status_message(content, has_pending_parts=False): # """添加状态提示信息""" # status_message = "\n\n-------------------\n" # if has_pending_parts: # status_message += "当前消息已截断,发送’继续‘查看后续内容\n" # status_message += "发送’新对话‘开始新的对话" # return content + status_message # @app.route('/api/wx', methods=['GET', 'POST']) # def wechatai(): # if request.method == 'GET': # return verify_wechat(request) # try: # message_data = parse_xml_message(request.data) # user_content = message_data['content'] # from_user = message_data['from_user'] # to_user = message_data['to_user'] # logging.info(f"收到用户({from_user})消息: {user_content}") # session = session_manager.get_session(from_user) # if user_content.strip() == '新对话': # session_manager.clear_session(from_user) # return generate_response_xml( # from_user, # to_user, # append_status_message("已开始新的对话。请描述您的问题。") # ) # if user_content.strip() == '继续': # if session.pending_parts: # next_part = session.pending_parts.pop(0) # has_more = bool(session.pending_parts) # return generate_response_xml( # from_user, # to_user, # append_status_message(next_part, has_more) # ) # return generate_response_xml( # from_user, # to_user, # append_status_message("没有更多内容了。请继续您的问题。") # ) # session.messages.append({"role": "user", "content": user_content}) # response = get_openai_response(session.messages) # session.messages.append({"role": "assistant", "content": response}) # if len(response) > 500: # parts = split_message(response) # first_part = parts.pop(0) # session.pending_parts = parts # return generate_response_xml( # from_user, # to_user, # append_status_message(first_part, True) # ) # return generate_response_xml( # from_user, # to_user, # append_status_message(response) # ) # except Exception as e: # logging.error(f"处理请求时出错: {str(e)}") # return generate_response_xml( # message_data['from_user'], # message_data['to_user'], # append_status_message("抱歉,系统暂时出现问题,请稍后重试。") # ) # def cleanup_sessions(): # """定期清理过期会话""" # while True: # time.sleep(3600) # session_manager.cleanup_expired_sessions() # if __name__ == '__main__': # # 启动清理线程 # cleanup_thread = threading.Thread(target=cleanup_sessions, daemon=True) # cleanup_thread.start() # app.run(host='0.0.0.0', port=7860, debug=True) #!/usr/bin/env python # -*- coding: utf-8 -*- # from flask import Flask, request, make_response # import hashlib # import time # import xml.etree.ElementTree as ET # import os # import json # from openai import OpenAI # from dotenv import load_dotenv # from markdown import markdown # import re # import threading # import logging # from datetime import datetime # import asyncio # from concurrent.futures import ThreadPoolExecutor # import queue # import uuid # # 配置日志记录 # logging.basicConfig( # level=logging.INFO, # format='%(asctime)s - %(levelname)s - %(message)s', # handlers=[ # logging.FileHandler('wechat_service.log'), # logging.StreamHandler() # ] # ) # # 加载环境变量 # load_dotenv() # app = Flask(__name__) # # 基础配置 # TOKEN = os.getenv('TOKEN') # API_KEY = os.getenv("API_KEY") # BASE_URL = os.getenv("OPENAI_BASE_URL") # client = OpenAI(api_key=API_KEY, base_url=BASE_URL) # # 创建线程池 # executor = ThreadPoolExecutor(max_workers=10) # def convert_markdown_to_wechat(md_text): # """将Markdown转换为微信友好的文本格式""" # if not md_text: # return md_text # # 处理标题 # md_text = re.sub(r'^# (.*?)$', r'【标题】\1', md_text, flags=re.MULTILINE) # md_text = re.sub(r'^## (.*?)$', r'【子标题】\1', md_text, flags=re.MULTILINE) # md_text = re.sub(r'^### (.*?)$', r'【小标题】\1', md_text, flags=re.MULTILINE) # # 处理强调语法 # md_text = re.sub(r'\*\*(.*?)\*\*', r'『\1』', md_text) # 粗体 # md_text = re.sub(r'\*(.*?)\*', r'「\1」', md_text) # 斜体 # md_text = re.sub(r'`(.*?)`', r'「\1」', md_text) # 行内代码 # # 处理列表 # md_text = re.sub(r'^\- ', '• ', md_text, flags=re.MULTILINE) # 无序列表 # md_text = re.sub(r'^\d\. ', '○ ', md_text, flags=re.MULTILINE) # 有序列表 # # 处理代码块 # md_text = re.sub(r'```[\w]*\n(.*?)```', r'【代码开始】\n\1\n【代码结束】', md_text, flags=re.DOTALL) # # 处理引用 # md_text = re.sub(r'^> (.*?)$', r'▎\1', md_text, flags=re.MULTILINE) # # 处理分隔线 # md_text = re.sub(r'^-{3,}$', r'—————————', md_text, flags=re.MULTILINE) # # 处理链接 # md_text = re.sub(r'\[(.*?)\]\((.*?)\)', r'\1(\2)', md_text) # # 净化处理:去除多余的空行 # md_text = re.sub(r'\n{3,}', '\n\n', md_text) # return md_text # class AsyncResponse: # def __init__(self): # self.status = "processing" # processing, completed, failed # self.result = None # self.error = None # class UserSession: # def __init__(self): # self.messages = [{"role": "system", "content": "You are a helpful assistant."}] # self.pending_parts = [] # self.last_active = time.time() # self.current_task = None # 存储当前正在处理的任务ID # self.response_queue = {} # 存储异步响应 # class SessionManager: # def __init__(self): # self.sessions = {} # self.session_timeout = 3600 # 1小时会话超时 # self._lock = threading.Lock() # def get_session(self, user_id): # with self._lock: # if user_id not in self.sessions: # self.sessions[user_id] = UserSession() # session = self.sessions[user_id] # session.last_active = time.time() # return session # def clear_session(self, user_id): # with self._lock: # if user_id in self.sessions: # self.sessions[user_id] = UserSession() # def cleanup_expired_sessions(self): # with self._lock: # current_time = time.time() # expired_users = [ # user_id for user_id, session in self.sessions.items() # if current_time - session.last_active > self.session_timeout # ] # for user_id in expired_users: # del self.sessions[user_id] # logging.info(f"已清理过期会话: {user_id}") # session_manager = SessionManager() # def verify_wechat(request): # """验证微信服务器请求""" # data = request.args # signature = data.get('signature') # timestamp = data.get('timestamp') # nonce = data.get('nonce') # echostr = data.get('echostr') # temp = [timestamp, nonce, TOKEN] # temp.sort() # temp = ''.join(temp) # if hashlib.sha1(temp.encode('utf8')).hexdigest() == signature: # return echostr # return 'error', 403 # def parse_xml_message(xml_content): # """解析微信XML消息""" # root = ET.fromstring(xml_content) # return { # 'content': root.find('Content').text, # 'from_user': root.find('FromUserName').text, # 'to_user': root.find('ToUserName').text, # 'msg_id': root.find('MsgId').text, # 'create_time': root.find('CreateTime').text # } # def generate_response_xml(to_user, from_user, content): # """生成回复的XML消息""" # formatted_content = convert_markdown_to_wechat(content) # xml_template = ''' # # # # %s # # # # ''' # response = make_response( # xml_template % (to_user, from_user, str(int(time.time())), formatted_content) # ) # response.content_type = 'application/xml' # return response # def process_long_running_task(messages): # """处理耗时任务""" # try: # response = client.chat.completions.create( # model="gpt-4o-mini", # messages=messages, # timeout=60 # ) # return response.choices[0].message.content # except Exception as e: # logging.error(f"API调用错误: {str(e)}") # raise # def handle_async_task(session, task_id, messages): # """异步任务处理函数""" # try: # result = process_long_running_task(messages) # session.response_queue[task_id].status = "completed" # session.response_queue[task_id].result = result # except Exception as e: # session.response_queue[task_id].status = "failed" # session.response_queue[task_id].error = str(e) # def generate_initial_response(): # """生成初始响应消息""" # return "您的请求正在处理中,请稍后回复’查询‘获取结果。" # def split_message(message, max_length=500): # """将长消息分段""" # return [message[i:i+max_length] for i in range(0, len(message), max_length)] # def append_status_message(content, has_pending_parts=False, is_processing=False): # """添加状态提示信息""" # status_message = "\n\n-------------------\n" # if is_processing: # status_message += "您的请求正在处理中,请回复’查询‘获取结果\n" # elif has_pending_parts: # status_message += "当前消息已截断,发送’继续‘查看后续内容\n" # status_message += "发送‘新对话’开始新的对话" # return content + status_message # @app.route('/api/wx', methods=['GET', 'POST']) # def wechatai(): # if request.method == 'GET': # return verify_wechat(request) # try: # message_data = parse_xml_message(request.data) # user_content = message_data['content'].strip() # from_user = message_data['from_user'] # to_user = message_data['to_user'] # logging.info(f"收到用户({from_user})消息: {user_content}") # session = session_manager.get_session(from_user) # # 处理特殊命令 # if user_content == '新对话': # session_manager.clear_session(from_user) # return generate_response_xml( # from_user, # to_user, # append_status_message("已开始新的对话。请描述您的问题。") # ) # if user_content == '继续': # if session.pending_parts: # next_part = session.pending_parts.pop(0) # has_more = bool(session.pending_parts) # return generate_response_xml( # from_user, # to_user, # append_status_message(next_part, has_more) # ) # return generate_response_xml( # from_user, # to_user, # append_status_message("没有更多内容了。请继续您的问题。") # ) # if user_content == '查询': # if session.current_task: # task_response = session.response_queue.get(session.current_task) # if task_response: # if task_response.status == "completed": # response = task_response.result # # 清理完成的任务 # del session.response_queue[session.current_task] # session.current_task = None # # 处理长消息 # if len(response) > 500: # parts = split_message(response) # first_part = parts.pop(0) # session.pending_parts = parts # return generate_response_xml( # from_user, # to_user, # append_status_message(first_part, True) # ) # return generate_response_xml( # from_user, # to_user, # append_status_message(response) # ) # elif task_response.status == "failed": # error_message = "处理过程中出现错误,请重新提问。" # # 清理失败的任务 # del session.response_queue[session.current_task] # session.current_task = None # return generate_response_xml( # from_user, # to_user, # append_status_message(error_message) # ) # else: # return generate_response_xml( # from_user, # to_user, # append_status_message("正在处理中,请稍后再次查询。", is_processing=True) # ) # return generate_response_xml( # from_user, # to_user, # append_status_message("没有正在处理的请求。") # ) # # 处理新的用户消息 # session.messages.append({"role": "user", "content": user_content}) # # 创建新的异步任务 # task_id = str(uuid.uuid4()) # session.current_task = task_id # session.response_queue[task_id] = AsyncResponse() # # 启动异步处理 # executor.submit(handle_async_task, session, task_id, session.messages.copy()) # # 返回初始响应 # return generate_response_xml( # from_user, # to_user, # append_status_message(generate_initial_response(), is_processing=True) # ) # except Exception as e: # logging.error(f"处理请求时出错: {str(e)}") # return generate_response_xml( # message_data['from_user'], # message_data['to_user'], # append_status_message("抱歉,系统暂时出现问题,请稍后重试。") # ) # def cleanup_sessions(): # """定期清理过期会话""" # while True: # time.sleep(3600) # 每小时清理一次 # try: # session_manager.cleanup_expired_sessions() # except Exception as e: # logging.error(f"清理会话时出错: {str(e)}") # if __name__ == '__main__': # # 启动清理线程 # cleanup_thread = threading.Thread(target=cleanup_sessions, daemon=True) # cleanup_thread.start() # # 启动Flask应用 # app.run(host='0.0.0.0', port=7860, debug=True) # #!/usr/bin/env python # # -*- coding: utf-8 -*- # from flask import Flask, request, make_response # import hashlib # import time # import xml.etree.ElementTree as ET # import os # import json # from openai import OpenAI # from dotenv import load_dotenv # from markdown import markdown # import re # import threading # import logging # from datetime import datetime # import asyncio # from concurrent.futures import ThreadPoolExecutor # import queue # import uuid # # 配置日志记录 # logging.basicConfig( # level=logging.INFO, # format='%(asctime)s - %(levelname)s - %(message)s', # handlers=[ # logging.FileHandler('wechat_service.log'), # logging.StreamHandler() # ] # ) # # 加载环境变量 # load_dotenv() # app = Flask(__name__) # # 基础配置 # TOKEN = os.getenv('TOKEN') # API_KEY = os.getenv("API_KEY") # BASE_URL = os.getenv("OPENAI_BASE_URL") # client = OpenAI(api_key=API_KEY, base_url=BASE_URL) # # 创建线程池 # executor = ThreadPoolExecutor(max_workers=10) # def convert_markdown_to_wechat(md_text): # """将Markdown转换为微信友好的文本格式""" # if not md_text: # return md_text # # 处理标题 # md_text = re.sub(r'^# (.*?)$', r'【标题】\1', md_text, flags=re.MULTILINE) # md_text = re.sub(r'^## (.*?)$', r'【子标题】\1', md_text, flags=re.MULTILINE) # md_text = re.sub(r'^### (.*?)$', r'【小标题】\1', md_text, flags=re.MULTILINE) # # 处理强调语法 # md_text = re.sub(r'\*\*(.*?)\*\*', r'『\1』', md_text) # 粗体 # md_text = re.sub(r'\*(.*?)\*', r'「\1」', md_text) # 斜体 # md_text = re.sub(r'`(.*?)`', r'「\1」', md_text) # 行内代码 # # 处理列表 # md_text = re.sub(r'^\- ', '• ', md_text, flags=re.MULTILINE) # 无序列表 # md_text = re.sub(r'^\d\. ', '○ ', md_text, flags=re.MULTILINE) # 有序列表 # # 处理代码块 # md_text = re.sub(r'```[\w]*\n(.*?)```', r'【代码开始】\n\1\n【代码结束】', md_text, flags=re.DOTALL) # # 处理引用 # md_text = re.sub(r'^> (.*?)$', r'▎\1', md_text, flags=re.MULTILINE) # # 处理分隔线 # md_text = re.sub(r'^-{3,}$', r'—————————', md_text, flags=re.MULTILINE) # # 处理链接 # md_text = re.sub(r'\[(.*?)\]\((.*?)\)', r'\1(\2)', md_text) # # 净化处理:去除多余的空行 # md_text = re.sub(r'\n{3,}', '\n\n', md_text) # return md_text # class AsyncResponse: # def __init__(self): # self.status = "processing" # processing, completed, failed # self.result = None # self.error = None # class UserSession: # def __init__(self): # self.messages = [{"role": "system", "content": "You are a helpful assistant."}] # self.pending_parts = [] # self.last_active = time.time() # self.current_task = None # self.response_queue = {} # class SessionManager: # def __init__(self): # self.sessions = {} # self.session_timeout = 3600 # self._lock = threading.Lock() # def get_session(self, user_id): # with self._lock: # if user_id not in self.sessions: # self.sessions[user_id] = UserSession() # session = self.sessions[user_id] # session.last_active = time.time() # return session # def clear_session(self, user_id): # with self._lock: # if user_id in self.sessions: # self.sessions[user_id] = UserSession() # def cleanup_expired_sessions(self): # with self._lock: # current_time = time.time() # expired_users = [ # user_id for user_id, session in self.sessions.items() # if current_time - session.last_active > self.session_timeout # ] # for user_id in expired_users: # del self.sessions[user_id] # logging.info(f"已清理过期会话: {user_id}") # session_manager = SessionManager() # def verify_wechat(request): # """验证微信服务器请求""" # data = request.args # signature = data.get('signature') # timestamp = data.get('timestamp') # nonce = data.get('nonce') # echostr = data.get('echostr') # temp = [timestamp, nonce, TOKEN] # temp.sort() # temp = ''.join(temp) # if hashlib.sha1(temp.encode('utf8')).hexdigest() == signature: # return echostr # return 'error', 403 # def parse_xml_message(xml_content): # """解析微信XML消息""" # root = ET.fromstring(xml_content) # return { # 'content': root.find('Content').text, # 'from_user': root.find('FromUserName').text, # 'to_user': root.find('ToUserName').text, # 'msg_id': root.find('MsgId').text, # 'create_time': root.find('CreateTime').text # } # def generate_response_xml(to_user, from_user, content): # """生成回复的XML消息""" # formatted_content = convert_markdown_to_wechat(content) # xml_template = ''' # # # # %s # # # # ''' # response = make_response( # xml_template % (to_user, from_user, str(int(time.time())), formatted_content) # ) # response.content_type = 'application/xml' # return response # def process_long_running_task(messages): # """处理耗时任务""" # try: # response = client.chat.completions.create( # model="gpt-4o-mini", # messages=messages, # timeout=60 # ) # return response.choices[0].message.content # except Exception as e: # logging.error(f"API调用错误: {str(e)}") # raise # def handle_async_task(session, task_id, messages): # """异步任务处理函数""" # try: # result = process_long_running_task(messages) # session.response_queue[task_id].status = "completed" # session.response_queue[task_id].result = result # except Exception as e: # session.response_queue[task_id].status = "failed" # session.response_queue[task_id].error = str(e) # def generate_initial_response(): # """生成初始响应消息""" # return "您的请求正在处理中" # def split_message(message, max_length=500): # """将长消息分段""" # return [message[i:i+max_length] for i in range(0, len(message), max_length)] # def append_status_message(content, has_pending_parts=False, is_processing=False): # """添加状态提示信息""" # status_message = "\n\n-------------------" # if is_processing: # status_message += "\n请回复'查询'获取结果" # elif has_pending_parts: # status_message += "\n当前消息已截断,发送'继续'查看后续内容" # status_message += "\n发送'新对话'开始新的对话" # return content + status_message # @app.route('/api/wx', methods=['GET', 'POST']) # def wechatai(): # if request.method == 'GET': # return verify_wechat(request) # try: # message_data = parse_xml_message(request.data) # user_content = message_data['content'].strip() # from_user = message_data['from_user'] # to_user = message_data['to_user'] # logging.info(f"收到用户({from_user})消息: {user_content}") # session = session_manager.get_session(from_user) # if user_content == '新对话': # session_manager.clear_session(from_user) # return generate_response_xml( # from_user, # to_user, # append_status_message('已开始新的对话。请描述您的问题。') # ) # if user_content == '继续': # if session.pending_parts: # next_part = session.pending_parts.pop(0) # has_more = bool(session.pending_parts) # return generate_response_xml( # from_user, # to_user, # append_status_message(next_part, has_more) # ) # return generate_response_xml( # from_user, # to_user, # append_status_message('没有更多内容了。请继续您的问题。') # ) # if user_content == '查询': # if session.current_task: # task_response = session.response_queue.get(session.current_task) # if task_response: # if task_response.status == "completed": # response = task_response.result # del session.response_queue[session.current_task] # session.current_task = None # if len(response) > 500: # parts = split_message(response) # first_part = parts.pop(0) # session.pending_parts = parts # return generate_response_xml( # from_user, # to_user, # append_status_message(first_part, True) # ) # return generate_response_xml( # from_user, # to_user, # append_status_message(response) # ) # elif task_response.status == "failed": # error_message = '处理过程中出现错误,请重新提问。' # del session.response_queue[session.current_task] # session.current_task = None # return generate_response_xml( # from_user, # to_user, # append_status_message(error_message) # ) # else: # return generate_response_xml( # from_user, # to_user, # append_status_message('正在处理中,请稍后再次查询。', is_processing=True) # ) # return generate_response_xml( # from_user, # to_user, # append_status_message('没有正在处理的请求。') # ) # session.messages.append({"role": "user", "content": user_content}) # task_id = str(uuid.uuid4()) # session.current_task = task_id # session.response_queue[task_id] = AsyncResponse() # executor.submit(handle_async_task, session, task_id, session.messages.copy()) # return generate_response_xml( # from_user, # to_user, # append_status_message(generate_initial_response(), is_processing=True) # ) # except Exception as e: # logging.error(f"处理请求时出错: {str(e)}") # return generate_response_xml( # message_data['from_user'], # message_data['to_user'], # append_status_message('抱歉,系统暂时出现问题,请稍后重试。') # ) # def cleanup_sessions(): # """定期清理过期会话""" # while True: # time.sleep(3600) # try: # session_manager.cleanup_expired_sessions() # except Exception as e: # logging.error(f"清理会话时出错: {str(e)}") # if __name__ == '__main__': # cleanup_thread = threading.Thread(target=cleanup_sessions, daemon=True) # cleanup_thread.start() # app.run(host='0.0.0.0', port=7860, debug=True) #下面这个代码解决的是响应查询时重复回应的情况 #!/usr/bin/env python # -*- coding: utf-8 -*- # #下面这个代码是告诉用户超时后就需要重新回答了 #!/usr/bin/env python # -*- coding: utf-8 -*- from flask import Flask, request, make_response import hashlib import time import xml.etree.ElementTree as ET import os import json from openai import OpenAI from dotenv import load_dotenv from markdown import markdown import re import threading import logging from datetime import datetime import asyncio from concurrent.futures import ThreadPoolExecutor import queue import uuid # 配置日志记录 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('wechat_service.log'), logging.StreamHandler() ] ) # 加载环境变量 load_dotenv() app = Flask(__name__) # 基础配置 TOKEN = os.getenv('TOKEN') API_KEY = os.getenv("API_KEY") BASE_URL = os.getenv("OPENAI_BASE_URL") client = OpenAI(api_key=API_KEY, base_url=BASE_URL) # 创建线程池 executor = ThreadPoolExecutor(max_workers=10) class AsyncResponse: def __init__(self): self.status = "processing" # processing, completed, failed self.result = None self.error = None self.create_time = time.time() # 任务创建时间 self.timeout = 3600 # 任务超时时间(1小时) def is_expired(self): """检查任务是否过期""" return time.time() - self.create_time > self.timeout class UserSession: def __init__(self): self.messages = [{"role": "system", "content": "You are a helpful assistant."}] self.pending_parts = [] self.last_active = time.time() self.current_task = None self.response_queue = {} self.session_timeout = 3600 # 会话超时时间(1小时) def is_expired(self): """检查会话是否过期""" return time.time() - self.last_active > self.session_timeout def cleanup_expired_tasks(self): """清理过期的任务""" expired_tasks = [ task_id for task_id, response in self.response_queue.items() if response.is_expired() ] for task_id in expired_tasks: del self.response_queue[task_id] if self.current_task == task_id: self.current_task = None class SessionManager: def __init__(self): self.sessions = {} self._lock = threading.Lock() def get_session(self, user_id): with self._lock: current_time = time.time() if user_id in self.sessions: session = self.sessions[user_id] if session.is_expired(): session = UserSession() # 创建新会话 else: session.cleanup_expired_tasks() # 清理过期任务 else: session = UserSession() session.last_active = current_time self.sessions[user_id] = session return session def clear_session(self, user_id): with self._lock: if user_id in self.sessions: self.sessions[user_id] = UserSession() def cleanup_expired_sessions(self): with self._lock: current_time = time.time() expired_users = [ user_id for user_id, session in self.sessions.items() if session.is_expired() ] for user_id in expired_users: del self.sessions[user_id] logging.info(f"已清理过期会话: {user_id}") session_manager = SessionManager() def convert_markdown_to_wechat(md_text): """将Markdown转换为微信友好的文本格式""" if not md_text: return md_text # 处理标题 md_text = re.sub(r'^# (.*?)$', r'【标题】\1', md_text, flags=re.MULTILINE) md_text = re.sub(r'^## (.*?)$', r'【子标题】\1', md_text, flags=re.MULTILINE) md_text = re.sub(r'^### (.*?)$', r'【小标题】\1', md_text, flags=re.MULTILINE) # 处理强调语法 md_text = re.sub(r'\*\*(.*?)\*\*', r'『\1』', md_text) # 粗体 md_text = re.sub(r'\*(.*?)\*', r'「\1」', md_text) # 斜体 md_text = re.sub(r'`(.*?)`', r'「\1」', md_text) # 行内代码 # 处理列表 md_text = re.sub(r'^\- ', '• ', md_text, flags=re.MULTILINE) # 无序列表 md_text = re.sub(r'^\d\. ', '○ ', md_text, flags=re.MULTILINE) # 有序列表 # 处理代码块 md_text = re.sub(r'```[\w]*\n(.*?)```', r'【代码开始】\n\1\n【代码结束】', md_text, flags=re.DOTALL) # 处理引用 md_text = re.sub(r'^> (.*?)$', r'▎\1', md_text, flags=re.MULTILINE) # 处理分隔线 md_text = re.sub(r'^-{3,}$', r'—————————', md_text, flags=re.MULTILINE) # 处理链接 md_text = re.sub(r'\[(.*?)\]\((.*?)\)', r'\1(\2)', md_text) # 净化处理:去除多余的空行 md_text = re.sub(r'\n{3,}', '\n\n', md_text) return md_text def verify_wechat(request): """验证微信服务器请求""" data = request.args signature = data.get('signature') timestamp = data.get('timestamp') nonce = data.get('nonce') echostr = data.get('echostr') temp = [timestamp, nonce, TOKEN] temp.sort() temp = ''.join(temp) if hashlib.sha1(temp.encode('utf8')).hexdigest() == signature: return echostr return 'error', 403 def parse_xml_message(xml_content): """解析微信XML消息""" root = ET.fromstring(xml_content) return { 'content': root.find('Content').text, 'from_user': root.find('FromUserName').text, 'to_user': root.find('ToUserName').text, 'msg_id': root.find('MsgId').text, 'create_time': root.find('CreateTime').text } def generate_response_xml(to_user, from_user, content): """生成回复的XML消息""" formatted_content = convert_markdown_to_wechat(content) xml_template = ''' %s ''' response = make_response( xml_template % (to_user, from_user, str(int(time.time())), formatted_content) ) response.content_type = 'application/xml' return response def process_long_running_task(messages): """处理耗时任务""" try: response = client.chat.completions.create( model="gpt-4o-mini", messages=messages, timeout=60 ) return response.choices[0].message.content except Exception as e: logging.error(f"API调用错误: {str(e)}") raise def handle_async_task(session, task_id, messages): """异步任务处理函数""" try: if task_id not in session.response_queue: return result = process_long_running_task(messages) if task_id in session.response_queue and not session.response_queue[task_id].is_expired(): session.response_queue[task_id].status = "completed" session.response_queue[task_id].result = result except Exception as e: if task_id in session.response_queue: session.response_queue[task_id].status = "failed" session.response_queue[task_id].error = str(e) def generate_initial_response(): """生成初始响应消息""" return "您的请求正在处理中,请回复'查询'获取结果" def split_message(message, max_length=500): """将长消息分段""" return [message[i:i+max_length] for i in range(0, len(message), max_length)] def append_status_message(content, has_pending_parts=False, is_processing=False): """添加状态提示信息""" if "您的请求正在处理中" in content: return content + "\n\n-------------------\n发送'新对话'开始新的对话" status_message = "\n\n-------------------" if is_processing: status_message += "\n请回复'查询'获取结果" elif has_pending_parts: status_message += "\n当前消息已截断,发送'继续'查看后续内容" status_message += "\n发送'新对话'开始新的对话" return content + status_message @app.route('/api/wx', methods=['GET', 'POST']) def wechatai(): if request.method == 'GET': return verify_wechat(request) try: message_data = parse_xml_message(request.data) user_content = message_data['content'].strip() from_user = message_data['from_user'] to_user = message_data['to_user'] logging.info(f"收到用户({from_user})消息: {user_content}") session = session_manager.get_session(from_user) if user_content == '新对话': session_manager.clear_session(from_user) return generate_response_xml( from_user, to_user, append_status_message('已开始新的对话。请描述您的问题。') ) if user_content == '继续': if session.pending_parts: next_part = session.pending_parts.pop(0) has_more = bool(session.pending_parts) return generate_response_xml( from_user, to_user, append_status_message(next_part, has_more) ) return generate_response_xml( from_user, to_user, append_status_message('没有更多内容了。请继续您的问题。') ) if user_content == '查询': if session.current_task: task_response = session.response_queue.get(session.current_task) if task_response: if task_response.is_expired(): # 处理过期任务 del session.response_queue[session.current_task] session.current_task = None return generate_response_xml( from_user, to_user, append_status_message('请求已过期,请重新提问。') ) if task_response.status == "completed": response = task_response.result del session.response_queue[session.current_task] session.current_task = None session.messages.append({"role": "assistant", "content": response}) if len(response) > 500: parts = split_message(response) first_part = parts.pop(0) session.pending_parts = parts return generate_response_xml( from_user, to_user, append_status_message(first_part, True) ) return generate_response_xml( from_user, to_user, append_status_message(response) ) elif task_response.status == "failed": error_message = '处理过程中出现错误,请重新提问。' del session.response_queue[session.current_task] session.current_task = None return generate_response_xml( from_user, to_user, append_status_message(error_message) ) else: return generate_response_xml( from_user, to_user, append_status_message('正在处理中,请稍后再次查询。', is_processing=True) ) return generate_response_xml( from_user, to_user, append_status_message('没有正在处理的请求。') ) session.messages.append({"role": "user", "content": user_content}) task_id = str(uuid.uuid4()) session.current_task = task_id session.response_queue[task_id] = AsyncResponse() executor.submit(handle_async_task, session, task_id, session.messages.copy()) return generate_response_xml( from_user, to_user, append_status_message(generate_initial_response(), is_processing=True) ) except Exception as e: logging.error(f"处理请求时出错: {str(e)}") return generate_response_xml( message_data['from_user'], message_data['to_user'], append_status_message('抱歉,系统暂时出现问题,请稍后重试。') ) def cleanup_sessions(): """定期清理过期会话""" while True: time.sleep(3600) # 每小时清理一次 try: session_manager.cleanup_expired_sessions() except Exception as e: logging.error(f"清理会话时出错: {str(e)}") if __name__ == '__main__': # 启动清理线程 cleanup_thread = threading.Thread(target=cleanup_sessions, daemon=True) cleanup_thread.start() # 启动Flask应用 app.run(host='0.0.0.0', port=7860, debug=True)