Coverage for backend/app/routers/user.py: 77%
91 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-22 15:38 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-22 15:38 +0000
1"""User route"""
3from fastapi import APIRouter, Depends, HTTPException, status, Request
4from sqlalchemy.orm import Session
6from app import utils, models, oauth2, database, schemas
8user_router = APIRouter(prefix="/users", tags=["users"])
11def assert_admin(user: models.User) -> None:
12 """Check if the user is an admin.
13 :param user: The user to check."""
15 if not user.is_admin:
16 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized to view this resource")
19@user_router.get("/", response_model=list[schemas.UserOut])
20def get_all_users(
21 request: Request,
22 db: Session = Depends(database.get_db),
23 current_user: models.User = Depends(oauth2.get_current_user),
24):
25 """Retrieve all users.
26 :param request: FastAPI request object to access query parameters
27 :param db: Database session.
28 :param current_user: Authenticated user.
29 :return: List of entries."""
31 assert_admin(current_user)
33 # noinspection PyTypeChecker
34 query = db.query(models.User)
36 # Get all query parameters
37 filter_params = dict(request.query_params)
39 # Apply filters for each parameter that matches a table column
40 for param_name, param_value in filter_params.items():
41 if hasattr(models.User, param_name):
42 column = getattr(models.User, param_name)
44 # Handle null values - convert string "null" to actual None/NULL
45 if param_value.lower() == "null":
46 query = query.filter(column.is_(None))
47 continue
49 # Handle different data types
50 try:
51 # Try to convert to appropriate type based on column type
52 if hasattr(column.type, "python_type"):
53 if column.type.python_type == int:
54 param_value = int(param_value)
55 elif column.type.python_type == float:
56 param_value = float(param_value)
57 elif column.type.python_type == bool:
58 param_value = param_value.lower() in ("true", "1", "yes", "on")
60 # Add filter to query
61 # noinspection PyTypeChecker
62 query = query.filter(column == param_value)
64 except (ValueError, TypeError):
65 # If conversion fails, treat as string comparison
66 # noinspection PyTypeChecker
67 query = query.filter(column == param_value)
69 return query.all()
72@user_router.get("/me", response_model=schemas.UserOut)
73def get_current_user_profile(current_user: models.User = Depends(oauth2.get_current_user)):
74 """Get the current user's profile.
75 :param current_user: The current authenticated user."""
77 return current_user
80@user_router.get("/{entry_id}", response_model=schemas.UserOut)
81def get_one_user(
82 entry_id: int | None,
83 current_user: models.User = Depends(oauth2.get_current_user),
84 db: Session = Depends(database.get_db),
85):
86 """Get a user by ID."""
88 assert_admin(current_user)
90 # noinspection PyTypeChecker
91 user = db.query(models.User).filter(models.User.id == entry_id).first()
92 if not user:
93 raise HTTPException(status_code=404, detail="User not found")
94 return user
97@user_router.put("/me", response_model=schemas.UserOut)
98def update_current_user_profile(
99 user_update: schemas.UserUpdate,
100 current_user: models.User = Depends(oauth2.get_current_user),
101 db: Session = Depends(database.get_db),
102):
103 """Update the current user's profile.
104 :param user_update: The user update data.
105 :param current_user: The current authenticated user.
106 :param db: The database session."""
108 user_update = user_update.model_dump(exclude_defaults=True)
110 # Hash password if it's being updated
111 if "password" in user_update:
112 user_update["password"] = utils.hash_password(user_update["password"])
114 # Determine if the user is updating the password or email
115 requires_password_check = "password" in user_update or "email" in user_update
117 # Get the user record to update
118 # noinspection PyTypeChecker
119 user_db = db.query(models.User).filter(models.User.id == current_user.id).first()
120 current_password = user_update.get("current_password", "")
122 # Update password/email
123 if requires_password_check and not utils.verify_password(current_password, user_db.password):
124 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="The current password is required")
126 # Validate email
127 # noinspection PyTypeChecker
128 other_users = db.query(models.User).filter(models.User.id != current_user.id).all()
129 emails = [u.email for u in other_users]
130 if "email" in user_update and user_update["email"] in emails:
131 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered")
133 # Update the user record
134 for field, value in user_update.items():
135 setattr(user_db, field, value)
137 db.commit()
138 db.refresh(user_db)
139 return user_db
142@user_router.put("/{entry_id}", response_model=schemas.UserOut)
143def update_user(
144 entry_id: int | None,
145 user_update: schemas.UserUpdate,
146 current_user: models.User = Depends(oauth2.get_current_user),
147 db: Session = Depends(database.get_db),
148):
149 """Update a user by ID."""
151 assert_admin(current_user)
153 user_update = user_update.model_dump(exclude_defaults=True)
155 # Hash password if it's being updated
156 if "password" in user_update:
157 user_update["password"] = utils.hash_password(user_update["password"])
159 # Get the user record to update
160 # noinspection PyTypeChecker
161 user_db = db.query(models.User).filter(models.User.id == entry_id).first()
163 # Validate email
164 # noinspection PyTypeChecker
165 other_users = db.query(models.User).filter(models.User.id != entry_id).all()
166 emails = [u.email for u in other_users]
167 if "email" in user_update and user_update["email"] in emails:
168 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered")
170 # Update the user record
171 for field, value in user_update.items():
172 setattr(user_db, field, value)
174 db.commit()
175 db.refresh(user_db)
176 return user_db
179@user_router.post("/", status_code=201, response_model=schemas.UserOut)
180def create_user(
181 user: schemas.UserCreate,
182 db: Session = Depends(database.get_db),
183):
184 """Create a new user.
185 :param user: The user data.
186 :param db: The database session."""
188 # noinspection PyTypeChecker
189 settings = db.query(models.Setting).filter(models.Setting.name == "allowlist").all()
190 if settings:
191 emails_allowed = settings[0].value.split(",")
192 if user.email not in emails_allowed:
193 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Email not allowed")
195 # Get all users and check if the email is already registered
196 users = db.query(models.User).all()
197 emails = [u.email for u in users]
198 if user.email in emails:
199 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered")
201 # Hash the password and create the user
202 user.password = utils.hash_password(user.password)
203 # noinspection PyArgumentList
204 new_user = models.User(**user.model_dump())
205 db.add(new_user)
206 db.commit()
208 return new_user