Coverage for backend / app / utils.py: 86%

99 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-17 21:34 +0000

1"""Module containing utility functions.""" 

2 

3import hashlib 

4import json 

5import logging 

6import os 

7from logging.handlers import RotatingFileHandler 

8from pathlib import Path 

9from typing import Optional 

10 

11import bcrypt 

12from pydantic import EmailStr 

13 

14from app.config import settings 

15 

16 

17def hash_password(password: str, rounds: int = 12) -> str: 

18 """Hash a password for storing. 

19 :param password: password to hash 

20 :param rounds: number of bcrypt rounds (default: 12) 

21 :return: hashed password""" 

22 

23 return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt(rounds=rounds)).decode("utf-8") 

24 

25 

26def verify_password(password: str, hashed: str) -> bool: 

27 """Verify a stored password against one provided by the user. 

28 :param password: raw password to check 

29 :param hashed: hashed password from the database 

30 :return: boolean indicating whether the passwords matched""" 

31 

32 return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("utf-8")) 

33 

34 

35def hash_token(token: str) -> str: 

36 """Hash a token for secure storage. 

37 :param token: token to hash 

38 :return: hashed token""" 

39 

40 return hashlib.sha256(token.encode()).hexdigest() 

41 

42 

43def clean_email(email: EmailStr | str) -> str: 

44 """Normalise the email address by stripping whitespace and converting to lowercase. 

45 :param email: The email address to be cleaned 

46 :return: Cleaned email address""" 

47 

48 return str(email).strip().lower() 

49 

50 

51def open_json(filepath: str) -> list[dict]: 

52 """Open a file and return its content 

53 :param filepath: The json file to open 

54 :return: The contents of the file""" 

55 

56 BASE_DIR = os.path.dirname(__file__) 

57 path = os.path.join(BASE_DIR, "..", filepath) 

58 with open(path, "r", encoding="utf8") as ofile: 

59 return json.load(ofile) 

60 

61 

62def super_getattr(obj: object, attr: str) -> object: 

63 """Get nested attributes from an object using dot notation. 

64 :param obj: The object to get attributes from 

65 :param attr: The attribute path in dot notation""" 

66 

67 attrs = attr.split(".") 

68 for a in attrs: 

69 obj = getattr(obj, a) 

70 return obj 

71 

72 

73def super_hasattr(obj: object, attr: str) -> bool: 

74 """Check if nested attributes exist in an object using dot notation. 

75 :param obj: The object to check attributes from 

76 :param attr: The attribute path in dot notation""" 

77 

78 attrs = attr.split(".") 

79 for a in attrs: 

80 if not hasattr(obj, a): 

81 return False 

82 obj = getattr(obj, a) 

83 return True 

84 

85 

86class AppLogger: 

87 """Centralised logging utility""" 

88 

89 _loggers = {} # Cache for created loggers 

90 

91 @classmethod 

92 def get_logger( 

93 cls, 

94 name: str, 

95 log_file: Optional[str] = None, 

96 level: int = logging.INFO, 

97 max_file_size: int = 10 * 1024 * 1024, # 10MB 

98 backup_count: int = 5, 

99 console_output: bool = True, 

100 ) -> logging.Logger: 

101 """Get or create a logger with the specified configuration 

102 :param name: Logger name (usually module name) 

103 :param log_file: Specific log file name (defaults to {name}.log) 

104 :param level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) 

105 :param max_file_size: Maximum size of log file before rotation 

106 :param backup_count: Number of backup files to keep 

107 :param console_output: Whether to output logs to console 

108 :return: Configured logger instance""" 

109 

110 # Return cached logger if it exists 

111 log_dir = settings.log_directory 

112 cache_key = f"{name}_{log_dir}_{log_file}" 

113 if cache_key in cls._loggers: 

114 return cls._loggers[cache_key] 

115 

116 # Create new logger 

117 logger = logging.getLogger(name) 

118 logger.setLevel(level) 

119 

120 # Prevent duplicate handlers if logger already exists 

121 if logger.handlers: 

122 cls._loggers[cache_key] = logger 

123 return logger 

124 

125 # Create log directory 

126 log_path = Path(log_dir) 

127 log_path.mkdir(exist_ok=True) 

128 

129 # Set log file name 

130 if not log_file: 

131 log_file = f"{name}.log" 

132 

133 full_log_path = log_path / log_file 

134 

135 # Create formatters 

136 detailed_formatter = logging.Formatter( 

137 "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s", 

138 datefmt="%Y-%m-%d %H:%M:%S", 

139 ) 

140 

141 simple_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S") 

142 

143 # File handler with rotation 

144 file_handler = RotatingFileHandler( 

145 full_log_path, maxBytes=max_file_size, backupCount=backup_count, encoding="utf-8" 

146 ) 

147 file_handler.setLevel(logging.DEBUG) 

148 file_handler.setFormatter(detailed_formatter) 

149 logger.addHandler(file_handler) 

150 

151 # Console handler 

152 if console_output: 

153 console_handler = logging.StreamHandler() 

154 console_handler.setLevel(level) 

155 console_handler.setFormatter(simple_formatter) 

156 logger.addHandler(console_handler) 

157 

158 # Cache the logger 

159 cls._loggers[cache_key] = logger 

160 

161 return logger 

162 

163 @classmethod 

164 def create_service_logger(cls, service_name: str, log_level: str = "INFO") -> logging.Logger: 

165 """Create a standardised logger for a service 

166 :param service_name: Name of the service (e.g., 'gmail_scraper', 'job_scraper') 

167 :param log_level: String representation of log level 

168 :return: Configured logger""" 

169 

170 level_map = { 

171 "DEBUG": logging.DEBUG, 

172 "INFO": logging.INFO, 

173 "WARNING": logging.WARNING, 

174 "ERROR": logging.ERROR, 

175 "CRITICAL": logging.CRITICAL, 

176 } 

177 

178 level = level_map.get(log_level.upper(), logging.INFO) 

179 

180 return cls.get_logger( 

181 name=service_name, 

182 log_file=f"{service_name}.log", 

183 level=level, 

184 max_file_size=10 * 1024 * 1024, # 10MB 

185 backup_count=5, 

186 console_output=True, 

187 ) 

188 

189 

190def get_last_log_line(logger_name: str) -> str | None: 

191 """Get the last line from the service log file efficiently. 

192 Reads from the end of the file to avoid loading the entire file. 

193 :param logger_name: Name of the logger / log file""" 

194 

195 log_file_path = os.path.join(settings.log_directory, logger_name + ".log") 

196 

197 if not os.path.exists(log_file_path): 

198 return None 

199 

200 try: 

201 with open(log_file_path, "rb") as f: 

202 # Seek to end 

203 f.seek(0, 2) 

204 position = f.tell() 

205 

206 if position == 0: 

207 return None 

208 

209 # Read backwards to find the last non-empty line 

210 chunk_size = 1024 

211 buffer = b"" 

212 

213 while position > 0: 

214 read_size = min(chunk_size, position) 

215 position -= read_size 

216 f.seek(position) 

217 buffer = f.read(read_size) + buffer 

218 

219 # Split and look for a complete line 

220 lines = buffer.split(b"\n") 

221 

222 # Find the last non-empty line 

223 for line in reversed(lines): 

224 stripped = line.strip() 

225 if stripped: 

226 try: 

227 return stripped.decode("utf-8") 

228 except UnicodeDecodeError: 

229 return stripped.decode("utf-8", errors="replace") 

230 

231 return None 

232 

233 except Exception as e: 

234 return f"Error reading log file: {str(e)}"