Spaces:
Running
Running
import datetime | |
from bson import ObjectId | |
from streaksManagement import streaks_manager | |
def isexpired(previous_date): | |
# Get the current date and time | |
current_date =datetime.datetime.now() | |
# Convert the previous date (which is a string) to a datetime object | |
# Compare the two dates | |
if current_date > previous_date: | |
return True | |
else: | |
return False | |
def create_accessToken(db_uri: str, user_id: str, refresh_token: str) -> str: | |
from pymongo import MongoClient | |
current_time = datetime.datetime.now() | |
expire_at = current_time + datetime.timedelta(minutes=130) | |
""" | |
Inserts a new document into the specified MongoDB collection. | |
Parameters: | |
db_uri (str): MongoDB connection URI. | |
db_name (str): Name of the database. | |
collection_name (str): Name of the collection. | |
document (dict): The document to insert. | |
Returns: | |
str: The ID of the inserted document. | |
""" | |
# Connect to MongoDB | |
client = MongoClient(db_uri) | |
db = client["crayonics"] | |
collection = db["AccessToken"] | |
collection.find_one_and_delete({"refresh_token":refresh_token}) | |
# Insert the document | |
result = collection.insert_one({"user_id":user_id,"refresh_token":refresh_token,"current_time":current_time,"expire_at":expire_at}) | |
client.close() | |
return str(result.inserted_id) | |
# Close the connection | |
def create_refreshToken(db_uri: str, user_id: str) -> str: | |
from pymongo import MongoClient | |
current_time = datetime.datetime.now() | |
expire_at = current_time + datetime.timedelta(days=30) | |
""" | |
Inserts a new document into the specified MongoDB collection. | |
Parameters: | |
db_uri (str): MongoDB connection URI. | |
user_id (str): id of user . | |
Returns: | |
str: The ID of the inserted document. | |
""" | |
# Connect to MongoDB | |
client = MongoClient(db_uri) | |
db = client["crayonics"] | |
collection = db["RefreshToken"] | |
# Insert the document | |
result = collection.insert_one({"user_id":user_id,"current_time":current_time,"expire_at":expire_at,"previous_access_token":"None"}) | |
streaks_doc={} | |
streaks_doc['user_id'] = str(user_id) | |
streaks_manager(db_uri=db_uri,document=streaks_doc) | |
client.close() | |
return str(result.inserted_id) | |
# Close the connection | |
def update_refreshTokenWithPreviouslyUsedAccessToken(db_uri: str, refresh_token: str,access_token:str) -> bool: | |
from pymongo import MongoClient | |
""" | |
""" | |
# Connect to MongoDB | |
client = MongoClient(db_uri) | |
db = client["crayonics"] | |
collection = db["RefreshToken"] | |
# Insert the document | |
try: | |
collection.update_one( | |
{"_id":ObjectId(oid=refresh_token) }, # Filter (find the document by user_id) | |
{"$set": {"previous_access_token": access_token}} # Add or update the field | |
) | |
client.close() | |
return True | |
except: | |
return False | |
from pymongo import MongoClient | |
def verify_access_token(db_uri: str, user_id: str, access_token: str) -> bool: | |
current_time = datetime.datetime.now() | |
expire_at = current_time + datetime.timedelta(minutes=15) | |
""" | |
""" | |
# Connect to MongoDB | |
client = MongoClient(db_uri) | |
db = client["crayonics"] | |
collection = db["AccessToken"] | |
docs = collection.find({"user_id":user_id}) | |
for doc in docs: | |
if doc==None: | |
return False | |
else: | |
if str(doc['_id']) == access_token: | |
if isexpired(doc['expire_at'])!=False: | |
streaks_doc={} | |
streaks_doc['user_id'] = str(user_id) | |
streaks_manager(db_uri=db_uri,document=streaks_doc) | |
pass | |
else: | |
streaks_doc={} | |
streaks_doc['user_id'] = str(user_id) | |
streaks_manager(db_uri=db_uri,document=streaks_doc) | |
return True | |
else: | |
streaks_doc={} | |
streaks_doc['user_id'] = str(user_id) | |
streaks_manager(db_uri=db_uri,document=streaks_doc) | |
pass | |
return False | |
def verify_refresh_access_token(db_uri: str, user_id: str, access_token: str,refresh_token:str) -> bool: | |
current_time = datetime.datetime.now() | |
""" | |
""" | |
# Connect to MongoDB | |
client = MongoClient(db_uri) | |
db = client["crayonics"] | |
collection = db["RefreshToken"] | |
docs = collection.find({"_id":ObjectId(refresh_token),"user_id":user_id,"previous_access_token":access_token}) | |
for doc in docs: | |
if doc==None: | |
return False | |
else: | |
if str(doc['previous_access_token']) == access_token: | |
streaks_doc={} | |
streaks_doc['user_id'] = str(user_id) | |
streaks_manager(db_uri=db_uri,document=streaks_doc) | |
return True | |
else: | |
streaks_doc={} | |
streaks_doc['user_id'] = str(user_id) | |
streaks_manager(db_uri=db_uri,document=streaks_doc) | |
pass | |
return False | |
def logout_func(db_uri: str, refresh_token: str) -> str: | |
from pymongo import MongoClient | |
current_time = datetime.datetime.now() | |
expire_at = current_time + datetime.timedelta(days=30) | |
""" | |
Inserts a new document into the specified MongoDB collection. | |
Parameters: | |
db_uri (str): MongoDB connection URI. | |
user_id (str): id of user . | |
Returns: | |
str: The ID of the inserted document. | |
""" | |
# Connect to MongoDB | |
client = MongoClient(db_uri) | |
db = client["crayonics"] | |
collection = db["RefreshToken"] | |
# Insert the document | |
result = collection.find_one_and_delete(filter={"_id":ObjectId(refresh_token)}) | |
print(result) | |
if result==None: | |
return result | |
return True | |
# Close the connection | |