Coverage for backend / app / service_runner / routers.py: 36%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-17 21:34 +0000

1"""Routers for managing the service runner operations.""" 

2 

3import datetime as dt 

4import os 

5 

6from fastapi import HTTPException 

7from sqlalchemy.orm import Session 

8from starlette import status 

9 

10from app.config import settings 

11from app.models import User 

12from app.routers.utility import assert_admin 

13from app.service_runner.service_runner import ServiceRunner 

14 

15 

16def start_scraper( 

17 service_runner: ServiceRunner, 

18 current_user: User, 

19 period_hours: float | int, 

20 **kwargs, 

21) -> dict: 

22 """Start the service runner with the specified period. 

23 :param service_runner: ServiceRunner instance 

24 :param current_user: Current authenticated user 

25 :param period_hours: period between runs in hours 

26 :param kwargs: keyword arguments containing keyword arguments passed to the start_runner method of ServiceRunner""" 

27 

28 assert_admin(current_user) 

29 try: 

30 service_runner.start_runner(period_hours=period_hours, **kwargs) 

31 except Exception as e: 

32 raise HTTPException( 

33 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 

34 detail=f"Failed to start service runner: {str(e)}", 

35 ) 

36 return {"detail": f"Service runner started (period_hours={period_hours})"} 

37 

38 

39def stop_scraper( 

40 service_runner: ServiceRunner, 

41 current_user: User, 

42) -> dict: 

43 """Stop the service runner. 

44 :param service_runner: ServiceRunner instance 

45 :param current_user: Current authenticated user""" 

46 

47 assert_admin(current_user) 

48 try: 

49 service_runner.stop_runner() 

50 except Exception as e: 

51 raise HTTPException( 

52 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 

53 detail=f"Failed to stop service runner: {str(e)}", 

54 ) 

55 return {"detail": "Service runner stopped"} 

56 

57 

58def scraper_status( 

59 service_runner: ServiceRunner, 

60 current_user: User, 

61) -> dict: 

62 """Get the current status service runner. 

63 :param service_runner: ServiceRunner instance 

64 :param current_user: Current authenticated user""" 

65 

66 assert_admin(current_user) 

67 return service_runner.status() 

68 

69 

70def get_service_logs( 

71 logger_name: str, 

72 lines: int, 

73 current_user: User, 

74) -> dict: 

75 """Get the last N lines from the service log file efficiently. 

76 Uses reverse reading for large files to avoid loading the entire file into memory. 

77 :param lines: Number of lines to retrieve 

78 :param logger_name: Name of the logger / log file 

79 :param current_user: Current authenticated user""" 

80 

81 assert_admin(current_user) 

82 

83 log_file_path = os.path.join(settings.log_directory, logger_name + ".log") 

84 

85 if not os.path.exists(log_file_path): 

86 return {"lines": [], "total_lines": 0} 

87 

88 try: 

89 file_size = os.path.getsize(log_file_path) 

90 

91 # For small files, just read the whole thing 

92 if file_size < 1024 * 1024: # 1 MB threshold 

93 with open(log_file_path, "r", encoding="utf-8") as f: 

94 all_lines = f.readlines() 

95 total_lines = len(all_lines) 

96 log_lines = all_lines[-lines:] if lines < total_lines else all_lines 

97 return {"lines": [line.rstrip() for line in log_lines], "total_lines": total_lines} 

98 

99 # For large files, read from the end in chunks 

100 chunk_size = 8192 

101 collected_lines = [] 

102 total_lines = 0 

103 

104 with open(log_file_path, "rb") as f: 

105 # Count total lines efficiently (optional, can skip if not needed) 

106 for _ in f: 

107 total_lines += 1 

108 

109 # Now read from the end to get last N lines 

110 f.seek(0, 2) # Seek to end 

111 position = f.tell() 

112 buffer = b"" 

113 

114 while position > 0 and len(collected_lines) < lines: 

115 read_size = min(chunk_size, position) 

116 position -= read_size 

117 f.seek(position) 

118 chunk = f.read(read_size) 

119 buffer = chunk + buffer 

120 

121 # Split into lines and keep collecting 

122 buffer_lines = buffer.split(b"\n") 

123 

124 # If we haven't reached the start, the first element is incomplete 

125 if position > 0: 

126 buffer = buffer_lines[0] 

127 new_lines = buffer_lines[1:] 

128 else: 

129 new_lines = buffer_lines 

130 buffer = b"" 

131 

132 # Prepend new lines (they're in reverse order relative to file) 

133 collected_lines = new_lines + collected_lines 

134 

135 # Take only the last N lines 

136 result_lines = collected_lines[-lines:] if len(collected_lines) > lines else collected_lines 

137 

138 # Decode and strip 

139 decoded_lines = [] 

140 for line in result_lines: 

141 try: 

142 decoded_lines.append(line.decode("utf-8").rstrip()) 

143 except UnicodeDecodeError: 

144 decoded_lines.append(line.decode("utf-8", errors="replace").rstrip()) 

145 

146 # Filter out empty lines that result from splitting 

147 decoded_lines = [line for line in decoded_lines if line] 

148 

149 return {"lines": decoded_lines, "total_lines": total_lines} 

150 

151 except Exception as e: 

152 return {"lines": [f"Error reading log file: {str(e)}"], "total_lines": 0} 

153 

154 

155def get_service_logs_by_date_range( 

156 start_date: dt.datetime | None, 

157 end_date: dt.datetime | None, 

158 delta_days: int | None, 

159 limit: int | None, 

160 current_user: User, 

161 db: Session, 

162 table, 

163): 

164 """Get service logs within a specified date range. Admin access required. 

165 :param start_date: Optional start date filter (inclusive) 

166 :param end_date: Optional end date filter (inclusive) 

167 :param limit: Optional limit for number of logs to return 

168 :param delta_days: Optional number of days to go back in time 

169 :param current_user: Current authenticated admin user 

170 :param db: Database session 

171 :param table: Database table model 

172 :return: list of service logs within the date range ordered by run_datetime descending""" 

173 

174 assert_admin(current_user) 

175 

176 query = db.query(table).filter(table.run_duration.is_not(None)) 

177 

178 # Apply date filters 

179 if start_date: 

180 query = query.filter(table.run_datetime >= start_date) 

181 if end_date: 

182 query = query.filter(table.run_datetime <= end_date) 

183 if delta_days: 

184 start_date = dt.datetime.now() - dt.timedelta(days=delta_days) 

185 query = query.filter(table.run_datetime >= start_date) 

186 

187 # Order by run_datetime descending (most recent first) 

188 query = query.order_by(table.run_datetime.desc()) 

189 

190 # Apply limit if specified 

191 if limit: 

192 query = query.limit(limit) 

193 

194 return query.all() 

195 

196 

197def get_latest( 

198 current_user: User, 

199 db: Session, 

200 table, 

201): 

202 """Get the latest service log entry. Admin access required. 

203 :param current_user: Current authenticated admin user 

204 :param db: Database session 

205 :param table: Database table model 

206 :return: Latest service log entry""" 

207 

208 assert_admin(current_user) 

209 

210 latest_log = db.query(table).order_by(table.run_datetime.desc()).first() 

211 if not latest_log: 

212 raise HTTPException( 

213 status_code=status.HTTP_404_NOT_FOUND, 

214 detail="No service logs found", 

215 ) 

216 return latest_log