Spaces:
Running
Running
from bson import ObjectId | |
import json | |
import requests | |
from pymongo import MongoClient | |
from password import * | |
from streaksManagement import streaks_manager | |
def google_search(query, api_key, cx): | |
url = f"https://www.googleapis.com/customsearch/v1?q={query}&key={api_key}&cx={cx}" | |
response = requests.get(url) | |
if response.status_code == 200: | |
search_results = response.json() | |
print(search_results) | |
return search_results | |
else: | |
print(f"Error: {response.status_code}") | |
return None | |
def generate_embedding_for_user_resume(data,user_id): | |
from sentence_transformers import SentenceTransformer | |
model = SentenceTransformer("nomic-ai/nomic-embed-text-v1", trust_remote_code=True) | |
def get_embedding(data, precision="float32"): | |
return model.encode(data, precision=precision) | |
from pinecone import Vector | |
def create_docs_with_vector_embeddings(bson_float32, data): | |
docs = [] | |
for i, (bson_f32_emb, text) in enumerate(zip(bson_float32, data)): | |
doc =Vector( | |
id=f"{i}", | |
values= bson_f32_emb.tolist(), | |
metadata={"text":text,"user_id":user_id}, | |
) | |
docs.append(doc) | |
return docs | |
float32_embeddings = get_embedding(data, "float32") | |
docs = create_docs_with_vector_embeddings(float32_embeddings, data) | |
return docs | |
def insert_embeddings_into_pinecone_database(doc,api_key,name_space): | |
from pinecone import Pinecone | |
pc = Pinecone(api_key=api_key) | |
index_name = "resumes" | |
index = pc.Index(index_name) | |
upsert_response = index.upsert(namespace=name_space,vectors=doc) | |
return upsert_response | |
def query_vector_database(query,api_key,name_space): | |
from pinecone import Pinecone | |
from sentence_transformers import SentenceTransformer | |
model = SentenceTransformer("nomic-ai/nomic-embed-text-v1", trust_remote_code=True) | |
ret=[] | |
pc = Pinecone(api_key=api_key) | |
index_name = "resumes" | |
index = pc.Index(index_name) | |
# Define a function to generate embeddings in multiple precisions | |
def get_embedding(data, precision="float32"): | |
return model.encode(data, precision=precision) | |
query_embedding = get_embedding(query, precision="float32") | |
response = index.query( | |
namespace=name_space, | |
vector=query_embedding.tolist(), | |
top_k=5, | |
include_metadata=True | |
) | |
for doc in response['matches']: | |
ret.append(doc['metadata']['text']) | |
return ret | |
def delete_vector_namespace(name_space,api_key): | |
from pinecone import Pinecone | |
pc = Pinecone(api_key=api_key) | |
index_name = "resumes" | |
index = pc.Index(index_name) | |
response = index.delete(delete_all=True,namespace=name_space) | |
return response | |
def split_text_into_chunks(text, chunk_size=400): | |
# Split the text into words using whitespace. | |
words = text.split() | |
# Group the words into chunks of size 'chunk_size'. | |
chunks = [' '.join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)] | |
return chunks | |
def create_user(db_uri: str, db_name: str, collection_name: str, document: dict) -> str: | |
""" | |
Inserts a new document into the specified MongoDB collection. | |
Parameters: | |
db_uri (str): MongoDB connection URI. | |
db_name (str): Name of the database. | |
collection_name (str): Name of the collection. | |
document (dict): The document to insert. | |
Returns: | |
str: The ID of the inserted document. | |
""" | |
# Connect to MongoDB | |
client = MongoClient(db_uri) | |
db = client[db_name] | |
collection = db[collection_name] | |
# Insert the document | |
s = collection.find_one({"email":document.get('email')}) | |
password = hash_password(document.get('password')) | |
document['password']= password | |
if s==None: | |
result = collection.insert_one(document) | |
streaks_doc={} | |
streaks_doc['user_id'] = str(result.inserted_id) | |
streaks_manager(db_uri=db_uri,document=streaks_doc) | |
return str(result.inserted_id) | |
else: | |
client.close() | |
return False | |
# Close the connection | |
def create_questionaire(db_uri: str, db_name: str, collection_name: str, document: dict) -> str: | |
""" | |
Inserts a new document into the specified MongoDB collection. | |
Parameters: | |
db_uri (str): MongoDB connection URI. | |
db_name (str): Name of the database. | |
collection_name (str): Name of the collection. | |
document (dict): The document to insert. | |
Returns: | |
str: The ID of the inserted document. | |
""" | |
# Connect to MongoDB | |
client = MongoClient(db_uri) | |
db = client[db_name] | |
collection = db[collection_name] | |
# Insert the document | |
result= collection.find_one_and_replace(filter={"userId":document.get("userId")},replacement=document) | |
print(result) | |
if result==None: | |
result = collection.insert_one(document) | |
print(result) | |
return str(result.inserted_id) | |
client.close() | |
return str(result) | |
# Close the connection | |
def login_user(db_uri: str, db_name: str, collection_name: str, document: dict) -> str: | |
streaks_doc={} | |
""" | |
Inserts a new document into the specified MongoDB collection. | |
Parameters: | |
db_uri (str): MongoDB connection URI. | |
db_name (str): Name of the database. | |
collection_name (str): Name of the collection. | |
document (dict): The document to insert. | |
Returns: | |
str: The ID of the inserted document. | |
""" | |
# Connect to MongoDB | |
client = MongoClient(db_uri) | |
db = client[db_name] | |
collection = db[collection_name] | |
# Insert the document | |
s = collection.find_one({"email":document["email"]}) | |
print(s) | |
print(document.get('email')) | |
if s==None: | |
return False | |
else: | |
if check_password(password=document['password'],hashed_password=s['password']): | |
streaks_doc['user_id'] = str(s["_id"]) | |
streaks_manager(db_uri=db_uri,document=streaks_doc) | |
return str(s['_id']) | |
else: | |
return False | |
# Close the connection | |
from pymongo import MongoClient | |
from bson.objectid import ObjectId | |
from typing import Dict, Optional | |
def user_details_func(db_uri: str, document: Dict) -> Optional[Dict]: | |
""" | |
Retrieve and process user details from MongoDB collections. | |
Args: | |
db_uri (str): MongoDB connection URI | |
document (dict): Document containing user_id | |
Returns: | |
dict: Processed user details or None if user not found | |
""" | |
streaks_doc = {} | |
# Connect to MongoDB | |
client = MongoClient(db_uri) | |
db = client["crayonics"] | |
# Define collections | |
users_collection = db["users"] | |
streaks_collection = db["Streaks"] | |
questionaire_collection = db["Questionaire"] | |
# Find user document | |
user_id = document.get("user_id") | |
user_doc = users_collection.find_one({"_id": ObjectId(user_id)}) | |
if not user_doc: | |
return None | |
# Prepare base user document | |
user_doc['userId'] = str(user_doc['_id']) | |
user_doc.pop('_id') | |
user_doc.pop('password', None) # Use default None in case password doesn't exist | |
# Get streaks data | |
streaks_collection_doc = streaks_collection.find_one({"user_id": user_id}) | |
streaks_doc['user_id'] = user_id | |
# Call streaks_manager (assuming this function exists elsewhere) | |
streaks_manager(db_uri=db_uri, document=streaks_doc) | |
if streaks_collection_doc: | |
streaks_collection_doc.pop("_id", None) | |
streaks_collection_doc.pop("user_id", None) | |
user_doc['streak_dates'] = streaks_collection_doc.get('streak_dates', []) | |
# Try to get questionnaire data | |
questionaire_doc = questionaire_collection.find_one({"userId": user_id}) | |
if questionaire_doc: | |
print(f"in questionaire retrieval:") | |
try: | |
questionaire_doc.pop("_id", None) | |
questionaire_doc.pop("userId", None) | |
user_doc['career_questions'] = questionaire_doc | |
except Exception as e: | |
# If questionnaire fails, continue with what we have | |
print(f"Error in questionaire retrieval: {str(e)}") | |
print(questionaire_doc) | |
pass | |
client.close() | |
return user_doc | |