Coverage for backend / app / core / routers / user.py: 88%

156 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-17 21:34 +0000

1"""User route""" 

2 

3import datetime as dt 

4 

5from fastapi import APIRouter, Depends, HTTPException, status 

6from sqlalchemy.orm import Session 

7 

8from app import base_schemas 

9from app import utils, models, database 

10from app.core import oauth2, schemas 

11from app.core.utils import send_email_change_email 

12from app.emails.email_service import email_service 

13from app.emails.release_data import get_release_slides 

14from app.payments import stripe 

15from app.routers.utility import generate_data_table_crud_router, assert_admin 

16from app.utils import AppLogger 

17 

18 

19# -------------------------------------------------------- USERS ------------------------------------------------------- 

20 

21 

22def transform_user_data(data: dict, db: Session, entry_data: dict | None = None) -> dict: 

23 """Transform user data before creating or updating a user. 

24 :param data: The user data to transform. 

25 :param db: The database session 

26 :param entry_data: optional original data of the entry 

27 :returns: The transformed user data.""" 

28 

29 _ = db, entry_data 

30 if "password" in data: 

31 return {"password": utils.hash_password(data["password"])} 

32 else: 

33 return {} 

34 

35 

36user_router = generate_data_table_crud_router( 

37 table_model=models.User, 

38 create_schema=schemas.UserCreate, 

39 update_schema=schemas.UserUpdate, 

40 out_schema=schemas.UserOut, 

41 endpoint="users", 

42 not_found_msg="User not found", 

43 admin_only=True, 

44 transform=transform_user_data, 

45) 

46 

47 

48@user_router.post("/invalidate-all-sessions", response_model=base_schemas.GenericResponse) 

49def invalidate_all_sessions( 

50 db: Session = Depends(database.get_db), 

51 current_user: models.User = Depends(oauth2.get_current_user), 

52) -> dict[str, str | bool]: 

53 """Invalidate all user sessions by incrementing token_version for all users. 

54 This will force all users to log in again. 

55 :param db: The database session. 

56 :param current_user: The current authenticated admin user. 

57 :returns: A message indicating the result of the operation.""" 

58 

59 assert_admin(current_user) 

60 

61 # Increment token_version for all users 

62 db.query(models.User).update({models.User.token_version: models.User.token_version + 1}) 

63 db.commit() 

64 

65 return {"message": "All user sessions have been invalidated.", "success": True} 

66 

67 

68@user_router.post("/send-release-email/{version}", response_model=base_schemas.GenericResponse) 

69def send_release_email( 

70 version: str, 

71 db: Session = Depends(database.get_db), 

72 current_user: models.User = Depends(oauth2.get_current_user), 

73) -> dict[str, str | bool]: 

74 """Send a new version announcement email to all active, verified, non-demo users. 

75 :param version: The version string to announce (e.g. "1.2.0"). 

76 :param db: The database session. 

77 :param current_user: The current authenticated admin user. 

78 :returns: A message with the count of emails sent.""" 

79 

80 assert_admin(current_user) 

81 

82 features = get_release_slides(version) 

83 if features is None: 

84 raise HTTPException( 

85 status_code=status.HTTP_404_NOT_FOUND, 

86 detail=f"No release data found for version {version}", 

87 ) 

88 

89 logger = AppLogger.create_service_logger("release_email", "INFO") 

90 

91 users = ( 

92 db.query(models.User) 

93 .filter(models.User.is_active, models.User.is_verified, models.User.is_demo.is_(False)) 

94 .all() 

95 ) 

96 

97 sent_count = 0 

98 for user in users: 

99 try: 

100 email_service.send_new_version_email(user.email, version, features) 

101 sent_count += 1 

102 except Exception as e: 

103 logger.error("Failed to send release email to user %s: %s", user.id, str(e)) 

104 

105 return { 

106 "message": f"Release email for v{version} sent to {sent_count}/{len(users)} users.", 

107 "success": True, 

108 } 

109 

110 

111# ------------------------------------------------- USER QUALIFICATIONS ------------------------------------------------ 

112 

113 

114user_qualification_router = APIRouter(prefix="/user-qualifications", tags=["user-qualifications"]) 

115 

116 

117@user_qualification_router.get("/latest", response_model=schemas.UserQualificationOut) 

118def get_latest_user_qualification( 

119 db: Session = Depends(database.get_db), 

120 user: models.User = Depends(oauth2.get_current_user), 

121) -> models.UserQualification: 

122 """Get the latest user qualification for the current user.""" 

123 

124 entry = ( 

125 db.query(models.UserQualification) 

126 .filter(models.UserQualification.owner_id == user.id) 

127 .order_by(models.UserQualification.modified_at.desc()) 

128 .first() 

129 ) 

130 if not entry: 

131 raise HTTPException( 

132 status_code=status.HTTP_404_NOT_FOUND, 

133 detail="User Qualification not found", 

134 ) 

135 return entry 

136 

137 

138@user_qualification_router.post("/", response_model=schemas.UserQualificationOut) 

139def upsert_user_qualification( 

140 qualification: schemas.UserQualificationUpsert, 

141 db: Session = Depends(database.get_db), 

142 user: models.User = Depends(oauth2.get_current_user), 

143) -> models.UserQualification: 

144 """Create or update a user qualification. 

145 :param qualification: The user qualification data. 

146 :param db: The database session 

147 :param user: The current authenticated user. 

148 :returns: The created or updated user qualification.""" 

149 

150 entry = ( 

151 db.query(models.UserQualification) 

152 .filter(models.UserQualification.owner_id == user.id) 

153 .filter(models.UserQualification.id == qualification.id) 

154 .first() 

155 ) 

156 if entry: 

157 # Determine if the qualification was used to rate jobs 

158 if len(entry.job_ratings): 

159 # noinspection PyArgumentList 

160 entry = models.UserQualification( 

161 **qualification.model_dump(exclude_unset=True, exclude=["id"]), owner_id=user.id 

162 ) 

163 db.add(entry) 

164 else: 

165 for field, value in qualification.model_dump(exclude_unset=True).items(): 

166 setattr(entry, field, value) 

167 else: 

168 # noinspection PyArgumentList 

169 entry = models.UserQualification( 

170 **qualification.model_dump(exclude_unset=True, exclude=["id"]), owner_id=user.id 

171 ) 

172 db.add(entry) 

173 db.commit() 

174 db.refresh(entry) 

175 return entry 

176 

177 

178# ---------------------------------------------------- CURRENT USER ---------------------------------------------------- 

179 

180 

181current_user_router = APIRouter(prefix="/current-user", tags=["current-user"]) 

182 

183 

184@current_user_router.get("/", response_model=schemas.UserOut) 

185def get_current_user_profile( 

186 current_user: models.User = Depends(oauth2.get_current_user), 

187) -> models.User: 

188 """Get the current user's profile. 

189 :param current_user: The current authenticated user. 

190 :returns: The current user.""" 

191 

192 return current_user 

193 

194 

195@current_user_router.post("/heartbeat", response_model=base_schemas.GenericResponse) 

196def heartbeat( 

197 current_user: models.User = Depends(oauth2.get_current_user), 

198 db: Session = Depends(database.get_db), 

199) -> dict[str, str | bool]: 

200 """Record that the user has accessed the app. 

201 :param current_user: The current authenticated user. 

202 :param db: The database session. 

203 :returns: A success message.""" 

204 

205 current_user.previous_login = current_user.last_login 

206 current_user.last_login = dt.datetime.now(dt.timezone.utc) 

207 db.commit() 

208 

209 return {"message": "Last login updated.", "success": True} 

210 

211 

212@current_user_router.put("/", response_model=schemas.CurrentUserUpdateResponse) 

213def update_account( 

214 user_update: schemas.CurrentUserUpdate, 

215 current_user: models.User = Depends(oauth2.get_current_user), 

216 db: Session = Depends(database.get_db), 

217) -> dict: 

218 """Update the current user's profile. 

219 :param user_update: The user update data. 

220 :param current_user: The current authenticated user. 

221 :param db: The database session. 

222 :returns: A dictionary with the result of the update operation.""" 

223 

224 result = {"success": True, "message": "User has been successfully updated"} 

225 user_update_dict = user_update.model_dump(exclude_unset=True) 

226 

227 # Track if password or email changed 

228 password_changed = False 

229 

230 # Hash password if it's being updated 

231 transformed_data = transform_user_data(user_update_dict, db) 

232 user_update_dict.update(transformed_data) 

233 

234 # Determine if the user is updating the password or email 

235 requires_password_check = "password" in user_update_dict or "email" in user_update_dict 

236 

237 # Prevent test users from changing password or email 

238 if current_user.is_demo and requires_password_check: 

239 raise HTTPException( 

240 status_code=status.HTTP_403_FORBIDDEN, 

241 detail="Test users cannot change their password or email address.", 

242 ) 

243 

244 # Update password/email 

245 current_password = user_update_dict.get("current_password", "") 

246 if requires_password_check and not utils.verify_password(current_password, current_user.password): 

247 raise HTTPException( 

248 status_code=status.HTTP_401_UNAUTHORIZED, 

249 detail="The current password is incorrect.", 

250 ) 

251 

252 # Track password change 

253 if "password" in user_update_dict: 

254 password_changed = True 

255 

256 # Handle email change separately 

257 if "email" in user_update_dict and user_update_dict["email"] != current_user.email: 

258 new_email = user_update_dict.pop("email") # Remove from dict to handle separately 

259 

260 # Validate email is not already associated with another user 

261 other_users = ( 

262 db.query(models.User) 

263 .filter(models.User.id != current_user.id) 

264 .filter(models.User.email == new_email) 

265 .first() 

266 ) 

267 if other_users: 

268 raise HTTPException( 

269 status_code=status.HTTP_400_BAD_REQUEST, 

270 detail="Email already registered", 

271 ) 

272 

273 # Send verification email with rate limiting 

274 email_result = send_email_change_email(current_user, new_email, db) 

275 if not email_result.success: 

276 raise HTTPException( 

277 status_code=email_result.error_code, 

278 detail=email_result.message, 

279 ) 

280 result["message"] = email_result.message 

281 

282 # Update other fields normally 

283 for field, value in user_update_dict.items(): 

284 if isinstance(value, dict): 

285 for k, v in value.items(): 

286 setattr(getattr(current_user, field), k, v) 

287 else: 

288 setattr(current_user, field, value) 

289 

290 # Increment token version if password was changed 

291 if password_changed: 

292 current_user.token_version += 1 

293 result["message"] = "Account updated successfully. Please log in again." 

294 result["logged_out"] = True 

295 

296 db.commit() 

297 db.refresh(current_user) 

298 return result 

299 

300 

301@current_user_router.get("/verify-email/{token}", response_model=base_schemas.GenericResponse) 

302def verify_email_change( 

303 token: str, 

304 db: Session = Depends(database.get_db), 

305) -> dict[str, str | bool]: 

306 """Verify email change using the provided token 

307 :param token: The email change verification token. 

308 :param db: The database session. 

309 :returns: A message indicating the result of the email change verification.""" 

310 

311 hashed_token = utils.hash_token(token) 

312 

313 # Find the token entry 

314 token_entry = ( 

315 db.query(models.UserToken) 

316 .filter( 

317 models.UserToken.token == hashed_token, 

318 models.UserToken.token_type == "email_change", 

319 ) 

320 .first() 

321 ) 

322 

323 if not token_entry: 

324 raise HTTPException( 

325 status_code=status.HTTP_403_FORBIDDEN, 

326 detail="Invalid or expired token. Please request a new one by logging in and changing your email address.", 

327 ) 

328 

329 # Check if token is expired 

330 if not token_entry.is_valid: 

331 db.delete(token_entry) 

332 db.commit() 

333 raise HTTPException( 

334 status_code=status.HTTP_403_FORBIDDEN, 

335 detail="Email change token has expired. Please request a new one by logging in and changing your email address.", 

336 ) 

337 

338 # Get the user 

339 user = db.query(models.User).filter(models.User.id == token_entry.owner_id).first() 

340 if not user: 

341 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User not found.") 

342 

343 # Check if demo user 

344 if user.is_demo: 

345 raise HTTPException( 

346 status_code=status.HTTP_403_FORBIDDEN, 

347 detail="Test users cannot change their email address.", 

348 ) 

349 

350 # Check if email is already taken 

351 other_users = ( 

352 db.query(models.User) 

353 .filter(models.User.id != user.id) 

354 .filter(models.User.email == token_entry.pending_email) 

355 .first() 

356 ) 

357 if other_users: 

358 raise HTTPException( 

359 status_code=status.HTTP_400_BAD_REQUEST, 

360 detail="Email already registered", 

361 ) 

362 

363 # Update email and invalidate all sessions 

364 old_email = user.email 

365 user.email = token_entry.pending_email 

366 user.token_version += 1 

367 

368 # Delete the token after successful use 

369 db.delete(token_entry) 

370 db.commit() 

371 email_service.send_email_change_notification(user.email, old_email) 

372 

373 return {"message": "Email address changed successfully. You can now log in with your new email.", "success": True} 

374 

375 

376@current_user_router.get("/check-pending-email", response_model=schemas.CheckPendingEmailResponse) 

377def check_email_pending( 

378 current_user: models.User = Depends(oauth2.get_current_user), 

379 db: Session = Depends(database.get_db), 

380) -> dict[str, bool | str | None]: 

381 """Check if the user has a pending email change. 

382 :param current_user: The current authenticated user. 

383 :param db: The database session. 

384 :returns: Dictionary with pending status and email if exists.""" 

385 

386 # Get the email change token for this user 

387 token_entry = ( 

388 db.query(models.UserToken) 

389 .filter( 

390 models.UserToken.owner_id == current_user.id, 

391 models.UserToken.token_type == "email_change", 

392 ) 

393 .first() 

394 ) 

395 

396 if not token_entry: 

397 return {"has_pending_email": False, "pending_email": None} 

398 

399 # Check if token is expired 

400 if not token_entry.is_valid: 

401 db.delete(token_entry) 

402 db.commit() 

403 return {"has_pending_email": False, "pending_email": None} 

404 

405 return {"has_pending_email": True, "pending_email": token_entry.pending_email} 

406 

407 

408@current_user_router.delete("/", response_model=base_schemas.GenericResponse) 

409def delete_account( 

410 delete_request: schemas.AccountDeleteRequest, 

411 current_user: models.User = Depends(oauth2.get_current_user), 

412 db: Session = Depends(database.get_db), 

413) -> dict[str, str | bool]: 

414 """Delete the current user's account permanently. 

415 :param delete_request: The account deletion request with password. 

416 :param current_user: The current authenticated user. 

417 :param db: The database session. 

418 :returns: A message indicating the result of the account deletion.""" 

419 

420 # Prevent demo users from deleting their account 

421 if current_user.is_demo: 

422 raise HTTPException( 

423 status_code=status.HTTP_403_FORBIDDEN, 

424 detail="Test users cannot delete their account.", 

425 ) 

426 

427 # Verify password before deletion 

428 if not utils.verify_password(delete_request.password, current_user.password): 

429 raise HTTPException( 

430 status_code=status.HTTP_401_UNAUTHORIZED, 

431 detail="Failed to delete account. Password is incorrect.", 

432 ) 

433 

434 # Cancel Stripe subscription if active 

435 if current_user.stripe_details and current_user.stripe_details.subscription_id: 

436 try: 

437 stripe.Subscription.delete(current_user.stripe_details.subscription_id) 

438 except Exception as e: 

439 print(f"Failed to cancel Stripe subscription for user {current_user.id}: {e}") 

440 

441 # Delete the user (cascading deletes will handle related data) 

442 db.delete(current_user) 

443 db.commit() 

444 

445 return {"message": "Account deleted successfully.", "success": True}