Coverage for backend / app / core / routers / auth.py: 92%

144 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-17 21:34 +0000

1"""Authentication route""" 

2 

3import datetime as dt 

4import uuid 

5 

6from fastapi import APIRouter, Depends, HTTPException, status 

7from fastapi.security import OAuth2PasswordRequestForm 

8from sqlalchemy.orm import Session 

9 

10from app import utils, models, database, base_schemas 

11from app.config import settings 

12from app.core import schemas, oauth2 

13from app.core.models import get_setting_value 

14from app.core.utils import send_email_verification_email, send_password_reset_email, get_token 

15from app.demo.seed import seed_demo_data 

16from app.emails.email_service import email_service 

17 

18# -------------------------------------------------------- LOGIN ------------------------------------------------------- 

19 

20 

21def _assert_not_maintenance(db: Session) -> None: 

22 """Raise 401 if maintenance mode is currently active.""" 

23 

24 maintenance_scheduled_at = get_setting_value(db, "maintenance_scheduled_at", None) 

25 if maintenance_scheduled_at: 

26 try: 

27 scheduled_time = dt.datetime.fromisoformat(maintenance_scheduled_at) 

28 if scheduled_time.tzinfo is None: 

29 scheduled_time = scheduled_time.replace(tzinfo=dt.timezone.utc) 

30 if scheduled_time <= dt.datetime.now(dt.timezone.utc): 

31 raise HTTPException( 

32 status_code=status.HTTP_401_UNAUTHORIZED, 

33 detail="Service is currently under maintenance.", 

34 ) 

35 except ValueError: 

36 pass 

37 

38 

39login_router = APIRouter(prefix="/login", tags=["Login"]) 

40 

41 

42@login_router.post("/", status_code=status.HTTP_200_OK, response_model=schemas.Token) 

43def login( 

44 user_credentials: OAuth2PasswordRequestForm = Depends(), 

45 db: Session = Depends(database.get_db), 

46 demo_db: Session = Depends(database.get_demo_db), 

47) -> dict[str, str]: 

48 """Log in a user. 

49 :param user_credentials: The user credentials (note: username is the email field) 

50 :param db: The database session 

51 :param demo_db: The demo database session 

52 :returns: The access token dictionary 

53 :raises HTTPException with a 403 status code if the credentials are invalid 

54 :raises HTTPException with a 401 status code if the user is not active or not verified 

55 :raises HTTPException with a 429 status code if verification email rate limit is exceeded""" 

56 

57 user_email = utils.clean_email(user_credentials.username) 

58 

59 if settings.test_mode and user_email == "crash@crash.com": 

60 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error") 

61 

62 # Find the user in the list based on the email provided 

63 user = db.query(models.User).filter(models.User.email == user_email).first() 

64 

65 # Handle demo user login: create a temp user in the demo schema with seeded data 

66 if user is not None and user.is_demo: 

67 _assert_not_maintenance(db) 

68 try: 

69 demo_email = f"demo-{uuid.uuid4().hex[:12]}@demo.jam" 

70 demo_user = models.User( 

71 email=demo_email, 

72 password=utils.hash_password(uuid.uuid4().hex), 

73 is_demo=True, 

74 is_active=True, 

75 is_verified=True, 

76 first_name="Demo", 

77 last_name="User", 

78 last_login=dt.datetime.now(dt.timezone.utc), 

79 premium={"is_active": True, "job_scraping_active": True, "job_rating_active": True}, 

80 ) 

81 demo_db.add(demo_user) 

82 demo_db.commit() 

83 demo_db.refresh(demo_user) 

84 

85 seed_demo_data(demo_db, demo_user) 

86 

87 access_token = oauth2.create_access_token( 

88 data={"user_id": demo_user.id}, 

89 token_version=demo_user.token_version, 

90 is_demo=True, 

91 ) 

92 return {"access_token": access_token, "token_type": "bearer"} 

93 except Exception: 

94 demo_db.rollback() 

95 raise 

96 

97 # Check that the user exists and verify the password 

98 if user is None or not utils.verify_password(user_credentials.password, user.password): 

99 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid credentials.") 

100 

101 # Check that the user is active 

102 if not user.is_active: 

103 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User account is not active.") 

104 

105 # Check that the user is verified 

106 if not user.is_verified: 

107 result = send_email_verification_email(user, db) 

108 

109 # Raise appropriate exception based on email sending result 

110 if result.success: 

111 raise HTTPException( 

112 status_code=status.HTTP_401_UNAUTHORIZED, 

113 detail=f"User account is not verified. A new verification email has been sent to {user.email}.", 

114 ) 

115 else: 

116 raise HTTPException( 

117 status_code=result.error_code, 

118 detail=result.message, 

119 ) 

120 

121 # Block non-admin users from logging in during maintenance 

122 if not user.is_admin: 

123 _assert_not_maintenance(db) 

124 

125 # Save the last login date and update the last login date 

126 user.previous_login = user.last_login 

127 user.last_login = dt.datetime.now(dt.timezone.utc) 

128 db.commit() 

129 

130 # Create an access token and return it 

131 access_token = oauth2.create_access_token( 

132 data={"user_id": user.id}, 

133 token_version=user.token_version, 

134 ) 

135 return {"access_token": access_token, "token_type": "bearer"} 

136 

137 

138# ------------------------------------------------------ REGISTER ------------------------------------------------------ 

139 

140 

141register_router = APIRouter(prefix="/register", tags=["Register"]) 

142 

143 

144@register_router.post("/", status_code=status.HTTP_201_CREATED, response_model=base_schemas.GenericResponse) 

145def create_user( 

146 user: schemas.UserRegister, 

147 db: Session = Depends(database.get_db), 

148) -> base_schemas.GenericResponse: 

149 """Create a new user 

150 :param user: The user data 

151 :param db: The database session 

152 :returns: Dictionary with success status and message 

153 :raises HTTPException with a 400 status code if the email is already registered 

154 :raises HTTPException with a 401 status code if the user is not allowed to sign up""" 

155 

156 _assert_not_maintenance(db) 

157 

158 # Check the user can be created 

159 allowlist = models.get_setting_value(db, "allowlist", None) 

160 if allowlist is not None: 

161 emails_allowed = [utils.clean_email(email) for email in allowlist.split(",")] 

162 if user.email not in emails_allowed: 

163 raise HTTPException( 

164 status_code=status.HTTP_401_UNAUTHORIZED, 

165 detail="You are not allowed to sign up for now.", 

166 ) 

167 

168 # Check if email already exists 

169 existing_user = db.query(models.User).filter(models.User.email == user.email).first() 

170 

171 if existing_user: 

172 # If user exists but is not verified, resend verification email 

173 if not existing_user.is_verified: 

174 result = send_email_verification_email(existing_user, db) 

175 if result.success: 

176 raise HTTPException( 

177 status_code=status.HTTP_401_UNAUTHORIZED, 

178 detail=f"Email already registered but not verified. " 

179 f"A new verification email has been sent to {existing_user.email}.", 

180 ) 

181 else: 

182 raise HTTPException(status_code=result.error_code, detail=result.message) 

183 else: 

184 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered") 

185 

186 # Hash the password and create the user 

187 user.password = utils.hash_password(user.password) 

188 

189 # Create user with related tables 

190 user_data = user.model_dump() 

191 new_user = models.User(**user_data) # noqa 

192 db.add(new_user) 

193 db.commit() 

194 db.refresh(new_user) 

195 

196 # Send verification email 

197 result = send_email_verification_email(new_user, db) 

198 if not result.success: 

199 # Rollback user creation if email fails 

200 db.delete(new_user) 

201 db.commit() 

202 raise HTTPException(status_code=result.error_code, detail=result.message) 

203 

204 return result 

205 

206 

207@register_router.get("/verify-email/{token}", response_model=base_schemas.GenericResponse) 

208def verify_email( 

209 token: str, 

210 db: Session = Depends(database.get_db), 

211) -> dict[str, str | bool]: 

212 """Verify a user's email address using the provided token 

213 :param token: The verification token from the email 

214 :param db: The database session 

215 :raises HTTPException with a 403 status code if the token does not exist 

216 :raises HTTPException with a 403 status code if the token has expired 

217 :return: Success message upon successful verification""" 

218 

219 _assert_not_maintenance(db) 

220 

221 verification_code = utils.hash_token(token) 

222 token_entry = get_token(verification_code, "verification", db) 

223 

224 if not token_entry: 

225 raise HTTPException(status_code=403, detail="Invalid or expired token. Please request a new one by logging in.") 

226 

227 # Check if token is expired 

228 if not token_entry.is_valid: 

229 # Delete expired token 

230 db.delete(token_entry) 

231 db.commit() 

232 raise HTTPException( 

233 status_code=status.HTTP_403_FORBIDDEN, 

234 detail="Verification token has expired. Please request a new one by logging in.", 

235 ) 

236 

237 # Get the user 

238 user = db.query(models.User).filter(models.User.id == token_entry.owner_id).first() 

239 if not user: 

240 raise HTTPException(status_code=403, detail="User not found.") 

241 

242 # Mark user as verified and delete the token 

243 user.is_verified = True 

244 db.delete(token_entry) 

245 db.commit() 

246 

247 return {"message": "Account verified successfully", "success": True} 

248 

249 

250# --------------------------------------------------- PASSWORD RESET --------------------------------------------------- 

251 

252 

253password_router = APIRouter(prefix="/password", tags=["Password"]) 

254 

255 

256@password_router.post("/forgot", status_code=status.HTTP_200_OK, response_model=base_schemas.GenericResponse) 

257def request_password_reset( 

258 email_data: schemas.PasswordResetRequest, 

259 db: Session = Depends(database.get_db), 

260) -> base_schemas.GenericResponse: 

261 """Request a password reset email. 

262 :param email_data: Schema containing the user's email address 

263 :param db: The database session 

264 :return: Success message 

265 :raises HTTPException with a 404 status code if user does not exist 

266 :raises HTTPException with a 401 status code if user account is not active 

267 :raises HTTPException with a 403 status code if user is a test user 

268 :raises HTTPException with send_password_reset_with_rate_limit error details""" 

269 

270 _assert_not_maintenance(db) 

271 

272 # Find user by email 

273 user = db.query(models.User).filter(models.User.email == email_data.email).first() 

274 

275 if not user: 

276 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User with this email does not exist.") 

277 

278 if not user.is_active: 

279 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User account is not active.") 

280 

281 # Prevent test users from resetting password 

282 if user.is_demo: 

283 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Test users cannot reset their password.") 

284 

285 # Send password reset email with rate limiting 

286 result = send_password_reset_email(user, db) 

287 if not result.success: 

288 raise HTTPException(status_code=result.error_code, detail=result.message) 

289 else: 

290 return result 

291 

292 

293@password_router.post("/reset", status_code=status.HTTP_200_OK, response_model=base_schemas.GenericResponse) 

294def reset_password( 

295 reset_data: schemas.PasswordReset, 

296 db: Session = Depends(database.get_db), 

297) -> dict[str, str | bool]: 

298 """Reset a user's password using a valid token. 

299 :param reset_data: Schema containing token and new password 

300 :param db: The database session 

301 :return: Success message 

302 :raises HTTPException with code 403 if token is invalid or expired 

303 :raises HTTPException with code 403 if user is a test user 

304 :raises HTTPException with code 500 if there is an error resetting the password""" 

305 

306 _assert_not_maintenance(db) 

307 

308 # Hash the token to compare with stored hash 

309 password_reset_code = utils.hash_token(reset_data.token) 

310 

311 # Find token entry with matching hash 

312 token_entry = get_token(password_reset_code, "password_reset", db) 

313 

314 if not token_entry or not token_entry.is_valid: 

315 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid or expired password reset token") 

316 

317 # Get the user 

318 user = db.query(models.User).filter(models.User.id == token_entry.owner_id).first() 

319 if not user: 

320 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User not found") 

321 

322 # Prevent test users from resetting password 

323 if user.is_demo: 

324 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Test users cannot reset their password.") 

325 

326 # Hash the new password 

327 hashed_password = utils.hash_password(reset_data.new_password) 

328 

329 # Update user's password and mark token as used 

330 try: 

331 email_service.send_password_changed_notification(user.email) 

332 user.password = hashed_password 

333 db.delete(token_entry) 

334 db.commit() 

335 except: 

336 db.rollback() 

337 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error resetting password") 

338 

339 return {"success": True, "message": "Password has been reset successfully"}