Coverage for backend / app / core / oauth2.py: 100%

42 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-17 21:34 +0000

1"""This module handles authentication and authorisation functionality for the application, including the creation, 

2verification, and usage of JWT access tokens.""" 

3 

4from datetime import datetime, timedelta, timezone 

5 

6import jwt 

7from fastapi import Depends, HTTPException, status 

8from fastapi.security import OAuth2PasswordBearer 

9from sqlalchemy.orm import Session 

10 

11from app import models, database 

12from app.config import settings 

13from app.core import schemas 

14 

15SECRET_KEY = settings.secret_key 

16ALGORITHM = settings.algorithm 

17ACCESS_TOKEN_EXPIRE_MINUTES = settings.access_token_expire_minutes 

18 

19oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login") 

20 

21 

22def create_access_token(data: dict, token_version: int = 0, is_demo: bool = False) -> str: 

23 """Create a JWT access token with token version. 

24 :param data: The data to be encoded into the JWT access token. 

25 :param token_version: The user's current token version for invalidation tracking. 

26 :param is_demo: Whether this token is for a demo user. 

27 :returns: The JWT access token.""" 

28 

29 to_encode = data.copy() 

30 expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 

31 to_encode.update({"exp": expire, "token_version": token_version}) 

32 to_encode["is_demo"] = is_demo 

33 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 

34 return encoded_jwt 

35 

36 

37def verify_access_token( 

38 token: str, 

39 credentials_exception: Exception, 

40) -> schemas.TokenData: 

41 """Verify the JWT access token validity and extract the user id and token version. 

42 :param token: JWT access token to be verified. 

43 :param credentials_exception: the exception to be raised if the token is invalid or the user ID is not found. 

44 :returns: object containing the user ID and token version extracted from the token.""" 

45 

46 try: 

47 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 

48 user_id = payload.get("user_id") 

49 

50 if user_id is None: 

51 raise credentials_exception 

52 

53 token_version: int = payload.get("token_version", 0) 

54 is_demo: bool = payload.get("is_demo", False) 

55 

56 token_data = schemas.TokenData(id=str(user_id), token_version=token_version, is_demo=is_demo) 

57 

58 except jwt.PyJWTError: 

59 raise credentials_exception 

60 

61 return token_data 

62 

63 

64def get_current_user( 

65 token: str = Depends(oauth2_scheme), 

66 db: Session = Depends(database.get_db), 

67) -> models.User | None: 

68 """Get the current user from the token and verify token version is valid. 

69 :param token: The JWT access token. 

70 :param db: The database session. 

71 :returns: The current user or None""" 

72 

73 credentials_exception = HTTPException( 

74 status_code=status.HTTP_401_UNAUTHORIZED, 

75 detail="Could not validate credentials", 

76 headers={"WWW-Authenticate": "Bearer"}, 

77 ) 

78 

79 token_data = verify_access_token(token, credentials_exception) 

80 

81 # Set demo mode context var if token is for a demo user 

82 if token_data.is_demo: 

83 database.demo_mode.set(True) 

84 

85 user = db.query(models.User).filter(token_data.id == models.User.id).first() 

86 

87 if user is None: 

88 raise credentials_exception 

89 

90 # Verify token version matches current user's token version 

91 if user.token_version != token_data.token_version: 

92 raise HTTPException( 

93 status_code=status.HTTP_401_UNAUTHORIZED, 

94 detail="Token has been revoked. Please log in again.", 

95 headers={"WWW-Authenticate": "Bearer"}, 

96 ) 

97 

98 return user