# #!/usr/bin/env python
# # -*- coding: utf-8 -*-
# from flask import Flask, request, make_response
# import hashlib
# import time
# import xml.etree.ElementTree as ET
# import os
# import json
# from openai import OpenAI
# from dotenv import load_dotenv
# from markdown import markdown
# import re
# import threading
# import logging
# from datetime import datetime
# # 配置日志记录
# logging.basicConfig(
# level=logging.INFO,
# format='%(asctime)s - %(levelname)s - %(message)s',
# handlers=[
# logging.FileHandler('wechat_service.log'),
# logging.StreamHandler()
# ]
# )
# # 加载环境变量
# load_dotenv()
# app = Flask(__name__)
# # 基础配置
# TOKEN = os.getenv('TOKEN')
# API_KEY = os.getenv("API_KEY")
# BASE_URL = os.getenv("OPENAI_BASE_URL")
# client = OpenAI(api_key=API_KEY, base_url=BASE_URL)
# class UserSession:
# def __init__(self):
# self.messages = [{"role": "system", "content": "You are a helpful assistant."}]
# self.pending_parts = []
# self.last_active = time.time()
# class SessionManager:
# def __init__(self):
# self.sessions = {}
# self.session_timeout = 3600 # 1小时会话超时
# self._lock = threading.Lock()
# def get_session(self, user_id):
# with self._lock:
# if user_id not in self.sessions:
# self.sessions[user_id] = UserSession()
# session = self.sessions[user_id]
# session.last_active = time.time()
# return session
# def clear_session(self, user_id):
# with self._lock:
# if user_id in self.sessions:
# self.sessions[user_id] = UserSession()
# def cleanup_expired_sessions(self):
# with self._lock:
# current_time = time.time()
# expired_users = [
# user_id for user_id, session in self.sessions.items()
# if current_time - session.last_active > self.session_timeout
# ]
# for user_id in expired_users:
# del self.sessions[user_id]
# logging.info(f"已清理过期会话: {user_id}")
# session_manager = SessionManager()
# def convert_markdown_to_wechat(md_text):
# """将Markdown转换为微信友好的文本格式"""
# md_text = re.sub(r'^# (.*?)$', r'【标题】\1', md_text, flags=re.MULTILINE)
# md_text = re.sub(r'^## (.*?)$', r'【子标题】\1', md_text, flags=re.MULTILINE)
# md_text = re.sub(r'\*\*(.*?)\*\*', r'『\1』', md_text)
# md_text = re.sub(r'\*(.*?)\*', r'「\1」', md_text)
# md_text = re.sub(r'^\- ', '• ', md_text, flags=re.MULTILINE)
# md_text = re.sub(r'^\d\. ', '○ ', md_text, flags=re.MULTILINE)
# md_text = re.sub(r'```(.*?)```', r'【代码】\n\1\n【代码结束】', md_text, flags=re.DOTALL)
# md_text = re.sub(r'^> (.*?)$', r'▎\1', md_text, flags=re.MULTILINE)
# return md_text
# def verify_wechat(request):
# """验证微信服务器请求"""
# data = request.args
# signature = data.get('signature')
# timestamp = data.get('timestamp')
# nonce = data.get('nonce')
# echostr = data.get('echostr')
# temp = [timestamp, nonce, TOKEN]
# temp.sort()
# temp = ''.join(temp)
# if hashlib.sha1(temp.encode('utf8')).hexdigest() == signature:
# return echostr
# return 'error', 403
# def parse_xml_message(xml_content):
# """解析微信XML消息"""
# root = ET.fromstring(xml_content)
# return {
# 'content': root.find('Content').text,
# 'from_user': root.find('FromUserName').text,
# 'to_user': root.find('ToUserName').text,
# 'msg_id': root.find('MsgId').text,
# 'create_time': root.find('CreateTime').text
# }
# def generate_response_xml(to_user, from_user, content):
# """生成回复的XML消息"""
# response_content = convert_markdown_to_wechat(content)
# xml_template = '''
#
#
#
# %s
#
#
#
# '''
# response = make_response(
# xml_template % (to_user, from_user, str(int(time.time())), response_content)
# )
# response.content_type = 'application/xml'
# return response
# def get_openai_response(messages, timeout=30):
# """获取OpenAI API响应"""
# try:
# response = client.chat.completions.create(
# model="gpt-4o-mini",
# messages=messages,
# timeout=timeout
# )
# return response.choices[0].message.content
# except Exception as e:
# logging.error(f"OpenAI API错误: {str(e)}")
# return "抱歉,我暂时无法回答,请稍后再试。"
# def split_message(message, max_length=500):
# """将长消息分段"""
# return [message[i:i+max_length] for i in range(0, len(message), max_length)]
# def append_status_message(content, has_pending_parts=False):
# """添加状态提示信息"""
# status_message = "\n\n-------------------\n"
# if has_pending_parts:
# status_message += "当前消息已截断,发送’继续‘查看后续内容\n"
# status_message += "发送’新对话‘开始新的对话"
# return content + status_message
# @app.route('/api/wx', methods=['GET', 'POST'])
# def wechatai():
# if request.method == 'GET':
# return verify_wechat(request)
# try:
# message_data = parse_xml_message(request.data)
# user_content = message_data['content']
# from_user = message_data['from_user']
# to_user = message_data['to_user']
# logging.info(f"收到用户({from_user})消息: {user_content}")
# session = session_manager.get_session(from_user)
# if user_content.strip() == '新对话':
# session_manager.clear_session(from_user)
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message("已开始新的对话。请描述您的问题。")
# )
# if user_content.strip() == '继续':
# if session.pending_parts:
# next_part = session.pending_parts.pop(0)
# has_more = bool(session.pending_parts)
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message(next_part, has_more)
# )
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message("没有更多内容了。请继续您的问题。")
# )
# session.messages.append({"role": "user", "content": user_content})
# response = get_openai_response(session.messages)
# session.messages.append({"role": "assistant", "content": response})
# if len(response) > 500:
# parts = split_message(response)
# first_part = parts.pop(0)
# session.pending_parts = parts
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message(first_part, True)
# )
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message(response)
# )
# except Exception as e:
# logging.error(f"处理请求时出错: {str(e)}")
# return generate_response_xml(
# message_data['from_user'],
# message_data['to_user'],
# append_status_message("抱歉,系统暂时出现问题,请稍后重试。")
# )
# def cleanup_sessions():
# """定期清理过期会话"""
# while True:
# time.sleep(3600)
# session_manager.cleanup_expired_sessions()
# if __name__ == '__main__':
# # 启动清理线程
# cleanup_thread = threading.Thread(target=cleanup_sessions, daemon=True)
# cleanup_thread.start()
# app.run(host='0.0.0.0', port=7860, debug=True)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# from flask import Flask, request, make_response
# import hashlib
# import time
# import xml.etree.ElementTree as ET
# import os
# import json
# from openai import OpenAI
# from dotenv import load_dotenv
# from markdown import markdown
# import re
# import threading
# import logging
# from datetime import datetime
# import asyncio
# from concurrent.futures import ThreadPoolExecutor
# import queue
# import uuid
# # 配置日志记录
# logging.basicConfig(
# level=logging.INFO,
# format='%(asctime)s - %(levelname)s - %(message)s',
# handlers=[
# logging.FileHandler('wechat_service.log'),
# logging.StreamHandler()
# ]
# )
# # 加载环境变量
# load_dotenv()
# app = Flask(__name__)
# # 基础配置
# TOKEN = os.getenv('TOKEN')
# API_KEY = os.getenv("API_KEY")
# BASE_URL = os.getenv("OPENAI_BASE_URL")
# client = OpenAI(api_key=API_KEY, base_url=BASE_URL)
# # 创建线程池
# executor = ThreadPoolExecutor(max_workers=10)
# def convert_markdown_to_wechat(md_text):
# """将Markdown转换为微信友好的文本格式"""
# if not md_text:
# return md_text
# # 处理标题
# md_text = re.sub(r'^# (.*?)$', r'【标题】\1', md_text, flags=re.MULTILINE)
# md_text = re.sub(r'^## (.*?)$', r'【子标题】\1', md_text, flags=re.MULTILINE)
# md_text = re.sub(r'^### (.*?)$', r'【小标题】\1', md_text, flags=re.MULTILINE)
# # 处理强调语法
# md_text = re.sub(r'\*\*(.*?)\*\*', r'『\1』', md_text) # 粗体
# md_text = re.sub(r'\*(.*?)\*', r'「\1」', md_text) # 斜体
# md_text = re.sub(r'`(.*?)`', r'「\1」', md_text) # 行内代码
# # 处理列表
# md_text = re.sub(r'^\- ', '• ', md_text, flags=re.MULTILINE) # 无序列表
# md_text = re.sub(r'^\d\. ', '○ ', md_text, flags=re.MULTILINE) # 有序列表
# # 处理代码块
# md_text = re.sub(r'```[\w]*\n(.*?)```', r'【代码开始】\n\1\n【代码结束】', md_text, flags=re.DOTALL)
# # 处理引用
# md_text = re.sub(r'^> (.*?)$', r'▎\1', md_text, flags=re.MULTILINE)
# # 处理分隔线
# md_text = re.sub(r'^-{3,}$', r'—————————', md_text, flags=re.MULTILINE)
# # 处理链接
# md_text = re.sub(r'\[(.*?)\]\((.*?)\)', r'\1(\2)', md_text)
# # 净化处理:去除多余的空行
# md_text = re.sub(r'\n{3,}', '\n\n', md_text)
# return md_text
# class AsyncResponse:
# def __init__(self):
# self.status = "processing" # processing, completed, failed
# self.result = None
# self.error = None
# class UserSession:
# def __init__(self):
# self.messages = [{"role": "system", "content": "You are a helpful assistant."}]
# self.pending_parts = []
# self.last_active = time.time()
# self.current_task = None # 存储当前正在处理的任务ID
# self.response_queue = {} # 存储异步响应
# class SessionManager:
# def __init__(self):
# self.sessions = {}
# self.session_timeout = 3600 # 1小时会话超时
# self._lock = threading.Lock()
# def get_session(self, user_id):
# with self._lock:
# if user_id not in self.sessions:
# self.sessions[user_id] = UserSession()
# session = self.sessions[user_id]
# session.last_active = time.time()
# return session
# def clear_session(self, user_id):
# with self._lock:
# if user_id in self.sessions:
# self.sessions[user_id] = UserSession()
# def cleanup_expired_sessions(self):
# with self._lock:
# current_time = time.time()
# expired_users = [
# user_id for user_id, session in self.sessions.items()
# if current_time - session.last_active > self.session_timeout
# ]
# for user_id in expired_users:
# del self.sessions[user_id]
# logging.info(f"已清理过期会话: {user_id}")
# session_manager = SessionManager()
# def verify_wechat(request):
# """验证微信服务器请求"""
# data = request.args
# signature = data.get('signature')
# timestamp = data.get('timestamp')
# nonce = data.get('nonce')
# echostr = data.get('echostr')
# temp = [timestamp, nonce, TOKEN]
# temp.sort()
# temp = ''.join(temp)
# if hashlib.sha1(temp.encode('utf8')).hexdigest() == signature:
# return echostr
# return 'error', 403
# def parse_xml_message(xml_content):
# """解析微信XML消息"""
# root = ET.fromstring(xml_content)
# return {
# 'content': root.find('Content').text,
# 'from_user': root.find('FromUserName').text,
# 'to_user': root.find('ToUserName').text,
# 'msg_id': root.find('MsgId').text,
# 'create_time': root.find('CreateTime').text
# }
# def generate_response_xml(to_user, from_user, content):
# """生成回复的XML消息"""
# formatted_content = convert_markdown_to_wechat(content)
# xml_template = '''
#
#
#
# %s
#
#
#
# '''
# response = make_response(
# xml_template % (to_user, from_user, str(int(time.time())), formatted_content)
# )
# response.content_type = 'application/xml'
# return response
# def process_long_running_task(messages):
# """处理耗时任务"""
# try:
# response = client.chat.completions.create(
# model="gpt-4o-mini",
# messages=messages,
# timeout=60
# )
# return response.choices[0].message.content
# except Exception as e:
# logging.error(f"API调用错误: {str(e)}")
# raise
# def handle_async_task(session, task_id, messages):
# """异步任务处理函数"""
# try:
# result = process_long_running_task(messages)
# session.response_queue[task_id].status = "completed"
# session.response_queue[task_id].result = result
# except Exception as e:
# session.response_queue[task_id].status = "failed"
# session.response_queue[task_id].error = str(e)
# def generate_initial_response():
# """生成初始响应消息"""
# return "您的请求正在处理中,请稍后回复’查询‘获取结果。"
# def split_message(message, max_length=500):
# """将长消息分段"""
# return [message[i:i+max_length] for i in range(0, len(message), max_length)]
# def append_status_message(content, has_pending_parts=False, is_processing=False):
# """添加状态提示信息"""
# status_message = "\n\n-------------------\n"
# if is_processing:
# status_message += "您的请求正在处理中,请回复’查询‘获取结果\n"
# elif has_pending_parts:
# status_message += "当前消息已截断,发送’继续‘查看后续内容\n"
# status_message += "发送‘新对话’开始新的对话"
# return content + status_message
# @app.route('/api/wx', methods=['GET', 'POST'])
# def wechatai():
# if request.method == 'GET':
# return verify_wechat(request)
# try:
# message_data = parse_xml_message(request.data)
# user_content = message_data['content'].strip()
# from_user = message_data['from_user']
# to_user = message_data['to_user']
# logging.info(f"收到用户({from_user})消息: {user_content}")
# session = session_manager.get_session(from_user)
# # 处理特殊命令
# if user_content == '新对话':
# session_manager.clear_session(from_user)
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message("已开始新的对话。请描述您的问题。")
# )
# if user_content == '继续':
# if session.pending_parts:
# next_part = session.pending_parts.pop(0)
# has_more = bool(session.pending_parts)
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message(next_part, has_more)
# )
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message("没有更多内容了。请继续您的问题。")
# )
# if user_content == '查询':
# if session.current_task:
# task_response = session.response_queue.get(session.current_task)
# if task_response:
# if task_response.status == "completed":
# response = task_response.result
# # 清理完成的任务
# del session.response_queue[session.current_task]
# session.current_task = None
# # 处理长消息
# if len(response) > 500:
# parts = split_message(response)
# first_part = parts.pop(0)
# session.pending_parts = parts
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message(first_part, True)
# )
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message(response)
# )
# elif task_response.status == "failed":
# error_message = "处理过程中出现错误,请重新提问。"
# # 清理失败的任务
# del session.response_queue[session.current_task]
# session.current_task = None
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message(error_message)
# )
# else:
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message("正在处理中,请稍后再次查询。", is_processing=True)
# )
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message("没有正在处理的请求。")
# )
# # 处理新的用户消息
# session.messages.append({"role": "user", "content": user_content})
# # 创建新的异步任务
# task_id = str(uuid.uuid4())
# session.current_task = task_id
# session.response_queue[task_id] = AsyncResponse()
# # 启动异步处理
# executor.submit(handle_async_task, session, task_id, session.messages.copy())
# # 返回初始响应
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message(generate_initial_response(), is_processing=True)
# )
# except Exception as e:
# logging.error(f"处理请求时出错: {str(e)}")
# return generate_response_xml(
# message_data['from_user'],
# message_data['to_user'],
# append_status_message("抱歉,系统暂时出现问题,请稍后重试。")
# )
# def cleanup_sessions():
# """定期清理过期会话"""
# while True:
# time.sleep(3600) # 每小时清理一次
# try:
# session_manager.cleanup_expired_sessions()
# except Exception as e:
# logging.error(f"清理会话时出错: {str(e)}")
# if __name__ == '__main__':
# # 启动清理线程
# cleanup_thread = threading.Thread(target=cleanup_sessions, daemon=True)
# cleanup_thread.start()
# # 启动Flask应用
# app.run(host='0.0.0.0', port=7860, debug=True)
# #!/usr/bin/env python
# # -*- coding: utf-8 -*-
# from flask import Flask, request, make_response
# import hashlib
# import time
# import xml.etree.ElementTree as ET
# import os
# import json
# from openai import OpenAI
# from dotenv import load_dotenv
# from markdown import markdown
# import re
# import threading
# import logging
# from datetime import datetime
# import asyncio
# from concurrent.futures import ThreadPoolExecutor
# import queue
# import uuid
# # 配置日志记录
# logging.basicConfig(
# level=logging.INFO,
# format='%(asctime)s - %(levelname)s - %(message)s',
# handlers=[
# logging.FileHandler('wechat_service.log'),
# logging.StreamHandler()
# ]
# )
# # 加载环境变量
# load_dotenv()
# app = Flask(__name__)
# # 基础配置
# TOKEN = os.getenv('TOKEN')
# API_KEY = os.getenv("API_KEY")
# BASE_URL = os.getenv("OPENAI_BASE_URL")
# client = OpenAI(api_key=API_KEY, base_url=BASE_URL)
# # 创建线程池
# executor = ThreadPoolExecutor(max_workers=10)
# def convert_markdown_to_wechat(md_text):
# """将Markdown转换为微信友好的文本格式"""
# if not md_text:
# return md_text
# # 处理标题
# md_text = re.sub(r'^# (.*?)$', r'【标题】\1', md_text, flags=re.MULTILINE)
# md_text = re.sub(r'^## (.*?)$', r'【子标题】\1', md_text, flags=re.MULTILINE)
# md_text = re.sub(r'^### (.*?)$', r'【小标题】\1', md_text, flags=re.MULTILINE)
# # 处理强调语法
# md_text = re.sub(r'\*\*(.*?)\*\*', r'『\1』', md_text) # 粗体
# md_text = re.sub(r'\*(.*?)\*', r'「\1」', md_text) # 斜体
# md_text = re.sub(r'`(.*?)`', r'「\1」', md_text) # 行内代码
# # 处理列表
# md_text = re.sub(r'^\- ', '• ', md_text, flags=re.MULTILINE) # 无序列表
# md_text = re.sub(r'^\d\. ', '○ ', md_text, flags=re.MULTILINE) # 有序列表
# # 处理代码块
# md_text = re.sub(r'```[\w]*\n(.*?)```', r'【代码开始】\n\1\n【代码结束】', md_text, flags=re.DOTALL)
# # 处理引用
# md_text = re.sub(r'^> (.*?)$', r'▎\1', md_text, flags=re.MULTILINE)
# # 处理分隔线
# md_text = re.sub(r'^-{3,}$', r'—————————', md_text, flags=re.MULTILINE)
# # 处理链接
# md_text = re.sub(r'\[(.*?)\]\((.*?)\)', r'\1(\2)', md_text)
# # 净化处理:去除多余的空行
# md_text = re.sub(r'\n{3,}', '\n\n', md_text)
# return md_text
# class AsyncResponse:
# def __init__(self):
# self.status = "processing" # processing, completed, failed
# self.result = None
# self.error = None
# class UserSession:
# def __init__(self):
# self.messages = [{"role": "system", "content": "You are a helpful assistant."}]
# self.pending_parts = []
# self.last_active = time.time()
# self.current_task = None
# self.response_queue = {}
# class SessionManager:
# def __init__(self):
# self.sessions = {}
# self.session_timeout = 3600
# self._lock = threading.Lock()
# def get_session(self, user_id):
# with self._lock:
# if user_id not in self.sessions:
# self.sessions[user_id] = UserSession()
# session = self.sessions[user_id]
# session.last_active = time.time()
# return session
# def clear_session(self, user_id):
# with self._lock:
# if user_id in self.sessions:
# self.sessions[user_id] = UserSession()
# def cleanup_expired_sessions(self):
# with self._lock:
# current_time = time.time()
# expired_users = [
# user_id for user_id, session in self.sessions.items()
# if current_time - session.last_active > self.session_timeout
# ]
# for user_id in expired_users:
# del self.sessions[user_id]
# logging.info(f"已清理过期会话: {user_id}")
# session_manager = SessionManager()
# def verify_wechat(request):
# """验证微信服务器请求"""
# data = request.args
# signature = data.get('signature')
# timestamp = data.get('timestamp')
# nonce = data.get('nonce')
# echostr = data.get('echostr')
# temp = [timestamp, nonce, TOKEN]
# temp.sort()
# temp = ''.join(temp)
# if hashlib.sha1(temp.encode('utf8')).hexdigest() == signature:
# return echostr
# return 'error', 403
# def parse_xml_message(xml_content):
# """解析微信XML消息"""
# root = ET.fromstring(xml_content)
# return {
# 'content': root.find('Content').text,
# 'from_user': root.find('FromUserName').text,
# 'to_user': root.find('ToUserName').text,
# 'msg_id': root.find('MsgId').text,
# 'create_time': root.find('CreateTime').text
# }
# def generate_response_xml(to_user, from_user, content):
# """生成回复的XML消息"""
# formatted_content = convert_markdown_to_wechat(content)
# xml_template = '''
#
#
#
# %s
#
#
#
# '''
# response = make_response(
# xml_template % (to_user, from_user, str(int(time.time())), formatted_content)
# )
# response.content_type = 'application/xml'
# return response
# def process_long_running_task(messages):
# """处理耗时任务"""
# try:
# response = client.chat.completions.create(
# model="gpt-4o-mini",
# messages=messages,
# timeout=60
# )
# return response.choices[0].message.content
# except Exception as e:
# logging.error(f"API调用错误: {str(e)}")
# raise
# def handle_async_task(session, task_id, messages):
# """异步任务处理函数"""
# try:
# result = process_long_running_task(messages)
# session.response_queue[task_id].status = "completed"
# session.response_queue[task_id].result = result
# except Exception as e:
# session.response_queue[task_id].status = "failed"
# session.response_queue[task_id].error = str(e)
# def generate_initial_response():
# """生成初始响应消息"""
# return "您的请求正在处理中"
# def split_message(message, max_length=500):
# """将长消息分段"""
# return [message[i:i+max_length] for i in range(0, len(message), max_length)]
# def append_status_message(content, has_pending_parts=False, is_processing=False):
# """添加状态提示信息"""
# status_message = "\n\n-------------------"
# if is_processing:
# status_message += "\n请回复'查询'获取结果"
# elif has_pending_parts:
# status_message += "\n当前消息已截断,发送'继续'查看后续内容"
# status_message += "\n发送'新对话'开始新的对话"
# return content + status_message
# @app.route('/api/wx', methods=['GET', 'POST'])
# def wechatai():
# if request.method == 'GET':
# return verify_wechat(request)
# try:
# message_data = parse_xml_message(request.data)
# user_content = message_data['content'].strip()
# from_user = message_data['from_user']
# to_user = message_data['to_user']
# logging.info(f"收到用户({from_user})消息: {user_content}")
# session = session_manager.get_session(from_user)
# if user_content == '新对话':
# session_manager.clear_session(from_user)
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message('已开始新的对话。请描述您的问题。')
# )
# if user_content == '继续':
# if session.pending_parts:
# next_part = session.pending_parts.pop(0)
# has_more = bool(session.pending_parts)
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message(next_part, has_more)
# )
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message('没有更多内容了。请继续您的问题。')
# )
# if user_content == '查询':
# if session.current_task:
# task_response = session.response_queue.get(session.current_task)
# if task_response:
# if task_response.status == "completed":
# response = task_response.result
# del session.response_queue[session.current_task]
# session.current_task = None
# if len(response) > 500:
# parts = split_message(response)
# first_part = parts.pop(0)
# session.pending_parts = parts
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message(first_part, True)
# )
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message(response)
# )
# elif task_response.status == "failed":
# error_message = '处理过程中出现错误,请重新提问。'
# del session.response_queue[session.current_task]
# session.current_task = None
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message(error_message)
# )
# else:
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message('正在处理中,请稍后再次查询。', is_processing=True)
# )
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message('没有正在处理的请求。')
# )
# session.messages.append({"role": "user", "content": user_content})
# task_id = str(uuid.uuid4())
# session.current_task = task_id
# session.response_queue[task_id] = AsyncResponse()
# executor.submit(handle_async_task, session, task_id, session.messages.copy())
# return generate_response_xml(
# from_user,
# to_user,
# append_status_message(generate_initial_response(), is_processing=True)
# )
# except Exception as e:
# logging.error(f"处理请求时出错: {str(e)}")
# return generate_response_xml(
# message_data['from_user'],
# message_data['to_user'],
# append_status_message('抱歉,系统暂时出现问题,请稍后重试。')
# )
# def cleanup_sessions():
# """定期清理过期会话"""
# while True:
# time.sleep(3600)
# try:
# session_manager.cleanup_expired_sessions()
# except Exception as e:
# logging.error(f"清理会话时出错: {str(e)}")
# if __name__ == '__main__':
# cleanup_thread = threading.Thread(target=cleanup_sessions, daemon=True)
# cleanup_thread.start()
# app.run(host='0.0.0.0', port=7860, debug=True)
#下面这个代码解决的是响应查询时重复回应的情况
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#下面这个代码是告诉用户超时后就需要重新回答了
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from flask import Flask, request, make_response
import hashlib
import time
import xml.etree.ElementTree as ET
import os
import json
from openai import OpenAI
from dotenv import load_dotenv
from markdown import markdown
import re
import threading
import logging
from datetime import datetime
import asyncio
from concurrent.futures import ThreadPoolExecutor
import queue
import uuid
# 配置日志记录
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('wechat_service.log'),
logging.StreamHandler()
]
)
# 加载环境变量
load_dotenv()
app = Flask(__name__)
# 基础配置
TOKEN = os.getenv('TOKEN')
API_KEY = os.getenv("API_KEY")
BASE_URL = os.getenv("OPENAI_BASE_URL")
client = OpenAI(api_key=API_KEY, base_url=BASE_URL)
# 创建线程池
executor = ThreadPoolExecutor(max_workers=10)
class AsyncResponse:
def __init__(self):
self.status = "processing" # processing, completed, failed
self.result = None
self.error = None
self.create_time = time.time() # 任务创建时间
self.timeout = 3600 # 任务超时时间(1小时)
def is_expired(self):
"""检查任务是否过期"""
return time.time() - self.create_time > self.timeout
class UserSession:
def __init__(self):
self.messages = [{"role": "system", "content": "You are a helpful assistant."}]
self.pending_parts = []
self.last_active = time.time()
self.current_task = None
self.response_queue = {}
self.session_timeout = 3600 # 会话超时时间(1小时)
def is_expired(self):
"""检查会话是否过期"""
return time.time() - self.last_active > self.session_timeout
def cleanup_expired_tasks(self):
"""清理过期的任务"""
expired_tasks = [
task_id for task_id, response in self.response_queue.items()
if response.is_expired()
]
for task_id in expired_tasks:
del self.response_queue[task_id]
if self.current_task == task_id:
self.current_task = None
class SessionManager:
def __init__(self):
self.sessions = {}
self._lock = threading.Lock()
def get_session(self, user_id):
with self._lock:
current_time = time.time()
if user_id in self.sessions:
session = self.sessions[user_id]
if session.is_expired():
session = UserSession() # 创建新会话
else:
session.cleanup_expired_tasks() # 清理过期任务
else:
session = UserSession()
session.last_active = current_time
self.sessions[user_id] = session
return session
def clear_session(self, user_id):
with self._lock:
if user_id in self.sessions:
self.sessions[user_id] = UserSession()
def cleanup_expired_sessions(self):
with self._lock:
current_time = time.time()
expired_users = [
user_id for user_id, session in self.sessions.items()
if session.is_expired()
]
for user_id in expired_users:
del self.sessions[user_id]
logging.info(f"已清理过期会话: {user_id}")
session_manager = SessionManager()
def convert_markdown_to_wechat(md_text):
"""将Markdown转换为微信友好的文本格式"""
if not md_text:
return md_text
# 处理标题
md_text = re.sub(r'^# (.*?)$', r'【标题】\1', md_text, flags=re.MULTILINE)
md_text = re.sub(r'^## (.*?)$', r'【子标题】\1', md_text, flags=re.MULTILINE)
md_text = re.sub(r'^### (.*?)$', r'【小标题】\1', md_text, flags=re.MULTILINE)
# 处理强调语法
md_text = re.sub(r'\*\*(.*?)\*\*', r'『\1』', md_text) # 粗体
md_text = re.sub(r'\*(.*?)\*', r'「\1」', md_text) # 斜体
md_text = re.sub(r'`(.*?)`', r'「\1」', md_text) # 行内代码
# 处理列表
md_text = re.sub(r'^\- ', '• ', md_text, flags=re.MULTILINE) # 无序列表
md_text = re.sub(r'^\d\. ', '○ ', md_text, flags=re.MULTILINE) # 有序列表
# 处理代码块
md_text = re.sub(r'```[\w]*\n(.*?)```', r'【代码开始】\n\1\n【代码结束】', md_text, flags=re.DOTALL)
# 处理引用
md_text = re.sub(r'^> (.*?)$', r'▎\1', md_text, flags=re.MULTILINE)
# 处理分隔线
md_text = re.sub(r'^-{3,}$', r'—————————', md_text, flags=re.MULTILINE)
# 处理链接
md_text = re.sub(r'\[(.*?)\]\((.*?)\)', r'\1(\2)', md_text)
# 净化处理:去除多余的空行
md_text = re.sub(r'\n{3,}', '\n\n', md_text)
return md_text
def verify_wechat(request):
"""验证微信服务器请求"""
data = request.args
signature = data.get('signature')
timestamp = data.get('timestamp')
nonce = data.get('nonce')
echostr = data.get('echostr')
temp = [timestamp, nonce, TOKEN]
temp.sort()
temp = ''.join(temp)
if hashlib.sha1(temp.encode('utf8')).hexdigest() == signature:
return echostr
return 'error', 403
def parse_xml_message(xml_content):
"""解析微信XML消息"""
root = ET.fromstring(xml_content)
return {
'content': root.find('Content').text,
'from_user': root.find('FromUserName').text,
'to_user': root.find('ToUserName').text,
'msg_id': root.find('MsgId').text,
'create_time': root.find('CreateTime').text
}
def generate_response_xml(to_user, from_user, content):
"""生成回复的XML消息"""
formatted_content = convert_markdown_to_wechat(content)
xml_template = '''
%s
'''
response = make_response(
xml_template % (to_user, from_user, str(int(time.time())), formatted_content)
)
response.content_type = 'application/xml'
return response
def process_long_running_task(messages):
"""处理耗时任务"""
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
timeout=60
)
return response.choices[0].message.content
except Exception as e:
logging.error(f"API调用错误: {str(e)}")
raise
def handle_async_task(session, task_id, messages):
"""异步任务处理函数"""
try:
if task_id not in session.response_queue:
return
result = process_long_running_task(messages)
if task_id in session.response_queue and not session.response_queue[task_id].is_expired():
session.response_queue[task_id].status = "completed"
session.response_queue[task_id].result = result
except Exception as e:
if task_id in session.response_queue:
session.response_queue[task_id].status = "failed"
session.response_queue[task_id].error = str(e)
def generate_initial_response():
"""生成初始响应消息"""
return "您的请求正在处理中,请回复'查询'获取结果"
def split_message(message, max_length=500):
"""将长消息分段"""
return [message[i:i+max_length] for i in range(0, len(message), max_length)]
def append_status_message(content, has_pending_parts=False, is_processing=False):
"""添加状态提示信息"""
if "您的请求正在处理中" in content:
return content + "\n\n-------------------\n发送'新对话'开始新的对话"
status_message = "\n\n-------------------"
if is_processing:
status_message += "\n请回复'查询'获取结果"
elif has_pending_parts:
status_message += "\n当前消息已截断,发送'继续'查看后续内容"
status_message += "\n发送'新对话'开始新的对话"
return content + status_message
@app.route('/api/wx', methods=['GET', 'POST'])
def wechatai():
if request.method == 'GET':
return verify_wechat(request)
try:
message_data = parse_xml_message(request.data)
user_content = message_data['content'].strip()
from_user = message_data['from_user']
to_user = message_data['to_user']
logging.info(f"收到用户({from_user})消息: {user_content}")
session = session_manager.get_session(from_user)
if user_content == '新对话':
session_manager.clear_session(from_user)
return generate_response_xml(
from_user,
to_user,
append_status_message('已开始新的对话。请描述您的问题。')
)
if user_content == '继续':
if session.pending_parts:
next_part = session.pending_parts.pop(0)
has_more = bool(session.pending_parts)
return generate_response_xml(
from_user,
to_user,
append_status_message(next_part, has_more)
)
return generate_response_xml(
from_user,
to_user,
append_status_message('没有更多内容了。请继续您的问题。')
)
if user_content == '查询':
if session.current_task:
task_response = session.response_queue.get(session.current_task)
if task_response:
if task_response.is_expired():
# 处理过期任务
del session.response_queue[session.current_task]
session.current_task = None
return generate_response_xml(
from_user,
to_user,
append_status_message('请求已过期,请重新提问。')
)
if task_response.status == "completed":
response = task_response.result
del session.response_queue[session.current_task]
session.current_task = None
session.messages.append({"role": "assistant", "content": response})
if len(response) > 500:
parts = split_message(response)
first_part = parts.pop(0)
session.pending_parts = parts
return generate_response_xml(
from_user,
to_user,
append_status_message(first_part, True)
)
return generate_response_xml(
from_user,
to_user,
append_status_message(response)
)
elif task_response.status == "failed":
error_message = '处理过程中出现错误,请重新提问。'
del session.response_queue[session.current_task]
session.current_task = None
return generate_response_xml(
from_user,
to_user,
append_status_message(error_message)
)
else:
return generate_response_xml(
from_user,
to_user,
append_status_message('正在处理中,请稍后再次查询。', is_processing=True)
)
return generate_response_xml(
from_user,
to_user,
append_status_message('没有正在处理的请求。')
)
session.messages.append({"role": "user", "content": user_content})
task_id = str(uuid.uuid4())
session.current_task = task_id
session.response_queue[task_id] = AsyncResponse()
executor.submit(handle_async_task, session, task_id, session.messages.copy())
return generate_response_xml(
from_user,
to_user,
append_status_message(generate_initial_response(), is_processing=True)
)
except Exception as e:
logging.error(f"处理请求时出错: {str(e)}")
return generate_response_xml(
message_data['from_user'],
message_data['to_user'],
append_status_message('抱歉,系统暂时出现问题,请稍后重试。')
)
def cleanup_sessions():
"""定期清理过期会话"""
while True:
time.sleep(3600) # 每小时清理一次
try:
session_manager.cleanup_expired_sessions()
except Exception as e:
logging.error(f"清理会话时出错: {str(e)}")
if __name__ == '__main__':
# 启动清理线程
cleanup_thread = threading.Thread(target=cleanup_sessions, daemon=True)
cleanup_thread.start()
# 启动Flask应用
app.run(host='0.0.0.0', port=7860, debug=True)