Spaces:
Sleeping
Sleeping
# app.py | |
import os | |
from fastapi import FastAPI | |
from pydantic import BaseModel | |
from typing import List | |
import requests | |
from PIL import Image, UnidentifiedImageError | |
import numpy as np | |
from encoder import FashionCLIPEncoder | |
from pinecone import Pinecone | |
from dotenv import load_dotenv | |
# Load environment variables | |
load_dotenv() | |
# Initialize FastAPI app | |
app = FastAPI() | |
# Constants | |
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") | |
PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX_NAME") | |
PINECONE_NAMESPACE = os.getenv("PINECONE_NAMESPACE") | |
REQUESTS_HEADERS = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
} | |
BATCH_SIZE = 30 | |
# Initialize services | |
pc = Pinecone(api_key=PINECONE_API_KEY) | |
index = pc.Index(PINECONE_INDEX_NAME) | |
encoder = FashionCLIPEncoder() | |
class ProductData(BaseModel): | |
product_id: str | |
url: str | |
class ProcessRequest(BaseModel): | |
products: List[ProductData] | |
upload_to_pinecone: bool = True | |
def download_image_as_pil(url: str, timeout: int = 10) -> Image.Image: | |
try: | |
response = requests.get(url, stream=True, headers=REQUESTS_HEADERS, timeout=timeout) | |
if response.status_code == 200 and 'image' in response.headers.get('Content-Type', ''): | |
try: | |
return Image.open(response.raw).convert("RGB") | |
except UnidentifiedImageError: | |
print(f"Unidentified image file from URL: {url}") | |
return None | |
except Exception as e: | |
print(f"Error downloading image: {e}") | |
return None | |
def process_batch(batch_products, batch_images, results): | |
try: | |
# Generate embeddings | |
embeddings = encoder.encode_images(batch_images) | |
for product, embedding in zip(batch_products, embeddings): | |
# Normalize embedding | |
embedding_normalized = embedding / np.linalg.norm(embedding) | |
# Append results | |
results.append({ | |
"product_id": product["product_id"], | |
"image_url": product["url"], | |
"embedding": embedding_normalized.tolist(), | |
"embedding_preview": embedding_normalized[:5].tolist(), # First 5 values for preview | |
"success": True | |
}) | |
except Exception as e: | |
for product in batch_products: | |
results.append({ | |
"product_id": product["product_id"], | |
"image_url": product["url"], | |
"error": str(e) | |
}) | |
def batch_process_images(products): | |
if not products: | |
return {"error": "No products provided."} | |
results = [] | |
batch_products, batch_images = [], [] | |
for product in products: | |
try: | |
# Download image | |
image = download_image_as_pil(product["url"]) | |
if not image: | |
results.append({ | |
"product_id": product["product_id"], | |
"image_url": product["url"], | |
"error": "Failed to download image" | |
}) | |
continue | |
batch_products.append(product) | |
batch_images.append(image) | |
# Process batch when reaching batch size | |
if len(batch_images) == BATCH_SIZE: | |
process_batch(batch_products, batch_images, results) | |
batch_products, batch_images = [], [] | |
except Exception as e: | |
results.append({ | |
"product_id": product["product_id"], | |
"image_url": product["url"], | |
"error": str(e) | |
}) | |
# Process remaining images in the last batch | |
if batch_images: | |
process_batch(batch_products, batch_images, results) | |
return results | |
def upload_to_pinecone(processed_results): | |
"""Upload embeddings to Pinecone""" | |
vectors_to_upsert = [] | |
for result in processed_results: | |
if 'error' not in result and 'embedding' in result: | |
vector = { | |
'id': result['product_id'], | |
'values': result['embedding'], | |
'metadata': { | |
'image_url': result['image_url'] | |
} | |
} | |
vectors_to_upsert.append(vector) | |
if vectors_to_upsert: | |
index.upsert(vectors=vectors_to_upsert, namespace=PINECONE_NAMESPACE) | |
return {"uploaded_count": len(vectors_to_upsert)} | |
async def process_images(request: ProcessRequest): | |
""" | |
Process product images and optionally upload their embeddings to Pinecone | |
Parameters: | |
- products: List of products with product_id and url | |
- upload_to_pinecone: Boolean flag to determine if embeddings should be uploaded to Pinecone | |
""" | |
# Convert products to list of dicts | |
products_data = [{"product_id": p.product_id, "url": p.url} for p in request.products] | |
# Process images | |
results = batch_process_images(products_data) | |
# Upload to Pinecone if requested | |
if request.upload_to_pinecone: | |
upload_result = upload_to_pinecone(results) | |
return { | |
"processing_results": results, | |
"pinecone_upload": upload_result | |
} | |
return {"processing_results": results} | |
async def health_check(): | |
return {"status": "healthy"} | |
# if __name__ == "__main__": | |
# import uvicorn | |
# uvicorn.run(app, host="0.0.0.0", port=8000) |