Coverage for backend / app / core / utils.py: 100%
37 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
1"""Utility functions for token management and email verification."""
3import secrets
4from typing import Callable
6from sqlalchemy.orm import Session
7from starlette import status
9from app import utils, models, base_schemas
10from app.config import settings
11from app.emails.email_service import email_service
14def get_token(
15 token: str,
16 token_type: str,
17 db: Session,
18) -> models.UserToken | None:
19 """Retrieve a user token by token string and token type.
20 :param token: The token string.
21 :param token_type: The type of the token (e.g., 'verification', 'password_reset').
22 :param db: The database session.
23 :return: The UserToken object if found, else None."""
25 return (
26 db.query(models.UserToken)
27 .filter(
28 models.UserToken.token == token,
29 models.UserToken.token_type == token_type,
30 )
31 .first()
32 )
35def generate_token(
36 user_id: int,
37 token_type: str,
38 db: Session,
39 pending_email: str | None = None,
40) -> tuple[str, models.UserToken]:
41 """Generate a secure random token and delete old tokens of the same type.
42 :param user_id: ID of the user for whom the token is generated
43 :param token_type: Type of the token (e.g., 'verification', 'password_reset', 'email_change')
44 :param db: Database session
45 :param pending_email: Optional pending email for email_change tokens
46 :return: Tuple of (plain_token, UserToken object)"""
48 # Delete all existing tokens of this type for this user
49 db.query(models.UserToken).filter(
50 models.UserToken.owner_id == user_id,
51 models.UserToken.token_type == token_type,
52 ).delete()
54 # Generate new token
55 plain_token = secrets.token_urlsafe(32)
56 hashed_token = utils.hash_token(plain_token)
58 # noinspection PyArgumentList
59 new_token = models.UserToken(
60 owner_id=user_id,
61 token=hashed_token,
62 token_type=token_type,
63 pending_email=pending_email,
64 )
65 db.add(new_token)
66 db.commit()
67 db.refresh(new_token)
69 return plain_token, new_token
72def send_verification_with_rate_limit(
73 token_type: str,
74 user: models.User,
75 db: Session,
76 send_email_function: Callable,
77 endpoint: str,
78 name: str,
79 pending_email: str | None = None,
80) -> base_schemas.GenericResponse:
81 """Send verification email with rate limiting.
82 :param token_type: Type of the token (e.g., 'verification')
83 :param user: user entry
84 :param db: database session
85 :param send_email_function: Function to send the email
86 :param endpoint: Frontend endpoint for the verification link
87 :param name: Name of the token type for messaging
88 :param pending_email: Optional pending email for email_change tokens
89 :return: Dictionary with success status, message and error code"""
91 # Check if enough time has passed since last token was generated
92 existing_token = (
93 db.query(models.UserToken)
94 .filter(models.UserToken.owner_id == user.id)
95 .filter(models.UserToken.token_type == token_type)
96 .order_by(models.UserToken.created_at.desc())
97 .first()
98 )
99 if existing_token:
100 seconds_remaining = existing_token.remaining_seconds
101 if float(str(seconds_remaining)) > 0:
102 return base_schemas.GenericResponse(
103 success=False,
104 message=f"Please wait {seconds_remaining} seconds before requesting another {name} email.",
105 error_code=status.HTTP_429_TOO_MANY_REQUESTS,
106 )
108 # Generate new verification token and delete the old ones
109 plain_token, hashed_token_obj = generate_token(user.id, token_type, db, pending_email)
111 try:
112 # Send the email to the user with the plain token
113 verification_url = f"{settings.frontend_url}/{endpoint}/?token={plain_token}"
114 send_email_function(user.email, verification_url, user.first_name)
115 return base_schemas.GenericResponse(
116 success=True,
117 message=f"{name} email sent successfully.",
118 error_code=None,
119 )
121 except Exception as e:
122 return base_schemas.GenericResponse(
123 success=False,
124 message=f"Error sending {token_type} email: {e}",
125 error_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
126 )
129def send_email_verification_email(
130 user: models.User,
131 db: Session,
132) -> base_schemas.GenericResponse:
133 """Send email verification email with rate limiting.
134 :param user: user entry
135 :param db: database session
136 :return: Dictionary with success status, message and error code"""
138 return send_verification_with_rate_limit(
139 token_type="verification",
140 user=user,
141 db=db,
142 send_email_function=email_service.send_verification_email,
143 endpoint="verify-email",
144 name="Verification",
145 )
148def send_password_reset_email(
149 user: models.User,
150 db: Session,
151) -> base_schemas.GenericResponse:
152 """Send password reset email with rate limiting.
153 :param user: user entry
154 :param db: database session
155 :return: Dictionary with success status, message and error code"""
157 return send_verification_with_rate_limit(
158 token_type="password_reset",
159 user=user,
160 db=db,
161 send_email_function=email_service.send_password_reset_email,
162 endpoint="reset-password",
163 name="Password reset",
164 )
167def send_email_change_email(
168 user: models.User,
169 new_email: str,
170 db: Session,
171) -> base_schemas.GenericResponse:
172 """Send new email verification email with rate limiting.
173 :param user: user entry
174 :param new_email: new email address to verify
175 :param db: database session
176 :return: Dictionary with success status, message and error code"""
178 return send_verification_with_rate_limit(
179 token_type="email_change",
180 user=user,
181 db=db,
182 send_email_function=lambda _, url, name: email_service.send_email_change_verification(new_email, url, name),
183 endpoint="verify-new-email",
184 pending_email=new_email,
185 name="Email change verification",
186 )