# app.py import os from fastapi import FastAPI from pydantic import BaseModel from typing import List import requests from PIL import Image, UnidentifiedImageError import numpy as np from encoder import FashionCLIPEncoder from pinecone import Pinecone from dotenv import load_dotenv # Load environment variables load_dotenv() # Initialize FastAPI app app = FastAPI() # Constants PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX_NAME") PINECONE_NAMESPACE = os.getenv("PINECONE_NAMESPACE") REQUESTS_HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } BATCH_SIZE = 30 # Initialize services pc = Pinecone(api_key=PINECONE_API_KEY) index = pc.Index(PINECONE_INDEX_NAME) encoder = FashionCLIPEncoder() class ProductData(BaseModel): product_id: str url: str class ProcessRequest(BaseModel): products: List[ProductData] upload_to_pinecone: bool = True def download_image_as_pil(url: str, timeout: int = 10) -> Image.Image: try: response = requests.get(url, stream=True, headers=REQUESTS_HEADERS, timeout=timeout) if response.status_code == 200 and 'image' in response.headers.get('Content-Type', ''): try: return Image.open(response.raw).convert("RGB") except UnidentifiedImageError: print(f"Unidentified image file from URL: {url}") return None except Exception as e: print(f"Error downloading image: {e}") return None def process_batch(batch_products, batch_images, results): try: # Generate embeddings embeddings = encoder.encode_images(batch_images) for product, embedding in zip(batch_products, embeddings): # Normalize embedding embedding_normalized = embedding / np.linalg.norm(embedding) # Append results results.append({ "product_id": product["product_id"], "image_url": product["url"], "embedding": embedding_normalized.tolist(), "embedding_preview": embedding_normalized[:5].tolist(), # First 5 values for preview "success": True }) except Exception as e: for product in batch_products: results.append({ "product_id": product["product_id"], "image_url": product["url"], "error": str(e) }) def batch_process_images(products): if not products: return {"error": "No products provided."} results = [] batch_products, batch_images = [], [] for product in products: try: # Download image image = download_image_as_pil(product["url"]) if not image: results.append({ "product_id": product["product_id"], "image_url": product["url"], "error": "Failed to download image" }) continue batch_products.append(product) batch_images.append(image) # Process batch when reaching batch size if len(batch_images) == BATCH_SIZE: process_batch(batch_products, batch_images, results) batch_products, batch_images = [], [] except Exception as e: results.append({ "product_id": product["product_id"], "image_url": product["url"], "error": str(e) }) # Process remaining images in the last batch if batch_images: process_batch(batch_products, batch_images, results) return results def upload_to_pinecone(processed_results): """Upload embeddings to Pinecone""" vectors_to_upsert = [] for result in processed_results: if 'error' not in result and 'embedding' in result: vector = { 'id': result['product_id'], 'values': result['embedding'], 'metadata': { 'image_url': result['image_url'] } } vectors_to_upsert.append(vector) if vectors_to_upsert: index.upsert(vectors=vectors_to_upsert, namespace=PINECONE_NAMESPACE) return {"uploaded_count": len(vectors_to_upsert)} @app.post("/process") async def process_images(request: ProcessRequest): """ Process product images and optionally upload their embeddings to Pinecone Parameters: - products: List of products with product_id and url - upload_to_pinecone: Boolean flag to determine if embeddings should be uploaded to Pinecone """ # Convert products to list of dicts products_data = [{"product_id": p.product_id, "url": p.url} for p in request.products] # Process images results = batch_process_images(products_data) # Upload to Pinecone if requested if request.upload_to_pinecone: upload_result = upload_to_pinecone(results) return { "processing_results": results, "pinecone_upload": upload_result } return {"processing_results": results} @app.get("/health") async def health_check(): return {"status": "healthy"} # if __name__ == "__main__": # import uvicorn # uvicorn.run(app, host="0.0.0.0", port=8000)