Coverage for backend/app/routers/user.py: 77%

91 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-22 15:38 +0000

1"""User route""" 

2 

3from fastapi import APIRouter, Depends, HTTPException, status, Request 

4from sqlalchemy.orm import Session 

5 

6from app import utils, models, oauth2, database, schemas 

7 

8user_router = APIRouter(prefix="/users", tags=["users"]) 

9 

10 

11def assert_admin(user: models.User) -> None: 

12 """Check if the user is an admin. 

13 :param user: The user to check.""" 

14 

15 if not user.is_admin: 

16 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized to view this resource") 

17 

18 

19@user_router.get("/", response_model=list[schemas.UserOut]) 

20def get_all_users( 

21 request: Request, 

22 db: Session = Depends(database.get_db), 

23 current_user: models.User = Depends(oauth2.get_current_user), 

24): 

25 """Retrieve all users. 

26 :param request: FastAPI request object to access query parameters 

27 :param db: Database session. 

28 :param current_user: Authenticated user. 

29 :return: List of entries.""" 

30 

31 assert_admin(current_user) 

32 

33 # noinspection PyTypeChecker 

34 query = db.query(models.User) 

35 

36 # Get all query parameters 

37 filter_params = dict(request.query_params) 

38 

39 # Apply filters for each parameter that matches a table column 

40 for param_name, param_value in filter_params.items(): 

41 if hasattr(models.User, param_name): 

42 column = getattr(models.User, param_name) 

43 

44 # Handle null values - convert string "null" to actual None/NULL 

45 if param_value.lower() == "null": 

46 query = query.filter(column.is_(None)) 

47 continue 

48 

49 # Handle different data types 

50 try: 

51 # Try to convert to appropriate type based on column type 

52 if hasattr(column.type, "python_type"): 

53 if column.type.python_type == int: 

54 param_value = int(param_value) 

55 elif column.type.python_type == float: 

56 param_value = float(param_value) 

57 elif column.type.python_type == bool: 

58 param_value = param_value.lower() in ("true", "1", "yes", "on") 

59 

60 # Add filter to query 

61 # noinspection PyTypeChecker 

62 query = query.filter(column == param_value) 

63 

64 except (ValueError, TypeError): 

65 # If conversion fails, treat as string comparison 

66 # noinspection PyTypeChecker 

67 query = query.filter(column == param_value) 

68 

69 return query.all() 

70 

71 

72@user_router.get("/me", response_model=schemas.UserOut) 

73def get_current_user_profile(current_user: models.User = Depends(oauth2.get_current_user)): 

74 """Get the current user's profile. 

75 :param current_user: The current authenticated user.""" 

76 

77 return current_user 

78 

79 

80@user_router.get("/{entry_id}", response_model=schemas.UserOut) 

81def get_one_user( 

82 entry_id: int | None, 

83 current_user: models.User = Depends(oauth2.get_current_user), 

84 db: Session = Depends(database.get_db), 

85): 

86 """Get a user by ID.""" 

87 

88 assert_admin(current_user) 

89 

90 # noinspection PyTypeChecker 

91 user = db.query(models.User).filter(models.User.id == entry_id).first() 

92 if not user: 

93 raise HTTPException(status_code=404, detail="User not found") 

94 return user 

95 

96 

97@user_router.put("/me", response_model=schemas.UserOut) 

98def update_current_user_profile( 

99 user_update: schemas.UserUpdate, 

100 current_user: models.User = Depends(oauth2.get_current_user), 

101 db: Session = Depends(database.get_db), 

102): 

103 """Update the current user's profile. 

104 :param user_update: The user update data. 

105 :param current_user: The current authenticated user. 

106 :param db: The database session.""" 

107 

108 user_update = user_update.model_dump(exclude_defaults=True) 

109 

110 # Hash password if it's being updated 

111 if "password" in user_update: 

112 user_update["password"] = utils.hash_password(user_update["password"]) 

113 

114 # Determine if the user is updating the password or email 

115 requires_password_check = "password" in user_update or "email" in user_update 

116 

117 # Get the user record to update 

118 # noinspection PyTypeChecker 

119 user_db = db.query(models.User).filter(models.User.id == current_user.id).first() 

120 current_password = user_update.get("current_password", "") 

121 

122 # Update password/email 

123 if requires_password_check and not utils.verify_password(current_password, user_db.password): 

124 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="The current password is required") 

125 

126 # Validate email 

127 # noinspection PyTypeChecker 

128 other_users = db.query(models.User).filter(models.User.id != current_user.id).all() 

129 emails = [u.email for u in other_users] 

130 if "email" in user_update and user_update["email"] in emails: 

131 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered") 

132 

133 # Update the user record 

134 for field, value in user_update.items(): 

135 setattr(user_db, field, value) 

136 

137 db.commit() 

138 db.refresh(user_db) 

139 return user_db 

140 

141 

142@user_router.put("/{entry_id}", response_model=schemas.UserOut) 

143def update_user( 

144 entry_id: int | None, 

145 user_update: schemas.UserUpdate, 

146 current_user: models.User = Depends(oauth2.get_current_user), 

147 db: Session = Depends(database.get_db), 

148): 

149 """Update a user by ID.""" 

150 

151 assert_admin(current_user) 

152 

153 user_update = user_update.model_dump(exclude_defaults=True) 

154 

155 # Hash password if it's being updated 

156 if "password" in user_update: 

157 user_update["password"] = utils.hash_password(user_update["password"]) 

158 

159 # Get the user record to update 

160 # noinspection PyTypeChecker 

161 user_db = db.query(models.User).filter(models.User.id == entry_id).first() 

162 

163 # Validate email 

164 # noinspection PyTypeChecker 

165 other_users = db.query(models.User).filter(models.User.id != entry_id).all() 

166 emails = [u.email for u in other_users] 

167 if "email" in user_update and user_update["email"] in emails: 

168 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered") 

169 

170 # Update the user record 

171 for field, value in user_update.items(): 

172 setattr(user_db, field, value) 

173 

174 db.commit() 

175 db.refresh(user_db) 

176 return user_db 

177 

178 

179@user_router.post("/", status_code=201, response_model=schemas.UserOut) 

180def create_user( 

181 user: schemas.UserCreate, 

182 db: Session = Depends(database.get_db), 

183): 

184 """Create a new user. 

185 :param user: The user data. 

186 :param db: The database session.""" 

187 

188 # noinspection PyTypeChecker 

189 settings = db.query(models.Setting).filter(models.Setting.name == "allowlist").all() 

190 if settings: 

191 emails_allowed = settings[0].value.split(",") 

192 if user.email not in emails_allowed: 

193 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Email not allowed") 

194 

195 # Get all users and check if the email is already registered 

196 users = db.query(models.User).all() 

197 emails = [u.email for u in users] 

198 if user.email in emails: 

199 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered") 

200 

201 # Hash the password and create the user 

202 user.password = utils.hash_password(user.password) 

203 # noinspection PyArgumentList 

204 new_user = models.User(**user.model_dump()) 

205 db.add(new_user) 

206 db.commit() 

207 

208 return new_user