openFashionClip / app.py
im
ok
0fcf1c4
raw
history blame
5.45 kB
# app.py
import os
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
import requests
from PIL import Image, UnidentifiedImageError
import numpy as np
from encoder import FashionCLIPEncoder
from pinecone import Pinecone
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Initialize FastAPI app
app = FastAPI()
# Constants
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX_NAME")
PINECONE_NAMESPACE = os.getenv("PINECONE_NAMESPACE")
REQUESTS_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
BATCH_SIZE = 30
# Initialize services
pc = Pinecone(api_key=PINECONE_API_KEY)
index = pc.Index(PINECONE_INDEX_NAME)
encoder = FashionCLIPEncoder()
class ProductData(BaseModel):
product_id: str
url: str
class ProcessRequest(BaseModel):
products: List[ProductData]
upload_to_pinecone: bool = True
def download_image_as_pil(url: str, timeout: int = 10) -> Image.Image:
try:
response = requests.get(url, stream=True, headers=REQUESTS_HEADERS, timeout=timeout)
if response.status_code == 200 and 'image' in response.headers.get('Content-Type', ''):
try:
return Image.open(response.raw).convert("RGB")
except UnidentifiedImageError:
print(f"Unidentified image file from URL: {url}")
return None
except Exception as e:
print(f"Error downloading image: {e}")
return None
def process_batch(batch_products, batch_images, results):
try:
# Generate embeddings
embeddings = encoder.encode_images(batch_images)
for product, embedding in zip(batch_products, embeddings):
# Normalize embedding
embedding_normalized = embedding / np.linalg.norm(embedding)
# Append results
results.append({
"product_id": product["product_id"],
"image_url": product["url"],
"embedding": embedding_normalized.tolist(),
"embedding_preview": embedding_normalized[:5].tolist(), # First 5 values for preview
"success": True
})
except Exception as e:
for product in batch_products:
results.append({
"product_id": product["product_id"],
"image_url": product["url"],
"error": str(e)
})
def batch_process_images(products):
if not products:
return {"error": "No products provided."}
results = []
batch_products, batch_images = [], []
for product in products:
try:
# Download image
image = download_image_as_pil(product["url"])
if not image:
results.append({
"product_id": product["product_id"],
"image_url": product["url"],
"error": "Failed to download image"
})
continue
batch_products.append(product)
batch_images.append(image)
# Process batch when reaching batch size
if len(batch_images) == BATCH_SIZE:
process_batch(batch_products, batch_images, results)
batch_products, batch_images = [], []
except Exception as e:
results.append({
"product_id": product["product_id"],
"image_url": product["url"],
"error": str(e)
})
# Process remaining images in the last batch
if batch_images:
process_batch(batch_products, batch_images, results)
return results
def upload_to_pinecone(processed_results):
"""Upload embeddings to Pinecone"""
vectors_to_upsert = []
for result in processed_results:
if 'error' not in result and 'embedding' in result:
vector = {
'id': result['product_id'],
'values': result['embedding'],
'metadata': {
'image_url': result['image_url']
}
}
vectors_to_upsert.append(vector)
if vectors_to_upsert:
index.upsert(vectors=vectors_to_upsert, namespace=PINECONE_NAMESPACE)
return {"uploaded_count": len(vectors_to_upsert)}
@app.post("/process")
async def process_images(request: ProcessRequest):
"""
Process product images and optionally upload their embeddings to Pinecone
Parameters:
- products: List of products with product_id and url
- upload_to_pinecone: Boolean flag to determine if embeddings should be uploaded to Pinecone
"""
# Convert products to list of dicts
products_data = [{"product_id": p.product_id, "url": p.url} for p in request.products]
# Process images
results = batch_process_images(products_data)
# Upload to Pinecone if requested
if request.upload_to_pinecone:
upload_result = upload_to_pinecone(results)
return {
"processing_results": results,
"pinecone_upload": upload_result
}
return {"processing_results": results}
@app.get("/health")
async def health_check():
return {"status": "healthy"}
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=8000)