Coverage for backend / app / core / oauth2.py: 100%
42 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
1"""This module handles authentication and authorisation functionality for the application, including the creation,
2verification, and usage of JWT access tokens."""
4from datetime import datetime, timedelta, timezone
6import jwt
7from fastapi import Depends, HTTPException, status
8from fastapi.security import OAuth2PasswordBearer
9from sqlalchemy.orm import Session
11from app import models, database
12from app.config import settings
13from app.core import schemas
15SECRET_KEY = settings.secret_key
16ALGORITHM = settings.algorithm
17ACCESS_TOKEN_EXPIRE_MINUTES = settings.access_token_expire_minutes
19oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
22def create_access_token(data: dict, token_version: int = 0, is_demo: bool = False) -> str:
23 """Create a JWT access token with token version.
24 :param data: The data to be encoded into the JWT access token.
25 :param token_version: The user's current token version for invalidation tracking.
26 :param is_demo: Whether this token is for a demo user.
27 :returns: The JWT access token."""
29 to_encode = data.copy()
30 expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
31 to_encode.update({"exp": expire, "token_version": token_version})
32 to_encode["is_demo"] = is_demo
33 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
34 return encoded_jwt
37def verify_access_token(
38 token: str,
39 credentials_exception: Exception,
40) -> schemas.TokenData:
41 """Verify the JWT access token validity and extract the user id and token version.
42 :param token: JWT access token to be verified.
43 :param credentials_exception: the exception to be raised if the token is invalid or the user ID is not found.
44 :returns: object containing the user ID and token version extracted from the token."""
46 try:
47 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
48 user_id = payload.get("user_id")
50 if user_id is None:
51 raise credentials_exception
53 token_version: int = payload.get("token_version", 0)
54 is_demo: bool = payload.get("is_demo", False)
56 token_data = schemas.TokenData(id=str(user_id), token_version=token_version, is_demo=is_demo)
58 except jwt.PyJWTError:
59 raise credentials_exception
61 return token_data
64def get_current_user(
65 token: str = Depends(oauth2_scheme),
66 db: Session = Depends(database.get_db),
67) -> models.User | None:
68 """Get the current user from the token and verify token version is valid.
69 :param token: The JWT access token.
70 :param db: The database session.
71 :returns: The current user or None"""
73 credentials_exception = HTTPException(
74 status_code=status.HTTP_401_UNAUTHORIZED,
75 detail="Could not validate credentials",
76 headers={"WWW-Authenticate": "Bearer"},
77 )
79 token_data = verify_access_token(token, credentials_exception)
81 # Set demo mode context var if token is for a demo user
82 if token_data.is_demo:
83 database.demo_mode.set(True)
85 user = db.query(models.User).filter(token_data.id == models.User.id).first()
87 if user is None:
88 raise credentials_exception
90 # Verify token version matches current user's token version
91 if user.token_version != token_data.token_version:
92 raise HTTPException(
93 status_code=status.HTTP_401_UNAUTHORIZED,
94 detail="Token has been revoked. Please log in again.",
95 headers={"WWW-Authenticate": "Bearer"},
96 )
98 return user