Spaces:
Running
Running
from pymongo import MongoClient | |
from datetime import datetime, timedelta | |
from typing import Dict, List, Union | |
def get_current_date() -> str: | |
# Get the current date and return it in YYYY-MM-DD format | |
return datetime.now().date().isoformat() | |
def get_another_date(day: int) -> str: | |
# Get the current date and add specified days to it | |
tomorrow = datetime.now() + timedelta(days=day) | |
return tomorrow.date().isoformat() | |
def check_date_streak(start_date: str, end_date: str) -> bool: | |
# Convert ISO strings to datetime objects | |
start_date = datetime.fromisoformat(start_date) | |
end_date = datetime.fromisoformat(end_date) | |
# Compare the dates to check if they are consecutive (1 day apart) | |
if end_date - start_date == timedelta(days=0) or end_date - start_date == timedelta(days=1): | |
return True | |
else: | |
return False | |
def save_in_streaks_history(db_uri,document): | |
client = MongoClient(db_uri) | |
document.pop("_id",None) | |
print(document) | |
db = client["crayonics"] | |
collection = db["StreaksHistory"] | |
# Check for existing user | |
found_user = collection.find_one({"user_id": document['user_id']}) | |
current_date = get_current_date() | |
# try: | |
if found_user is None: | |
# New user case | |
print("added new streak reset record") | |
document["streaks_records"] = [{"date_of_streak_reset":current_date,"streak_dates":document["streak_dates"]}] | |
document.pop("streak_dates") | |
collection.insert_one(document) | |
return True | |
else: | |
print("added another streak reset record") | |
current_record={"date_of_streaks_reset":current_date,"streak_dates":document["streak_dates"]} | |
dates = found_user["streaks_records"] + [current_record] | |
collection.update_one( | |
{"user_id": document.get("user_id")}, | |
{"$set": {"streaks_records": dates}} | |
) | |
return True | |
# except Exception as e: | |
# print(f"Error in creating a streaks reset record: {str(e)}") | |
# print(document,found_user) | |
# return False | |
# finally: | |
# client.close() | |
def streaks_manager(db_uri: str, document: Dict) -> Union[bool, str]: | |
""" | |
Manage user streaks in MongoDB. | |
Args: | |
db_uri: MongoDB connection string | |
document: Dictionary containing user data with 'user_id' key | |
Returns: | |
Union[bool, str]: True if successful, "User Already Exists" if streak is reset | |
""" | |
client = MongoClient(db_uri) | |
try: | |
db = client["crayonics"] | |
collection = db["Streaks"] | |
# Check for existing user | |
found_user = collection.find_one({"user_id": document.get('user_id')}) | |
current_date = get_current_date() | |
document['streak_dates'] = [current_date] | |
if found_user is None: | |
# New user case | |
collection.insert_one(document) | |
return True | |
else: | |
is_a_streak = check_date_streak( | |
start_date=found_user['streak_dates'][-1], | |
end_date=current_date | |
) | |
if is_a_streak: | |
print("its a streak guys") | |
# Extend existing streak | |
if not (current_date== found_user["streak_dates"][-1]): | |
dates = found_user["streak_dates"] + [current_date] | |
else: | |
dates=found_user["streak_dates"] | |
collection.update_one( | |
{"user_id": document.get("user_id")}, | |
{"$set": {"streak_dates": dates}} | |
) | |
return True | |
else: | |
save_in_streaks_history(db_uri=db_uri,document=found_user) | |
# Reset streak if not consecutive | |
collection.find_one_and_replace( | |
{"user_id": document.get('user_id')}, | |
document | |
) | |
return "User Already Exists" | |
except Exception as e: | |
print(f"Error: {str(e)}") | |
raise | |
finally: | |
client.close() | |