Coverage for backend/app/oauth2.py: 91%

32 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-22 15:38 +0000

1"""This module handles authentication and authorisation functionality for the application, including the creation, 

2verification, and usage of JWT access tokens.""" 

3 

4from datetime import datetime, timedelta, timezone 

5 

6from fastapi import Depends, HTTPException, status 

7from fastapi.security import OAuth2PasswordBearer 

8from jose import jwt 

9from sqlalchemy.orm import Session 

10 

11from app import models, database, schemas 

12from app.config import settings 

13 

14 

15SECRET_KEY = settings.secret_key 

16ALGORITHM = settings.algorithm 

17ACCESS_TOKEN_EXPIRE_MINUTES = settings.access_token_expire_minutes 

18 

19oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login") 

20 

21 

22def create_access_token(data: dict) -> str: 

23 """Create a JWT access token. 

24 :param data: The data to be encoded into the JWT access token. 

25 :returns: The JWT access token.""" 

26 

27 to_encode = data.copy() 

28 expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 

29 to_encode.update({"exp": expire}) 

30 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 

31 return encoded_jwt 

32 

33 

34def verify_access_token( 

35 token: str, 

36 credentials_exception: Exception, 

37) -> schemas.TokenData: 

38 """Verify the JWT access token. 

39 :param token: JWT access token to be verified. 

40 :param credentials_exception: The exception to be raised if the token is invalid or the user ID is not found. 

41 :returns: object containing the user ID extracted from the token.""" 

42 

43 # noinspection PyUnresolvedReferences 

44 try: 

45 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 

46 user_id: str = str(payload.get("user_id")) 

47 

48 if user_id is None: 

49 raise credentials_exception 

50 

51 token_data = schemas.TokenData(id=user_id) 

52 

53 except jwt.JWTError: 

54 raise credentials_exception 

55 

56 return token_data 

57 

58 

59def get_current_user( 

60 token: str = Depends(oauth2_scheme), 

61 db: Session = Depends(database.get_db), 

62) -> models.User | None: 

63 """Get the current user from the token and check if the token has expired. 

64 :param token: The JWT access token. 

65 :param db: The database session. 

66 :returns: The current user or None""" 

67 

68 credentials_exception = HTTPException( 

69 status_code=status.HTTP_401_UNAUTHORIZED, 

70 detail="Could not validate credentials", 

71 ) 

72 token = verify_access_token(token, credentials_exception) 

73 user = db.query(models.User).filter(token.id == models.User.id).first() 

74 return user