Spaces:
Sleeping
Sleeping
import logging | |
from fastapi import Depends, HTTPException | |
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
import jwt | |
from jwt import PyJWKClient | |
from config import JWKS_URL | |
security = HTTPBearer() | |
def get_public_key(token: str): | |
try: | |
jwks_client = PyJWKClient(JWKS_URL) | |
signing_key = jwks_client.get_signing_key_from_jwt(token) | |
return signing_key.key | |
except Exception as e: | |
logging.error(f"Error fetching public key: {e}") | |
raise | |
def token_required(credentials: HTTPAuthorizationCredentials = Depends(security)): | |
token = credentials.credentials | |
try: | |
public_key = get_public_key(token) | |
decoded = jwt.decode( | |
token, | |
public_key, | |
algorithms=['RS256'], | |
issuer="https://assuring-lobster-64.clerk.accounts.dev" | |
) | |
customer_id = decoded.get('org_id') | |
user_id = decoded.get('sub') | |
logging.info(f"Customer/Org ID: {customer_id}, User ID: {user_id}") | |
if not customer_id: | |
logging.error("Customer ID is missing in the token!") | |
raise HTTPException(status_code=401, detail="Customer ID is missing in the token!") | |
return customer_id, user_id | |
except jwt.ExpiredSignatureError: | |
logging.error("Token has expired") | |
raise HTTPException(status_code=401, detail="Token has expired") | |
except jwt.InvalidTokenError as e: | |
logging.error(f"Invalid token: {e}") | |
raise HTTPException(status_code=401, detail="Invalid token") | |
except Exception as e: | |
logging.error(f"Error decoding token: {e}") | |
raise HTTPException(status_code=401, detail=str(e)) | |