pro-search-api / services /qdrant_searcher.py
vhr1007
redeploy for prod
a6cce41
raw
history blame
2.39 kB
import logging
import torch
import numpy as np
from qdrant_client import QdrantClient
from qdrant_client.http.models import Filter, FieldCondition
class QdrantSearcher:
def __init__(self, qdrant_url, access_token):
self.client = QdrantClient(url=qdrant_url, api_key=access_token)
def search_documents(self, collection_name, query_embedding, user_id, limit=3):
logging.info("Starting document search")
# Ensure the query_embedding is in the correct format (flat list of floats)
if isinstance(query_embedding, torch.Tensor):
query_embedding = query_embedding.detach().numpy().flatten().tolist()
elif isinstance(query_embedding, np.ndarray):
query_embedding = query_embedding.flatten().tolist()
else:
raise ValueError("query_embedding must be a torch.Tensor or numpy.ndarray")
# Validate that all elements in the query_vector are floats
if not all(isinstance(x, float) for x in query_embedding):
raise ValueError("All elements in query_embedding must be of type float")
# Filter by user_id
query_filter = Filter(must=[FieldCondition(key="user_id", match={"value": user_id})])
try:
hits = self.client.search(
collection_name=collection_name,
query_vector=query_embedding,
limit=limit,
query_filter=query_filter
)
except Exception as e:
logging.error(f"Error during Qdrant search: {e}")
return None, str(e)
if not hits:
logging.info("No documents found for the given query")
return None, "No documents found for the given query."
hits_list = []
for hit in hits:
hit_info = {
"id": hit.id,
"score": hit.score,
"file_id": hit.payload.get('file_id'),
"organization_id": hit.payload.get('organization_id'),
"chunk_index": hit.payload.get('chunk_index'),
"chunk_text": hit.payload.get('chunk_text'),
"s3_bucket_key": hit.payload.get('s3_bucket_key')
}
hits_list.append(hit_info)
logging.info(f"Document search completed with {len(hits_list)} hits")
logging.info(f"Hits: {hits_list}")
return hits_list, None