Coverage for backend / app / core / utils.py: 100%

37 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-17 21:34 +0000

1"""Utility functions for token management and email verification.""" 

2 

3import secrets 

4from typing import Callable 

5 

6from sqlalchemy.orm import Session 

7from starlette import status 

8 

9from app import utils, models, base_schemas 

10from app.config import settings 

11from app.emails.email_service import email_service 

12 

13 

14def get_token( 

15 token: str, 

16 token_type: str, 

17 db: Session, 

18) -> models.UserToken | None: 

19 """Retrieve a user token by token string and token type. 

20 :param token: The token string. 

21 :param token_type: The type of the token (e.g., 'verification', 'password_reset'). 

22 :param db: The database session. 

23 :return: The UserToken object if found, else None.""" 

24 

25 return ( 

26 db.query(models.UserToken) 

27 .filter( 

28 models.UserToken.token == token, 

29 models.UserToken.token_type == token_type, 

30 ) 

31 .first() 

32 ) 

33 

34 

35def generate_token( 

36 user_id: int, 

37 token_type: str, 

38 db: Session, 

39 pending_email: str | None = None, 

40) -> tuple[str, models.UserToken]: 

41 """Generate a secure random token and delete old tokens of the same type. 

42 :param user_id: ID of the user for whom the token is generated 

43 :param token_type: Type of the token (e.g., 'verification', 'password_reset', 'email_change') 

44 :param db: Database session 

45 :param pending_email: Optional pending email for email_change tokens 

46 :return: Tuple of (plain_token, UserToken object)""" 

47 

48 # Delete all existing tokens of this type for this user 

49 db.query(models.UserToken).filter( 

50 models.UserToken.owner_id == user_id, 

51 models.UserToken.token_type == token_type, 

52 ).delete() 

53 

54 # Generate new token 

55 plain_token = secrets.token_urlsafe(32) 

56 hashed_token = utils.hash_token(plain_token) 

57 

58 # noinspection PyArgumentList 

59 new_token = models.UserToken( 

60 owner_id=user_id, 

61 token=hashed_token, 

62 token_type=token_type, 

63 pending_email=pending_email, 

64 ) 

65 db.add(new_token) 

66 db.commit() 

67 db.refresh(new_token) 

68 

69 return plain_token, new_token 

70 

71 

72def send_verification_with_rate_limit( 

73 token_type: str, 

74 user: models.User, 

75 db: Session, 

76 send_email_function: Callable, 

77 endpoint: str, 

78 name: str, 

79 pending_email: str | None = None, 

80) -> base_schemas.GenericResponse: 

81 """Send verification email with rate limiting. 

82 :param token_type: Type of the token (e.g., 'verification') 

83 :param user: user entry 

84 :param db: database session 

85 :param send_email_function: Function to send the email 

86 :param endpoint: Frontend endpoint for the verification link 

87 :param name: Name of the token type for messaging 

88 :param pending_email: Optional pending email for email_change tokens 

89 :return: Dictionary with success status, message and error code""" 

90 

91 # Check if enough time has passed since last token was generated 

92 existing_token = ( 

93 db.query(models.UserToken) 

94 .filter(models.UserToken.owner_id == user.id) 

95 .filter(models.UserToken.token_type == token_type) 

96 .order_by(models.UserToken.created_at.desc()) 

97 .first() 

98 ) 

99 if existing_token: 

100 seconds_remaining = existing_token.remaining_seconds 

101 if float(str(seconds_remaining)) > 0: 

102 return base_schemas.GenericResponse( 

103 success=False, 

104 message=f"Please wait {seconds_remaining} seconds before requesting another {name} email.", 

105 error_code=status.HTTP_429_TOO_MANY_REQUESTS, 

106 ) 

107 

108 # Generate new verification token and delete the old ones 

109 plain_token, hashed_token_obj = generate_token(user.id, token_type, db, pending_email) 

110 

111 try: 

112 # Send the email to the user with the plain token 

113 verification_url = f"{settings.frontend_url}/{endpoint}/?token={plain_token}" 

114 send_email_function(user.email, verification_url, user.first_name) 

115 return base_schemas.GenericResponse( 

116 success=True, 

117 message=f"{name} email sent successfully.", 

118 error_code=None, 

119 ) 

120 

121 except Exception as e: 

122 return base_schemas.GenericResponse( 

123 success=False, 

124 message=f"Error sending {token_type} email: {e}", 

125 error_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 

126 ) 

127 

128 

129def send_email_verification_email( 

130 user: models.User, 

131 db: Session, 

132) -> base_schemas.GenericResponse: 

133 """Send email verification email with rate limiting. 

134 :param user: user entry 

135 :param db: database session 

136 :return: Dictionary with success status, message and error code""" 

137 

138 return send_verification_with_rate_limit( 

139 token_type="verification", 

140 user=user, 

141 db=db, 

142 send_email_function=email_service.send_verification_email, 

143 endpoint="verify-email", 

144 name="Verification", 

145 ) 

146 

147 

148def send_password_reset_email( 

149 user: models.User, 

150 db: Session, 

151) -> base_schemas.GenericResponse: 

152 """Send password reset email with rate limiting. 

153 :param user: user entry 

154 :param db: database session 

155 :return: Dictionary with success status, message and error code""" 

156 

157 return send_verification_with_rate_limit( 

158 token_type="password_reset", 

159 user=user, 

160 db=db, 

161 send_email_function=email_service.send_password_reset_email, 

162 endpoint="reset-password", 

163 name="Password reset", 

164 ) 

165 

166 

167def send_email_change_email( 

168 user: models.User, 

169 new_email: str, 

170 db: Session, 

171) -> base_schemas.GenericResponse: 

172 """Send new email verification email with rate limiting. 

173 :param user: user entry 

174 :param new_email: new email address to verify 

175 :param db: database session 

176 :return: Dictionary with success status, message and error code""" 

177 

178 return send_verification_with_rate_limit( 

179 token_type="email_change", 

180 user=user, 

181 db=db, 

182 send_email_function=lambda _, url, name: email_service.send_email_change_verification(new_email, url, name), 

183 endpoint="verify-new-email", 

184 pending_email=new_email, 

185 name="Email change verification", 

186 )