Coverage for backend / app / core / routers / auth.py: 92%
144 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
1"""Authentication route"""
3import datetime as dt
4import uuid
6from fastapi import APIRouter, Depends, HTTPException, status
7from fastapi.security import OAuth2PasswordRequestForm
8from sqlalchemy.orm import Session
10from app import utils, models, database, base_schemas
11from app.config import settings
12from app.core import schemas, oauth2
13from app.core.models import get_setting_value
14from app.core.utils import send_email_verification_email, send_password_reset_email, get_token
15from app.demo.seed import seed_demo_data
16from app.emails.email_service import email_service
18# -------------------------------------------------------- LOGIN -------------------------------------------------------
21def _assert_not_maintenance(db: Session) -> None:
22 """Raise 401 if maintenance mode is currently active."""
24 maintenance_scheduled_at = get_setting_value(db, "maintenance_scheduled_at", None)
25 if maintenance_scheduled_at:
26 try:
27 scheduled_time = dt.datetime.fromisoformat(maintenance_scheduled_at)
28 if scheduled_time.tzinfo is None:
29 scheduled_time = scheduled_time.replace(tzinfo=dt.timezone.utc)
30 if scheduled_time <= dt.datetime.now(dt.timezone.utc):
31 raise HTTPException(
32 status_code=status.HTTP_401_UNAUTHORIZED,
33 detail="Service is currently under maintenance.",
34 )
35 except ValueError:
36 pass
39login_router = APIRouter(prefix="/login", tags=["Login"])
42@login_router.post("/", status_code=status.HTTP_200_OK, response_model=schemas.Token)
43def login(
44 user_credentials: OAuth2PasswordRequestForm = Depends(),
45 db: Session = Depends(database.get_db),
46 demo_db: Session = Depends(database.get_demo_db),
47) -> dict[str, str]:
48 """Log in a user.
49 :param user_credentials: The user credentials (note: username is the email field)
50 :param db: The database session
51 :param demo_db: The demo database session
52 :returns: The access token dictionary
53 :raises HTTPException with a 403 status code if the credentials are invalid
54 :raises HTTPException with a 401 status code if the user is not active or not verified
55 :raises HTTPException with a 429 status code if verification email rate limit is exceeded"""
57 user_email = utils.clean_email(user_credentials.username)
59 if settings.test_mode and user_email == "crash@crash.com":
60 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error")
62 # Find the user in the list based on the email provided
63 user = db.query(models.User).filter(models.User.email == user_email).first()
65 # Handle demo user login: create a temp user in the demo schema with seeded data
66 if user is not None and user.is_demo:
67 _assert_not_maintenance(db)
68 try:
69 demo_email = f"demo-{uuid.uuid4().hex[:12]}@demo.jam"
70 demo_user = models.User(
71 email=demo_email,
72 password=utils.hash_password(uuid.uuid4().hex),
73 is_demo=True,
74 is_active=True,
75 is_verified=True,
76 first_name="Demo",
77 last_name="User",
78 last_login=dt.datetime.now(dt.timezone.utc),
79 premium={"is_active": True, "job_scraping_active": True, "job_rating_active": True},
80 )
81 demo_db.add(demo_user)
82 demo_db.commit()
83 demo_db.refresh(demo_user)
85 seed_demo_data(demo_db, demo_user)
87 access_token = oauth2.create_access_token(
88 data={"user_id": demo_user.id},
89 token_version=demo_user.token_version,
90 is_demo=True,
91 )
92 return {"access_token": access_token, "token_type": "bearer"}
93 except Exception:
94 demo_db.rollback()
95 raise
97 # Check that the user exists and verify the password
98 if user is None or not utils.verify_password(user_credentials.password, user.password):
99 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid credentials.")
101 # Check that the user is active
102 if not user.is_active:
103 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User account is not active.")
105 # Check that the user is verified
106 if not user.is_verified:
107 result = send_email_verification_email(user, db)
109 # Raise appropriate exception based on email sending result
110 if result.success:
111 raise HTTPException(
112 status_code=status.HTTP_401_UNAUTHORIZED,
113 detail=f"User account is not verified. A new verification email has been sent to {user.email}.",
114 )
115 else:
116 raise HTTPException(
117 status_code=result.error_code,
118 detail=result.message,
119 )
121 # Block non-admin users from logging in during maintenance
122 if not user.is_admin:
123 _assert_not_maintenance(db)
125 # Save the last login date and update the last login date
126 user.previous_login = user.last_login
127 user.last_login = dt.datetime.now(dt.timezone.utc)
128 db.commit()
130 # Create an access token and return it
131 access_token = oauth2.create_access_token(
132 data={"user_id": user.id},
133 token_version=user.token_version,
134 )
135 return {"access_token": access_token, "token_type": "bearer"}
138# ------------------------------------------------------ REGISTER ------------------------------------------------------
141register_router = APIRouter(prefix="/register", tags=["Register"])
144@register_router.post("/", status_code=status.HTTP_201_CREATED, response_model=base_schemas.GenericResponse)
145def create_user(
146 user: schemas.UserRegister,
147 db: Session = Depends(database.get_db),
148) -> base_schemas.GenericResponse:
149 """Create a new user
150 :param user: The user data
151 :param db: The database session
152 :returns: Dictionary with success status and message
153 :raises HTTPException with a 400 status code if the email is already registered
154 :raises HTTPException with a 401 status code if the user is not allowed to sign up"""
156 _assert_not_maintenance(db)
158 # Check the user can be created
159 allowlist = models.get_setting_value(db, "allowlist", None)
160 if allowlist is not None:
161 emails_allowed = [utils.clean_email(email) for email in allowlist.split(",")]
162 if user.email not in emails_allowed:
163 raise HTTPException(
164 status_code=status.HTTP_401_UNAUTHORIZED,
165 detail="You are not allowed to sign up for now.",
166 )
168 # Check if email already exists
169 existing_user = db.query(models.User).filter(models.User.email == user.email).first()
171 if existing_user:
172 # If user exists but is not verified, resend verification email
173 if not existing_user.is_verified:
174 result = send_email_verification_email(existing_user, db)
175 if result.success:
176 raise HTTPException(
177 status_code=status.HTTP_401_UNAUTHORIZED,
178 detail=f"Email already registered but not verified. "
179 f"A new verification email has been sent to {existing_user.email}.",
180 )
181 else:
182 raise HTTPException(status_code=result.error_code, detail=result.message)
183 else:
184 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered")
186 # Hash the password and create the user
187 user.password = utils.hash_password(user.password)
189 # Create user with related tables
190 user_data = user.model_dump()
191 new_user = models.User(**user_data) # noqa
192 db.add(new_user)
193 db.commit()
194 db.refresh(new_user)
196 # Send verification email
197 result = send_email_verification_email(new_user, db)
198 if not result.success:
199 # Rollback user creation if email fails
200 db.delete(new_user)
201 db.commit()
202 raise HTTPException(status_code=result.error_code, detail=result.message)
204 return result
207@register_router.get("/verify-email/{token}", response_model=base_schemas.GenericResponse)
208def verify_email(
209 token: str,
210 db: Session = Depends(database.get_db),
211) -> dict[str, str | bool]:
212 """Verify a user's email address using the provided token
213 :param token: The verification token from the email
214 :param db: The database session
215 :raises HTTPException with a 403 status code if the token does not exist
216 :raises HTTPException with a 403 status code if the token has expired
217 :return: Success message upon successful verification"""
219 _assert_not_maintenance(db)
221 verification_code = utils.hash_token(token)
222 token_entry = get_token(verification_code, "verification", db)
224 if not token_entry:
225 raise HTTPException(status_code=403, detail="Invalid or expired token. Please request a new one by logging in.")
227 # Check if token is expired
228 if not token_entry.is_valid:
229 # Delete expired token
230 db.delete(token_entry)
231 db.commit()
232 raise HTTPException(
233 status_code=status.HTTP_403_FORBIDDEN,
234 detail="Verification token has expired. Please request a new one by logging in.",
235 )
237 # Get the user
238 user = db.query(models.User).filter(models.User.id == token_entry.owner_id).first()
239 if not user:
240 raise HTTPException(status_code=403, detail="User not found.")
242 # Mark user as verified and delete the token
243 user.is_verified = True
244 db.delete(token_entry)
245 db.commit()
247 return {"message": "Account verified successfully", "success": True}
250# --------------------------------------------------- PASSWORD RESET ---------------------------------------------------
253password_router = APIRouter(prefix="/password", tags=["Password"])
256@password_router.post("/forgot", status_code=status.HTTP_200_OK, response_model=base_schemas.GenericResponse)
257def request_password_reset(
258 email_data: schemas.PasswordResetRequest,
259 db: Session = Depends(database.get_db),
260) -> base_schemas.GenericResponse:
261 """Request a password reset email.
262 :param email_data: Schema containing the user's email address
263 :param db: The database session
264 :return: Success message
265 :raises HTTPException with a 404 status code if user does not exist
266 :raises HTTPException with a 401 status code if user account is not active
267 :raises HTTPException with a 403 status code if user is a test user
268 :raises HTTPException with send_password_reset_with_rate_limit error details"""
270 _assert_not_maintenance(db)
272 # Find user by email
273 user = db.query(models.User).filter(models.User.email == email_data.email).first()
275 if not user:
276 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User with this email does not exist.")
278 if not user.is_active:
279 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User account is not active.")
281 # Prevent test users from resetting password
282 if user.is_demo:
283 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Test users cannot reset their password.")
285 # Send password reset email with rate limiting
286 result = send_password_reset_email(user, db)
287 if not result.success:
288 raise HTTPException(status_code=result.error_code, detail=result.message)
289 else:
290 return result
293@password_router.post("/reset", status_code=status.HTTP_200_OK, response_model=base_schemas.GenericResponse)
294def reset_password(
295 reset_data: schemas.PasswordReset,
296 db: Session = Depends(database.get_db),
297) -> dict[str, str | bool]:
298 """Reset a user's password using a valid token.
299 :param reset_data: Schema containing token and new password
300 :param db: The database session
301 :return: Success message
302 :raises HTTPException with code 403 if token is invalid or expired
303 :raises HTTPException with code 403 if user is a test user
304 :raises HTTPException with code 500 if there is an error resetting the password"""
306 _assert_not_maintenance(db)
308 # Hash the token to compare with stored hash
309 password_reset_code = utils.hash_token(reset_data.token)
311 # Find token entry with matching hash
312 token_entry = get_token(password_reset_code, "password_reset", db)
314 if not token_entry or not token_entry.is_valid:
315 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid or expired password reset token")
317 # Get the user
318 user = db.query(models.User).filter(models.User.id == token_entry.owner_id).first()
319 if not user:
320 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User not found")
322 # Prevent test users from resetting password
323 if user.is_demo:
324 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Test users cannot reset their password.")
326 # Hash the new password
327 hashed_password = utils.hash_password(reset_data.new_password)
329 # Update user's password and mark token as used
330 try:
331 email_service.send_password_changed_notification(user.email)
332 user.password = hashed_password
333 db.delete(token_entry)
334 db.commit()
335 except:
336 db.rollback()
337 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error resetting password")
339 return {"success": True, "message": "Password has been reset successfully"}