from flask import Flask, request, make_response |
import hashlib |
import time |
import xml.etree.ElementTree as ET |
import os |
import json |
import smtplib |
from email.mime.text import MIMEText |
from email.mime.multipart import MIMEMultipart |
from openai import OpenAI |
from dotenv import load_dotenv |
load_dotenv() |
app = Flask(__name__) |
TOKEN = os.getenv('TOKEN') |
API_KEY = os.getenv("API_KEY") |
BASE_URL = os.getenv("OPENAI_BASE_URL") |
EMAIL_KEY = os.getenv("EMAIL_KEY") |
client = OpenAI(api_key=API_KEY, base_url=BASE_URL) |
user_sessions = {} |
{ |
"name": "send_email", |
"description": "发送电子邮件。", |
"parameters": { |
"type": "object", |
"properties": { |
"to": { |
"type": "string", |
"description": "收件人邮箱地址" |
}, |
"subject": { |
"type": "string", |
"description": "邮件主题" |
}, |
"content": { |
"type": "string", |
"description": "邮件内容" |
} |
}, |
"required": ["to", "subject", "content"] |
} |
} |
] |
def verify_wechat(request): |
data = request.args |
signature = data.get('signature') |
timestamp = data.get('timestamp') |
nonce = data.get('nonce') |
echostr = data.get('echostr') |
temp = [timestamp, nonce, TOKEN] |
temp.sort() |
temp = ''.join(temp) |
if (hashlib.sha1(temp.encode('utf8')).hexdigest() == signature): |
return echostr |
else: |
return 'error', 403 |
def getUserMessageContentFromXML(xml_content): |
root = ET.fromstring(xml_content) |
content = root.find('Content').text |
from_user_name = root.find('FromUserName').text |
to_user_name = root.find('ToUserName').text |
return content, from_user_name, to_user_name |
def generate_response_xml(from_user_name, to_user_name, output_content): |
output_xml = ''' |
<xml> |
<ToUserName><![CDATA[%s]]></ToUserName> |
<FromUserName><![CDATA[%s]]></FromUserName> |
<CreateTime>%s</CreateTime> |
<MsgType><![CDATA[text]]></MsgType> |
<Content><![CDATA[%s]]></Content> |
</xml>''' |
response = make_response(output_xml % (from_user_name, to_user_name, str(int(time.time())), output_content)) |
response.content_type = 'application/xml' |
return response |
def get_openai_response(messages, functions=None, function_call=None): |
try: |
response = client.chat.completions.create( |
model="gpt-4o-mini", |
messages=messages, |
functions=functions, |
function_call=function_call |
) |
return response.choices[0].message |
except Exception as e: |
print(f"调用OpenAI API时出错: {str(e)}") |
return None |
def send_email(to, subject, content): |
try: |
with smtplib.SMTP('', 8025) as smtp: |
smtp.login("jwt", EMAIL_KEY) |
message = MIMEMultipart() |
message['From'] = "Me <[email protected]>" |
message['To'] = to |
message['Subject'] = subject |
message.attach(MIMEText(content, 'html')) |
smtp.sendmail("[email protected]", to, message.as_string()) |
return True |
except Exception as e: |
print(f"发送邮件时出错: {str(e)}") |
return False |
def process_function_call(response_message): |
function_name = response_message.function_call.name |
function_args = json.loads(response_message.function_call.arguments) |
if function_name == "send_email": |
to = function_args.get('to', '') |
subject = function_args.get('subject', '') |
content = function_args.get('content', '') |
if not to or not subject or not content: |
return None |
success = send_email(to, subject, content) |
return { |
"success": success, |
"to": to, |
"subject": subject, |
"content": content |
} |
else: |
return None |
def split_message(message, max_length=500): |
return [message[i:i+max_length] for i in range(0, len(message), max_length)] |
@app.route('/api/wx', methods=['GET', 'POST']) |
def wechatai(): |
if request.method == 'GET': |
return verify_wechat(request) |
else: |
print("user request data: ", request.data) |
user_message_content, from_user_name, to_user_name = getUserMessageContentFromXML(request.data) |
print("user message content: ", user_message_content) |
if user_message_content.lower() == '继续': |
if from_user_name in user_sessions and user_sessions[from_user_name]['pending_response']: |
response_content = user_sessions[from_user_name]['pending_response'].pop(0) |
if user_sessions[from_user_name]['pending_response']: |
response_content += '\n\n回复"继续"获取下一部分。' |
else: |
response_content += '\n\n回复结束。' |
else: |
response_content = "没有待发送的消息。" |
else: |
if from_user_name not in user_sessions: |
user_sessions[from_user_name] = {'messages': [], 'pending_response': [], 'email_sent': False} |
session = user_sessions[from_user_name] |
session['messages'].append({"role": "user", "content": user_message_content}) |
decision_response = get_openai_response(session['messages'], functions=FUNCTIONS, function_call="auto") |
if decision_response.function_call and not session['email_sent']: |
function_result = process_function_call(decision_response) |
if function_result: |
session['email_sent'] = True |
if function_result['success']: |
response_content = f"邮件已成功发送到 {function_result['to']}。\n\n主题:{function_result['subject']}\n\n内容:\n{function_result['content']}" |
else: |
response_content = "邮件发送失败,请稍后再试。" |
else: |
response_content = "无法处理该请求,请重试。" |
else: |
gpt_response = get_openai_response(session['messages']) |
response_content = gpt_response.content if gpt_response else "抱歉,我遇到了一些问题,无法回答您的问题。" |
session['messages'].append({"role": "assistant", "content": response_content}) |
session['email_sent'] = False |
response_parts = split_message(response_content) |
if len(response_parts) > 1: |
response_content = response_parts[0] + '\n\n回复"继续"获取下一部分。' |
session['pending_response'] = response_parts[1:] |
else: |
response_content = response_parts[0] |
return generate_response_xml(from_user_name, to_user_name, response_content) |
if __name__ == '__main__': |
app.run(host='', port=7860, debug=True) |