Coverage for backend / app / emails / email_service.py: 72%
199 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
1"""Module for sending and reading emails using SMTP and IMAP."""
3import email
4import imaplib
5import smtplib
6from datetime import datetime, timedelta
7from email.header import decode_header
8from email.mime.multipart import MIMEMultipart
9from email.mime.text import MIMEText
10from pathlib import Path
12from fastapi.templating import Jinja2Templates
14from app.config import settings
15from app.emails.utils import clean_email_address, build_multi_from_query
16from app.utils import AppLogger
18templates = Jinja2Templates(directory="templates")
21class EmailService(object):
22 """Email service class for sending and reading emails."""
24 smtp_server = settings.email_smtp_host
25 smtp_port = settings.email_smtp_port
26 imap_server = settings.email_imap_host
27 imap_port = settings.email_imap_port
29 def __init__(self, email_username: str, email_password: str) -> None:
30 """Initialise the EmailService class
31 :param email_username: The email username for SMTP/IMAP authentication.
32 :param email_password: The email password for SMTP/IMAP authentication."""
34 self.email_username = email_username
35 self.email_password = email_password
36 self.logger = AppLogger.create_service_logger("email_service", "INFO")
37 self.test_emails = []
39 # Setup Jinja2 templates using FastAPI's built-in class
40 current_dir = Path(__file__).parent
41 self.templates = Jinja2Templates(directory=str(current_dir / "templates"))
43 @property
44 def current_datetime(self) -> str:
45 """Get the current date and time formatted as a string."""
47 return datetime.now().strftime("%B %d, %Y at %I:%M %p UTC")
49 def send_email(
50 self,
51 recipient: str,
52 subject: str,
53 body: str,
54 sender: str | None = None,
55 message_type: str = "",
56 ) -> None:
57 """Send an email to the specified recipient.
58 :param recipient: The recipient's email address.
59 :param subject: The subject of the email.
60 :param body: The body of the email in HTML format.
61 :param message_type: The type of email being sent (for logging purposes).
62 :param sender: The sender's email address alias (optional, defaults to configured sender)."""
64 if settings.test_mode:
65 self.test_emails.append(
66 {
67 "recipient": recipient,
68 "subject": subject,
69 "body": body,
70 "sender": sender or self.email_username,
71 "timestamp": datetime.now().isoformat(),
72 }
73 )
74 return
76 try:
77 msg = MIMEMultipart()
78 msg["From"] = self.email_username if sender is None else sender
79 msg["To"] = recipient
80 msg["Subject"] = subject
81 msg.attach(MIMEText(body, "html"))
83 with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
84 server.starttls()
85 server.login(self.email_username, self.email_password)
86 server.sendmail(self.email_username, recipient, msg.as_string())
87 self.logger.info(f"{message_type} email sent to %s with subject: %s", recipient, subject)
88 except Exception as e:
89 self.logger.error(f"Failed to send {message_type} email to %s: %s", recipient, str(e))
90 raise e
92 def send_verification_email(
93 self,
94 recipient: str,
95 verification_url: str,
96 recipient_name: str | None = None,
97 ) -> None:
98 """Send a verification email to the specified recipient.
99 :param recipient: The recipient's email address.
100 :param verification_url: The email verification URL.
101 :param recipient_name: The recipient name."""
103 template = self.templates.env.get_template("email_confirmation.html")
104 html_content = template.render(
105 name=recipient_name if recipient_name else "there",
106 confirmation_url=verification_url,
107 token_expiry_min=settings.verification_token_expiration_minutes,
108 )
110 self.send_email(
111 recipient,
112 "Please verify your email",
113 html_content,
114 settings.support_email,
115 "Email verification",
116 )
118 def send_email_change_verification(
119 self,
120 recipient: str,
121 verification_url: str,
122 recipient_name: str | None = None,
123 ) -> None:
124 """Send an email change verification email to the specified recipient.
125 :param recipient: The recipient's email address.
126 :param verification_url: The email change verification URL.
127 :param recipient_name: The recipient name."""
129 template = self.templates.env.get_template("email_change.html")
130 html_content = template.render(
131 name=recipient_name if recipient_name else "there",
132 confirmation_url=verification_url,
133 token_expiry_min=settings.email_change_token_expiration_minutes,
134 )
136 self.send_email(
137 recipient,
138 "Please verify your email",
139 html_content,
140 settings.support_email,
141 "Email change verification",
142 )
144 def send_password_reset_email(
145 self,
146 recipient: str,
147 reset_url: str,
148 recipient_name: str | None = None,
149 ) -> None:
150 """Send a password reset email to the specified recipient.
151 :param recipient: The recipient's email address.
152 :param reset_url: The password reset URL.
153 :param recipient_name: The recipient name."""
155 template = self.templates.env.get_template("password_reset.html")
156 html_content = template.render(
157 name=recipient_name if recipient_name else "there",
158 reset_url=reset_url,
159 token_expiry_min=settings.password_reset_token_expiration_minutes,
160 )
162 self.send_email(
163 recipient,
164 "Reset your password",
165 html_content,
166 settings.support_email,
167 "Password Reset",
168 )
170 def send_password_changed_notification(
171 self,
172 recipient: str,
173 ) -> None:
174 """Send an email to the specified recipient mentioning that the password was changed.
175 :param recipient: The recipient's email address."""
177 template = self.templates.env.get_template("password_changed.html")
178 html_content = template.render(change_date=self.current_datetime, support_email=settings.support_email)
180 subject = "Your JAM Password Has Been Changed"
181 self.send_email(
182 recipient,
183 subject,
184 html_content,
185 settings.support_email,
186 "Password changed notification",
187 )
189 def send_email_change_notification(
190 self,
191 recipient: str,
192 old_email: str,
193 ) -> None:
194 """Send an email to the specified recipient mentioning that the email was changed.
195 :param recipient: The recipient's email address.
196 :param old_email: The old email address before the change."""
198 template = self.templates.env.get_template("email_changed.html")
199 html_content = template.render(
200 change_date=self.current_datetime,
201 support_email=settings.support_email,
202 old_email=old_email,
203 new_email=recipient,
204 )
206 subject = "Your JAM Email Address Has Been Changed"
207 self.send_email(
208 recipient,
209 subject,
210 html_content,
211 settings.support_email,
212 "Email change notification",
213 )
215 def send_trial_end_notification(
216 self,
217 recipient: str,
218 end_date: datetime,
219 ) -> None:
220 """Send an email to the specified recipient mentioning that the email was changed.
221 :param recipient: The recipient's email address.
222 :param end_date: The trial end date."""
224 template = self.templates.env.get_template("trial_end_reminder.html")
225 html_content = template.render(
226 upgrade_url=settings.frontend_url + "/settings/premium",
227 end_date=end_date.strftime("%B %d, %Y"),
228 support_email=settings.support_email,
229 )
231 subject = "Your TOAST Free Trial is Ending Soon"
232 self.send_email(
233 recipient,
234 subject,
235 html_content,
236 settings.support_email,
237 "Trial end notification",
238 )
240 def send_new_version_email(
241 self,
242 recipient: str,
243 version: str,
244 features: list[dict],
245 ) -> None:
246 """Send a new version announcement email to the specified recipient.
247 :param recipient: The recipient's email address.
248 :param version: The version string (e.g. "1.2.0").
249 :param features: List of feature dicts with title, description, and optional image_url."""
251 template = self.templates.env.get_template("new_version.html")
252 html_content = template.render(
253 version=version,
254 features=features,
255 app_url=settings.frontend_url,
256 )
258 self.send_email(
259 recipient,
260 f"JAM v{version} is Here! \u2014 See What's New",
261 html_content,
262 settings.info_email,
263 "New version announcement",
264 )
266 def _connect_imap(self) -> imaplib.IMAP4_SSL:
267 """Connect to IMAP server and login.
268 :return: IMAP connection object"""
270 mail = imaplib.IMAP4_SSL(self.imap_server, self.imap_port)
271 mail.login(self.email_username, self.email_password)
272 return mail
274 def get_test_emails(self, recipient: str = None) -> list[dict]:
275 """Get test emails for a specific recipient or all test emails."""
277 if not settings.test_mode:
278 raise ValueError("Test mode is not enabled")
280 if recipient:
281 return [e for e in self.test_emails if e["recipient"] == recipient]
282 return self.test_emails
284 def clear_test_emails(self) -> None:
285 """Clear all stored test emails."""
287 if settings.test_mode:
288 self.test_emails = []
290 def get_email_ids(
291 self,
292 recipient_email: str = "",
293 sender_email: str = "",
294 inbox: str = "INBOX",
295 timedelta_days: int | float = 1,
296 subject_contains: str = "",
297 from_email: list[str] | str = "",
298 to_email: str = "",
299 ) -> list[str]:
300 """Search for messages matching a query.
301 :param recipient_email: Filter by recipient email address (e.g. jam.jobscraper@emmanuelpean.me)
302 :param sender_email: Filter by sender email address (e.g. emmanuelpean@gmail.com)
303 :param inbox: Mailbox to search in (default is INBOX)
304 :param timedelta_days: Number of days to search for emails
305 :param subject_contains: Filter by subject content
306 :param from_email: Filter by 'From' email address
307 :param to_email: Filter by 'To' email address
308 :return: List of message IDs matching the query"""
310 mail = self._connect_imap()
312 try:
313 # Select mailbox
314 mail.select(inbox)
316 # Build IMAP search criteria
317 search_criteria = []
319 # Date filter
320 if timedelta_days:
321 since_date = (datetime.now() - timedelta(days=timedelta_days)).strftime("%d-%b-%Y")
322 search_criteria.append(f"SINCE {since_date}")
324 if recipient_email:
325 search_criteria.append(f'HEADER X-Forwarded-To "{recipient_email}"')
327 # Sender filter
328 if sender_email:
329 search_criteria.append(f'HEADER Delivered-To "{sender_email}"')
331 # Subject filter
332 if subject_contains:
333 search_criteria.append(f'SUBJECT "{subject_contains}"')
335 # From email filter
336 if from_email:
337 search_criteria.append(build_multi_from_query(from_email))
339 # To email filter
340 if to_email:
341 search_criteria.append(f'HEADER To "{to_email}"')
343 # Default to all if no criteria
344 search_query = " ".join(search_criteria) if search_criteria else "ALL"
346 # Execute search using UID instead of sequence numbers
347 # noinspection PyTypeChecker
348 status, message_ids = mail.uid("search", None, search_query)
350 if status != "OK":
351 return []
353 # Parse message UIDs (more stable than sequence numbers)
354 email_ids = message_ids[0].split()
355 return [msg_id.decode() for msg_id in email_ids]
357 finally:
358 mail.close()
359 mail.logout()
361 def get_email_data(
362 self,
363 email_id: str,
364 ) -> dict[str, str | datetime] | None:
365 """Get the content of a specific email by ID.
366 :param email_id: The email message UID (unique identifier)
367 :return: Dictionary with email details (subject, from, date, body)"""
369 mail = self._connect_imap()
371 try:
372 mail.select("INBOX")
374 # Fetch the email using UID
375 status, msg_data = mail.uid("fetch", email_id, "(RFC822)")
377 if status != "OK":
378 return None
380 # Parse email content
381 # noinspection PyUnresolvedReferences
382 raw_email = msg_data[0][1]
383 msg = email.message_from_bytes(raw_email)
385 # Extract headers
386 subject = self._decode_header(msg["Subject"])
387 from_email = clean_email_address(self._decode_header(msg["From"]))
388 to_email = clean_email_address(self._decode_header(msg["To"]))
389 message_id = msg.get("Message-ID", "").strip()
391 # Extract date
392 date = msg["Date"]
393 date_formats = [
394 "%a, %d %b %Y %H:%M:%S %z", # Standard RFC 2822: "Thu, 14 Aug 2025 02:25:53 +0000"
395 "%a, %d %b %Y %H:%M:%S %z (UTC)", # Original format with (UTC)
396 "%a, %d %b %Y %H:%M:%S", # Without timezone
397 "%d %b %Y %H:%M:%S %z", # Without day name
398 "%a, %d %b %Y %H:%M:%S GMT", # GMT timezone
399 "%a, %d %b %Y %H:%M:%S UTC", # UTC timezone
400 ]
402 date_received = None
403 for date_format in date_formats:
404 try:
405 date_received = datetime.strptime(date, date_format)
406 break
407 except ValueError:
408 continue
410 body_text = ""
411 html_text = ""
413 if msg.is_multipart():
414 for part in msg.walk():
415 content_type = part.get_content_type()
416 content_disposition = str(part.get("Content-Disposition"))
418 if "attachment" in content_disposition:
419 continue
421 payload = part.get_payload(decode=True)
422 charset = part.get_content_charset() or "utf-8"
424 if content_type == "text/plain":
425 try:
426 body_text = payload.decode(charset)
427 except UnicodeDecodeError:
428 body_text = payload.decode("windows-1252", errors="replace")
430 elif content_type == "text/html":
431 try:
432 html_text = payload.decode(charset)
433 except UnicodeDecodeError:
434 html_text = payload.decode("windows-1252", errors="replace")
435 else:
436 # single-part message
437 content_type = msg.get_content_type()
438 body_text = msg.get_payload(decode=True).decode()
439 if content_type == "text/html":
440 html_text = body_text
442 # Prefer text, but fallback to HTML if needed
443 final_body = html_text or body_text
445 return {
446 "id": email_id,
447 "message_id": message_id,
448 "subject": subject,
449 "from": from_email,
450 "to": to_email,
451 "date": date_received,
452 "body": final_body,
453 }
455 finally:
456 mail.close()
457 mail.logout()
459 def get_emails(self, *args, **kwargs) -> list[dict[str, str]]:
460 """Get multiple emails matching criteria.
461 :param args: arguments passed to get_email_ids
462 :param kwargs: Keyword arguments passed to get_email_ids
463 :return: List of email content dictionaries"""
465 email_ids = self.get_email_ids(*args, **kwargs)
466 emails = []
467 for email_id in reversed(email_ids): # Most recent first
468 content = self.get_email_data(email_id)
469 if content:
470 emails.append(content)
472 return emails
474 @staticmethod
475 def _decode_header(header: str | None) -> str:
476 """Decode email header.
477 :param header: Raw header string
478 :return: Decoded header string"""
480 if not header:
481 return ""
483 decoded_parts = decode_header(header)
484 decoded_string = ""
486 for part, encoding in decoded_parts:
487 if isinstance(part, bytes):
488 decoded_string += part.decode(encoding or "utf-8")
489 else:
490 decoded_string += part
492 return decoded_string
494 def delete_email(
495 self,
496 email_id: str,
497 ) -> bool:
498 """Delete an email by ID from the inbox.
499 :param email_id: The email message UID to delete
500 :return: True if deletion successful, False otherwise"""
502 if settings.test_mode:
503 return True
505 mail = self._connect_imap()
507 try:
508 mail.select("INBOX")
510 # Mark the email as deleted using UID
511 status, _ = mail.uid("store", email_id, "+FLAGS", "\\Deleted")
513 if status != "OK":
514 return False
516 # Permanently remove emails marked as deleted
517 mail.expunge()
519 return True
521 finally:
522 mail.close()
523 mail.logout()
526email_service = EmailService(settings.main_email_username, settings.main_email_password)