resume-api / utils.py
Nattyboi's picture
updated util.py
d831147
from bson import ObjectId
import json
import requests
from pymongo import MongoClient
from password import *
from streaksManagement import streaks_manager
def google_search(query, api_key, cx):
url = f"https://www.googleapis.com/customsearch/v1?q={query}&key={api_key}&cx={cx}"
response = requests.get(url)
if response.status_code == 200:
search_results = response.json()
print(search_results)
return search_results
else:
print(f"Error: {response.status_code}")
return None
def generate_embedding_for_user_resume(data,user_id):
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("nomic-ai/nomic-embed-text-v1", trust_remote_code=True)
def get_embedding(data, precision="float32"):
return model.encode(data, precision=precision)
from pinecone import Vector
def create_docs_with_vector_embeddings(bson_float32, data):
docs = []
for i, (bson_f32_emb, text) in enumerate(zip(bson_float32, data)):
doc =Vector(
id=f"{i}",
values= bson_f32_emb.tolist(),
metadata={"text":text,"user_id":user_id},
)
docs.append(doc)
return docs
float32_embeddings = get_embedding(data, "float32")
docs = create_docs_with_vector_embeddings(float32_embeddings, data)
return docs
def insert_embeddings_into_pinecone_database(doc,api_key,name_space):
from pinecone import Pinecone
pc = Pinecone(api_key=api_key)
index_name = "resumes"
index = pc.Index(index_name)
upsert_response = index.upsert(namespace=name_space,vectors=doc)
return upsert_response
def query_vector_database(query,api_key,name_space):
from pinecone import Pinecone
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("nomic-ai/nomic-embed-text-v1", trust_remote_code=True)
ret=[]
pc = Pinecone(api_key=api_key)
index_name = "resumes"
index = pc.Index(index_name)
# Define a function to generate embeddings in multiple precisions
def get_embedding(data, precision="float32"):
return model.encode(data, precision=precision)
query_embedding = get_embedding(query, precision="float32")
response = index.query(
namespace=name_space,
vector=query_embedding.tolist(),
top_k=5,
include_metadata=True
)
for doc in response['matches']:
ret.append(doc['metadata']['text'])
return ret
def delete_vector_namespace(name_space,api_key):
from pinecone import Pinecone
pc = Pinecone(api_key=api_key)
index_name = "resumes"
index = pc.Index(index_name)
response = index.delete(delete_all=True,namespace=name_space)
return response
def split_text_into_chunks(text, chunk_size=400):
# Split the text into words using whitespace.
words = text.split()
# Group the words into chunks of size 'chunk_size'.
chunks = [' '.join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)]
return chunks
def create_user(db_uri: str, db_name: str, collection_name: str, document: dict) -> str:
"""
Inserts a new document into the specified MongoDB collection.
Parameters:
db_uri (str): MongoDB connection URI.
db_name (str): Name of the database.
collection_name (str): Name of the collection.
document (dict): The document to insert.
Returns:
str: The ID of the inserted document.
"""
# Connect to MongoDB
client = MongoClient(db_uri)
db = client[db_name]
collection = db[collection_name]
# Insert the document
s = collection.find_one({"email":document.get('email')})
password = hash_password(document.get('password'))
document['password']= password
if s==None:
result = collection.insert_one(document)
streaks_doc={}
streaks_doc['user_id'] = str(result.inserted_id)
streaks_manager(db_uri=db_uri,document=streaks_doc)
return str(result.inserted_id)
else:
client.close()
return False
# Close the connection
def create_questionaire(db_uri: str, db_name: str, collection_name: str, document: dict) -> str:
"""
Inserts a new document into the specified MongoDB collection.
Parameters:
db_uri (str): MongoDB connection URI.
db_name (str): Name of the database.
collection_name (str): Name of the collection.
document (dict): The document to insert.
Returns:
str: The ID of the inserted document.
"""
# Connect to MongoDB
client = MongoClient(db_uri)
db = client[db_name]
collection = db[collection_name]
# Insert the document
result= collection.find_one_and_replace(filter={"userId":document.get("userId")},replacement=document)
print(result)
if result==None:
result = collection.insert_one(document)
print(result)
return str(result.inserted_id)
client.close()
return str(result)
# Close the connection
def login_user(db_uri: str, db_name: str, collection_name: str, document: dict) -> str:
streaks_doc={}
"""
Inserts a new document into the specified MongoDB collection.
Parameters:
db_uri (str): MongoDB connection URI.
db_name (str): Name of the database.
collection_name (str): Name of the collection.
document (dict): The document to insert.
Returns:
str: The ID of the inserted document.
"""
# Connect to MongoDB
client = MongoClient(db_uri)
db = client[db_name]
collection = db[collection_name]
# Insert the document
s = collection.find_one({"email":document["email"]})
print(s)
print(document.get('email'))
if s==None:
return False
else:
if check_password(password=document['password'],hashed_password=s['password']):
streaks_doc['user_id'] = str(s["_id"])
streaks_manager(db_uri=db_uri,document=streaks_doc)
return str(s['_id'])
else:
return False
# Close the connection
from pymongo import MongoClient
from bson.objectid import ObjectId
from typing import Dict, Optional
def user_details_func(db_uri: str, document: Dict) -> Optional[Dict]:
"""
Retrieve and process user details from MongoDB collections.
Args:
db_uri (str): MongoDB connection URI
document (dict): Document containing user_id
Returns:
dict: Processed user details or None if user not found
"""
streaks_doc = {}
# Connect to MongoDB
client = MongoClient(db_uri)
db = client["crayonics"]
# Define collections
users_collection = db["users"]
streaks_collection = db["Streaks"]
questionaire_collection = db["Questionaire"]
# Find user document
user_id = document.get("user_id")
user_doc = users_collection.find_one({"_id": ObjectId(user_id)})
if not user_doc:
return None
# Prepare base user document
user_doc['userId'] = str(user_doc['_id'])
user_doc.pop('_id')
user_doc.pop('password', None) # Use default None in case password doesn't exist
# Get streaks data
streaks_collection_doc = streaks_collection.find_one({"user_id": user_id})
streaks_doc['user_id'] = user_id
# Call streaks_manager (assuming this function exists elsewhere)
streaks_manager(db_uri=db_uri, document=streaks_doc)
if streaks_collection_doc:
streaks_collection_doc.pop("_id", None)
streaks_collection_doc.pop("user_id", None)
user_doc['streak_dates'] = streaks_collection_doc.get('streak_dates', [])
# Try to get questionnaire data
questionaire_doc = questionaire_collection.find_one({"userId": user_id})
if questionaire_doc:
print(f"in questionaire retrieval:")
try:
questionaire_doc.pop("_id", None)
questionaire_doc.pop("userId", None)
user_doc['career_questions'] = questionaire_doc
except Exception as e:
# If questionnaire fails, continue with what we have
print(f"Error in questionaire retrieval: {str(e)}")
print(questionaire_doc)
pass
client.close()
return user_doc