Coverage for backend/app/eis/email_scraper.py: 58%
339 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-22 15:38 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-22 15:38 +0000
1"""Gmail Email Retrieval and LinkedIn Job Extraction Module
3This module provides functionality to authenticate with Gmail using OAuth 2.0,
4retrieve email messages, and extract LinkedIn job IDs from email content.
5It offers a complete workflow for accessing Gmail data and parsing job-related
6information from email bodies."""
8import base64
9import json
10import os
11import pickle
12import re
13import threading
14import traceback
15from datetime import datetime
16from email.utils import parseaddr
18import cloudscraper
19from google.auth.transport.requests import Request
20from google.oauth2.credentials import Credentials
21from google_auth_oauthlib.flow import InstalledAppFlow
22from googleapiclient.discovery import build
24from app.database import session_local
25from app.eis import schemas
26from app.eis.job_scraper import LinkedinJobScraper, IndeedJobScraper, extract_indeed_jobs_from_email
27from app.eis.models import JobAlertEmail, ScrapedJob, EisServiceLog
28from app.models import User
29from app.utils import get_gmail_logger
31logger = get_gmail_logger()
34def clean_email_address(sender_field: str) -> str:
35 """Extract a clean email address from the sender field
36 Handles formats like:
37 - 'John Doe <john.doe@gmail.com>'
38 - 'john.doe@gmail.com'
39 - '"John Doe" <john.doe@gmail.com>'"""
41 name, email = parseaddr(sender_field)
42 return email.lower().strip() if email else sender_field.lower().strip()
45def get_user_id_from_email(email: str, db) -> None | int:
46 """Get user id from email"""
48 entry = db.query(User).filter(User.email == email).first()
49 if entry:
50 return entry.id
51 else:
52 raise AssertionError(f"User with email '{email}' not found in database.")
55class GmailScraper(object):
56 """Gmail Scrapper"""
58 def __init__(
59 self,
60 token_file: str = "token.pickle",
61 secrets_file: str = "eis_secrets.json",
62 skip_indeed_brightapi_scraping: bool = True,
63 ) -> None:
64 """Object constructor
65 :param token_file: Path to the token pickle file
66 :param secrets_file: Path to the secrets JSON file containing OAuth credentials
67 :param skip_indeed_brightapi_scraping: if True, use the email content to extract the indeed job data."""
69 self.token_file = token_file
70 self.secrets_file = secrets_file
71 self.skip_indeed_brightapi_scraping = skip_indeed_brightapi_scraping
72 self.SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]
73 self.service = None
75 # Load credentials from the external file and authenticate
76 self.credentials_config = self._load_credentials()
77 self.authenticate()
79 def _load_credentials(self) -> dict:
80 """Load OAuth credentials from the secrets file"""
82 try:
83 with open(self.secrets_file, "r") as f:
84 secrets = json.load(f)
85 return secrets["google_auth"]
86 except FileNotFoundError:
87 raise FileNotFoundError(f"Secrets file '{self.secrets_file}' not found.")
88 except (json.JSONDecodeError, KeyError) as e:
89 raise ValueError(f"Invalid JSON format in secrets file '{self.secrets_file}': {e}")
91 def authenticate(self) -> None:
92 """Authenticate and create the Gmail service"""
94 credentials: Credentials | None = None
96 # Load existing token
97 if os.path.exists(self.token_file):
98 with open(self.token_file, "rb") as token:
99 credentials = pickle.load(token)
101 # If there are no valid credentials, request authorisation
102 if not credentials or not credentials.valid:
103 if credentials and credentials.expired and credentials.refresh_token:
104 credentials.refresh(Request())
105 else:
106 flow = InstalledAppFlow.from_client_config(self.credentials_config, self.SCOPES)
107 credentials = flow.run_local_server(port=0)
109 # Save the credentials for the next run
110 with open(self.token_file, "wb") as token:
111 # noinspection PyTypeChecker
112 pickle.dump(credentials, token)
114 self.service = build("gmail", "v1", credentials=credentials)
116 # ------------------------------------------------- EMAIL READING -------------------------------------------------
118 def get_email_ids(
119 self,
120 sender_email: str = "",
121 inbox_only: bool = True,
122 timedelta_days: int | float = 1,
123 ) -> list[str]:
124 """Search for messages matching a query
125 :param sender_email: Sender email address
126 :param inbox_only: Search only in the inbox
127 :param timedelta_days: Number of days to search for emails
128 :return: List of message IDs matching the query"""
130 query = ""
131 # query += f" from:{sender_email}" if sender_email else ""
132 query += f" deliveredto:{sender_email}" if sender_email else ""
133 query += " in:inbox" if inbox_only else ""
134 query += f" newer_than:{timedelta_days}d" if timedelta_days else ""
135 query = query.strip()
137 result = self.service.users().messages().list(userId="me", q=query).execute()
138 messages = result.get("messages", [])
139 return [msg["id"] for msg in messages]
141 def _extract_email_body(self, payload: dict) -> str:
142 """Extract email body from payload
143 :param payload: Email payload dictionary
144 :return: Email body content as a string"""
146 body = ""
148 if "parts" in payload:
149 for part in payload["parts"]:
150 if part["mimeType"] == "text/plain":
151 data = part["body"]["data"]
152 body = self._decode_base64(data)
153 break
154 elif part["mimeType"] == "text/html":
155 data = part["body"]["data"]
156 body = self._decode_base64(data)
157 else:
158 if payload["mimeType"] == "text/plain":
159 data = payload["body"]["data"]
160 body = self._decode_base64(data)
162 return body
164 @staticmethod
165 def _decode_base64(data: str) -> str:
166 """Decode base64 URL-safe string"""
168 return base64.urlsafe_b64decode(data).decode("utf-8")
170 def get_email_data(
171 self,
172 message_id: str,
173 sender: str,
174 ) -> schemas.JobAlertEmailCreate:
175 """Extract readable content from an email
176 :param message_id: Message ID
177 :param sender: Sender email address
178 :return: JobAlertEmailIn object containing email metadata and body content"""
180 message = self.service.users().messages().get(userId="me", id=message_id, format="full").execute()
182 payload = message["payload"]
183 headers = payload.get("headers", [])
185 # Extract data
186 subject = next((h["value"] for h in headers if h["name"] == "Subject"), "No Subject")
187 date = next((h["value"] for h in headers if h["name"] == "Date"), "Unknown Date")
188 body = self._extract_email_body(payload)
190 if "linkedin" in body.lower():
191 platform = "linkedin"
192 elif "indeed" in body.lower():
193 platform = "indeed"
194 else:
195 raise ValueError("Email body does not contain a valid platform identifier.")
197 # Common email date formats to try
198 date_formats = [
199 "%a, %d %b %Y %H:%M:%S %z", # Standard RFC 2822: "Thu, 14 Aug 2025 02:25:53 +0000"
200 "%a, %d %b %Y %H:%M:%S %z (UTC)", # Original format with (UTC)
201 "%a, %d %b %Y %H:%M:%S", # Without timezone
202 "%d %b %Y %H:%M:%S %z", # Without day name
203 "%a, %d %b %Y %H:%M:%S GMT", # GMT timezone
204 "%a, %d %b %Y %H:%M:%S UTC", # UTC timezone
205 ]
207 date_received = None
208 for date_format in date_formats:
209 try:
210 date_received = datetime.strptime(date, date_format)
211 break
212 except ValueError:
213 continue
215 return schemas.JobAlertEmailCreate(
216 external_email_id=message_id,
217 subject=subject,
218 sender=clean_email_address(sender),
219 date_received=date_received,
220 body=body,
221 platform=platform,
222 )
224 @staticmethod
225 def save_email_to_db(
226 email_data: schemas.JobAlertEmailCreate,
227 service_log_id: int,
228 db,
229 ) -> tuple[JobAlertEmail, bool]:
230 """Save email and job IDs to database
231 :param email_data: Dictionary containing email metadata
232 :param service_log_id: ID of the EisServiceLog instance associated with this email
233 :param db: SQLAlchemy database session
234 :return: JobEmails instance and whether the record was created or already existing"""
236 # Check if email already exists
237 existing_email = (
238 db.query(JobAlertEmail).filter(JobAlertEmail.external_email_id == email_data.external_email_id).first()
239 )
241 if existing_email:
242 return existing_email, False
244 # Create new email record
245 # noinspection PyArgumentList
246 email_record = JobAlertEmail(
247 owner_id=get_user_id_from_email(email_data.sender, db),
248 service_log_id=service_log_id,
249 **email_data.model_dump(exclude_unset=True),
250 )
251 db.add(email_record)
252 db.commit()
253 db.refresh(email_record)
255 return email_record, True
257 # -------------------------------------------------- JOB SCRAPING --------------------------------------------------
259 @classmethod
260 def extract_linkedin_job_ids(cls, body: str) -> list[str]:
261 """Extract LinkedIn job IDs from the email body
262 :param body: Email body content as string
263 :return: List of unique LinkedIn job IDs"""
265 pattern = r"linkedin\.com/(?:comm/)?jobs/view/(\d+)"
266 job_ids = re.findall(pattern, body, re.IGNORECASE)
267 return list(dict.fromkeys(job_ids))
269 @classmethod
270 def get_indeed_redirected_url(cls, job_url: str) -> str:
271 """Get the redirected URL from the Indeed job URL
272 :param job_url: Indeed job URL
273 :return: Redirected URL"""
275 scraper = cloudscraper.create_scraper()
276 response = scraper.get(job_url, allow_redirects=True)
277 max_attempts = 100
278 iteration = 0
279 while "indeed.com/viewjob?jk" not in response.url:
280 scraper = cloudscraper.create_scraper()
281 response = scraper.get(job_url, allow_redirects=True)
282 iteration += 1
283 if iteration > max_attempts:
284 break
285 return response.url
287 @classmethod
288 def extract_indeed_job_ids(cls, body: str) -> list[str]:
289 """Extract Indeed job advertisement IDs from email body URLs
290 :param body: Email body content as string
291 :return: List of unique Indeed job IDs"""
293 pattern = r"https?://(?:uk\.)?indeed\.com/(?:pagead|rc)/clk/dl\?[^>\s]+"
294 job_urls = re.findall(pattern, body, re.IGNORECASE)
295 job_urls = list(dict.fromkeys(job_urls))
296 job_ids = []
298 for url in job_urls:
299 # Try to extract 'ad' parameter first (for pagead URLs)
300 ad_match = re.search(r"[?&]mo=([^&>\s]+)", url, re.IGNORECASE)
301 if ad_match:
302 url = cls.get_indeed_redirected_url(url)
304 # Try to extract 'jk' parameter (for rc URLs)
305 jk_match = re.search(r"[?&]jk=([^&>\s]+)", url, re.IGNORECASE)
306 if jk_match:
307 job_ids.append(jk_match.group(1))
309 return list(dict.fromkeys(job_ids))
311 @staticmethod
312 def save_jobs_to_db(
313 email_record: JobAlertEmail,
314 job_ids: list[str],
315 db,
316 ) -> list[ScrapedJob]:
317 """Save extracted job IDs to the database and link them to the email
318 :param email_record: JobAlertEmail record instance
319 :param job_ids: List of job IDs to save
320 :param db: SQLAlchemy database session
321 :return: List of JobAlertEmailJob instances created or already existing in the database"""
323 job_records = []
325 for job_id in job_ids:
327 # Check if the job already exists for this owner
328 existing_entry = (
329 db.query(ScrapedJob)
330 .filter(
331 ScrapedJob.external_job_id == job_id,
332 ScrapedJob.owner_id == email_record.owner_id,
333 )
334 .first()
335 )
337 if not existing_entry:
339 # Create new job record
340 # noinspection PyArgumentList
341 new_job = ScrapedJob(
342 external_job_id=job_id,
343 owner_id=email_record.owner_id,
344 )
345 new_job.emails.append(email_record)
346 db.add(new_job)
347 job_records.append(new_job)
349 else:
350 # Check if this email is already linked to avoid duplicates
351 if email_record not in existing_entry.emails:
352 existing_entry.emails.append(email_record)
353 job_records.append(existing_entry)
355 db.commit()
357 # Refresh all records
358 for job_record in job_records:
359 db.refresh(job_record)
361 return job_records
363 @staticmethod
364 def save_job_data_to_db(
365 job_records: list[ScrapedJob] | ScrapedJob,
366 job_data: list[dict] | dict,
367 scraped_date: datetime,
368 db,
369 ) -> None:
370 """Save job data to the database"""
372 if not isinstance(job_records, list):
373 job_records = [job_records]
374 if not isinstance(job_data, list):
375 job_data = [job_data]
377 for job, record in zip(job_data, job_records):
378 record.company = job["company"]
379 record.location = job["location"]
380 record.salary_min = job["job"]["salary"]["min_amount"]
381 record.salary_max = job["job"]["salary"]["max_amount"]
382 record.title = job["job"]["title"]
383 record.description = job["job"]["description"]
384 record.url = job["job"]["url"]
385 record.scrape_datetime = scraped_date
386 record.is_scraped = True
387 db.commit()
389 # ----------------------------------------------------- RUNNER -----------------------------------------------------
391 def run_scraping(self, timedelta_days: int | float = 10) -> EisServiceLog:
392 """Run the email scraping workflow
393 :param timedelta_days: Number of days to search for emails"""
395 start_time = datetime.now()
396 logger.info("Starting email scraping workflow")
398 with session_local() as db:
400 # Service log
401 # noinspection PyArgumentList
402 service_log_entry = EisServiceLog(
403 name="Email Scraper Service",
404 run_datetime=start_time,
405 )
406 db.add(service_log_entry)
407 db.commit()
408 db.refresh(service_log_entry)
410 try:
411 # Process emails for all users
412 jobs_data = self._process_user_emails(db, timedelta_days, service_log_entry)
414 # Scrape remaining jobs that haven't been scraped yet
415 self._scrape_remaining_jobs(db, service_log_entry, jobs_data)
417 # Log final statistics
418 service_log_entry.run_duration = (datetime.now() - start_time).total_seconds()
419 success = True
420 error_message = None
421 # AppLogger.log_execution_time(logger, start_time, "Gmail scraping workflow")
422 # AppLogger.log_stats(logger, service_log_entry, "Gmail Scraping Results")
424 except Exception as exception:
425 logger.exception(f"Critical error in scraping workflow: {exception}")
426 service_log_entry.run_duration = (datetime.now() - start_time).total_seconds()
427 success = False
428 error_message = str(exception)
430 service_log_entry.is_success = success
431 service_log_entry.error_message = error_message
432 db.commit()
433 return service_log_entry
435 def _process_user_emails(
436 self,
437 db,
438 timedelta_days: int | float,
439 service_log_entry: EisServiceLog,
440 ) -> dict:
441 """Process emails for all users
442 :param db: Database session
443 :param timedelta_days: Number of days to search for emails
444 :param service_log_entry: Service log entry"""
446 users = db.query(User).all()
447 logger.info(f"Found {len(users)} users to process")
449 # For each user...
450 jobs_data = {}
451 for user in users:
452 logger.info(f"Processing user: {user.email} (ID: {user.id})")
454 # Get the list of all emails
455 try:
456 email_external_ids = self.get_email_ids(user.email, True, timedelta_days)
457 service_log_entry.users_processed_n += 1
458 service_log_entry.emails_found_n += len(email_external_ids)
459 except Exception as exception:
460 logger.exception(f"Failed to search messages due to error: {exception}. Skipping user.")
461 continue # next user
463 # For each email...
464 for email_external_id in email_external_ids:
465 logger.info(f"Processing email with ID: {email_external_id}")
466 try:
467 email_data = self.get_email_data(email_external_id, user.email)
468 email_record, is_new = self.save_email_to_db(email_data, service_log_entry.id, db)
470 # Process jobs if this is a new email
471 if is_new:
472 service_log_entry.emails_saved_n += 1
473 jobs_data.update(self._process_email(db, email_record, service_log_entry))
474 else:
475 logger.info("Email already exists in database. Skipping email.")
477 except Exception as exception:
478 message = f"Failed to get and save email data for email ID {email_external_id} due to error: {exception}. Skipping email."
479 logger.exception(message)
480 continue # next email
482 return jobs_data
484 def _process_email(
485 self,
486 db,
487 email_record: JobAlertEmail,
488 service_log_entry: EisServiceLog,
489 ) -> dict:
490 """Extract job ids from an email
491 :param db: Database session
492 :param email_record: JobAlertEmail record
493 :param service_log_entry: Service log entry"""
495 jobs_data = {}
497 # LinkedIn jobs
498 if email_record.platform == "linkedin":
499 job_ids = self.extract_linkedin_job_ids(email_record.body)
500 service_log_entry.linkedin_job_n += len(job_ids)
502 # Indeed
503 elif email_record.platform == "indeed":
505 # Use the email body to extract the job information instead of using the Bright API
506 if self.skip_indeed_brightapi_scraping:
507 jobs = extract_indeed_jobs_from_email(email_record.body)
508 for job in jobs:
509 try:
510 job_id = self.extract_indeed_job_ids(job["job"]["url"])[0]
511 jobs_data[job_id] = job
512 except Exception as exception:
513 message = f"Failed to extract job ID for job URL {job['job']['url']} due to error: {exception}. Skipping job."
514 logger.exception(message)
515 continue
516 job_ids = list(jobs_data.keys())
517 else:
518 job_ids = self.extract_indeed_job_ids(email_record.body)
519 service_log_entry.indeed_job_n += len(job_ids)
521 else:
522 logger.info(f"No job IDs found in email: {email_record.email_external_id}. Skipping email.")
523 return jobs_data
525 # Save the extracted job ids to the database
526 try:
527 self.save_jobs_to_db(email_record, job_ids, db)
528 service_log_entry.jobs_extracted_n += len(job_ids)
529 logger.info(f"Extracted {len(job_ids)} job IDs from {email_record.platform}")
530 except Exception as exception:
531 logger.exception(
532 f"Failed to save job IDs for email ID {email_record.email_external_id} due to error: {exception}. Skipping email."
533 )
534 return jobs_data
536 return jobs_data
538 def _scrape_remaining_jobs(
539 self,
540 db,
541 service_log_entry: EisServiceLog,
542 jobs_data: dict,
543 ) -> None:
544 """Scrape all remaining unscraped jobs
545 :param db: Database session
546 :param service_log_entry: Service log entry
547 :param jobs_data: Dictionary of jobs data"""
549 job_records = (
550 db.query(ScrapedJob)
551 .filter(ScrapedJob.is_scraped.is_(False))
552 .filter(ScrapedJob.is_failed.is_(False))
553 .distinct(ScrapedJob.external_job_id)
554 .all()
555 )
557 for job_record in job_records:
558 if job_record.emails[0].platform == "linkedin":
559 scrapper = LinkedinJobScraper(job_record.external_job_id)
560 elif job_record.emails[0].platform == "indeed":
561 if not self.skip_indeed_brightapi_scraping:
562 scrapper = IndeedJobScraper(job_record.external_job_id)
563 else:
564 scrapper = None
565 else:
566 logger.info(f"Unknown platform for job {job_record.external_job_id}. Skipping job.")
567 continue # next job record
569 # Scrape the data and save them to the database
570 logger.info(f"Scraping job ID: {job_record.external_job_id}")
571 try:
572 if scrapper is not None:
573 job_data = scrapper.scrape_job()
574 else:
575 job_data = jobs_data[job_record.external_job_id]
576 scrape_datetime = datetime.now()
577 self.save_job_data_to_db(job_record, job_data, scrape_datetime, db)
578 same_jobs = (
579 db.query(ScrapedJob)
580 .filter(ScrapedJob.is_scraped.is_(False))
581 .filter(ScrapedJob.is_failed.is_(False))
582 .filter(ScrapedJob.external_job_id == job_record.external_job_id)
583 .all()
584 )
585 for same_job in same_jobs:
586 self.save_job_data_to_db(same_job, job_data, scrape_datetime, db)
588 service_log_entry.job_success_n += 1
589 except:
590 message = f"Failed to scrape job data for job ID {job_record.external_job_id} due to error: {traceback.format_exc()}. Skipping job."
591 logger.exception(message)
592 job_record.is_scraped = True
593 job_record.is_failed = True
594 job_record.scrape_error = f"{traceback.format_exc()}"
595 db.commit()
596 service_log_entry.job_fail_n += 1
599class GmailScraperService:
600 """Service wrapper for GmailScraper with start/stop functionality"""
602 def __init__(self) -> None:
603 """Initialise the service with a GmailScraper instance."""
605 self.scraper = GmailScraper()
606 self.is_running = False
607 self.thread = None
608 self.stop_event = threading.Event()
610 def start(self, period_hours: float = 3.0) -> None:
611 """Start the scraping service
612 :param period_hours: Hours between each scraping run"""
614 if self.is_running:
615 logger.info("Service is already running")
616 return
618 self.is_running = True
619 self.stop_event.clear()
621 # Start the service in a separate thread
622 self.thread = threading.Thread(target=self._run_service, args=(period_hours,))
623 self.thread.daemon = False
624 self.thread.start()
626 logger.info(f"Gmail scraping service started with {period_hours}h interval")
628 def stop(self) -> None:
629 """Stop the scraping service"""
631 if not self.is_running:
632 logger.info("Service is not running")
633 return
635 logger.info("Stopping Gmail scraping service...")
636 self.is_running = False
637 self.stop_event.set()
639 if self.thread:
640 while self.thread.is_alive():
641 self.thread.join(timeout=5) # Wait up to 5 seconds for clean shutdown
643 logger.info("Gmail scraping service stopped")
645 def _run_service(self, period_hours: float) -> None:
646 """Internal method that runs the scraping loop
647 :param period_hours: Hours between each scraping run"""
649 while self.is_running and not self.stop_event.is_set():
650 try:
651 logger.info(f"[{datetime.now()}] Starting scraping run...")
653 # Run the scraping
654 result = self.scraper.run_scraping(timedelta_days=2)
656 duration = result.get("duration_seconds", 0)
657 sleep_time = max([0, period_hours * 3600 - duration])
658 logger.info(f"[{datetime.now()}] Scraping completed in {duration:.2f}s. Sleeping for {sleep_time:.2f}s")
659 if self.stop_event.wait(timeout=sleep_time):
660 break
662 except Exception as e:
663 logger.info(f"[{datetime.now()}] Error in scraping service: {e}")
664 # Sleep for a shorter time on error to retry sooner
665 if self.stop_event.wait(timeout=300): # 5 minutes
666 break
668 def status(self) -> dict:
669 """Get the current status of the service"""
671 return {
672 "is_running": self.is_running,
673 "thread_alive": self.thread.is_alive() if self.thread else False,
674 "thread_name": self.thread.name if self.thread else None,
675 }
678if __name__ == "__main__":
679 gmail = GmailScraper()
680 # emails = gmail.get_email_ids("emmanuelpean@gmail.com", inbox_only=True, timedelta_days=2)
681 # email_d = gmail.get_email_data(emails[2], "")
682 # print(email_d.body)
683 # print(email_d)
684 # gmail.save_email_to_db(email_d, next(get_db()))
685 gmail.run_scraping(2)
687 # service = GmailScraperService()
688 # service.start()
690 # gmail = GmailScraper()