Spaces:
Sleeping
Sleeping
File size: 2,390 Bytes
500c1ba 567e7ba 500c1ba 567e7ba 500c1ba 567e7ba 500c1ba 567e7ba a80ee03 567e7ba a80ee03 567e7ba a80ee03 567e7ba 500c1ba 567e7ba 500c1ba a6cce41 500c1ba |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
import logging
import torch
import numpy as np
from qdrant_client import QdrantClient
from qdrant_client.http.models import Filter, FieldCondition
class QdrantSearcher:
def __init__(self, qdrant_url, access_token):
self.client = QdrantClient(url=qdrant_url, api_key=access_token)
def search_documents(self, collection_name, query_embedding, user_id, limit=3):
logging.info("Starting document search")
# Ensure the query_embedding is in the correct format (flat list of floats)
if isinstance(query_embedding, torch.Tensor):
query_embedding = query_embedding.detach().numpy().flatten().tolist()
elif isinstance(query_embedding, np.ndarray):
query_embedding = query_embedding.flatten().tolist()
else:
raise ValueError("query_embedding must be a torch.Tensor or numpy.ndarray")
# Validate that all elements in the query_vector are floats
if not all(isinstance(x, float) for x in query_embedding):
raise ValueError("All elements in query_embedding must be of type float")
# Filter by user_id
query_filter = Filter(must=[FieldCondition(key="user_id", match={"value": user_id})])
try:
hits = self.client.search(
collection_name=collection_name,
query_vector=query_embedding,
limit=limit,
query_filter=query_filter
)
except Exception as e:
logging.error(f"Error during Qdrant search: {e}")
return None, str(e)
if not hits:
logging.info("No documents found for the given query")
return None, "No documents found for the given query."
hits_list = []
for hit in hits:
hit_info = {
"id": hit.id,
"score": hit.score,
"file_id": hit.payload.get('file_id'),
"organization_id": hit.payload.get('organization_id'),
"chunk_index": hit.payload.get('chunk_index'),
"chunk_text": hit.payload.get('chunk_text'),
"s3_bucket_key": hit.payload.get('s3_bucket_key')
}
hits_list.append(hit_info)
logging.info(f"Document search completed with {len(hits_list)} hits")
logging.info(f"Hits: {hits_list}")
return hits_list, None
|