ConsumeWise / app_api.py
sonika1503
Add application file
33b10b6
raw
history blame
25.7 kB
import streamlit as st
from openai import OpenAI
import json, os, httpx, asyncio
import requests, time
#from data_extractor import extract_data
#from rda import find_nutrition
from typing import Dict, Any
#from calc_cosine_similarity import find_relevant_file_paths
import pickle
from calc_consumption_context import get_consumption_context
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
#Used the @st.cache_resource decorator on this function.
#This Streamlit decorator ensures that the function is only executed once and its result (the OpenAI client) is cached.
#Subsequent calls to this function will return the cached client, avoiding unnecessary recreation.
@st.cache_resource
def get_openai_client():
#Enable debug mode for testing only
return OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
#@st.cache_resource
#def get_backend_urls():
# data_extractor_url = "https://data-extractor-67qj89pa0-sonikas-projects-9936eaad.vercel.app/"
# return data_extractor_url
client = get_openai_client()
render_host_url = "https://foodlabelanalyzer-api-2.onrender.com"
@st.cache_resource
def create_assistant_and_embeddings():
global client
assistant1 = client.beta.assistants.create(
name="Processing Level",
instructions="You are an expert dietician. Use your knowledge base to answer questions about the processing level of food product.",
model="gpt-4o",
tools=[{"type": "file_search"}],
temperature=0,
top_p = 0.85
)
# Create a vector store
vector_store1 = client.beta.vector_stores.create(name="Processing Level Vec")
# Ready the files for upload to OpenAI
file_paths = ["./Processing_Level.docx"]
file_streams = [open(path, "rb") for path in file_paths]
# Use the upload and poll SDK helper to upload the files, add them to the vector store,
# and poll the status of the file batch for completion.
file_batch1 = client.beta.vector_stores.file_batches.upload_and_poll(
vector_store_id=vector_store1.id, files=file_streams
)
# You can print the status and the file counts of the batch to see the result of this operation.
print(file_batch1.status)
print(file_batch1.file_counts)
#Processing Level
assistant1 = client.beta.assistants.update(
assistant_id=assistant1.id,
tool_resources={"file_search": {"vector_store_ids": [vector_store1.id]}},
)
return assistant1
assistant_p = create_assistant_and_embeddings()
async def extract_data_from_product_image(image_links):
global render_host_url
print(f"DEBUG - image links are {image_links}")
async with httpx.AsyncClient() as client_api:
try:
response = await client_api.post(
f"{render_host_url}/data_extractor/api/extract-data",
json = { "image_links" : image_links },
headers = {
"Content-Type": "application/json"
},
timeout=50.0
)
response.raise_for_status() # Raise an exception for HTTP errors
return response.json()
except httpx.RequestError as e:
print(f"Request error occurred: {e.request.url} - {e}")
return None
except httpx.HTTPStatusError as e:
print(f"HTTP error occurred: {e.response.status_code} - {e.response.text}")
return None
except Exception as e:
print(f"An unexpected error occurred: {e}")
return None
#def get_product_list(product_name_by_user):
# response = find_product(product_name_by_user)
# return response
async def get_product_list(product_name_by_user):
global render_host_url
print("calling find-product api")
async with httpx.AsyncClient() as client_api:
try:
response = await client_api.get(
f"{render_host_url}/data_extractor/api/find-product",
params={"product_name": product_name_by_user},
timeout=httpx.Timeout(
connect=100.0,
read=500.0,
pool=50.0,
write=10.0
)
)
response.raise_for_status()
return response.json()
except httpx.RequestError as e:
print(f"An error occurred: {e}")
return None
async def get_product(product_name):
global render_host_url
print("calling get-product api")
async with httpx.AsyncClient() as client_api:
try:
response = await client_api.get(
f"{render_host_url}/data_extractor/api/get-product",
params={"product_name": product_name},
timeout=httpx.Timeout(
connect=300.0,
read=700.0,
pool=50.0,
write=10.0
)
)
response.raise_for_status()
return response.json()
except httpx.TimeoutException as e:
print(f"The request timed out : {e}")
return None
except httpx.RequestError as e:
print(f"An error occurred: {e}")
return None
async def analyze_nutrition_using_icmr_rda(product_info_from_db):
global render_host_url
print(f"Calling analyze_nutrition_icmr_rda api - product_info_from_db : {type(product_info_from_db)}")
async with httpx.AsyncClient() as client_api:
try:
response = await client_api.post(
f"{render_host_url}/nutrient_analyzer/api/nutrient-analysis",
json={"product_info_from_db": product_info_from_db},
timeout=httpx.Timeout(
connect=50.0,
read=400.0,
write=10.0,
pool=10.0
),
headers={
"Content-Type": "application/json"
}
)
response.raise_for_status()
# Add more detailed logging
response_json = response.json()
print(f"Full response JSON: {response_json}")
# Validate response structure
if not response_json:
print("Received empty JSON response")
return None
return response_json
except httpx.TimeoutException as e:
print(f"Timeout error: {e}")
raise # Re-raise to propagate the error
except httpx.RequestError as e:
print(f"Request error: {e}")
raise # Re-raise to propagate the error
except Exception as e:
print(f"Unexpected error in API call: {e}")
raise
async def generate_final_analysis(
brand_name: str,
product_name: str,
nutritional_level: str,
processing_level: str,
all_ingredient_analysis: str,
claims_analysis: str,
refs: list
):
print(f"Calling cumulative-analysis API with refs : {refs}")
global render_host_url
# Create a client with a longer timeout (120 seconds)
async with httpx.AsyncClient() as client_api:
try:
# Convert the refs list to a JSON string
print(f"sending refs to API for product {product_name} by {brand_name} - {refs}")
response = await client_api.post(
f"{render_host_url}/cumulative_analysis/api/cumulative-analysis",
json={
"brand_name": brand_name,
"product_name": product_name,
"nutritional_level": nutritional_level,
"processing_level": processing_level,
"all_ingredient_analysis": all_ingredient_analysis,
"claims_analysis": claims_analysis,
"refs": refs
},
headers={
"Content-Type": "application/json"
},
timeout=httpx.Timeout(
connect=10.0,
read=800.0,
write=10.0,
pool=10.0
)
)
response.raise_for_status()
formatted_response = response.text.replace('\\n', '\n')
return formatted_response
except httpx.TimeoutException as e:
print(f"Request timed out: {e}")
return None
except Exception as e:
print(f"An error occurred: {e}")
return None
async def analyze_processing_level_and_ingredients(product_info_from_db, assistant_p_id, start_time):
print("calling processing level and ingredient_analysis api")
print(f"assistant_p_id is of type {type(assistant_p_id)}")
global render_host_url
request_payload = {
"product_info_from_db": product_info_from_db,
"assistant_p_id": assistant_p_id
}
try:
#with httpx.Client() as client_api
print(f"DEBUG - Inside Ingredient analysis API 1 {time.time() - start_time} sec")
async with httpx.AsyncClient() as client_api:
response = await client_api.post(
f"{render_host_url}/ingredient_analysis/api/processing_level-ingredient-analysis",
json=request_payload,
headers={
"Content-Type": "application/json"
},
timeout=httpx.Timeout(
connect=5.0,
read=600.0,
write=10.0,
pool=10.0
)
)
print(f"DEBUG - Inside Ingredient analysis API 2 {time.time() - start_time} sec")
response.raise_for_status()
return response.json()
except httpx.TimeoutException as e:
print(f"The request timed out : {e}")
return None
except (httpx.RequestError, httpx.HTTPStatusError) as e:
print(f"API call error: {e}")
return None
async def analyze_claims(product_info_from_db):
print("calling processing level and ingredient_analysis api")
global render_host_url
request_payload = {
"product_info_from_db": product_info_from_db
}
try:
async with httpx.AsyncClient() as client_api:
response = await client_api.post(
f"{render_host_url}/claims_analysis/api/claims-analysis",
json=request_payload,
headers={
"Content-Type": "application/json"
},
timeout=httpx.Timeout(
connect=10.0,
read=150.0,
write=10.0,
pool=10.0
)
)
response.raise_for_status()
return response.json()
except (httpx.RequestError, httpx.HTTPStatusError) as e:
print(f"API call error: {e}")
return None
async def analyze_product(product_info_from_db):
global assistant_p
if product_info_from_db:
brand_name = product_info_from_db.get("brandName", "")
product_name = product_info_from_db.get("productName", "")
start_time = time.time()
# Verify each function is async and returns a coroutine
coroutines = []
# Ensure each function is an async function and returns a coroutine
nutrition_coro = analyze_nutrition_using_icmr_rda(product_info_from_db)
processing_coro = analyze_processing_level_and_ingredients(product_info_from_db, assistant_p.id, start_time)
coroutines.append(nutrition_coro)
coroutines.append(processing_coro)
# Conditionally add claims analysis
if product_info_from_db.get("claims"):
claims_coro = analyze_claims(product_info_from_db)
coroutines.append(claims_coro)
# Debug: Print coroutine types to verify
print("Coroutines:", [type(coro) for coro in coroutines])
# Parallel API calls
results = await asyncio.gather(*coroutines)
# Unpack results based on the number of coroutines
nutritional_level_json = results[0]
refs_ingredient_analysis_json = results[1]
claims_analysis_json = results[2] if len(results) > 2 else None
# Extract data from API results
nutritional_level = nutritional_level_json["nutrition_analysis"]
refs = refs_ingredient_analysis_json["refs"]
all_ingredient_analysis = refs_ingredient_analysis_json["all_ingredient_analysis"]
processing_level = refs_ingredient_analysis_json["processing_level"]
claims_analysis = claims_analysis_json["claims_analysis"] if claims_analysis_json else ""
# Generate final analysis
final_analysis = await generate_final_analysis(
brand_name,
product_name,
nutritional_level,
processing_level,
all_ingredient_analysis,
claims_analysis,
refs
)
print(f"DEBUG - Cumulative analysis finished in {time.time() - start_time} seconds")
return final_analysis
# Streamlit app
# Initialize session state
if 'messages' not in st.session_state:
st.session_state.messages = []
async def chatbot_response(image_urls_str, product_name_by_user, extract_info = True):
# Process the user input and generate a response
processing_level = ""
harmful_ingredient_analysis = ""
claims_analysis = ""
image_urls = []
if product_name_by_user != "":
similar_product_list_json = await get_product_list(product_name_by_user)
if similar_product_list_json and extract_info == False:
with st.spinner("Fetching product information from our database... This may take a moment."):
print(f"similar_product_list_json : {similar_product_list_json}")
if 'error' not in similar_product_list_json.keys():
similar_product_list = similar_product_list_json['products']
return similar_product_list, "Product list found from our database"
else:
return [], "Product list not found"
elif extract_info == True:
with st.spinner("Analyzing product using data from 3,000+ peer-reviewed journal papers..."):
st.caption("This may take a few minutes")
product_info_raw = await get_product(product_name_by_user)
print(f"DEBUG product_info_raw from name: {type(product_info_raw)} {product_info_raw}")
if not product_info_raw:
return [], "product not found because product information in the db is corrupt"
if 'error' not in product_info_raw.keys():
final_analysis = await analyze_product(product_info_raw)
return [], final_analysis
else:
return [], f"Product information could not be extracted from our database because of {product_info_raw['error']}"
else:
return [], "Product not found in our database."
elif "http:/" in image_urls_str.lower() or "https:/" in image_urls_str.lower():
# Extract image URL from user input
if "," not in image_urls_str:
image_urls.append(image_urls_str)
else:
for url in image_urls_str.split(","):
if "http:/" in url.lower() or "https:/" in url.lower():
image_urls.append(url)
with st.spinner("Analyzing the product... This may take a moment."):
product_info_raw = await extract_data_from_product_image(image_urls)
print(f"DEBUG product_info_raw from image : {product_info_raw}")
if 'error' not in product_info_raw.keys():
final_analysis = await analyze_product(product_info_raw)
return [], final_analysis
else:
return [], f"Product information could not be extracted from the image because of {json.loads(product_info_raw)['error']}"
else:
return [], "I'm here to analyze food products. Please provide an image URL (Example : http://example.com/image.jpg) or product name (Example : Harvest Gold Bread)"
class SessionState:
"""Handles all session state variables in a centralized way"""
@staticmethod
def initialize():
initial_states = {
"messages": [],
"product_selected": False,
"product_shared": False,
"analyze_more": True,
"welcome_shown": False,
"yes_no_choice": None,
"welcome_msg": "Welcome to ConsumeWise! What product would you like me to analyze today? Example : Noodles, Peanut Butter etc",
"similar_products": [],
"awaiting_selection": False,
"current_user_input": "",
"selected_product": None
}
for key, value in initial_states.items():
if key not in st.session_state:
st.session_state[key] = value
class ProductSelector:
"""Handles product selection logic"""
@staticmethod
async def handle_selection():
if st.session_state.similar_products:
# Create a container for the selection UI
selection_container = st.container()
with selection_container:
# Radio button for product selection
choice = st.radio(
"Select a product:",
st.session_state.similar_products + ["None of the above"],
key="product_choice"
)
# Confirm button
confirm_clicked = st.button("Confirm Selection")
# Only process the selection when confirm is clicked
msg = ""
if confirm_clicked:
st.session_state.awaiting_selection = False
if choice != "None of the above":
#st.session_state.selected_product = choice
st.session_state.messages.append({"role": "assistant", "content": f"You selected {choice}"})
_, msg = await chatbot_response("", choice.split(" by ")[0], extract_info=True)
#Check if analysis couldn't be done because db had incomplete information
if msg != "product not found because product information in the db is corrupt":
#Only when msg is acceptable
st.session_state.messages.append({"role": "assistant", "content": msg})
with st.chat_message("assistant"):
st.markdown(msg)
st.session_state.product_selected = True
keys_to_keep = ["messages", "welcome_msg"]
keys_to_delete = [key for key in st.session_state.keys() if key not in keys_to_keep]
for key in keys_to_delete:
del st.session_state[key]
st.session_state.welcome_msg = "What product would you like me to analyze next?"
if choice == "None of the above" or msg == "product not found because product information in the db is corrupt":
st.session_state.messages.append(
{"role": "assistant", "content": "Please provide the image URL of the product to analyze based on the latest information."}
)
with st.chat_message("assistant"):
st.markdown("Please provide the image URL of the product to analyze based on the latest information.")
#st.session_state.selected_product = None
st.rerun()
# Prevent further chat input while awaiting selection
return True # Indicates selection is in progress
return False # Indicates no selection in progress
class ChatManager:
"""Manages chat interactions and responses"""
@staticmethod
async def process_response(user_input):
if not st.session_state.product_selected:
if "http:/" not in user_input and "https:/" not in user_input:
response, status = await ChatManager._handle_product_name(user_input)
else:
response, status = await ChatManager._handle_product_url(user_input)
return response, status
@staticmethod
async def _handle_product_name(user_input):
st.session_state.product_shared = True
st.session_state.current_user_input = user_input
similar_products, _ = await chatbot_response(
"", user_input, extract_info=False
)
if len(similar_products) > 0:
st.session_state.similar_products = similar_products
st.session_state.awaiting_selection = True
return "Here are some similar products from our database. Please select:", "no success"
return "Product not found in our database. Please provide the image URL of the product.", "no success"
@staticmethod
async def _handle_product_url(user_input):
is_valid_url = (".jpeg" in user_input or ".jpg" in user_input) and \
("http:/" in user_input or "https:/" in user_input)
if not st.session_state.product_shared:
return "Please provide the product name first"
if is_valid_url and st.session_state.product_shared:
_, msg = await chatbot_response(
user_input, "", extract_info=True
)
st.session_state.product_selected = True
if msg != "product not found because image is not clear" and "Product information could not be extracted from the image" not in msg:
response = msg
status = "success"
elif msg == "product not found because image is not clear":
response = msg + ". Please share clear image URLs!"
status = "no success"
else:
response = msg + ".Please re-try!!"
status = "no success"
return response, status
return "Please provide valid image URL of the product.", "no success"
async def main():
# Initialize session state
SessionState.initialize()
# Display title
st.title("ConsumeWise - Your Food Product Analysis Assistant")
# Show welcome message
if not st.session_state.welcome_shown:
st.session_state.messages.append({
"role": "assistant",
"content": st.session_state.welcome_msg
})
st.session_state.welcome_shown = True
# Display chat history
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Handle product selection if awaiting
selection_in_progress = False
if st.session_state.awaiting_selection:
selection_in_progress = await ProductSelector.handle_selection()
# Only show chat input if not awaiting selection
if not selection_in_progress:
user_input = st.chat_input("Enter your message:", key="user_input")
if user_input:
# Add user message to chat
st.session_state.messages.append({"role": "user", "content": user_input})
with st.chat_message("user"):
st.markdown(user_input)
# Process response
response, status = await ChatManager.process_response(user_input)
st.session_state.messages.append({"role": "assistant", "content": response})
with st.chat_message("assistant"):
st.markdown(response)
if status == "success":
SessionState.initialize() # Reset states for next product
#st.session_state.welcome_msg = "What is the next product you would like me to analyze today?"
keys_to_keep = ["messages", "welcome_msg"]
keys_to_delete = [key for key in st.session_state.keys() if key not in keys_to_keep]
for key in keys_to_delete:
del st.session_state[key]
st.session_state.welcome_msg = "What product would you like me to analyze next?"
#else:
# print(f"DEBUG : st.session_state.awaiting_selection : {st.session_state.awaiting_selection}")
st.rerun()
else:
# Disable chat input while selection is in progress
st.chat_input("Please confirm your selection above first...", disabled=True)
# Clear chat history button
if st.button("Clear Chat History"):
st.session_state.clear()
st.rerun()
# Create a wrapper function to run the async main
def run_main():
asyncio.run(main())
# Call the wrapper function in Streamlit
if __name__ == "__main__":
run_main()