Coverage for backend / app / utils.py: 86%
99 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
1"""Module containing utility functions."""
3import hashlib
4import json
5import logging
6import os
7from logging.handlers import RotatingFileHandler
8from pathlib import Path
9from typing import Optional
11import bcrypt
12from pydantic import EmailStr
14from app.config import settings
17def hash_password(password: str, rounds: int = 12) -> str:
18 """Hash a password for storing.
19 :param password: password to hash
20 :param rounds: number of bcrypt rounds (default: 12)
21 :return: hashed password"""
23 return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt(rounds=rounds)).decode("utf-8")
26def verify_password(password: str, hashed: str) -> bool:
27 """Verify a stored password against one provided by the user.
28 :param password: raw password to check
29 :param hashed: hashed password from the database
30 :return: boolean indicating whether the passwords matched"""
32 return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("utf-8"))
35def hash_token(token: str) -> str:
36 """Hash a token for secure storage.
37 :param token: token to hash
38 :return: hashed token"""
40 return hashlib.sha256(token.encode()).hexdigest()
43def clean_email(email: EmailStr | str) -> str:
44 """Normalise the email address by stripping whitespace and converting to lowercase.
45 :param email: The email address to be cleaned
46 :return: Cleaned email address"""
48 return str(email).strip().lower()
51def open_json(filepath: str) -> list[dict]:
52 """Open a file and return its content
53 :param filepath: The json file to open
54 :return: The contents of the file"""
56 BASE_DIR = os.path.dirname(__file__)
57 path = os.path.join(BASE_DIR, "..", filepath)
58 with open(path, "r", encoding="utf8") as ofile:
59 return json.load(ofile)
62def super_getattr(obj: object, attr: str) -> object:
63 """Get nested attributes from an object using dot notation.
64 :param obj: The object to get attributes from
65 :param attr: The attribute path in dot notation"""
67 attrs = attr.split(".")
68 for a in attrs:
69 obj = getattr(obj, a)
70 return obj
73def super_hasattr(obj: object, attr: str) -> bool:
74 """Check if nested attributes exist in an object using dot notation.
75 :param obj: The object to check attributes from
76 :param attr: The attribute path in dot notation"""
78 attrs = attr.split(".")
79 for a in attrs:
80 if not hasattr(obj, a):
81 return False
82 obj = getattr(obj, a)
83 return True
86class AppLogger:
87 """Centralised logging utility"""
89 _loggers = {} # Cache for created loggers
91 @classmethod
92 def get_logger(
93 cls,
94 name: str,
95 log_file: Optional[str] = None,
96 level: int = logging.INFO,
97 max_file_size: int = 10 * 1024 * 1024, # 10MB
98 backup_count: int = 5,
99 console_output: bool = True,
100 ) -> logging.Logger:
101 """Get or create a logger with the specified configuration
102 :param name: Logger name (usually module name)
103 :param log_file: Specific log file name (defaults to {name}.log)
104 :param level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
105 :param max_file_size: Maximum size of log file before rotation
106 :param backup_count: Number of backup files to keep
107 :param console_output: Whether to output logs to console
108 :return: Configured logger instance"""
110 # Return cached logger if it exists
111 log_dir = settings.log_directory
112 cache_key = f"{name}_{log_dir}_{log_file}"
113 if cache_key in cls._loggers:
114 return cls._loggers[cache_key]
116 # Create new logger
117 logger = logging.getLogger(name)
118 logger.setLevel(level)
120 # Prevent duplicate handlers if logger already exists
121 if logger.handlers:
122 cls._loggers[cache_key] = logger
123 return logger
125 # Create log directory
126 log_path = Path(log_dir)
127 log_path.mkdir(exist_ok=True)
129 # Set log file name
130 if not log_file:
131 log_file = f"{name}.log"
133 full_log_path = log_path / log_file
135 # Create formatters
136 detailed_formatter = logging.Formatter(
137 "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s",
138 datefmt="%Y-%m-%d %H:%M:%S",
139 )
141 simple_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
143 # File handler with rotation
144 file_handler = RotatingFileHandler(
145 full_log_path, maxBytes=max_file_size, backupCount=backup_count, encoding="utf-8"
146 )
147 file_handler.setLevel(logging.DEBUG)
148 file_handler.setFormatter(detailed_formatter)
149 logger.addHandler(file_handler)
151 # Console handler
152 if console_output:
153 console_handler = logging.StreamHandler()
154 console_handler.setLevel(level)
155 console_handler.setFormatter(simple_formatter)
156 logger.addHandler(console_handler)
158 # Cache the logger
159 cls._loggers[cache_key] = logger
161 return logger
163 @classmethod
164 def create_service_logger(cls, service_name: str, log_level: str = "INFO") -> logging.Logger:
165 """Create a standardised logger for a service
166 :param service_name: Name of the service (e.g., 'gmail_scraper', 'job_scraper')
167 :param log_level: String representation of log level
168 :return: Configured logger"""
170 level_map = {
171 "DEBUG": logging.DEBUG,
172 "INFO": logging.INFO,
173 "WARNING": logging.WARNING,
174 "ERROR": logging.ERROR,
175 "CRITICAL": logging.CRITICAL,
176 }
178 level = level_map.get(log_level.upper(), logging.INFO)
180 return cls.get_logger(
181 name=service_name,
182 log_file=f"{service_name}.log",
183 level=level,
184 max_file_size=10 * 1024 * 1024, # 10MB
185 backup_count=5,
186 console_output=True,
187 )
190def get_last_log_line(logger_name: str) -> str | None:
191 """Get the last line from the service log file efficiently.
192 Reads from the end of the file to avoid loading the entire file.
193 :param logger_name: Name of the logger / log file"""
195 log_file_path = os.path.join(settings.log_directory, logger_name + ".log")
197 if not os.path.exists(log_file_path):
198 return None
200 try:
201 with open(log_file_path, "rb") as f:
202 # Seek to end
203 f.seek(0, 2)
204 position = f.tell()
206 if position == 0:
207 return None
209 # Read backwards to find the last non-empty line
210 chunk_size = 1024
211 buffer = b""
213 while position > 0:
214 read_size = min(chunk_size, position)
215 position -= read_size
216 f.seek(position)
217 buffer = f.read(read_size) + buffer
219 # Split and look for a complete line
220 lines = buffer.split(b"\n")
222 # Find the last non-empty line
223 for line in reversed(lines):
224 stripped = line.strip()
225 if stripped:
226 try:
227 return stripped.decode("utf-8")
228 except UnicodeDecodeError:
229 return stripped.decode("utf-8", errors="replace")
231 return None
233 except Exception as e:
234 return f"Error reading log file: {str(e)}"