Coverage for backend/app/oauth2.py: 91%
32 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-22 15:38 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-22 15:38 +0000
1"""This module handles authentication and authorisation functionality for the application, including the creation,
2verification, and usage of JWT access tokens."""
4from datetime import datetime, timedelta, timezone
6from fastapi import Depends, HTTPException, status
7from fastapi.security import OAuth2PasswordBearer
8from jose import jwt
9from sqlalchemy.orm import Session
11from app import models, database, schemas
12from app.config import settings
15SECRET_KEY = settings.secret_key
16ALGORITHM = settings.algorithm
17ACCESS_TOKEN_EXPIRE_MINUTES = settings.access_token_expire_minutes
19oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
22def create_access_token(data: dict) -> str:
23 """Create a JWT access token.
24 :param data: The data to be encoded into the JWT access token.
25 :returns: The JWT access token."""
27 to_encode = data.copy()
28 expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
29 to_encode.update({"exp": expire})
30 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
31 return encoded_jwt
34def verify_access_token(
35 token: str,
36 credentials_exception: Exception,
37) -> schemas.TokenData:
38 """Verify the JWT access token.
39 :param token: JWT access token to be verified.
40 :param credentials_exception: The exception to be raised if the token is invalid or the user ID is not found.
41 :returns: object containing the user ID extracted from the token."""
43 # noinspection PyUnresolvedReferences
44 try:
45 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
46 user_id: str = str(payload.get("user_id"))
48 if user_id is None:
49 raise credentials_exception
51 token_data = schemas.TokenData(id=user_id)
53 except jwt.JWTError:
54 raise credentials_exception
56 return token_data
59def get_current_user(
60 token: str = Depends(oauth2_scheme),
61 db: Session = Depends(database.get_db),
62) -> models.User | None:
63 """Get the current user from the token and check if the token has expired.
64 :param token: The JWT access token.
65 :param db: The database session.
66 :returns: The current user or None"""
68 credentials_exception = HTTPException(
69 status_code=status.HTTP_401_UNAUTHORIZED,
70 detail="Could not validate credentials",
71 )
72 token = verify_access_token(token, credentials_exception)
73 user = db.query(models.User).filter(token.id == models.User.id).first()
74 return user