Coverage for backend / app / core / routers / user.py: 88%
156 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
1"""User route"""
3import datetime as dt
5from fastapi import APIRouter, Depends, HTTPException, status
6from sqlalchemy.orm import Session
8from app import base_schemas
9from app import utils, models, database
10from app.core import oauth2, schemas
11from app.core.utils import send_email_change_email
12from app.emails.email_service import email_service
13from app.emails.release_data import get_release_slides
14from app.payments import stripe
15from app.routers.utility import generate_data_table_crud_router, assert_admin
16from app.utils import AppLogger
19# -------------------------------------------------------- USERS -------------------------------------------------------
22def transform_user_data(data: dict, db: Session, entry_data: dict | None = None) -> dict:
23 """Transform user data before creating or updating a user.
24 :param data: The user data to transform.
25 :param db: The database session
26 :param entry_data: optional original data of the entry
27 :returns: The transformed user data."""
29 _ = db, entry_data
30 if "password" in data:
31 return {"password": utils.hash_password(data["password"])}
32 else:
33 return {}
36user_router = generate_data_table_crud_router(
37 table_model=models.User,
38 create_schema=schemas.UserCreate,
39 update_schema=schemas.UserUpdate,
40 out_schema=schemas.UserOut,
41 endpoint="users",
42 not_found_msg="User not found",
43 admin_only=True,
44 transform=transform_user_data,
45)
48@user_router.post("/invalidate-all-sessions", response_model=base_schemas.GenericResponse)
49def invalidate_all_sessions(
50 db: Session = Depends(database.get_db),
51 current_user: models.User = Depends(oauth2.get_current_user),
52) -> dict[str, str | bool]:
53 """Invalidate all user sessions by incrementing token_version for all users.
54 This will force all users to log in again.
55 :param db: The database session.
56 :param current_user: The current authenticated admin user.
57 :returns: A message indicating the result of the operation."""
59 assert_admin(current_user)
61 # Increment token_version for all users
62 db.query(models.User).update({models.User.token_version: models.User.token_version + 1})
63 db.commit()
65 return {"message": "All user sessions have been invalidated.", "success": True}
68@user_router.post("/send-release-email/{version}", response_model=base_schemas.GenericResponse)
69def send_release_email(
70 version: str,
71 db: Session = Depends(database.get_db),
72 current_user: models.User = Depends(oauth2.get_current_user),
73) -> dict[str, str | bool]:
74 """Send a new version announcement email to all active, verified, non-demo users.
75 :param version: The version string to announce (e.g. "1.2.0").
76 :param db: The database session.
77 :param current_user: The current authenticated admin user.
78 :returns: A message with the count of emails sent."""
80 assert_admin(current_user)
82 features = get_release_slides(version)
83 if features is None:
84 raise HTTPException(
85 status_code=status.HTTP_404_NOT_FOUND,
86 detail=f"No release data found for version {version}",
87 )
89 logger = AppLogger.create_service_logger("release_email", "INFO")
91 users = (
92 db.query(models.User)
93 .filter(models.User.is_active, models.User.is_verified, models.User.is_demo.is_(False))
94 .all()
95 )
97 sent_count = 0
98 for user in users:
99 try:
100 email_service.send_new_version_email(user.email, version, features)
101 sent_count += 1
102 except Exception as e:
103 logger.error("Failed to send release email to user %s: %s", user.id, str(e))
105 return {
106 "message": f"Release email for v{version} sent to {sent_count}/{len(users)} users.",
107 "success": True,
108 }
111# ------------------------------------------------- USER QUALIFICATIONS ------------------------------------------------
114user_qualification_router = APIRouter(prefix="/user-qualifications", tags=["user-qualifications"])
117@user_qualification_router.get("/latest", response_model=schemas.UserQualificationOut)
118def get_latest_user_qualification(
119 db: Session = Depends(database.get_db),
120 user: models.User = Depends(oauth2.get_current_user),
121) -> models.UserQualification:
122 """Get the latest user qualification for the current user."""
124 entry = (
125 db.query(models.UserQualification)
126 .filter(models.UserQualification.owner_id == user.id)
127 .order_by(models.UserQualification.modified_at.desc())
128 .first()
129 )
130 if not entry:
131 raise HTTPException(
132 status_code=status.HTTP_404_NOT_FOUND,
133 detail="User Qualification not found",
134 )
135 return entry
138@user_qualification_router.post("/", response_model=schemas.UserQualificationOut)
139def upsert_user_qualification(
140 qualification: schemas.UserQualificationUpsert,
141 db: Session = Depends(database.get_db),
142 user: models.User = Depends(oauth2.get_current_user),
143) -> models.UserQualification:
144 """Create or update a user qualification.
145 :param qualification: The user qualification data.
146 :param db: The database session
147 :param user: The current authenticated user.
148 :returns: The created or updated user qualification."""
150 entry = (
151 db.query(models.UserQualification)
152 .filter(models.UserQualification.owner_id == user.id)
153 .filter(models.UserQualification.id == qualification.id)
154 .first()
155 )
156 if entry:
157 # Determine if the qualification was used to rate jobs
158 if len(entry.job_ratings):
159 # noinspection PyArgumentList
160 entry = models.UserQualification(
161 **qualification.model_dump(exclude_unset=True, exclude=["id"]), owner_id=user.id
162 )
163 db.add(entry)
164 else:
165 for field, value in qualification.model_dump(exclude_unset=True).items():
166 setattr(entry, field, value)
167 else:
168 # noinspection PyArgumentList
169 entry = models.UserQualification(
170 **qualification.model_dump(exclude_unset=True, exclude=["id"]), owner_id=user.id
171 )
172 db.add(entry)
173 db.commit()
174 db.refresh(entry)
175 return entry
178# ---------------------------------------------------- CURRENT USER ----------------------------------------------------
181current_user_router = APIRouter(prefix="/current-user", tags=["current-user"])
184@current_user_router.get("/", response_model=schemas.UserOut)
185def get_current_user_profile(
186 current_user: models.User = Depends(oauth2.get_current_user),
187) -> models.User:
188 """Get the current user's profile.
189 :param current_user: The current authenticated user.
190 :returns: The current user."""
192 return current_user
195@current_user_router.post("/heartbeat", response_model=base_schemas.GenericResponse)
196def heartbeat(
197 current_user: models.User = Depends(oauth2.get_current_user),
198 db: Session = Depends(database.get_db),
199) -> dict[str, str | bool]:
200 """Record that the user has accessed the app.
201 :param current_user: The current authenticated user.
202 :param db: The database session.
203 :returns: A success message."""
205 current_user.previous_login = current_user.last_login
206 current_user.last_login = dt.datetime.now(dt.timezone.utc)
207 db.commit()
209 return {"message": "Last login updated.", "success": True}
212@current_user_router.put("/", response_model=schemas.CurrentUserUpdateResponse)
213def update_account(
214 user_update: schemas.CurrentUserUpdate,
215 current_user: models.User = Depends(oauth2.get_current_user),
216 db: Session = Depends(database.get_db),
217) -> dict:
218 """Update the current user's profile.
219 :param user_update: The user update data.
220 :param current_user: The current authenticated user.
221 :param db: The database session.
222 :returns: A dictionary with the result of the update operation."""
224 result = {"success": True, "message": "User has been successfully updated"}
225 user_update_dict = user_update.model_dump(exclude_unset=True)
227 # Track if password or email changed
228 password_changed = False
230 # Hash password if it's being updated
231 transformed_data = transform_user_data(user_update_dict, db)
232 user_update_dict.update(transformed_data)
234 # Determine if the user is updating the password or email
235 requires_password_check = "password" in user_update_dict or "email" in user_update_dict
237 # Prevent test users from changing password or email
238 if current_user.is_demo and requires_password_check:
239 raise HTTPException(
240 status_code=status.HTTP_403_FORBIDDEN,
241 detail="Test users cannot change their password or email address.",
242 )
244 # Update password/email
245 current_password = user_update_dict.get("current_password", "")
246 if requires_password_check and not utils.verify_password(current_password, current_user.password):
247 raise HTTPException(
248 status_code=status.HTTP_401_UNAUTHORIZED,
249 detail="The current password is incorrect.",
250 )
252 # Track password change
253 if "password" in user_update_dict:
254 password_changed = True
256 # Handle email change separately
257 if "email" in user_update_dict and user_update_dict["email"] != current_user.email:
258 new_email = user_update_dict.pop("email") # Remove from dict to handle separately
260 # Validate email is not already associated with another user
261 other_users = (
262 db.query(models.User)
263 .filter(models.User.id != current_user.id)
264 .filter(models.User.email == new_email)
265 .first()
266 )
267 if other_users:
268 raise HTTPException(
269 status_code=status.HTTP_400_BAD_REQUEST,
270 detail="Email already registered",
271 )
273 # Send verification email with rate limiting
274 email_result = send_email_change_email(current_user, new_email, db)
275 if not email_result.success:
276 raise HTTPException(
277 status_code=email_result.error_code,
278 detail=email_result.message,
279 )
280 result["message"] = email_result.message
282 # Update other fields normally
283 for field, value in user_update_dict.items():
284 if isinstance(value, dict):
285 for k, v in value.items():
286 setattr(getattr(current_user, field), k, v)
287 else:
288 setattr(current_user, field, value)
290 # Increment token version if password was changed
291 if password_changed:
292 current_user.token_version += 1
293 result["message"] = "Account updated successfully. Please log in again."
294 result["logged_out"] = True
296 db.commit()
297 db.refresh(current_user)
298 return result
301@current_user_router.get("/verify-email/{token}", response_model=base_schemas.GenericResponse)
302def verify_email_change(
303 token: str,
304 db: Session = Depends(database.get_db),
305) -> dict[str, str | bool]:
306 """Verify email change using the provided token
307 :param token: The email change verification token.
308 :param db: The database session.
309 :returns: A message indicating the result of the email change verification."""
311 hashed_token = utils.hash_token(token)
313 # Find the token entry
314 token_entry = (
315 db.query(models.UserToken)
316 .filter(
317 models.UserToken.token == hashed_token,
318 models.UserToken.token_type == "email_change",
319 )
320 .first()
321 )
323 if not token_entry:
324 raise HTTPException(
325 status_code=status.HTTP_403_FORBIDDEN,
326 detail="Invalid or expired token. Please request a new one by logging in and changing your email address.",
327 )
329 # Check if token is expired
330 if not token_entry.is_valid:
331 db.delete(token_entry)
332 db.commit()
333 raise HTTPException(
334 status_code=status.HTTP_403_FORBIDDEN,
335 detail="Email change token has expired. Please request a new one by logging in and changing your email address.",
336 )
338 # Get the user
339 user = db.query(models.User).filter(models.User.id == token_entry.owner_id).first()
340 if not user:
341 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User not found.")
343 # Check if demo user
344 if user.is_demo:
345 raise HTTPException(
346 status_code=status.HTTP_403_FORBIDDEN,
347 detail="Test users cannot change their email address.",
348 )
350 # Check if email is already taken
351 other_users = (
352 db.query(models.User)
353 .filter(models.User.id != user.id)
354 .filter(models.User.email == token_entry.pending_email)
355 .first()
356 )
357 if other_users:
358 raise HTTPException(
359 status_code=status.HTTP_400_BAD_REQUEST,
360 detail="Email already registered",
361 )
363 # Update email and invalidate all sessions
364 old_email = user.email
365 user.email = token_entry.pending_email
366 user.token_version += 1
368 # Delete the token after successful use
369 db.delete(token_entry)
370 db.commit()
371 email_service.send_email_change_notification(user.email, old_email)
373 return {"message": "Email address changed successfully. You can now log in with your new email.", "success": True}
376@current_user_router.get("/check-pending-email", response_model=schemas.CheckPendingEmailResponse)
377def check_email_pending(
378 current_user: models.User = Depends(oauth2.get_current_user),
379 db: Session = Depends(database.get_db),
380) -> dict[str, bool | str | None]:
381 """Check if the user has a pending email change.
382 :param current_user: The current authenticated user.
383 :param db: The database session.
384 :returns: Dictionary with pending status and email if exists."""
386 # Get the email change token for this user
387 token_entry = (
388 db.query(models.UserToken)
389 .filter(
390 models.UserToken.owner_id == current_user.id,
391 models.UserToken.token_type == "email_change",
392 )
393 .first()
394 )
396 if not token_entry:
397 return {"has_pending_email": False, "pending_email": None}
399 # Check if token is expired
400 if not token_entry.is_valid:
401 db.delete(token_entry)
402 db.commit()
403 return {"has_pending_email": False, "pending_email": None}
405 return {"has_pending_email": True, "pending_email": token_entry.pending_email}
408@current_user_router.delete("/", response_model=base_schemas.GenericResponse)
409def delete_account(
410 delete_request: schemas.AccountDeleteRequest,
411 current_user: models.User = Depends(oauth2.get_current_user),
412 db: Session = Depends(database.get_db),
413) -> dict[str, str | bool]:
414 """Delete the current user's account permanently.
415 :param delete_request: The account deletion request with password.
416 :param current_user: The current authenticated user.
417 :param db: The database session.
418 :returns: A message indicating the result of the account deletion."""
420 # Prevent demo users from deleting their account
421 if current_user.is_demo:
422 raise HTTPException(
423 status_code=status.HTTP_403_FORBIDDEN,
424 detail="Test users cannot delete their account.",
425 )
427 # Verify password before deletion
428 if not utils.verify_password(delete_request.password, current_user.password):
429 raise HTTPException(
430 status_code=status.HTTP_401_UNAUTHORIZED,
431 detail="Failed to delete account. Password is incorrect.",
432 )
434 # Cancel Stripe subscription if active
435 if current_user.stripe_details and current_user.stripe_details.subscription_id:
436 try:
437 stripe.Subscription.delete(current_user.stripe_details.subscription_id)
438 except Exception as e:
439 print(f"Failed to cancel Stripe subscription for user {current_user.id}: {e}")
441 # Delete the user (cascading deletes will handle related data)
442 db.delete(current_user)
443 db.commit()
445 return {"message": "Account deleted successfully.", "success": True}