Spaces:
Sleeping
Sleeping
File size: 1,687 Bytes
500c1ba |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
import logging
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from jwt import PyJWKClient
from config import JWKS_URL
security = HTTPBearer()
def get_public_key(token: str):
try:
jwks_client = PyJWKClient(JWKS_URL)
signing_key = jwks_client.get_signing_key_from_jwt(token)
return signing_key.key
except Exception as e:
logging.error(f"Error fetching public key: {e}")
raise
def token_required(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
try:
public_key = get_public_key(token)
decoded = jwt.decode(
token,
public_key,
algorithms=['RS256'],
issuer="https://assuring-lobster-64.clerk.accounts.dev"
)
customer_id = decoded.get('org_id')
user_id = decoded.get('sub')
logging.info(f"Customer/Org ID: {customer_id}, User ID: {user_id}")
if not customer_id:
logging.error("Customer ID is missing in the token!")
raise HTTPException(status_code=401, detail="Customer ID is missing in the token!")
return customer_id, user_id
except jwt.ExpiredSignatureError:
logging.error("Token has expired")
raise HTTPException(status_code=401, detail="Token has expired")
except jwt.InvalidTokenError as e:
logging.error(f"Invalid token: {e}")
raise HTTPException(status_code=401, detail="Invalid token")
except Exception as e:
logging.error(f"Error decoding token: {e}")
raise HTTPException(status_code=401, detail=str(e))
|