Coverage for backend/app/routers/__init__.py: 80%

128 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-22 17:03 +0000

1"""CRUD router generator for data table operations. 

2 

3Provides a factory function to generate FastAPI routers with standard CRUD endpoints, 

4including user ownership validation, query filtering, and many-to-many relationship handling.""" 

5 

6from fastapi import APIRouter, Depends, HTTPException 

7from sqlalchemy.orm import Session 

8from starlette import status 

9from starlette.requests import Request 

10 

11from app import database, models, oauth2 

12 

13 

14def generate_data_table_crud_router( 

15 *, 

16 table_model, # SQLAlchemy model 

17 create_schema, # Pydantic schema for creation 

18 update_schema, # Pydantic schema for updates 

19 out_schema, # Pydantic schema for output 

20 endpoint: str, # e.g. "companies" 

21 not_found_msg: str = "Entry not found", 

22 many_to_many_fields: dict = None, 

23 router: APIRouter | None = None, 

24 admin_only: bool = False, 

25) -> APIRouter: 

26 """Generate a FastAPI router with standard CRUD endpoints for a given table. 

27 :param table_model: SQLAlchemy model class representing the database table. 

28 :param create_schema: Pydantic schema used for creating new entries. 

29 :param update_schema: Pydantic schema used for updating existing entries. 

30 :param out_schema: Pydantic schema used for serialising output. 

31 :param endpoint: Endpoint name (used as route prefix and tag). 

32 :param not_found_msg: Default message when an entry is not found. 

33 :param many_to_many_fields: Dict defining M2M relationships. 

34 Format: { 

35 'field_name': { 

36 'table': association_table, 

37 'local_key': 'local_foreign_key', 

38 'remote_key': 'remote_foreign_key' 

39 } 

40 } 

41 :param router: Optional router to which the endpoints will be added. 

42 :param admin_only: If True, restrict access to admin users only. 

43 :return: Configured APIRouter instance with CRUD endpoints.""" 

44 

45 if router is None: 

46 router = APIRouter(prefix=f"/{endpoint}", tags=[endpoint]) 

47 

48 def handle_many_to_many_create( 

49 db: Session, 

50 entry_id: int, 

51 item_data: dict, 

52 ): 

53 """Handle the creation of many-to-many relationships. 

54 :param db: Database session 

55 :param entry_id: ID of the entry to which the relationships are being added 

56 :param item_data: Data containing the relationships to be added""" 

57 

58 if not many_to_many_fields: 

59 return 

60 

61 for field_name, m2m_config in many_to_many_fields.items(): 

62 

63 if field_name in item_data and item_data[field_name] is not None: 

64 values = item_data[field_name] 

65 if isinstance(values, list): 

66 association_table = m2m_config["table"] 

67 local_key = m2m_config["local_key"] 

68 remote_key = m2m_config["remote_key"] 

69 

70 # Insert the relationships 

71 for value_id in values: 

72 db.execute(association_table.insert().values(**{local_key: entry_id, remote_key: value_id})) 

73 

74 def handle_many_to_many_update( 

75 db: Session, 

76 entry_id: int, 

77 item_data: dict, 

78 ): 

79 """Handle updating of many-to-many relationships. 

80 :param db: Database session 

81 :param entry_id: ID of the entry to which the relationships are being added 

82 :param item_data: Data containing the relationships to be added""" 

83 

84 if not many_to_many_fields: 

85 return 

86 

87 for field_name, m2m_config in many_to_many_fields.items(): 

88 if field_name in item_data: 

89 association_table = m2m_config["table"] 

90 local_key = m2m_config["local_key"] 

91 remote_key = m2m_config["remote_key"] 

92 

93 # Delete existing relationships 

94 db.execute(association_table.delete().where(getattr(association_table.c, local_key) == entry_id)) 

95 

96 # Add new relationships if provided 

97 values = item_data[field_name] 

98 if values is not None and isinstance(values, list): 

99 for value_id in values: 

100 db.execute(association_table.insert().values(**{local_key: entry_id, remote_key: value_id})) 

101 

102 # noinspection PyTypeHints 

103 @router.get("/", response_model=list[out_schema]) 

104 def get_all( 

105 request: Request, 

106 db: Session = Depends(database.get_db), 

107 current_user: models.User = Depends(oauth2.get_current_user), 

108 limit: int | None = None, 

109 ): 

110 """Retrieve all entries for the current user. 

111 :param request: FastAPI request object to access query parameters 

112 :param db: Database session. 

113 :param current_user: Authenticated user. 

114 :param limit: Maximum number of entries to return. 

115 :return: List of entries.""" 

116 

117 # Start with base query 

118 if not admin_only: 

119 # noinspection PyTypeChecker 

120 query = db.query(table_model).filter(table_model.owner_id == current_user.id) 

121 elif current_user.is_admin: 

122 query = db.query(table_model) 

123 else: 

124 raise HTTPException( 

125 status_code=status.HTTP_403_FORBIDDEN, detail="Not authorised to perform requested action" 

126 ) 

127 

128 # Get all query parameters except 'limit' 

129 filter_params = dict(request.query_params) 

130 filter_params.pop("limit", None) # Remove limit from filters 

131 

132 # Apply filters for each parameter that matches a table column 

133 for param_name, param_value in filter_params.items(): 

134 if hasattr(table_model, param_name): 

135 column = getattr(table_model, param_name) 

136 

137 # Handle null values - convert string "null" to actual None/NULL 

138 if param_value.lower() == "null": 

139 query = query.filter(column.is_(None)) 

140 continue 

141 

142 # Handle different data types 

143 try: 

144 # Try to convert to appropriate type based on column type 

145 if hasattr(column.type, "python_type"): 

146 if column.type.python_type == int: 

147 param_value = int(param_value) 

148 elif column.type.python_type == float: 

149 param_value = float(param_value) 

150 elif column.type.python_type == bool: 

151 param_value = param_value.lower() in ("true", "1", "yes", "on") 

152 

153 # Add filter to query 

154 query = query.filter(column == param_value) 

155 

156 except (ValueError, TypeError): 

157 # If conversion fails, treat as string comparison 

158 query = query.filter(column == param_value) 

159 

160 return query.limit(limit).all() 

161 

162 # noinspection PyTypeHints 

163 @router.get("/{entry_id}", response_model=out_schema) 

164 def get_one( 

165 entry_id: int, 

166 db: Session = Depends(database.get_db), 

167 current_user: models.User = Depends(oauth2.get_current_user), 

168 ): 

169 """Get an entry by ID. 

170 :param entry_id: The entry ID. 

171 :param db: The database session. 

172 :param current_user: The current user. 

173 :returns: The entry if found. 

174 :raises: HTTPException with a 404 status code if the entry is not found. 

175 :raises: HTTPException with a 403 status code if not authorised to perform the requested action.""" 

176 

177 # noinspection PyTypeChecker 

178 entry = db.query(table_model).filter(table_model.id == entry_id).first() 

179 

180 if not entry: 

181 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=not_found_msg) 

182 

183 if (entry.owner_id != current_user.id) or (admin_only and not current_user.is_admin): 

184 raise HTTPException( 

185 status_code=status.HTTP_403_FORBIDDEN, 

186 detail="Not authorised to perform requested action", 

187 ) 

188 

189 return entry 

190 

191 # noinspection PyTypeHints 

192 @router.post("/", status_code=status.HTTP_201_CREATED, response_model=out_schema) 

193 def create( 

194 item: create_schema, 

195 db: Session = Depends(database.get_db), 

196 current_user: models.User = Depends(oauth2.get_current_user), 

197 ): 

198 """Create a new entry. 

199 :param item: Data for the new entry. 

200 :param db: Database session. 

201 :param current_user: Authenticated user. 

202 :return: The created entry.""" 

203 

204 if admin_only and not current_user.is_admin: 

205 raise HTTPException( 

206 status_code=status.HTTP_403_FORBIDDEN, detail="Not authorised to perform requested action" 

207 ) 

208 

209 # Extract the item data and exclude many-to-many fields from main creation 

210 item_dict = item.model_dump() 

211 

212 # Remove many-to-many fields from main creation data 

213 main_data = item_dict.copy() 

214 m2m_data = {} 

215 

216 if many_to_many_fields: 

217 for field_name in many_to_many_fields.keys(): 

218 if field_name in main_data: 

219 m2m_data[field_name] = main_data.pop(field_name) 

220 

221 # Create the main entry 

222 new_entry = table_model(**main_data, owner_id=current_user.id) 

223 db.add(new_entry) 

224 db.commit() 

225 db.refresh(new_entry) 

226 

227 # Handle many-to-many relationships 

228 if m2m_data: 

229 handle_many_to_many_create(db, new_entry.id, m2m_data) 

230 db.commit() 

231 db.refresh(new_entry) 

232 

233 return new_entry 

234 

235 # noinspection PyTypeHints 

236 @router.put("/{entry_id}", response_model=out_schema) 

237 def update( 

238 entry_id: int, 

239 item: update_schema, 

240 db: Session = Depends(database.get_db), 

241 current_user: models.User = Depends(oauth2.get_current_user), 

242 ): 

243 """Update an entry by ID. 

244 :param entry_id: The entry ID. 

245 :param item: The updated data. 

246 :param db: The database session. 

247 :param current_user: The current user. 

248 :returns: The updated entry. 

249 :raises: HTTPException with a 404 status code if an entry is not found. 

250 :raises: HTTPException with a 403 status code if not authorised to perform the requested action. 

251 :raises: HTTPException with a 400 status code if no field is provided for the update.""" 

252 

253 # noinspection PyTypeChecker 

254 query = db.query(table_model).filter(table_model.id == entry_id) 

255 entry = query.first() 

256 

257 if not entry: 

258 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=not_found_msg) 

259 

260 if (entry.owner_id != current_user.id) or (admin_only and not current_user.is_admin): 

261 raise HTTPException( 

262 status_code=status.HTTP_403_FORBIDDEN, detail="Not authorised to perform requested action" 

263 ) 

264 

265 # Extract the item data 

266 item_dict = item.model_dump(exclude_unset=True) 

267 

268 if not item_dict: 

269 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="No fields provided for update") 

270 

271 # Separate main fields from many-to-many fields 

272 main_data = item_dict.copy() 

273 m2m_data = {} 

274 

275 if many_to_many_fields: 

276 for field_name in many_to_many_fields.keys(): 

277 if field_name in main_data: 

278 m2m_data[field_name] = main_data.pop(field_name) 

279 

280 # Update main fields if any 

281 if main_data: 

282 query.update(main_data, synchronize_session=False) 

283 

284 # Handle many-to-many relationships 

285 if m2m_data: 

286 handle_many_to_many_update(db, entry_id, m2m_data) 

287 

288 db.commit() 

289 

290 # Return the updated entry 

291 return query.first() 

292 

293 @router.delete("/{entry_id}", status_code=status.HTTP_204_NO_CONTENT) 

294 def delete( 

295 entry_id: int, 

296 db: Session = Depends(database.get_db), 

297 current_user: models.User = Depends(oauth2.get_current_user), 

298 ): 

299 """Delete an entry by ID. 

300 :param entry_id: The entry ID. 

301 :param db: The database session. 

302 :param current_user: The current user. 

303 :returns: Dict with a deletion status message. 

304 :raises: HTTPException with a 404 status code if an entry is not found. 

305 :raises: HTTPException with a 403 status code if not authorised to perform the requested action.""" 

306 

307 # noinspection PyTypeChecker 

308 query = db.query(table_model).filter(table_model.id == entry_id) 

309 entry = query.first() 

310 

311 if not entry: 

312 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=not_found_msg) 

313 

314 if (entry.owner_id != current_user.id) or (admin_only and not current_user.is_admin): 

315 raise HTTPException( 

316 status_code=status.HTTP_403_FORBIDDEN, detail="Not authorised to perform requested action" 

317 ) 

318 

319 # Delete many-to-many relationships first if they exist 

320 if many_to_many_fields: 

321 for field_name, m2m_config in many_to_many_fields.items(): 

322 association_table = m2m_config["table"] 

323 local_key = m2m_config["local_key"] 

324 

325 db.execute(association_table.delete().where(getattr(association_table.c, local_key) == entry_id)) 

326 

327 query.delete(synchronize_session=False) 

328 db.commit() 

329 

330 return query 

331 

332 return router