import eventlet import pandas as pd import json from PIL import Image import numpy as np import os from pathlib import Path import torch import torch.nn.functional as F # from src.data.embs import ImageDataset from src.model.blip_embs import blip_embs from src.data.transforms import transform_test from transformers import StoppingCriteria, StoppingCriteriaList, TextIteratorStreamer import gradio as gr # import spaces from langchain.chains import ConversationChain from langchain_community.chat_message_histories import ChatMessageHistory from langchain_core.runnables import RunnableWithMessageHistory from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from langchain_groq import ChatGroq from dotenv import load_dotenv from flask import Flask, request, render_template from flask_cors import CORS from flask_socketio import SocketIO, emit import json from openai import OpenAI # GROQ_API_KEY = os.getenv("GROQ_API_KEY") load_dotenv(".env") USER_AGENT = os.getenv("USER_AGENT") GROQ_API_KEY = os.getenv("GROQ_API_KEY") OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") SECRET_KEY = os.getenv("SECRET_KEY") # Set environment variables os.environ['USER_AGENT'] = USER_AGENT os.environ["GROQ_API_KEY"] = GROQ_API_KEY os.environ['OPENAI_API_KEY'] = OPENAI_API_KEY os.environ["TOKENIZERS_PARALLELISM"] = 'true' # Initialize Flask app and SocketIO with CORS app = Flask(__name__) CORS(app) app.config['MAX_CONTENT_LENGTH'] = 1024 * 1024 * 1024 socketio = SocketIO(app, cors_allowed_origins="*", logger=True, max_http_buffer_size=1024 * 1024 * 1024) app.config['SECRET_KEY'] = SECRET_KEY # Initialize LLM llm = ChatGroq(model="llama-3.1-8b-instant", temperature=0, max_tokens=1024, max_retries=2) # Initialize Router router = ChatGroq(model="llama-3.2-3b-preview", temperature=0, max_tokens=1024, max_retries=2, model_kwargs={"response_format": {"type": "json_object"}}) # Initialize Router answer_formatter = ChatGroq(model="llama-3.1-8b-instant", temperature=0, max_tokens=1024, max_retries=2) # Initialized recommendation LLM client = OpenAI() class StoppingCriteriaSub(StoppingCriteria): def __init__(self, stops=[], encounters=1): super().__init__() self.stops = stops def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor): for stop in self.stops: if torch.all(input_ids[:, -len(stop):] == stop).item(): return True return False device = torch.device("cuda" if torch.cuda.is_available() else "cpu") def get_blip_config(model="base"): config = dict() if model == "base": config[ "pretrained" ] = "https://storage.googleapis.com/sfr-vision-language-research/BLIP/models/model_base_capfilt_large.pth " config["vit"] = "base" config["batch_size_train"] = 32 config["batch_size_test"] = 16 config["vit_grad_ckpt"] = True config["vit_ckpt_layer"] = 4 config["init_lr"] = 1e-5 elif model == "large": config[ "pretrained" ] = "https://storage.googleapis.com/sfr-vision-language-research/BLIP/models/model_large_retrieval_coco.pth" config["vit"] = "large" config["batch_size_train"] = 16 config["batch_size_test"] = 32 config["vit_grad_ckpt"] = True config["vit_ckpt_layer"] = 12 config["init_lr"] = 5e-6 config["image_size"] = 384 config["queue_size"] = 57600 config["alpha"] = 0.4 config["k_test"] = 256 config["negative_all_rank"] = True return config print("Creating model") config = get_blip_config("large") model = blip_embs( pretrained=config["pretrained"], image_size=config["image_size"], vit=config["vit"], vit_grad_ckpt=config["vit_grad_ckpt"], vit_ckpt_layer=config["vit_ckpt_layer"], queue_size=config["queue_size"], negative_all_rank=config["negative_all_rank"], ) model = model.to(device) model.eval() transform = transform_test(384) df = pd.read_json("my_recipes.json") tar_img_feats = [] for _id in df["id_"].tolist(): tar_img_feats.append(torch.load("./datasets/sidechef/blip-embs-large/{:07d}.pth".format(_id)).unsqueeze(0)) tar_img_feats = torch.cat(tar_img_feats, dim=0) class Chat: def __init__(self, model, transform, dataframe, tar_img_feats, device='cuda:0', stopping_criteria=None): self.device = device self.model = model self.transform = transform self.df = dataframe self.tar_img_feats = tar_img_feats self.img_feats = None self.target_recipe = None self.messages = [] if stopping_criteria is not None: self.stopping_criteria = stopping_criteria else: stop_words_ids = [torch.tensor([2]).to(self.device)] self.stopping_criteria = StoppingCriteriaList([StoppingCriteriaSub(stops=stop_words_ids)]) def encode_image(self, image_path): img = Image.fromarray(image_path).convert("RGB") img = self.transform(img).unsqueeze(0) img = img.to(self.device) img_embs = model.visual_encoder(img) img_feats = F.normalize(model.vision_proj(img_embs[:, 0, :]), dim=-1).cpu() self.img_feats = img_feats self.get_target(self.img_feats, self.tar_img_feats) def get_target(self, img_feats, tar_img_feats) : score = (img_feats @ tar_img_feats.t()).squeeze(0).cpu().detach().numpy() index = np.argsort(score)[::-1][0] self.target_recipe = df.iloc[index] def ask(self): return json.dumps(self.target_recipe.to_json()) chat = Chat(model,transform,df,tar_img_feats, device) def answer_generator(formated_input, session_id): # QA system prompt and chain qa_system_prompt = """ You are an AI assistant developed by Nutrigenics AI, specializing in intelligent recipe information retrieval and recipe suggestions. Your purpose is to help users by recommending recipes, providing detailed nutritional values, listing ingredients, offering step-by-step cooking instructions, and filtering recipes based on provide context ans user query. Operational Guidelines: 1. Input Structure: - Context: You may receive contextual information related to recipes, such as specific data sets, user preferences, dietary restrictions, or previously selected dishes. - User Query: Users will pose questions or requests related to recipes, nutritional information, ingredient substitutions, cooking instructions, and more. 2. Response Strategy: - Utilize Provided Context: If the context contains relevant information that addresses the user's query, base your response on this provided data to ensure accuracy and relevance. - Respond to User Query Directly: If the context does not contain the necessary information to answer the user's query, kindly state that you do not have require information. Core Functionalities: - Nutritional Information: Accurately provide nutritional values for each recipe, including calories, macronutrients (proteins, fats, carbohydrates), and essential vitamins and minerals, using contextual data when available. - Ingredient Details: List all ingredients required for recipes, including substitute options for dietary restrictions or ingredient availability, utilizing context when relevant. - Step-by-Step Cooking Instructions: Deliver clear, easy-to-follow instructions for preparing and cooking meals, informed by any provided contextual data. - Recipe Recommendations: Suggest dishes based on user preferences, dietary restrictions, available ingredients, and contextual data if provided. Additional Instructions: - Precision and Personalization: Always aim to provide precise, personalized, and relevant information to users based on both the provided context and their specific queries. - Clarity and Coherence: Ensure that all responses are clear, well-structured, and easy to understand, facilitating a seamless user experience. - Substitute Suggestions: When suggesting ingredient substitutes, consider user preferences and dietary restrictions outlined in the context or user query. - Dynamic Adaptation: Adapt your responses dynamically based on whether the context is relevant to the user's current request, ensuring optimal use of available information. Don't mention about context in the response, format the answer in a natural and friendly way. Context: {context} """ qa_prompt = ChatPromptTemplate.from_messages( [ ("system", qa_system_prompt), ("human", "{input}") ] ) # Create the base chain base_chain = qa_prompt | llm | StrOutputParser() # Wrap the chain with message history question_answer_chain = RunnableWithMessageHistory( base_chain, lambda session_id: ChatMessageHistory(), # This creates a new history for each session input_messages_key="input", history_messages_key="chat_history" ) response = question_answer_chain.invoke(formated_input, config={"configurable": {"session_id": session_id}}) return response ### Router import json from langchain_core.messages import HumanMessage, SystemMessage def router_node(query): # Prompt router_instructions = """You are an expert at determining the appropriate task for a user’s question based on chat history and the current query context. You have two available tasks: 1. Retrieval: Fetch information based on user's chat history and current query. 2. Recommendation/Suggestion: Recommend recipes to users based on the query. Return a JSON response with a single key named “task” indicating either “retrieval” or “recommendation” based on your decision. """ response = router.invoke( [SystemMessage(content=router_instructions)] + [ HumanMessage( content=query ) ] ) res = json.loads(response.content) return res['task'] def recommendation_node(query): prompt = """ You are a helpful assistant that writes Python code to filter recipes from a JSON filr based o the user query. \n JSON file path = 'recipes.json' \n The JSON file is a list of recipes with the following structure: \n { "recipe_name": string, "recipe_time": integer, "recipe_yields": string, "recipe_ingredients": list of ingredients, "recipe_instructions": list of instruections, "recipe_image": string, "blogger": string, "recipe_nutrients": JSON object with key value pairs such as "protein: 10g", "tags": list of tags related to recipe } \n Here is the example of an recipe json object from the JSON data: \n { "recipe_name": "Asian Potato Salad with Seven Minute Egg", "recipe_time": 0, "recipe_yields": "4 servings", "recipe_ingredients": [ "2 1/2 cup Multi-Colored Fingerling Potato", "3/4 cup Celery", "1/4 cup Red Onion", "2 tablespoon Fresh Parsley", "1/3 cup Mayonnaise", "1 tablespoon Chili Garlic Sauce", "1 teaspoon Hoisin Sauce", "1 splash Soy Sauce", "to taste Salt", "to taste Ground Black Pepper", "4 Egg" ], "recipe_instructions": "Fill a large stock pot with water.\nAdd the Multi-Colored Fingerling Potato (2 1/2 cup) and bring water to a boil. Boil the potatoes for 20 minutes or until fork tender.\nDrain the potatoes and let them cool completely.\nMeanwhile, mix together in a small bowl Mayonnaise (1/3 cup), Chili Garlic Sauce (1 tablespoon), Hoisin Sauce (1 teaspoon), and Soy Sauce (1 splash).\nTo make the Egg (4), fill a stock pot with water and bring to a boil Gently add the eggs to the water and set a timer for seven minutes.\nThen move the eggs to an ice bath to cool completely. Once cooled, crack the egg slightly and remove the shell. Slice in half when ready to serve.\nNext, halve the cooled potatoes and place into a large serving bowl. Add the Ground Black Pepper (to taste), Celery (3/4 cup), Red Onion (1/4 cup), and mayo mixture. Toss to combine adding Salt (to taste) and Fresh Parsley (2 tablespoon).\nTop with seven minute eggs and serve. Enjoy!", "recipe_image": "https://www.sidechef.com/recipe/eeeeeceb-493e-493d-8273-66c800821b13.jpg?d=1408x1120", "blogger": "sidechef.com", "recipe_nutrients": { "calories": "80 calories", "proteinContent": "2.1 g", "fatContent": "6.2 g", "carbohydrateContent": "3.9 g", "fiberContent": "0.5 g", "sugarContent": "0.4 g", "sodiumContent": "108.0 mg", "saturatedFatContent": "1.2 g", "transFatContent": "0.0 g", "cholesterolContent": "47.4 mg", "unsaturatedFatContent": "3.8 g" }, "tags": [ "Salad", "Lunch", "Brunch", "Appetizers", "Side Dish", "Budget-Friendly", "Vegetarian", "Pescatarian", "Eggs", "Potatoes", "Dairy-Free", "Shellfish-Free" ] } \n Based on the user query, provide a Python function to filter the JSON data. The output of the function should be a list of json objects. \n Recipe filtering instructions: - If a user asked for the highest nutrient recipe such as "high protein or high calories" then filtered recipes should be the top highest recipes from all the recipes with high nutrient. - sort or rearrange recipes based which recipes are more appropriate for the user. Your output instructions: - The function name should be filter_recipes. The input to the function should be file name. - The length of output recipes should not be more than 6. - Only give me output function. Do not call the function. - Give the python function as a key named "code" in a json format. - Do not include any other text with the output, only give python code. - If you do not follow the above given instructions, the chat may be terminated. """ max_tries = 3 while True: try: # llm = ChatGroq(model="llama-3.1-8b-instant", temperature=0, max_tokens=1024, max_retries=2, model_kwargs={"response_format": {"type": "json_object"}}) response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": prompt}, { "role": "user", "content": query } ] ) content = response.choices[0].message.content res = json.loads(content) script = res['code'] exec(script, globals()) filtered_recipes = filter_recipes('recipes.json') if len(filtered_recipes) > 0: return filtered_recipes except Exception as e: print(e) if max_tries <= 0: return [] else: max_tries -= 1 return filtered_recipes def answer_formatter_node(question, context): prompt = f""" You are an highly clever question-answering assistant trained to provide clear and concise answers based on the user query and provided context. Your task is to generated answers for the user query based on the context provided. Instructions for your response: 1. Directly answer the user query using only the information provided in the context. 2. Ensure your response is clear and concise. 3. Mention only details related to the recipe, including the recipe name, instructions, nutrients, yield, ingredients, and image. 4. Do not include any information that is not related to the recipe context. Please format an answer based on the following user question and context provided: User Question: {question} Context: {context} """ response = answer_formatter.invoke( [SystemMessage(content=prompt)] ) res = response.content return res CURR_CONTEXT = '' # @spaces.GPU def get_answer(image=[], message='', sessionID='abc123'): global CURR_CONTEXT if len(image) > 0: try: # Process the image and message here device = torch.device("cuda" if torch.cuda.is_available() else "cpu") chat = Chat(model,transform,df,tar_img_feats, device) chat.encode_image(image) data = chat.ask() CURR_CONTEXT = data formated_input = { 'input': message, 'context': data } response = answer_generator(formated_input, session_id=sessionID) except Exception as e: print(e) response = {'content':"An error occurred while processing your request."} elif len(image) == 0 and message is not None: print("I am here") task = router_node(message) if task == 'retrieval': recipes = recommendation_node(message) print(recipes) if not recipes: response = {'content':"An error occurred while processing your request."} response = answer_formatter_node(message, recipes) else: formated_input = { 'input': message, 'context': CURR_CONTEXT } response = answer_generator(formated_input, session_id=sessionID) return response # Function to handle WebSocket connection @socketio.on('ping') def handle_connect(): emit('Ping-return', {'message': 'Connected'}, room=request.sid) # Function to handle WebSocket connection @socketio.on('connect') def handle_connect(): print(f"Client connected: {request.sid}") # Function to handle WebSocket disconnection @socketio.on('disconnect') def handle_disconnect(): print(f"Client disconnected: {request.sid}") import json import base64 from PIL import Image from io import BytesIO import torchvision.transforms as transforms # Dictionary to store incomplete image data by session session_store = {} @socketio.on('message') def handle_message(data): global session_store global CURR_CONTEXT context = "No data available" session_id = request.sid if session_id not in session_store: session_store[session_id] = {'image_data': b"", 'message': None, 'image_received': False} if 'message' in data: session_store[session_id]['message'] = data['message'] # Handle image chunk data if 'image' in data: try: # Append the incoming image chunk session_store[session_id]['image_data'] += data['image'] except Exception as e: print(f"Error processing image chunk: {str(e)}") emit('response', "An error occurred while receiving the image chunk.", room=session_id) return if session_store[session_id]['image_data'] or session_store[session_id]['message']: try: image_bytes = session_store[session_id]['image_data'] # print("checkpoint 2") if isinstance(image_bytes, str): image_bytes = base64.b64decode(image_bytes) image = Image.open(BytesIO(image_bytes)) image_array = np.array(image) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") chat = Chat(model, transform, df, tar_img_feats, device) chat.encode_image(image_array) context = chat.ask() CURR_CONTEXT = context message = data['message'] formated_input = { 'input': message, 'context': json.dumps(context) } # Invoke question_answer_chain and stream the response response = answer_generator(formated_input, session_id=session_id) emit('response', response, room=session_id) except Exception as e: print(f"Error processing image or message: {str(e)}") emit('response', "An error occurred while processing your request.", room=session_id) return finally: # Clear session data after processing session_store.pop(session_id, None) else: message = data['message'] task = router_node(message) print(task) if task == 'retrieval': formated_input = { 'input': message, 'context': json.dumps(CURR_CONTEXT) } response = answer_generator(formated_input, session_id=session_id) emit('response', response, room=session_id) else: response = recommendation_node(message) print(response) # response = answer_formatter_node(message, recipes) if response is None: response = {'content':"An error occurred while processing your request."} emit('json_response', response, room=session_id) session_store.pop(session_id, None) import requests from PIL import Image import numpy as np from io import BytesIO def download_image_to_numpy(url): print("Image URL: ", url) # Send a GET request to the URL to download the image response = requests.get(url) # Check if the request was successful if response.status_code == 200: # Open the image using PIL and convert it to RGB format image = Image.open(BytesIO(response.content)).convert('RGB') # Convert the image to a NumPy array image_array = np.array(image) return image_array else: raise Exception(f"Failed to download image. Status code: {response.status_code}") @socketio.on('example') def handle_message(data): img_url = data['img_url'] message = data['message'] session_id = request.sid image_array = download_image_to_numpy(img_url) response = get_answer(image=image_array, message=message, sessionID=request.sid) emit('response', response, room=session_id) return response # Home route @app.route("/") def index_view(): return render_template('chat.html') # Main function to run the app if __name__ == '__main__': socketio.run(app, debug=False)