Coverage for backend / app / emails / email_service.py: 72%

199 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-17 21:34 +0000

1"""Module for sending and reading emails using SMTP and IMAP.""" 

2 

3import email 

4import imaplib 

5import smtplib 

6from datetime import datetime, timedelta 

7from email.header import decode_header 

8from email.mime.multipart import MIMEMultipart 

9from email.mime.text import MIMEText 

10from pathlib import Path 

11 

12from fastapi.templating import Jinja2Templates 

13 

14from app.config import settings 

15from app.emails.utils import clean_email_address, build_multi_from_query 

16from app.utils import AppLogger 

17 

18templates = Jinja2Templates(directory="templates") 

19 

20 

21class EmailService(object): 

22 """Email service class for sending and reading emails.""" 

23 

24 smtp_server = settings.email_smtp_host 

25 smtp_port = settings.email_smtp_port 

26 imap_server = settings.email_imap_host 

27 imap_port = settings.email_imap_port 

28 

29 def __init__(self, email_username: str, email_password: str) -> None: 

30 """Initialise the EmailService class 

31 :param email_username: The email username for SMTP/IMAP authentication. 

32 :param email_password: The email password for SMTP/IMAP authentication.""" 

33 

34 self.email_username = email_username 

35 self.email_password = email_password 

36 self.logger = AppLogger.create_service_logger("email_service", "INFO") 

37 self.test_emails = [] 

38 

39 # Setup Jinja2 templates using FastAPI's built-in class 

40 current_dir = Path(__file__).parent 

41 self.templates = Jinja2Templates(directory=str(current_dir / "templates")) 

42 

43 @property 

44 def current_datetime(self) -> str: 

45 """Get the current date and time formatted as a string.""" 

46 

47 return datetime.now().strftime("%B %d, %Y at %I:%M %p UTC") 

48 

49 def send_email( 

50 self, 

51 recipient: str, 

52 subject: str, 

53 body: str, 

54 sender: str | None = None, 

55 message_type: str = "", 

56 ) -> None: 

57 """Send an email to the specified recipient. 

58 :param recipient: The recipient's email address. 

59 :param subject: The subject of the email. 

60 :param body: The body of the email in HTML format. 

61 :param message_type: The type of email being sent (for logging purposes). 

62 :param sender: The sender's email address alias (optional, defaults to configured sender).""" 

63 

64 if settings.test_mode: 

65 self.test_emails.append( 

66 { 

67 "recipient": recipient, 

68 "subject": subject, 

69 "body": body, 

70 "sender": sender or self.email_username, 

71 "timestamp": datetime.now().isoformat(), 

72 } 

73 ) 

74 return 

75 

76 try: 

77 msg = MIMEMultipart() 

78 msg["From"] = self.email_username if sender is None else sender 

79 msg["To"] = recipient 

80 msg["Subject"] = subject 

81 msg.attach(MIMEText(body, "html")) 

82 

83 with smtplib.SMTP(self.smtp_server, self.smtp_port) as server: 

84 server.starttls() 

85 server.login(self.email_username, self.email_password) 

86 server.sendmail(self.email_username, recipient, msg.as_string()) 

87 self.logger.info(f"{message_type} email sent to %s with subject: %s", recipient, subject) 

88 except Exception as e: 

89 self.logger.error(f"Failed to send {message_type} email to %s: %s", recipient, str(e)) 

90 raise e 

91 

92 def send_verification_email( 

93 self, 

94 recipient: str, 

95 verification_url: str, 

96 recipient_name: str | None = None, 

97 ) -> None: 

98 """Send a verification email to the specified recipient. 

99 :param recipient: The recipient's email address. 

100 :param verification_url: The email verification URL. 

101 :param recipient_name: The recipient name.""" 

102 

103 template = self.templates.env.get_template("email_confirmation.html") 

104 html_content = template.render( 

105 name=recipient_name if recipient_name else "there", 

106 confirmation_url=verification_url, 

107 token_expiry_min=settings.verification_token_expiration_minutes, 

108 ) 

109 

110 self.send_email( 

111 recipient, 

112 "Please verify your email", 

113 html_content, 

114 settings.support_email, 

115 "Email verification", 

116 ) 

117 

118 def send_email_change_verification( 

119 self, 

120 recipient: str, 

121 verification_url: str, 

122 recipient_name: str | None = None, 

123 ) -> None: 

124 """Send an email change verification email to the specified recipient. 

125 :param recipient: The recipient's email address. 

126 :param verification_url: The email change verification URL. 

127 :param recipient_name: The recipient name.""" 

128 

129 template = self.templates.env.get_template("email_change.html") 

130 html_content = template.render( 

131 name=recipient_name if recipient_name else "there", 

132 confirmation_url=verification_url, 

133 token_expiry_min=settings.email_change_token_expiration_minutes, 

134 ) 

135 

136 self.send_email( 

137 recipient, 

138 "Please verify your email", 

139 html_content, 

140 settings.support_email, 

141 "Email change verification", 

142 ) 

143 

144 def send_password_reset_email( 

145 self, 

146 recipient: str, 

147 reset_url: str, 

148 recipient_name: str | None = None, 

149 ) -> None: 

150 """Send a password reset email to the specified recipient. 

151 :param recipient: The recipient's email address. 

152 :param reset_url: The password reset URL. 

153 :param recipient_name: The recipient name.""" 

154 

155 template = self.templates.env.get_template("password_reset.html") 

156 html_content = template.render( 

157 name=recipient_name if recipient_name else "there", 

158 reset_url=reset_url, 

159 token_expiry_min=settings.password_reset_token_expiration_minutes, 

160 ) 

161 

162 self.send_email( 

163 recipient, 

164 "Reset your password", 

165 html_content, 

166 settings.support_email, 

167 "Password Reset", 

168 ) 

169 

170 def send_password_changed_notification( 

171 self, 

172 recipient: str, 

173 ) -> None: 

174 """Send an email to the specified recipient mentioning that the password was changed. 

175 :param recipient: The recipient's email address.""" 

176 

177 template = self.templates.env.get_template("password_changed.html") 

178 html_content = template.render(change_date=self.current_datetime, support_email=settings.support_email) 

179 

180 subject = "Your JAM Password Has Been Changed" 

181 self.send_email( 

182 recipient, 

183 subject, 

184 html_content, 

185 settings.support_email, 

186 "Password changed notification", 

187 ) 

188 

189 def send_email_change_notification( 

190 self, 

191 recipient: str, 

192 old_email: str, 

193 ) -> None: 

194 """Send an email to the specified recipient mentioning that the email was changed. 

195 :param recipient: The recipient's email address. 

196 :param old_email: The old email address before the change.""" 

197 

198 template = self.templates.env.get_template("email_changed.html") 

199 html_content = template.render( 

200 change_date=self.current_datetime, 

201 support_email=settings.support_email, 

202 old_email=old_email, 

203 new_email=recipient, 

204 ) 

205 

206 subject = "Your JAM Email Address Has Been Changed" 

207 self.send_email( 

208 recipient, 

209 subject, 

210 html_content, 

211 settings.support_email, 

212 "Email change notification", 

213 ) 

214 

215 def send_trial_end_notification( 

216 self, 

217 recipient: str, 

218 end_date: datetime, 

219 ) -> None: 

220 """Send an email to the specified recipient mentioning that the email was changed. 

221 :param recipient: The recipient's email address. 

222 :param end_date: The trial end date.""" 

223 

224 template = self.templates.env.get_template("trial_end_reminder.html") 

225 html_content = template.render( 

226 upgrade_url=settings.frontend_url + "/settings/premium", 

227 end_date=end_date.strftime("%B %d, %Y"), 

228 support_email=settings.support_email, 

229 ) 

230 

231 subject = "Your TOAST Free Trial is Ending Soon" 

232 self.send_email( 

233 recipient, 

234 subject, 

235 html_content, 

236 settings.support_email, 

237 "Trial end notification", 

238 ) 

239 

240 def send_new_version_email( 

241 self, 

242 recipient: str, 

243 version: str, 

244 features: list[dict], 

245 ) -> None: 

246 """Send a new version announcement email to the specified recipient. 

247 :param recipient: The recipient's email address. 

248 :param version: The version string (e.g. "1.2.0"). 

249 :param features: List of feature dicts with title, description, and optional image_url.""" 

250 

251 template = self.templates.env.get_template("new_version.html") 

252 html_content = template.render( 

253 version=version, 

254 features=features, 

255 app_url=settings.frontend_url, 

256 ) 

257 

258 self.send_email( 

259 recipient, 

260 f"JAM v{version} is Here! \u2014 See What's New", 

261 html_content, 

262 settings.info_email, 

263 "New version announcement", 

264 ) 

265 

266 def _connect_imap(self) -> imaplib.IMAP4_SSL: 

267 """Connect to IMAP server and login. 

268 :return: IMAP connection object""" 

269 

270 mail = imaplib.IMAP4_SSL(self.imap_server, self.imap_port) 

271 mail.login(self.email_username, self.email_password) 

272 return mail 

273 

274 def get_test_emails(self, recipient: str = None) -> list[dict]: 

275 """Get test emails for a specific recipient or all test emails.""" 

276 

277 if not settings.test_mode: 

278 raise ValueError("Test mode is not enabled") 

279 

280 if recipient: 

281 return [e for e in self.test_emails if e["recipient"] == recipient] 

282 return self.test_emails 

283 

284 def clear_test_emails(self) -> None: 

285 """Clear all stored test emails.""" 

286 

287 if settings.test_mode: 

288 self.test_emails = [] 

289 

290 def get_email_ids( 

291 self, 

292 recipient_email: str = "", 

293 sender_email: str = "", 

294 inbox: str = "INBOX", 

295 timedelta_days: int | float = 1, 

296 subject_contains: str = "", 

297 from_email: list[str] | str = "", 

298 to_email: str = "", 

299 ) -> list[str]: 

300 """Search for messages matching a query. 

301 :param recipient_email: Filter by recipient email address (e.g. jam.jobscraper@emmanuelpean.me) 

302 :param sender_email: Filter by sender email address (e.g. emmanuelpean@gmail.com) 

303 :param inbox: Mailbox to search in (default is INBOX) 

304 :param timedelta_days: Number of days to search for emails 

305 :param subject_contains: Filter by subject content 

306 :param from_email: Filter by 'From' email address 

307 :param to_email: Filter by 'To' email address 

308 :return: List of message IDs matching the query""" 

309 

310 mail = self._connect_imap() 

311 

312 try: 

313 # Select mailbox 

314 mail.select(inbox) 

315 

316 # Build IMAP search criteria 

317 search_criteria = [] 

318 

319 # Date filter 

320 if timedelta_days: 

321 since_date = (datetime.now() - timedelta(days=timedelta_days)).strftime("%d-%b-%Y") 

322 search_criteria.append(f"SINCE {since_date}") 

323 

324 if recipient_email: 

325 search_criteria.append(f'HEADER X-Forwarded-To "{recipient_email}"') 

326 

327 # Sender filter 

328 if sender_email: 

329 search_criteria.append(f'HEADER Delivered-To "{sender_email}"') 

330 

331 # Subject filter 

332 if subject_contains: 

333 search_criteria.append(f'SUBJECT "{subject_contains}"') 

334 

335 # From email filter 

336 if from_email: 

337 search_criteria.append(build_multi_from_query(from_email)) 

338 

339 # To email filter 

340 if to_email: 

341 search_criteria.append(f'HEADER To "{to_email}"') 

342 

343 # Default to all if no criteria 

344 search_query = " ".join(search_criteria) if search_criteria else "ALL" 

345 

346 # Execute search using UID instead of sequence numbers 

347 # noinspection PyTypeChecker 

348 status, message_ids = mail.uid("search", None, search_query) 

349 

350 if status != "OK": 

351 return [] 

352 

353 # Parse message UIDs (more stable than sequence numbers) 

354 email_ids = message_ids[0].split() 

355 return [msg_id.decode() for msg_id in email_ids] 

356 

357 finally: 

358 mail.close() 

359 mail.logout() 

360 

361 def get_email_data( 

362 self, 

363 email_id: str, 

364 ) -> dict[str, str | datetime] | None: 

365 """Get the content of a specific email by ID. 

366 :param email_id: The email message UID (unique identifier) 

367 :return: Dictionary with email details (subject, from, date, body)""" 

368 

369 mail = self._connect_imap() 

370 

371 try: 

372 mail.select("INBOX") 

373 

374 # Fetch the email using UID 

375 status, msg_data = mail.uid("fetch", email_id, "(RFC822)") 

376 

377 if status != "OK": 

378 return None 

379 

380 # Parse email content 

381 # noinspection PyUnresolvedReferences 

382 raw_email = msg_data[0][1] 

383 msg = email.message_from_bytes(raw_email) 

384 

385 # Extract headers 

386 subject = self._decode_header(msg["Subject"]) 

387 from_email = clean_email_address(self._decode_header(msg["From"])) 

388 to_email = clean_email_address(self._decode_header(msg["To"])) 

389 message_id = msg.get("Message-ID", "").strip() 

390 

391 # Extract date 

392 date = msg["Date"] 

393 date_formats = [ 

394 "%a, %d %b %Y %H:%M:%S %z", # Standard RFC 2822: "Thu, 14 Aug 2025 02:25:53 +0000" 

395 "%a, %d %b %Y %H:%M:%S %z (UTC)", # Original format with (UTC) 

396 "%a, %d %b %Y %H:%M:%S", # Without timezone 

397 "%d %b %Y %H:%M:%S %z", # Without day name 

398 "%a, %d %b %Y %H:%M:%S GMT", # GMT timezone 

399 "%a, %d %b %Y %H:%M:%S UTC", # UTC timezone 

400 ] 

401 

402 date_received = None 

403 for date_format in date_formats: 

404 try: 

405 date_received = datetime.strptime(date, date_format) 

406 break 

407 except ValueError: 

408 continue 

409 

410 body_text = "" 

411 html_text = "" 

412 

413 if msg.is_multipart(): 

414 for part in msg.walk(): 

415 content_type = part.get_content_type() 

416 content_disposition = str(part.get("Content-Disposition")) 

417 

418 if "attachment" in content_disposition: 

419 continue 

420 

421 payload = part.get_payload(decode=True) 

422 charset = part.get_content_charset() or "utf-8" 

423 

424 if content_type == "text/plain": 

425 try: 

426 body_text = payload.decode(charset) 

427 except UnicodeDecodeError: 

428 body_text = payload.decode("windows-1252", errors="replace") 

429 

430 elif content_type == "text/html": 

431 try: 

432 html_text = payload.decode(charset) 

433 except UnicodeDecodeError: 

434 html_text = payload.decode("windows-1252", errors="replace") 

435 else: 

436 # single-part message 

437 content_type = msg.get_content_type() 

438 body_text = msg.get_payload(decode=True).decode() 

439 if content_type == "text/html": 

440 html_text = body_text 

441 

442 # Prefer text, but fallback to HTML if needed 

443 final_body = html_text or body_text 

444 

445 return { 

446 "id": email_id, 

447 "message_id": message_id, 

448 "subject": subject, 

449 "from": from_email, 

450 "to": to_email, 

451 "date": date_received, 

452 "body": final_body, 

453 } 

454 

455 finally: 

456 mail.close() 

457 mail.logout() 

458 

459 def get_emails(self, *args, **kwargs) -> list[dict[str, str]]: 

460 """Get multiple emails matching criteria. 

461 :param args: arguments passed to get_email_ids 

462 :param kwargs: Keyword arguments passed to get_email_ids 

463 :return: List of email content dictionaries""" 

464 

465 email_ids = self.get_email_ids(*args, **kwargs) 

466 emails = [] 

467 for email_id in reversed(email_ids): # Most recent first 

468 content = self.get_email_data(email_id) 

469 if content: 

470 emails.append(content) 

471 

472 return emails 

473 

474 @staticmethod 

475 def _decode_header(header: str | None) -> str: 

476 """Decode email header. 

477 :param header: Raw header string 

478 :return: Decoded header string""" 

479 

480 if not header: 

481 return "" 

482 

483 decoded_parts = decode_header(header) 

484 decoded_string = "" 

485 

486 for part, encoding in decoded_parts: 

487 if isinstance(part, bytes): 

488 decoded_string += part.decode(encoding or "utf-8") 

489 else: 

490 decoded_string += part 

491 

492 return decoded_string 

493 

494 def delete_email( 

495 self, 

496 email_id: str, 

497 ) -> bool: 

498 """Delete an email by ID from the inbox. 

499 :param email_id: The email message UID to delete 

500 :return: True if deletion successful, False otherwise""" 

501 

502 if settings.test_mode: 

503 return True 

504 

505 mail = self._connect_imap() 

506 

507 try: 

508 mail.select("INBOX") 

509 

510 # Mark the email as deleted using UID 

511 status, _ = mail.uid("store", email_id, "+FLAGS", "\\Deleted") 

512 

513 if status != "OK": 

514 return False 

515 

516 # Permanently remove emails marked as deleted 

517 mail.expunge() 

518 

519 return True 

520 

521 finally: 

522 mail.close() 

523 mail.logout() 

524 

525 

526email_service = EmailService(settings.main_email_username, settings.main_email_password)