Coverage for backend/app/routers/__init__.py: 80%
128 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-22 17:03 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-22 17:03 +0000
1"""CRUD router generator for data table operations.
3Provides a factory function to generate FastAPI routers with standard CRUD endpoints,
4including user ownership validation, query filtering, and many-to-many relationship handling."""
6from fastapi import APIRouter, Depends, HTTPException
7from sqlalchemy.orm import Session
8from starlette import status
9from starlette.requests import Request
11from app import database, models, oauth2
14def generate_data_table_crud_router(
15 *,
16 table_model, # SQLAlchemy model
17 create_schema, # Pydantic schema for creation
18 update_schema, # Pydantic schema for updates
19 out_schema, # Pydantic schema for output
20 endpoint: str, # e.g. "companies"
21 not_found_msg: str = "Entry not found",
22 many_to_many_fields: dict = None,
23 router: APIRouter | None = None,
24 admin_only: bool = False,
25) -> APIRouter:
26 """Generate a FastAPI router with standard CRUD endpoints for a given table.
27 :param table_model: SQLAlchemy model class representing the database table.
28 :param create_schema: Pydantic schema used for creating new entries.
29 :param update_schema: Pydantic schema used for updating existing entries.
30 :param out_schema: Pydantic schema used for serialising output.
31 :param endpoint: Endpoint name (used as route prefix and tag).
32 :param not_found_msg: Default message when an entry is not found.
33 :param many_to_many_fields: Dict defining M2M relationships.
34 Format: {
35 'field_name': {
36 'table': association_table,
37 'local_key': 'local_foreign_key',
38 'remote_key': 'remote_foreign_key'
39 }
40 }
41 :param router: Optional router to which the endpoints will be added.
42 :param admin_only: If True, restrict access to admin users only.
43 :return: Configured APIRouter instance with CRUD endpoints."""
45 if router is None:
46 router = APIRouter(prefix=f"/{endpoint}", tags=[endpoint])
48 def handle_many_to_many_create(
49 db: Session,
50 entry_id: int,
51 item_data: dict,
52 ):
53 """Handle the creation of many-to-many relationships.
54 :param db: Database session
55 :param entry_id: ID of the entry to which the relationships are being added
56 :param item_data: Data containing the relationships to be added"""
58 if not many_to_many_fields:
59 return
61 for field_name, m2m_config in many_to_many_fields.items():
63 if field_name in item_data and item_data[field_name] is not None:
64 values = item_data[field_name]
65 if isinstance(values, list):
66 association_table = m2m_config["table"]
67 local_key = m2m_config["local_key"]
68 remote_key = m2m_config["remote_key"]
70 # Insert the relationships
71 for value_id in values:
72 db.execute(association_table.insert().values(**{local_key: entry_id, remote_key: value_id}))
74 def handle_many_to_many_update(
75 db: Session,
76 entry_id: int,
77 item_data: dict,
78 ):
79 """Handle updating of many-to-many relationships.
80 :param db: Database session
81 :param entry_id: ID of the entry to which the relationships are being added
82 :param item_data: Data containing the relationships to be added"""
84 if not many_to_many_fields:
85 return
87 for field_name, m2m_config in many_to_many_fields.items():
88 if field_name in item_data:
89 association_table = m2m_config["table"]
90 local_key = m2m_config["local_key"]
91 remote_key = m2m_config["remote_key"]
93 # Delete existing relationships
94 db.execute(association_table.delete().where(getattr(association_table.c, local_key) == entry_id))
96 # Add new relationships if provided
97 values = item_data[field_name]
98 if values is not None and isinstance(values, list):
99 for value_id in values:
100 db.execute(association_table.insert().values(**{local_key: entry_id, remote_key: value_id}))
102 # noinspection PyTypeHints
103 @router.get("/", response_model=list[out_schema])
104 def get_all(
105 request: Request,
106 db: Session = Depends(database.get_db),
107 current_user: models.User = Depends(oauth2.get_current_user),
108 limit: int | None = None,
109 ):
110 """Retrieve all entries for the current user.
111 :param request: FastAPI request object to access query parameters
112 :param db: Database session.
113 :param current_user: Authenticated user.
114 :param limit: Maximum number of entries to return.
115 :return: List of entries."""
117 # Start with base query
118 if not admin_only:
119 # noinspection PyTypeChecker
120 query = db.query(table_model).filter(table_model.owner_id == current_user.id)
121 elif current_user.is_admin:
122 query = db.query(table_model)
123 else:
124 raise HTTPException(
125 status_code=status.HTTP_403_FORBIDDEN, detail="Not authorised to perform requested action"
126 )
128 # Get all query parameters except 'limit'
129 filter_params = dict(request.query_params)
130 filter_params.pop("limit", None) # Remove limit from filters
132 # Apply filters for each parameter that matches a table column
133 for param_name, param_value in filter_params.items():
134 if hasattr(table_model, param_name):
135 column = getattr(table_model, param_name)
137 # Handle null values - convert string "null" to actual None/NULL
138 if param_value.lower() == "null":
139 query = query.filter(column.is_(None))
140 continue
142 # Handle different data types
143 try:
144 # Try to convert to appropriate type based on column type
145 if hasattr(column.type, "python_type"):
146 if column.type.python_type == int:
147 param_value = int(param_value)
148 elif column.type.python_type == float:
149 param_value = float(param_value)
150 elif column.type.python_type == bool:
151 param_value = param_value.lower() in ("true", "1", "yes", "on")
153 # Add filter to query
154 query = query.filter(column == param_value)
156 except (ValueError, TypeError):
157 # If conversion fails, treat as string comparison
158 query = query.filter(column == param_value)
160 return query.limit(limit).all()
162 # noinspection PyTypeHints
163 @router.get("/{entry_id}", response_model=out_schema)
164 def get_one(
165 entry_id: int,
166 db: Session = Depends(database.get_db),
167 current_user: models.User = Depends(oauth2.get_current_user),
168 ):
169 """Get an entry by ID.
170 :param entry_id: The entry ID.
171 :param db: The database session.
172 :param current_user: The current user.
173 :returns: The entry if found.
174 :raises: HTTPException with a 404 status code if the entry is not found.
175 :raises: HTTPException with a 403 status code if not authorised to perform the requested action."""
177 # noinspection PyTypeChecker
178 entry = db.query(table_model).filter(table_model.id == entry_id).first()
180 if not entry:
181 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=not_found_msg)
183 if (entry.owner_id != current_user.id) or (admin_only and not current_user.is_admin):
184 raise HTTPException(
185 status_code=status.HTTP_403_FORBIDDEN,
186 detail="Not authorised to perform requested action",
187 )
189 return entry
191 # noinspection PyTypeHints
192 @router.post("/", status_code=status.HTTP_201_CREATED, response_model=out_schema)
193 def create(
194 item: create_schema,
195 db: Session = Depends(database.get_db),
196 current_user: models.User = Depends(oauth2.get_current_user),
197 ):
198 """Create a new entry.
199 :param item: Data for the new entry.
200 :param db: Database session.
201 :param current_user: Authenticated user.
202 :return: The created entry."""
204 if admin_only and not current_user.is_admin:
205 raise HTTPException(
206 status_code=status.HTTP_403_FORBIDDEN, detail="Not authorised to perform requested action"
207 )
209 # Extract the item data and exclude many-to-many fields from main creation
210 item_dict = item.model_dump()
212 # Remove many-to-many fields from main creation data
213 main_data = item_dict.copy()
214 m2m_data = {}
216 if many_to_many_fields:
217 for field_name in many_to_many_fields.keys():
218 if field_name in main_data:
219 m2m_data[field_name] = main_data.pop(field_name)
221 # Create the main entry
222 new_entry = table_model(**main_data, owner_id=current_user.id)
223 db.add(new_entry)
224 db.commit()
225 db.refresh(new_entry)
227 # Handle many-to-many relationships
228 if m2m_data:
229 handle_many_to_many_create(db, new_entry.id, m2m_data)
230 db.commit()
231 db.refresh(new_entry)
233 return new_entry
235 # noinspection PyTypeHints
236 @router.put("/{entry_id}", response_model=out_schema)
237 def update(
238 entry_id: int,
239 item: update_schema,
240 db: Session = Depends(database.get_db),
241 current_user: models.User = Depends(oauth2.get_current_user),
242 ):
243 """Update an entry by ID.
244 :param entry_id: The entry ID.
245 :param item: The updated data.
246 :param db: The database session.
247 :param current_user: The current user.
248 :returns: The updated entry.
249 :raises: HTTPException with a 404 status code if an entry is not found.
250 :raises: HTTPException with a 403 status code if not authorised to perform the requested action.
251 :raises: HTTPException with a 400 status code if no field is provided for the update."""
253 # noinspection PyTypeChecker
254 query = db.query(table_model).filter(table_model.id == entry_id)
255 entry = query.first()
257 if not entry:
258 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=not_found_msg)
260 if (entry.owner_id != current_user.id) or (admin_only and not current_user.is_admin):
261 raise HTTPException(
262 status_code=status.HTTP_403_FORBIDDEN, detail="Not authorised to perform requested action"
263 )
265 # Extract the item data
266 item_dict = item.model_dump(exclude_unset=True)
268 if not item_dict:
269 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="No fields provided for update")
271 # Separate main fields from many-to-many fields
272 main_data = item_dict.copy()
273 m2m_data = {}
275 if many_to_many_fields:
276 for field_name in many_to_many_fields.keys():
277 if field_name in main_data:
278 m2m_data[field_name] = main_data.pop(field_name)
280 # Update main fields if any
281 if main_data:
282 query.update(main_data, synchronize_session=False)
284 # Handle many-to-many relationships
285 if m2m_data:
286 handle_many_to_many_update(db, entry_id, m2m_data)
288 db.commit()
290 # Return the updated entry
291 return query.first()
293 @router.delete("/{entry_id}", status_code=status.HTTP_204_NO_CONTENT)
294 def delete(
295 entry_id: int,
296 db: Session = Depends(database.get_db),
297 current_user: models.User = Depends(oauth2.get_current_user),
298 ):
299 """Delete an entry by ID.
300 :param entry_id: The entry ID.
301 :param db: The database session.
302 :param current_user: The current user.
303 :returns: Dict with a deletion status message.
304 :raises: HTTPException with a 404 status code if an entry is not found.
305 :raises: HTTPException with a 403 status code if not authorised to perform the requested action."""
307 # noinspection PyTypeChecker
308 query = db.query(table_model).filter(table_model.id == entry_id)
309 entry = query.first()
311 if not entry:
312 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=not_found_msg)
314 if (entry.owner_id != current_user.id) or (admin_only and not current_user.is_admin):
315 raise HTTPException(
316 status_code=status.HTTP_403_FORBIDDEN, detail="Not authorised to perform requested action"
317 )
319 # Delete many-to-many relationships first if they exist
320 if many_to_many_fields:
321 for field_name, m2m_config in many_to_many_fields.items():
322 association_table = m2m_config["table"]
323 local_key = m2m_config["local_key"]
325 db.execute(association_table.delete().where(getattr(association_table.c, local_key) == entry_id))
327 query.delete(synchronize_session=False)
328 db.commit()
330 return query
332 return router