Spaces:
Sleeping
Sleeping
File size: 7,315 Bytes
67793ba 7364bee e0834c3 fa795f1 2c9cbe2 fa48d29 b132330 cf9fdca e0834c3 92b3530 e0834c3 7364bee cf9fdca e0834c3 cf9fdca 38b2ac5 67793ba 4bdfabe 38b2ac5 7364bee cf9fdca ab0b4df 7364bee ab0b4df 7364bee ab0b4df 7364bee ab0b4df 67793ba ab0b4df 67793ba 80da867 62fac61 2c9cbe2 6a35009 23dddca 62fac61 23dddca 2c9cbe2 23dddca 7ac41bb 23dddca 7ac41bb 62fac61 80da867 7364bee 67793ba 2c9cbe2 38b2ac5 cf9fdca 38b2ac5 ab0b4df 27dc5d5 92b3530 80da867 e0834c3 80da867 e0834c3 80da867 62fac61 92b3530 cf9fdca |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
from flask import Flask, request, jsonify
import requests
from dotenv import load_dotenv
import os
import re
import json
from helper.openai_api import chat_completion
# Load environment variables from .env file
load_dotenv()
app = Flask(__name__)
# Retrieve environment variables
VERIFY_TOKEN = os.getenv('VERIFY_TOKEN')
PAGE_ACCESS_TOKEN = os.getenv('PAGE_ACCESS_TOKEN')
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
# Debugging: Print the loaded environment variables (remove in production)
print(f"VERIFY_TOKEN: {VERIFY_TOKEN}")
print(f"PAGE_ACCESS_TOKEN: {PAGE_ACCESS_TOKEN}")
print(f"OPENAI_API_KEY: {OPENAI_API_KEY}")
@app.route('/', methods=['GET'])
def home():
return "Welcome to the chatbot!"
@app.route('/facebook', methods=['GET', 'POST'])
def webhook():
if request.method == 'GET':
# Webhook verification
verify_token = request.args.get('hub.verify_token')
challenge = request.args.get('hub.challenge')
print(f"GET request received: verify_token={verify_token}, challenge={challenge}")
if verify_token == VERIFY_TOKEN:
print("Verification token matches. Returning challenge.")
return challenge
else:
print("Error: wrong validation token.")
return 'Error, wrong validation token'
elif request.method == 'POST':
data = request.get_json()
print(f"POST request received: {data}")
if 'entry' in data and len(data['entry']) > 0 and 'messaging' in data['entry'][0]:
messaging_event = data['entry'][0]['messaging'][0]
print(f"Messaging event: {messaging_event}")
if 'message' in messaging_event:
sender_id = messaging_event['sender']['id']
message_text = messaging_event['message'].get('text', '')
print(f"Received message from {sender_id}: {message_text}")
# Set typing on
set_typing_on(sender_id)
# Get response with possible image URLs
response_data = chat_completion(message_text, sender_id)
print("ChatBot Response:\n", response_data)
# Convert response_data to dictionary if it's a string
if isinstance(response_data, str):
response_data = json.loads(response_data)
print(type(response_data))
try:
# Ensure response_data is a dictionary
if isinstance(response_data, dict):
response_text = response_data.get('msg', '')
image_urls = response_data.get('image_urls', [])
print(f"ChatBot Response Text: {response_text}")
print(f"ChatBot Response Image URLs: {image_urls}")
# Send text and/or images
if response_text:
send_message(sender_id, response_text)
if image_urls:
for url in image_urls:
send_image(sender_id, url)
elif isinstance(response_data, str):
# Extract text and URLs from the string
response_text, image_urls = extract_text_and_urls(response_data)
print(f"Extracted Text: {response_text}")
print(f"Extracted Image URLs: {image_urls}")
# Send text and/or images
if response_text:
send_message(sender_id, response_text)
if image_urls:
for url in image_urls:
send_image(sender_id, url)
else:
print("Error: ChatBot response is not a dictionary.")
send_message(sender_id, response_data)
except Exception as e:
print(f"Exception occurred: {e}")
# Mark message as seen
mark_seen(sender_id)
# Set typing off after responding
set_typing_off(sender_id)
return jsonify({'status': 'ok'})
def extract_text_and_urls(input_string):
# Regex pattern to find URLs
url_pattern = re.compile(r'https?://\S+|www\.\S+')
# Find all URLs using the pattern
urls = re.findall(url_pattern, input_string)
# Remove URLs from the input string to get the remaining text
text = re.sub(url_pattern, '', input_string).strip()
return text, urls
def send_message(recipient_id, message_text):
url = f'https://graph.facebook.com/v12.0/me/messages?access_token={PAGE_ACCESS_TOKEN}'
headers = {'Content-Type': 'application/json'}
payload = {
'recipient': {'id': recipient_id},
'message': {'text': message_text}
}
print(f"Sending message request: {payload}")
response = requests.post(url, headers=headers, json=payload)
print(f"Message send response: {response.status_code}, {response.text}")
def send_image(recipient_id, image_url):
url = f'https://graph.facebook.com/v12.0/me/messages?access_token={PAGE_ACCESS_TOKEN}'
headers = {'Content-Type': 'application/json'}
payload = {
'recipient': {'id': recipient_id},
'message': {
'attachment': {
'type': 'image',
'payload': {
'url': image_url,
'is_reusable': True
}
}
}
}
print(f"Sending image request: {payload}")
response = requests.post(url, headers=headers, json=payload)
print(f"Image send response: {response.status_code}, {response.text}")
def set_typing_on(recipient_id):
url = f'https://graph.facebook.com/v12.0/me/messages?access_token={PAGE_ACCESS_TOKEN}'
headers = {'Content-Type': 'application/json'}
payload = {
'recipient': {'id': recipient_id},
'sender_action': 'typing_on'
}
print(f"Sending typing on request: {payload}")
response = requests.post(url, headers=headers, json=payload)
print(f"Typing on response: {response.status_code}, {response.text}")
def set_typing_off(recipient_id):
url = f'https://graph.facebook.com/v12.0/me/messages?access_token={PAGE_ACCESS_TOKEN}'
headers = {'Content-Type': 'application/json'}
payload = {
'recipient': {'id': recipient_id},
'sender_action': 'typing_off'
}
print(f"Sending typing off request: {payload}")
response = requests.post(url, headers=headers, json=payload)
print(f"Typing off response: {response.status_code}, {response.text}")
def mark_seen(recipient_id):
url = f'https://graph.facebook.com/v12.0/me/messages?access_token={PAGE_ACCESS_TOKEN}'
headers = {'Content-Type': 'application/json'}
payload = {
'recipient': {'id': recipient_id},
'sender_action': 'mark_seen'
}
print(f"Sending mark seen request: {payload}")
response = requests.post(url, headers=headers, json=payload)
print(f"Mark seen response: {response.status_code}, {response.text}")
if __name__ == '__main__':
app.run(host="0.0.0.0", port=7860, debug=True)
|