from bson import ObjectId import json import requests from pymongo import MongoClient from password import * from streaksManagement import streaks_manager def google_search(query, api_key, cx): url = f"https://www.googleapis.com/customsearch/v1?q={query}&key={api_key}&cx={cx}" response = requests.get(url) if response.status_code == 200: search_results = response.json() print(search_results) return search_results else: print(f"Error: {response.status_code}") return None def generate_embedding_for_user_resume(data,user_id): from sentence_transformers import SentenceTransformer model = SentenceTransformer("nomic-ai/nomic-embed-text-v1", trust_remote_code=True) def get_embedding(data, precision="float32"): return model.encode(data, precision=precision) from pinecone import Vector def create_docs_with_vector_embeddings(bson_float32, data): docs = [] for i, (bson_f32_emb, text) in enumerate(zip(bson_float32, data)): doc =Vector( id=f"{i}", values= bson_f32_emb.tolist(), metadata={"text":text,"user_id":user_id}, ) docs.append(doc) return docs float32_embeddings = get_embedding(data, "float32") docs = create_docs_with_vector_embeddings(float32_embeddings, data) return docs def insert_embeddings_into_pinecone_database(doc,api_key,name_space): from pinecone import Pinecone pc = Pinecone(api_key=api_key) index_name = "resumes" index = pc.Index(index_name) upsert_response = index.upsert(namespace=name_space,vectors=doc) return upsert_response def query_vector_database(query,api_key,name_space): from pinecone import Pinecone from sentence_transformers import SentenceTransformer model = SentenceTransformer("nomic-ai/nomic-embed-text-v1", trust_remote_code=True) ret=[] pc = Pinecone(api_key=api_key) index_name = "resumes" index = pc.Index(index_name) # Define a function to generate embeddings in multiple precisions def get_embedding(data, precision="float32"): return model.encode(data, precision=precision) query_embedding = get_embedding(query, precision="float32") response = index.query( namespace=name_space, vector=query_embedding.tolist(), top_k=5, include_metadata=True ) for doc in response['matches']: ret.append(doc['metadata']['text']) return ret def delete_vector_namespace(name_space,api_key): from pinecone import Pinecone pc = Pinecone(api_key=api_key) index_name = "resumes" index = pc.Index(index_name) response = index.delete(delete_all=True,namespace=name_space) return response def split_text_into_chunks(text, chunk_size=400): # Split the text into words using whitespace. words = text.split() # Group the words into chunks of size 'chunk_size'. chunks = [' '.join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)] return chunks def create_user(db_uri: str, db_name: str, collection_name: str, document: dict) -> str: """ Inserts a new document into the specified MongoDB collection. Parameters: db_uri (str): MongoDB connection URI. db_name (str): Name of the database. collection_name (str): Name of the collection. document (dict): The document to insert. Returns: str: The ID of the inserted document. """ # Connect to MongoDB client = MongoClient(db_uri) db = client[db_name] collection = db[collection_name] # Insert the document s = collection.find_one({"email":document.get('email')}) password = hash_password(document.get('password')) document['password']= password if s==None: result = collection.insert_one(document) streaks_doc={} streaks_doc['user_id'] = str(result.inserted_id) streaks_manager(db_uri=db_uri,document=streaks_doc) return str(result.inserted_id) else: client.close() return False # Close the connection def create_questionaire(db_uri: str, db_name: str, collection_name: str, document: dict) -> str: """ Inserts a new document into the specified MongoDB collection. Parameters: db_uri (str): MongoDB connection URI. db_name (str): Name of the database. collection_name (str): Name of the collection. document (dict): The document to insert. Returns: str: The ID of the inserted document. """ # Connect to MongoDB client = MongoClient(db_uri) db = client[db_name] collection = db[collection_name] # Insert the document result= collection.find_one_and_replace(filter={"userId":document.get("userId")},replacement=document) print(result) if result==None: result = collection.insert_one(document) print(result) return str(result.inserted_id) client.close() return str(result) # Close the connection def login_user(db_uri: str, db_name: str, collection_name: str, document: dict) -> str: streaks_doc={} """ Inserts a new document into the specified MongoDB collection. Parameters: db_uri (str): MongoDB connection URI. db_name (str): Name of the database. collection_name (str): Name of the collection. document (dict): The document to insert. Returns: str: The ID of the inserted document. """ # Connect to MongoDB client = MongoClient(db_uri) db = client[db_name] collection = db[collection_name] # Insert the document s = collection.find_one({"email":document["email"]}) print(s) print(document.get('email')) if s==None: return False else: if check_password(password=document['password'],hashed_password=s['password']): streaks_doc['user_id'] = str(s["_id"]) streaks_manager(db_uri=db_uri,document=streaks_doc) return str(s['_id']) else: return False # Close the connection from pymongo import MongoClient from bson.objectid import ObjectId from typing import Dict, Optional def user_details_func(db_uri: str, document: Dict) -> Optional[Dict]: """ Retrieve and process user details from MongoDB collections. Args: db_uri (str): MongoDB connection URI document (dict): Document containing user_id Returns: dict: Processed user details or None if user not found """ streaks_doc = {} # Connect to MongoDB client = MongoClient(db_uri) db = client["crayonics"] # Define collections users_collection = db["users"] streaks_collection = db["Streaks"] questionaire_collection = db["Questionaire"] # Find user document user_id = document.get("user_id") user_doc = users_collection.find_one({"_id": ObjectId(user_id)}) if not user_doc: return None # Prepare base user document user_doc['userId'] = str(user_doc['_id']) user_doc.pop('_id') user_doc.pop('password', None) # Use default None in case password doesn't exist # Get streaks data streaks_collection_doc = streaks_collection.find_one({"user_id": user_id}) streaks_doc['user_id'] = user_id # Call streaks_manager (assuming this function exists elsewhere) streaks_manager(db_uri=db_uri, document=streaks_doc) if streaks_collection_doc: streaks_collection_doc.pop("_id", None) streaks_collection_doc.pop("user_id", None) user_doc['streak_dates'] = streaks_collection_doc.get('streak_dates', []) # Try to get questionnaire data questionaire_doc = questionaire_collection.find_one({"userId": user_id}) if questionaire_doc: print(f"in questionaire retrieval:") try: questionaire_doc.pop("_id", None) questionaire_doc.pop("userId", None) user_doc['career_questions'] = questionaire_doc except Exception as e: # If questionnaire fails, continue with what we have print(f"Error in questionaire retrieval: {str(e)}") print(questionaire_doc) pass client.close() return user_doc