Spaces:
Sleeping
Sleeping
from flask import Flask, request, jsonify | |
import requests | |
from dotenv import load_dotenv | |
import os | |
import re | |
import json | |
from helper.cohere_api import chat_completion # Ensure this import is correct | |
# Load environment variables from .env file | |
load_dotenv() | |
app = Flask(__name__) | |
# Retrieve environment variables | |
VERIFY_TOKEN = os.getenv('VERIFY_TOKEN') | |
PAGE_ACCESS_TOKEN = os.getenv('PAGE_ACCESS_TOKEN') | |
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') | |
# Check if environment variables are set | |
if not all([VERIFY_TOKEN, PAGE_ACCESS_TOKEN, OPENAI_API_KEY]): | |
raise EnvironmentError("Some environment variables are missing. Please check your .env file.") | |
# Debugging: Print the loaded environment variables (remove in production) | |
print(f"VERIFY_TOKEN: {VERIFY_TOKEN}") | |
print(f"PAGE_ACCESS_TOKEN: {PAGE_ACCESS_TOKEN}") | |
print(f"OPENAI_API_KEY: {OPENAI_API_KEY}") | |
def home(): | |
return "Welcome to the chatbot!" | |
def webhook(): | |
if request.method == 'GET': | |
# Webhook verification | |
verify_token = request.args.get('hub.verify_token') | |
challenge = request.args.get('hub.challenge') | |
print(f"GET request received: verify_token={verify_token}, challenge={challenge}") | |
if verify_token == VERIFY_TOKEN: | |
print("Verification token matches. Returning challenge.") | |
return challenge | |
else: | |
print("Error: wrong validation token.") | |
return 'Error, wrong validation token' | |
elif request.method == 'POST': | |
data = request.get_json() | |
print(f"POST request received: {data}") | |
if 'entry' in data and len(data['entry']) > 0 and 'messaging' in data['entry'][0]: | |
messaging_event = data['entry'][0]['messaging'][0] | |
print(f"Messaging event: {messaging_event}") | |
if 'message' in messaging_event: | |
# Ignore echo messages | |
if messaging_event['message'].get('is_echo'): | |
print("Echo message received. Ignoring.") | |
return jsonify({'status': 'ok'}) | |
sender_id = messaging_event['sender']['id'] | |
message_text = messaging_event['message'].get('text', '') | |
if not message_text: | |
print("Empty message text received. Ignoring.") | |
return jsonify({'status': 'ok'}) | |
print(f"Received message from {sender_id}: {message_text}") | |
# Set typing on | |
set_typing_on(sender_id) | |
try: | |
# Get response with possible image URLs | |
response_data = chat_completion(message_text, sender_id) | |
print("ChatBot Response:\n", response_data) | |
response_data = parse_response(response_data) | |
print("#"*10) | |
print("Parsed Response:\n", response_data) | |
print("#"*10) | |
response_text = response_data.get('msg', '') | |
image_urls = response_data.get('image_urls', []) | |
print(f"ChatBot Response Text: {response_text}") | |
print(f"ChatBot Response Image URLs: {image_urls}") | |
# Send text and/or images | |
if response_text: | |
send_message(sender_id, response_text) | |
if image_urls: | |
for url in image_urls: | |
send_image(sender_id, url) | |
except Exception as e: | |
print(f"Exception occurred: {e}") | |
send_message(sender_id, "Sorry, something went wrong. Please try again later.") | |
# Mark message as seen | |
mark_seen(sender_id) | |
# Set typing off after responding | |
set_typing_off(sender_id) | |
return jsonify({'status': 'ok'}) | |
def parse_response(input_string): | |
# Regex pattern to find URLs | |
url_pattern = re.compile(r'https?://\S+|www\.\S+') | |
urls = re.findall(url_pattern, input_string) | |
# Clean URLs by removing unwanted characters | |
cleaned_urls = [re.sub(r'[(){}"\']', '', url) for url in urls] | |
# Extract JSON-like dictionaries from the input string | |
json_pattern = re.compile(r'\{.*?\}') | |
json_matches = json_pattern.findall(input_string) | |
extracted_text = input_string | |
for json_str in json_matches: | |
try: | |
json_data = json.loads(json_str) | |
if isinstance(json_data, dict): | |
extracted_text = json_data.get('msg', extracted_text) | |
image_urls = json_data.get('image_url', []) | |
if isinstance(image_urls, str): | |
cleaned_urls.append(re.sub(r'[(){}"\']', '', image_urls)) | |
elif isinstance(image_urls, list): | |
cleaned_urls.extend([re.sub(r'[(){}"\']', '', url) for url in image_urls]) | |
except json.JSONDecodeError: | |
continue | |
# Remove URLs from the extracted text | |
text = re.sub(url_pattern, '', extracted_text).strip() | |
# Further clean the text from any leftover JSON artifacts and unwanted parts | |
text = re.sub(r'[\)\{\}\[\]\"\(\']', '', text).strip() | |
text = re.sub(r'msg:', '', text).strip() | |
text = re.sub(r'image_url:', '', text).strip() | |
text = text.replace('\n', '').strip() | |
text = text.replace(" ", " ") | |
return { | |
'msg': text, | |
'image_urls': list(set(cleaned_urls)) | |
} | |
def send_message(recipient_id, message_text): | |
url = f'https://graph.facebook.com/v12.0/me/messages?access_token={PAGE_ACCESS_TOKEN}' | |
headers = {'Content-Type': 'application/json'} | |
payload = { | |
'recipient': {'id': recipient_id}, | |
'message': {'text': message_text} | |
} | |
print(f"Sending message request: {payload}") | |
response = requests.post(url, headers=headers, json=payload) | |
print(f"Message send response: {response.status_code}, {response.text}") | |
def send_image(recipient_id, image_url): | |
if not verify_image_url(image_url): | |
print("Invalid or inaccessible image URL.") | |
return | |
url = f'https://graph.facebook.com/v12.0/me/messages?access_token={PAGE_ACCESS_TOKEN}' | |
headers = {'Content-Type': 'application/json'} | |
payload = { | |
'recipient': {'id': recipient_id}, | |
'message': { | |
'attachment': { | |
'type': 'image', | |
'payload': { | |
'url': image_url, | |
'is_reusable': True | |
} | |
} | |
} | |
} | |
print(f"Sending image request: {payload}") | |
response = requests.post(url, headers=headers, json=payload) | |
print(f"Image send response: {response.status_code}, {response.text}") | |
if response.status_code != 200: | |
print(f"Error sending image: {response.json()}") | |
def verify_image_url(image_url): | |
try: | |
response = requests.head(image_url) | |
return response.status_code == 200 and 'image' in response.headers['Content-Type'] | |
except Exception as e: | |
print(f"Error verifying image URL: {e}") | |
return False | |
def set_typing_on(recipient_id): | |
url = f'https://graph.facebook.com/v12.0/me/messages?access_token={PAGE_ACCESS_TOKEN}' | |
headers = {'Content-Type': 'application/json'} | |
payload = { | |
'recipient': {'id': recipient_id}, | |
'sender_action': 'typing_on' | |
} | |
print(f"Sending typing on request: {payload}") | |
response = requests.post(url, headers=headers, json=payload) | |
print(f"Typing on response: {response.status_code}, {response.text}") | |
def set_typing_off(recipient_id): | |
url = f'https://graph.facebook.com/v12.0/me/messages?access_token={PAGE_ACCESS_TOKEN}' | |
headers = {'Content-Type': 'application/json'} | |
payload = { | |
'recipient': {'id': recipient_id}, | |
'sender_action': 'typing_off' | |
} | |
print(f"Sending typing off request: {payload}") | |
response = requests.post(url, headers=headers, json=payload) | |
print(f"Typing off response: {response.status_code}, {response.text}") | |
def mark_seen(recipient_id): | |
url = f'https://graph.facebook.com/v12.0/me/messages?access_token={PAGE_ACCESS_TOKEN}' | |
headers = {'Content-Type': 'application/json'} | |
payload = { | |
'recipient': {'id': recipient_id}, | |
'sender_action': 'mark_seen' | |
} | |
print(f"Sending mark seen request: {payload}") | |
response = requests.post(url, headers=headers, json=payload) | |
print(f"Mark seen response: {response.status_code}, {response.text}") | |
if __name__ == '__main__': | |
app.run(host="0.0.0.0", port=7860, debug=True) | |