Coverage for backend / app / service_runner / service_runner.py: 31%
78 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
1"""Module to run a generic service periodically in a separate thread."""
3import threading
4import time
5from typing import Callable
7from app.utils import AppLogger
8from app.utils import get_last_log_line
11class ServiceRunner:
12 """Class to run a generic service periodically in a separate thread"""
14 def __init__(
15 self,
16 service_name: str,
17 service_function: Callable,
18 service_kwargs: dict | None = None,
19 period_hours: int | float = 3,
20 ) -> None:
21 """Initialise the service runner instance.
22 :param service_name: name of the service
23 :param service_kwargs: keyword arguments for service function
24 :param service_function: function to run periodically
25 :param period_hours: max hours between runs"""
27 self.service_name = service_name
28 self.service_kwargs = {} if not service_kwargs else service_kwargs
29 self.service_function = service_function
30 self.period_hours = period_hours
31 self.service_runner_name = self.service_name + "_runner"
32 self.service_runner_thread = None
33 self.stop_event = threading.Event()
34 self.service_runner_thread_status = "stopped"
35 self.service_running = False
36 self.sleep_until = None
37 self.sleep_start = None
38 self.logger = AppLogger.create_service_logger(self.service_runner_name, "INFO")
39 self.logger.info(self.service_runner_name + " initialised")
41 def start_runner(self, period_hours: float = 3.0, **kwargs) -> None:
42 """Start the service runner
43 :param period_hours: Maximum hours between each service run
44 :param kwargs: keyword arguments passed to the service_function"""
46 if self.service_runner_thread_status in ("started", "starting", "stopping"):
47 self.logger.warning(f"Cannot start service runner - current status: {self.service_runner_thread_status}")
48 return
50 self.logger.info(f"Starting service runner (period: {period_hours}h)")
51 self.service_runner_thread_status = "starting"
53 # Store parameters
54 self.period_hours = period_hours
55 for kwarg in kwargs:
56 self.service_kwargs[kwarg] = kwargs[kwarg]
58 # Clear the stop event
59 self.stop_event.clear()
61 # Start the service in a separate thread
62 self.service_runner_thread = threading.Thread(
63 target=self._run_service,
64 args=(period_hours,),
65 )
66 self.service_runner_thread.daemon = True
67 self.service_runner_thread.start()
69 def stop_runner(self) -> None:
70 """Stop the scraping service"""
72 if self.service_runner_thread_status in ("stopped", "starting", "stopping"):
73 self.logger.warning(f"Cannot stop service - current status: {self.service_runner_thread_status}")
74 return
76 self.logger.info("Stopping service")
77 self.service_runner_thread_status = "stopping"
78 self.stop_event.set()
80 def _run_service(self, period_hours: float) -> None:
81 """Internal method that runs the service
82 :param period_hours: Hours between each scraping run"""
84 try:
85 self.service_runner_thread_status = "started"
86 self.logger.info("Service runner thread started successfully")
88 while not self.stop_event.is_set():
89 try:
90 # Run the scraping
91 self.logger.info(f"Starting service ({self.service_kwargs})")
92 self.service_running = True
93 result = self.service_function(**self.service_kwargs)
94 self.service_running = False
96 self.logger.info(f"Service completed - duration: {result.run_duration:.2f}s")
98 duration = result.run_duration
99 sleep_time = max([0, period_hours * 3600 - duration])
101 # Track sleep timing
102 self.sleep_start = time.time()
103 self.sleep_until = self.sleep_start + sleep_time
105 self.logger.info(f"Sleeping for {sleep_time:.2f}s until next run")
107 if self.stop_event.wait(timeout=sleep_time):
108 self.logger.info("Stop event received during sleep")
109 break
111 # Clear sleep tracking after waking
112 self.sleep_start = None
113 self.sleep_until = None
115 except Exception as e:
116 self.logger.exception(f"Error during service runner: {e}")
117 self.service_running = False
119 self.sleep_start = time.time()
120 self.sleep_until = self.sleep_start + 300
122 self.logger.info("Waiting 5 minutes before retry after error")
124 if self.stop_event.wait(timeout=300): # 5 minutes
125 self.logger.info("Stop event received during error recovery")
126 break
128 self.sleep_start = None
129 self.sleep_until = None
130 finally:
131 self.logger.info("Service runner ended")
132 self.service_runner_thread_status = "stopped"
133 self.sleep_start = None
134 self.sleep_until = None
136 def status(self) -> dict:
137 """Get the current status of the service"""
139 return {
140 "service_runner_status": self.service_runner_thread_status,
141 "service_running": self.service_running,
142 "service_kwargs": self.service_kwargs,
143 "period_hours": self.period_hours,
144 "sleep_until": self.sleep_until,
145 "last_log": get_last_log_line(self.service_name),
146 }