Coverage for backend / app / service_runner / routers.py: 36%
92 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
1"""Routers for managing the service runner operations."""
3import datetime as dt
4import os
6from fastapi import HTTPException
7from sqlalchemy.orm import Session
8from starlette import status
10from app.config import settings
11from app.models import User
12from app.routers.utility import assert_admin
13from app.service_runner.service_runner import ServiceRunner
16def start_scraper(
17 service_runner: ServiceRunner,
18 current_user: User,
19 period_hours: float | int,
20 **kwargs,
21) -> dict:
22 """Start the service runner with the specified period.
23 :param service_runner: ServiceRunner instance
24 :param current_user: Current authenticated user
25 :param period_hours: period between runs in hours
26 :param kwargs: keyword arguments containing keyword arguments passed to the start_runner method of ServiceRunner"""
28 assert_admin(current_user)
29 try:
30 service_runner.start_runner(period_hours=period_hours, **kwargs)
31 except Exception as e:
32 raise HTTPException(
33 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
34 detail=f"Failed to start service runner: {str(e)}",
35 )
36 return {"detail": f"Service runner started (period_hours={period_hours})"}
39def stop_scraper(
40 service_runner: ServiceRunner,
41 current_user: User,
42) -> dict:
43 """Stop the service runner.
44 :param service_runner: ServiceRunner instance
45 :param current_user: Current authenticated user"""
47 assert_admin(current_user)
48 try:
49 service_runner.stop_runner()
50 except Exception as e:
51 raise HTTPException(
52 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
53 detail=f"Failed to stop service runner: {str(e)}",
54 )
55 return {"detail": "Service runner stopped"}
58def scraper_status(
59 service_runner: ServiceRunner,
60 current_user: User,
61) -> dict:
62 """Get the current status service runner.
63 :param service_runner: ServiceRunner instance
64 :param current_user: Current authenticated user"""
66 assert_admin(current_user)
67 return service_runner.status()
70def get_service_logs(
71 logger_name: str,
72 lines: int,
73 current_user: User,
74) -> dict:
75 """Get the last N lines from the service log file efficiently.
76 Uses reverse reading for large files to avoid loading the entire file into memory.
77 :param lines: Number of lines to retrieve
78 :param logger_name: Name of the logger / log file
79 :param current_user: Current authenticated user"""
81 assert_admin(current_user)
83 log_file_path = os.path.join(settings.log_directory, logger_name + ".log")
85 if not os.path.exists(log_file_path):
86 return {"lines": [], "total_lines": 0}
88 try:
89 file_size = os.path.getsize(log_file_path)
91 # For small files, just read the whole thing
92 if file_size < 1024 * 1024: # 1 MB threshold
93 with open(log_file_path, "r", encoding="utf-8") as f:
94 all_lines = f.readlines()
95 total_lines = len(all_lines)
96 log_lines = all_lines[-lines:] if lines < total_lines else all_lines
97 return {"lines": [line.rstrip() for line in log_lines], "total_lines": total_lines}
99 # For large files, read from the end in chunks
100 chunk_size = 8192
101 collected_lines = []
102 total_lines = 0
104 with open(log_file_path, "rb") as f:
105 # Count total lines efficiently (optional, can skip if not needed)
106 for _ in f:
107 total_lines += 1
109 # Now read from the end to get last N lines
110 f.seek(0, 2) # Seek to end
111 position = f.tell()
112 buffer = b""
114 while position > 0 and len(collected_lines) < lines:
115 read_size = min(chunk_size, position)
116 position -= read_size
117 f.seek(position)
118 chunk = f.read(read_size)
119 buffer = chunk + buffer
121 # Split into lines and keep collecting
122 buffer_lines = buffer.split(b"\n")
124 # If we haven't reached the start, the first element is incomplete
125 if position > 0:
126 buffer = buffer_lines[0]
127 new_lines = buffer_lines[1:]
128 else:
129 new_lines = buffer_lines
130 buffer = b""
132 # Prepend new lines (they're in reverse order relative to file)
133 collected_lines = new_lines + collected_lines
135 # Take only the last N lines
136 result_lines = collected_lines[-lines:] if len(collected_lines) > lines else collected_lines
138 # Decode and strip
139 decoded_lines = []
140 for line in result_lines:
141 try:
142 decoded_lines.append(line.decode("utf-8").rstrip())
143 except UnicodeDecodeError:
144 decoded_lines.append(line.decode("utf-8", errors="replace").rstrip())
146 # Filter out empty lines that result from splitting
147 decoded_lines = [line for line in decoded_lines if line]
149 return {"lines": decoded_lines, "total_lines": total_lines}
151 except Exception as e:
152 return {"lines": [f"Error reading log file: {str(e)}"], "total_lines": 0}
155def get_service_logs_by_date_range(
156 start_date: dt.datetime | None,
157 end_date: dt.datetime | None,
158 delta_days: int | None,
159 limit: int | None,
160 current_user: User,
161 db: Session,
162 table,
163):
164 """Get service logs within a specified date range. Admin access required.
165 :param start_date: Optional start date filter (inclusive)
166 :param end_date: Optional end date filter (inclusive)
167 :param limit: Optional limit for number of logs to return
168 :param delta_days: Optional number of days to go back in time
169 :param current_user: Current authenticated admin user
170 :param db: Database session
171 :param table: Database table model
172 :return: list of service logs within the date range ordered by run_datetime descending"""
174 assert_admin(current_user)
176 query = db.query(table).filter(table.run_duration.is_not(None))
178 # Apply date filters
179 if start_date:
180 query = query.filter(table.run_datetime >= start_date)
181 if end_date:
182 query = query.filter(table.run_datetime <= end_date)
183 if delta_days:
184 start_date = dt.datetime.now() - dt.timedelta(days=delta_days)
185 query = query.filter(table.run_datetime >= start_date)
187 # Order by run_datetime descending (most recent first)
188 query = query.order_by(table.run_datetime.desc())
190 # Apply limit if specified
191 if limit:
192 query = query.limit(limit)
194 return query.all()
197def get_latest(
198 current_user: User,
199 db: Session,
200 table,
201):
202 """Get the latest service log entry. Admin access required.
203 :param current_user: Current authenticated admin user
204 :param db: Database session
205 :param table: Database table model
206 :return: Latest service log entry"""
208 assert_admin(current_user)
210 latest_log = db.query(table).order_by(table.run_datetime.desc()).first()
211 if not latest_log:
212 raise HTTPException(
213 status_code=status.HTTP_404_NOT_FOUND,
214 detail="No service logs found",
215 )
216 return latest_log