Coverage for backend / app / service_runner / service_runner.py: 31%

78 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-17 21:34 +0000

1"""Module to run a generic service periodically in a separate thread.""" 

2 

3import threading 

4import time 

5from typing import Callable 

6 

7from app.utils import AppLogger 

8from app.utils import get_last_log_line 

9 

10 

11class ServiceRunner: 

12 """Class to run a generic service periodically in a separate thread""" 

13 

14 def __init__( 

15 self, 

16 service_name: str, 

17 service_function: Callable, 

18 service_kwargs: dict | None = None, 

19 period_hours: int | float = 3, 

20 ) -> None: 

21 """Initialise the service runner instance. 

22 :param service_name: name of the service 

23 :param service_kwargs: keyword arguments for service function 

24 :param service_function: function to run periodically 

25 :param period_hours: max hours between runs""" 

26 

27 self.service_name = service_name 

28 self.service_kwargs = {} if not service_kwargs else service_kwargs 

29 self.service_function = service_function 

30 self.period_hours = period_hours 

31 self.service_runner_name = self.service_name + "_runner" 

32 self.service_runner_thread = None 

33 self.stop_event = threading.Event() 

34 self.service_runner_thread_status = "stopped" 

35 self.service_running = False 

36 self.sleep_until = None 

37 self.sleep_start = None 

38 self.logger = AppLogger.create_service_logger(self.service_runner_name, "INFO") 

39 self.logger.info(self.service_runner_name + " initialised") 

40 

41 def start_runner(self, period_hours: float = 3.0, **kwargs) -> None: 

42 """Start the service runner 

43 :param period_hours: Maximum hours between each service run 

44 :param kwargs: keyword arguments passed to the service_function""" 

45 

46 if self.service_runner_thread_status in ("started", "starting", "stopping"): 

47 self.logger.warning(f"Cannot start service runner - current status: {self.service_runner_thread_status}") 

48 return 

49 

50 self.logger.info(f"Starting service runner (period: {period_hours}h)") 

51 self.service_runner_thread_status = "starting" 

52 

53 # Store parameters 

54 self.period_hours = period_hours 

55 for kwarg in kwargs: 

56 self.service_kwargs[kwarg] = kwargs[kwarg] 

57 

58 # Clear the stop event 

59 self.stop_event.clear() 

60 

61 # Start the service in a separate thread 

62 self.service_runner_thread = threading.Thread( 

63 target=self._run_service, 

64 args=(period_hours,), 

65 ) 

66 self.service_runner_thread.daemon = True 

67 self.service_runner_thread.start() 

68 

69 def stop_runner(self) -> None: 

70 """Stop the scraping service""" 

71 

72 if self.service_runner_thread_status in ("stopped", "starting", "stopping"): 

73 self.logger.warning(f"Cannot stop service - current status: {self.service_runner_thread_status}") 

74 return 

75 

76 self.logger.info("Stopping service") 

77 self.service_runner_thread_status = "stopping" 

78 self.stop_event.set() 

79 

80 def _run_service(self, period_hours: float) -> None: 

81 """Internal method that runs the service 

82 :param period_hours: Hours between each scraping run""" 

83 

84 try: 

85 self.service_runner_thread_status = "started" 

86 self.logger.info("Service runner thread started successfully") 

87 

88 while not self.stop_event.is_set(): 

89 try: 

90 # Run the scraping 

91 self.logger.info(f"Starting service ({self.service_kwargs})") 

92 self.service_running = True 

93 result = self.service_function(**self.service_kwargs) 

94 self.service_running = False 

95 

96 self.logger.info(f"Service completed - duration: {result.run_duration:.2f}s") 

97 

98 duration = result.run_duration 

99 sleep_time = max([0, period_hours * 3600 - duration]) 

100 

101 # Track sleep timing 

102 self.sleep_start = time.time() 

103 self.sleep_until = self.sleep_start + sleep_time 

104 

105 self.logger.info(f"Sleeping for {sleep_time:.2f}s until next run") 

106 

107 if self.stop_event.wait(timeout=sleep_time): 

108 self.logger.info("Stop event received during sleep") 

109 break 

110 

111 # Clear sleep tracking after waking 

112 self.sleep_start = None 

113 self.sleep_until = None 

114 

115 except Exception as e: 

116 self.logger.exception(f"Error during service runner: {e}") 

117 self.service_running = False 

118 

119 self.sleep_start = time.time() 

120 self.sleep_until = self.sleep_start + 300 

121 

122 self.logger.info("Waiting 5 minutes before retry after error") 

123 

124 if self.stop_event.wait(timeout=300): # 5 minutes 

125 self.logger.info("Stop event received during error recovery") 

126 break 

127 

128 self.sleep_start = None 

129 self.sleep_until = None 

130 finally: 

131 self.logger.info("Service runner ended") 

132 self.service_runner_thread_status = "stopped" 

133 self.sleep_start = None 

134 self.sleep_until = None 

135 

136 def status(self) -> dict: 

137 """Get the current status of the service""" 

138 

139 return { 

140 "service_runner_status": self.service_runner_thread_status, 

141 "service_running": self.service_running, 

142 "service_kwargs": self.service_kwargs, 

143 "period_hours": self.period_hours, 

144 "sleep_until": self.sleep_until, 

145 "last_log": get_last_log_line(self.service_name), 

146 }