Coverage for backend / app / job_email_scraping / email_scraper.py: 80%
316 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
1"""Email scraper service.
3Reads job alert emails from a shared inbox, extracts job IDs for supported
4platforms (LinkedIn, Indeed, Veganjobs, etc.), stores email and job metadata in the
5database, scrapes full job details (via platform scrapers or from email
6content), and records run statistics in an JobScrapingServiceLog."""
8import datetime as dt
9import traceback
11from sqlalchemy import or_
13from app import models
14from app.config import settings
15from app.database import get_db
16from app.emails.email_service import EmailService
17from app.geolocation.geolocation import geocode_location
18from app.job_email_scraping.email_parsers import JOB_PARSERS, ALERT_NAME_EXTRACTORS, PLATFORM_SENDER_EMAILS
19from app.job_email_scraping.email_parsers.utils import Platform, remove_style_tags
20from app.job_email_scraping.filtering import is_job_filtered_out, is_job_favoured
21from app.job_email_scraping.gmail import extract_forwarding_confirmation_link, extract_gmail_originator
22from app.job_email_scraping.job_scrapers import SCRAPERS
23from app.job_email_scraping.location_parser import LocationParser
24from app.job_email_scraping.models import (
25 JobEmail,
26 ScrapedJob,
27 JobEmailScrapingServiceLog,
28 JobEmailScrapingPlatformStat,
29 JobEmailScrapingServiceError,
30)
31from app.job_email_scraping.schemas import JobResult
32from app.resources import CURRENCIES
33from app.service_runner.service_runner import ServiceRunner
34from app.utils import AppLogger
36SERVICE_NAME = "email_scraper_service"
39class JobEmailScraper(EmailService):
40 """Job Email Alert Scraper"""
42 def __init__(self, db=None) -> None:
43 """Object constructor
44 :param db: optional database session for testing"""
46 EmailService.__init__(self, settings.scraper_email_username, settings.scraper_email_password)
47 self.location_parser = LocationParser()
48 self.logger = AppLogger.create_service_logger(SERVICE_NAME, "INFO")
49 self.db = next(get_db()) if db is None else db
51 def create_service_log(self, **kwargs) -> JobEmailScrapingServiceLog:
52 """Create a new service log entry
53 :param kwargs: JobEmailScrapingServiceLog keyword arguments"""
55 # noinspection PyArgumentList
56 service_log_entry = JobEmailScrapingServiceLog(**kwargs)
57 self.db.add(service_log_entry)
58 self.db.commit()
59 self.db.refresh(service_log_entry)
60 return service_log_entry
62 def upsert_platform_stat(
63 self,
64 service_log: JobEmailScrapingServiceLog,
65 platform: Platform,
66 **kwargs,
67 ) -> JobEmailScrapingPlatformStat:
68 """Create a new platform statistics entry
69 :param service_log: associated JobEmailScrapingServiceLog instance
70 :param platform: Platform enum value
71 :param kwargs: PlatformStat keyword arguments"""
73 # Check the platform_stats entry exists
74 platform_stats = (
75 self.db.query(JobEmailScrapingPlatformStat)
76 .join(JobEmailScrapingServiceLog)
77 .filter(JobEmailScrapingServiceLog.id == service_log.id)
78 .filter(JobEmailScrapingPlatformStat.name == platform)
79 .first()
80 )
82 # Update existing entry by adding the new values
83 if not platform_stats:
84 platform_stats = JobEmailScrapingPlatformStat(service_log_id=service_log.id, name=platform)
85 self.db.add(platform_stats)
87 for key in kwargs:
88 if isinstance(kwargs[key], list):
89 value = kwargs[key]
90 else:
91 value = [kwargs[key]]
92 setattr(platform_stats, key, list(set(getattr(platform_stats, key) + value)))
94 self.db.commit()
95 self.db.refresh(platform_stats)
96 return platform_stats
98 def log_service_error(
99 self,
100 service_log: JobEmailScrapingServiceLog,
101 exc: Exception | str,
102 ) -> JobEmailScrapingServiceError:
103 """Create a JobEmailScrapingServiceError for a caught exception.
104 :param service_log: associated JobEmailScrapingServiceLog instance
105 :param exc: the caught exception
106 :return: JobEmailScrapingServiceError instance"""
108 tb = traceback.format_exc()
109 # noinspection PyArgumentList
110 err = JobEmailScrapingServiceError(
111 error_type=type(exc).__name__,
112 message=str(exc),
113 traceback=tb,
114 service_log_id=service_log.id,
115 )
116 self.db.add(err)
117 self.db.commit()
118 return err
120 def get_user_monthly_scrape_count(self, owner_id: int) -> int:
121 """Get the count of jobs scraped by a user in the current month.
122 :param owner_id: User ID
123 :return: Number of jobs scraped this month"""
125 start_of_month = dt.datetime.now(dt.timezone.utc).replace(day=1, hour=0, minute=0, second=0, microsecond=0)
126 count = (
127 self.db.query(ScrapedJob)
128 .filter(ScrapedJob.owner_id == owner_id)
129 .filter(ScrapedJob.is_scraped)
130 .filter(ScrapedJob.is_failed.is_(False))
131 .filter(ScrapedJob.scrape_datetime >= start_of_month)
132 .count()
133 )
134 return count
136 def is_user_over_monthly_quota(self, owner_id: int) -> bool:
137 """Check if a user has exceeded their monthly scrape quota.
138 :param owner_id: User ID
139 :return: True if the user has exceeded their quota"""
141 return self.get_user_monthly_scrape_count(owner_id) >= settings.monthly_scrape_quota
143 def extract_forwarding_email_confirmation(self, service_log: JobEmailScrapingServiceLog):
144 """Extract and save forwarding confirmation links from emails sent by different email providers
145 :param service_log: associated JobEmailScrapingServiceLog instance"""
147 email_platforms = {"gmail": "forwarding-noreply@google.com"}
148 for email_platform in email_platforms:
149 try:
150 email_ids = self.get_email_ids(from_email=email_platforms[email_platform], timedelta_days=365)
151 except:
152 self.log_service_error(service_log, f"Failed to get email with platform {email_platform}.")
153 continue
155 for email_id in email_ids:
156 try:
157 existing_entry = (
158 self.db.query(models.ForwardingConfirmationLink)
159 .filter(models.ForwardingConfirmationLink.email_external_id == email_id)
160 .first()
161 )
162 if existing_entry:
163 self.logger.info(f"Forwarding confirmation link for email {email_id} already exists. Skipping.")
164 continue
165 else:
166 email = self.get_email_data(email_id)
167 except:
168 self.log_service_error(
169 service_log, f"Failed to read email {email_id} with platform {email_platform}."
170 )
171 continue
172 try:
173 link = extract_forwarding_confirmation_link(email["body"])
174 if not link:
175 message = (
176 "Forwarding confirmation link not found in email body. Skipping forwarding confirmation."
177 )
178 self.logger.warning(message)
179 continue
180 user_email = extract_gmail_originator(email["body"])
181 user = self.db.query(models.User).filter(models.User.email == user_email).first()
182 if not user:
183 message = (
184 f"User with email {user_email} not found in database. Skipping forwarding confirmation."
185 )
186 self.logger.warning(message)
187 continue
188 # noinspection PyArgumentList
189 confirmation = models.ForwardingConfirmationLink(
190 email_external_id=email["id"],
191 url=link,
192 platform=email_platform,
193 owner_id=user.id,
194 )
195 self.db.add(confirmation)
196 self.db.commit()
197 except:
198 message = f"Failed to extract forwarding confirmation link from email {email["id"]}"
199 self.log_service_error(service_log, message)
201 # ------------------------------------------------ EMAIL PROCESSING ------------------------------------------------
203 def get_and_save_email_to_db(
204 self,
205 email_id: str,
206 user: models.User,
207 service_log_id: int,
208 forwarded: bool = False,
209 ) -> tuple[JobEmail, bool]:
210 """Read and save an email to the database
211 :param email_id: Email ID
212 :param user: User entry associated with this email
213 :param service_log_id: ID of the JobEmailScrapingServiceLog instance associated with this email
214 :param forwarded: Whether the email was forwarded
215 :return: JobEmails instance and whether the record was created or already existing"""
217 # Check if the email already exists and return it if it does
218 existing_email = self.db.query(JobEmail).filter(JobEmail.external_email_id == email_id).first()
220 # Return the existing email
221 if existing_email:
222 return existing_email, False
224 else:
226 # Read the email content and determine the platform
227 message = self.get_email_data(email_id)
229 # Determine the platform
230 platform = PLATFORM_SENDER_EMAILS.get(message["from"].lower())
231 if not platform:
232 for plat in PLATFORM_SENDER_EMAILS:
233 if plat in message["body"].lower():
234 platform = PLATFORM_SENDER_EMAILS[plat]
235 break
236 if not platform:
237 raise ValueError("Email body does not contain a valid platform identifier.")
238 alert_name = ALERT_NAME_EXTRACTORS[platform](message["subject"], message["body"])
239 sender = message["from"] if forwarded else message["to"]
240 # Create a new email record
241 email_record = JobEmail(
242 owner_id=user.id,
243 service_log_id=service_log_id,
244 external_email_id=email_id,
245 subject=message["subject"],
246 sender=sender,
247 date_received=message["date"],
248 body=remove_style_tags(message["body"]),
249 platform=platform,
250 alert_name=alert_name,
251 )
252 self.db.add(email_record)
253 self.db.commit()
254 self.db.refresh(email_record)
256 return email_record, True
258 # ------------------------------------------------- JOB PROCESSING -------------------------------------------------
260 def process_job_result(self, job_result: JobResult) -> dict:
261 """Process a single JobResult and extract relevant data
262 :param job_result: JobResult instance
263 :return dictionary of extracted job data"""
265 result = {} # noqa
267 # Location & attendance type
268 result["location"] = job_result.location
269 result["parsed_location"], result["attendance_type"] = self.location_parser.parse_location(result["location"])
271 if result["parsed_location"]:
272 geolocation = geocode_location(result["parsed_location"], self.db, self.logger)
273 if geolocation:
274 result["geolocation_id"] = geolocation.id
275 result["location_postcode"] = geolocation.postcode
276 result["location_city"] = geolocation.city
277 result["location_country"] = geolocation.country
279 # Salary
280 result["salary_min"] = job_result.job.salary.min_amount
281 result["salary_max"] = job_result.job.salary.max_amount
282 result["salary_currency"] = None
283 currency_code = (job_result.job.salary.currency or "").lower()
284 for currency in CURRENCIES:
285 if currency_code in (currency["code"].lower(), currency["symbol_native"].lower()):
286 result["salary_currency"] = currency["code"]
287 break
289 # Job data
290 result["raw_url"] = job_result.job.raw_url
291 result["url"] = job_result.job.url
292 result["title"] = job_result.job.title
293 result["description"] = job_result.job.description
294 result["company"] = job_result.company
295 result["deadline"] = job_result.job.deadline
296 result["external_job_id"] = job_result.job_id
297 result["platform"] = job_result.platform
298 result["is_closed"] = job_result.job.is_closed
300 return result
302 def save_job_base_info_to_db(
303 self,
304 email_record: JobEmail,
305 job_results: list[JobResult],
306 ) -> list[ScrapedJob]:
307 """Save extracted job IDs from an email to the database and link them to the email.
308 If they already exist, just link them to the email.
309 :param email_record: associated JobAlertEmail record instance
310 :param job_results: list of job IDs to save
311 :return: list of ScrapedJob instances created or already existing in the database"""
313 job_records = []
315 for job_result in job_results:
317 # Check if the job already exists for this owner
318 existing_entry = (
319 self.db.query(ScrapedJob)
320 .filter(ScrapedJob.external_job_id == job_result.job_id)
321 .filter(ScrapedJob.owner_id == email_record.owner_id)
322 .first()
323 )
325 # Create new job record if it doesn't exist
326 if not existing_entry:
327 data = self.process_job_result(job_result)
329 # noinspection PyArgumentList
330 new_job = ScrapedJob(
331 owner_id=email_record.owner_id,
332 service_log_id=email_record.service_log_id,
333 **data,
334 )
335 new_job.emails.append(email_record)
336 self.db.add(new_job)
337 job_records.append(new_job)
339 # Link existing job record to the email
340 else:
341 # Check if this email is already linked to avoid duplicates
342 if email_record not in existing_entry.emails:
343 existing_entry.emails.append(email_record)
344 job_records.append(existing_entry)
346 # Commit and refresh the records
347 self.db.commit()
348 for job_record in job_records:
349 self.db.refresh(job_record)
351 return job_records
353 def update_scraped_job_data(
354 self,
355 job_record: ScrapedJob,
356 job_result: JobResult | None,
357 ) -> None:
358 """Update the job records with the scraped data
359 :param job_record: ScrapedJob instance
360 :param job_result: scraped job data"""
362 # Update the job data
363 if job_result is not None:
364 data = self.process_job_result(job_result)
365 for key in data:
366 if data[key] is not None:
367 setattr(job_record, key, data[key])
369 # Scraping information
370 job_record.scrape_datetime = dt.datetime.now()
371 job_record.is_scraped = True
373 self.db.commit()
375 def copy_existing_entry(
376 self,
377 job_record1: ScrapedJob,
378 job_record2: ScrapedJob,
379 ) -> ScrapedJob:
380 """Copy the data from job_record1 to job_record2
381 :param job_record1: Source ScrapedJob instance
382 :param job_record2: Target ScrapedJob instance
383 :return: Updated job_record2 instance"""
385 columns = [
386 # Only scraping info are necessary
387 "is_processed",
388 "is_scraped",
389 "scrape_datetime",
390 # Job details
391 "title",
392 "description",
393 "salary_min",
394 "salary_max",
395 "salary_currency",
396 "url",
397 "deadline",
398 "company",
399 "is_closed",
400 "location",
401 "location_city",
402 "location_country",
403 "location_postcode",
404 "attendance_type",
405 ]
406 for key in columns:
407 if getattr(job_record2, key) is not None:
408 setattr(job_record2, key, getattr(job_record1, key))
409 self.db.commit()
410 return job_record2
412 # ----------------------------------------------------- RUNNER -----------------------------------------------------
414 def run_scraping(self, timedelta_days: int | float = 1) -> JobEmailScrapingServiceLog:
415 """Run the email scraping workflow
416 :param timedelta_days: Number of days to search for emails"""
418 start_time = dt.datetime.now()
419 self.logger.info("Starting email scraping workflow")
420 service_log = self.create_service_log(run_datetime=start_time)
421 self.extract_forwarding_email_confirmation(service_log)
423 try:
424 # Process emails for all users
425 self.process_emails(timedelta_days, service_log)
427 # Scrape remaining jobs that haven't been scraped yet
428 self.scrape_jobs(service_log)
430 # Log final statistics
431 service_log.run_duration = (dt.datetime.now() - start_time).total_seconds()
432 service_log.is_success = True
434 except Exception as exception:
435 self.logger.exception(f"Critical error in scraping workflow: {exception}")
436 service_log.run_duration = (dt.datetime.now() - start_time).total_seconds()
437 service_log.is_success = False
438 service_log.error_message = str(exception)
439 finally:
440 self.logger.info("Finished email scraping workflow")
442 self.db.commit()
443 return service_log
445 def process_emails(
446 self,
447 timedelta_days: int | float,
448 service_log: JobEmailScrapingServiceLog,
449 ) -> None:
450 """For each user, get and save each new email, then extract the job ids and job data.
451 :param timedelta_days: Number of days to search for emails
452 :param service_log: JobEmailScrapingServiceLog entry"""
454 # Get the list of active users with TOAST active
455 users = (
456 self.db.query(models.User)
457 .filter(models.User.premium.has(is_active=True, job_scraping_active=True))
458 .filter(models.User.is_active)
459 .filter(models.User.is_verified)
460 .all()
461 )
462 self.logger.info(f"Found {len(users)} users to process.")
463 service_log.user_found_ids = [user.id for user in users]
465 # For each user...
466 for user in users:
467 self.logger.info(f"Processing user: {user.email} (ID: {user.id})")
469 # Get the list of all emails
470 forwarded = False
471 try:
472 email_ids = self.get_email_ids(
473 recipient_email=settings.scraper_email_username,
474 sender_email=user.email,
475 timedelta_days=timedelta_days,
476 from_email=list(PLATFORM_SENDER_EMAILS.keys()),
477 )
478 # If no emails found, look for forwarded emails
479 if not email_ids:
480 email_ids = self.get_email_ids(
481 from_email=user.email,
482 to_email=settings.scraper_email_username,
483 timedelta_days=timedelta_days,
484 )
485 forwarded = True
486 service_log.email_found_n += len(email_ids)
487 self.logger.info(f"Found {len(email_ids)} emails")
488 except Exception as exception:
489 self.log_service_error(service_log, exception)
490 self.logger.exception(f"Failed to search messages due to error: {exception}. Skipping user.")
491 email_ids = []
493 # For each email...
494 for email_id in email_ids:
495 self.logger.info(f"Processing email with ID: {email_id}")
496 try:
497 email_record, is_new = self.get_and_save_email_to_db(email_id, user, service_log.id, forwarded)
499 # Extract jobs if this is a new email
500 if is_new:
501 self.upsert_platform_stat(service_log, email_record.platform, email_saved_ids=email_record.id)
502 self.extract_email_data(email_record, service_log)
503 else:
504 self.upsert_platform_stat(service_log, email_record.platform, email_skipped_ids=email_record.id)
505 self.logger.info("Email already exists in database. Skipping email.")
507 except Exception as exception:
508 self.log_service_error(service_log, exception)
509 self.logger.exception(
510 f"Failed to get and save email data due to error: {exception}. Skipping email."
511 )
512 continue # next email
514 # noinspection PyAugmentAssignment
515 service_log.user_processed_ids = service_log.user_processed_ids + [user.id]
517 def extract_email_data(
518 self,
519 email_record: JobEmail,
520 service_log: JobEmailScrapingServiceLog,
521 ) -> None:
522 """Extract job ids from an email and save them to the database.
523 May also extract job data directly from the email for some platforms depending on settings.
524 :param email_record: JobEmail record
525 :param service_log: Service log entry
526 :return: Dictionary of jobs data if the job data were directly extracted from the email"""
528 try:
529 jobs = JOB_PARSERS[email_record.platform](email_record.body)
530 except Exception as exception:
531 self.log_service_error(service_log, exception)
532 self.logger.exception(
533 f"Failed to parse email ID {email_record.external_email_id} due to error: {exception}. Skipping email."
534 )
535 return None # skip the email parsing
537 # Update the email record with the number of jobs found
538 email_record.job_found_n = len(jobs)
539 self.db.commit()
541 # Save the extracted job ids to the database
542 try:
543 scraped_jobs = self.save_job_base_info_to_db(email_record, jobs)
544 self.upsert_platform_stat(service_log, email_record.platform, job_found_ids=[j.id for j in scraped_jobs])
545 self.logger.info(f"Extracted and saved {len(jobs)} job IDs from {email_record.platform}")
546 except Exception as exception:
547 error = f"Failed to save job IDs for email ID {email_record.external_email_id} due to error: {exception}. Skipping email."
548 self.log_service_error(service_log, error)
549 self.logger.exception(error)
551 def scrape_jobs(self, service_log: JobEmailScrapingServiceLog) -> None:
552 """Process all unscraped jobs, including those scheduled for retry.
553 :param service_log: Service log entry"""
555 # List all unprocessed job records, including those whose retry window has passed
556 now = dt.datetime.now(dt.timezone.utc)
557 job_records = (
558 self.db.query(ScrapedJob)
559 .filter(ScrapedJob.is_processed.is_(False))
560 .filter(
561 or_(
562 ScrapedJob.next_retry_at.is_(None),
563 ScrapedJob.next_retry_at <= now,
564 )
565 )
566 .all()
567 )
568 platforms = set([job.platform for job in job_records])
569 for platform in platforms:
570 job_ids = [job.id for job in job_records if job.platform == platform]
571 self.upsert_platform_stat(service_log, platform, job_to_process_ids=job_ids)
573 # For each job record...
574 for job_record in job_records:
576 # Check if filtered out
577 try:
578 if job_filter_rule := is_job_filtered_out(self.db, job_record):
579 self.logger.info(
580 f"Job ID {job_record.external_job_id} filtered out for user ID {job_record.owner_id} "
581 f"due to rule {job_filter_rule.name}"
582 )
583 job_record.is_processed = True
584 job_record.exclusion_filter_id = job_filter_rule.id
585 self.db.commit()
586 self.upsert_platform_stat(service_log, job_record.platform, job_scrape_filtered_ids=job_record.id)
587 continue # next job record
588 except Exception as exception:
589 error = (
590 f"Failed to check filtering for job ID {job_record.external_job_id} due to error: {exception}. "
591 f"Proceeding with scraping."
592 )
593 self.log_service_error(service_log, error)
594 self.logger.exception(error)
596 # Check if favoured
597 try:
598 if favoured_rule := is_job_favoured(self.db, job_record):
599 self.logger.info(
600 f"Job ID {job_record.external_job_id} favoured for user ID {job_record.owner_id} "
601 f"due to rule {favoured_rule.name}"
602 )
603 job_record.favourite_filter_id = favoured_rule.id
604 self.db.commit()
605 except Exception as exception:
606 error = (
607 f"Failed to check favouring for job ID {job_record.external_job_id} due to error: {exception}. "
608 f"Proceeding with scraping."
609 )
610 self.log_service_error(service_log, error)
611 self.logger.exception(error)
613 # Find any existing successfully scraped job data in the database
614 existing_data = (
615 self.db.query(ScrapedJob)
616 .filter(ScrapedJob.external_job_id == job_record.external_job_id)
617 .filter(ScrapedJob.platform == job_record.platform)
618 .filter(ScrapedJob.is_processed)
619 .filter(ScrapedJob.is_scraped)
620 .first()
621 )
623 # If previously scraped data exists, copy it to the unscraped record
624 if existing_data:
625 self.logger.info(
626 f"Job ID {job_record.external_job_id} already has scraped data in the database. "
627 f"Copying data to unscraped record."
628 )
629 self.copy_existing_entry(existing_data, job_record)
630 job_record.is_processed = True
631 self.db.commit()
632 self.upsert_platform_stat(service_log, job_record.platform, job_scrape_copied_ids=job_record.id)
633 continue # next job record
635 # Check if user has exceeded their monthly scrape quota
636 if self.is_user_over_monthly_quota(job_record.owner_id):
637 self.logger.info(
638 f"User ID {job_record.owner_id} has exceeded their monthly scrape quota of "
639 f"{settings.monthly_scrape_quota}. Skipping job ID {job_record.external_job_id}."
640 )
641 job_record.is_skipped = True
642 job_record.skip_reason = f"Monthly scrape quota of {settings.monthly_scrape_quota} exceeded"
643 job_record.is_processed = True
644 self.db.commit()
645 self.upsert_platform_stat(service_log, job_record.platform, job_scrape_skipped_ids=job_record.id)
646 continue # next job record
648 # Otherwise, scrape the job data
649 if job_record.platform in SCRAPERS:
650 scraper = SCRAPERS[job_record.platform](job_record.external_job_id)
651 self.logger.info(f"Scraping job ID: {job_record.external_job_id}")
652 try:
653 job_data = scraper.scrape_job()[0]
654 self.update_scraped_job_data(job_record, job_data)
655 job_record.is_processed = True
656 self.db.commit()
657 self.upsert_platform_stat(service_log, job_record.platform, job_scrape_succeeded_ids=job_record.id)
658 except:
659 message = (
660 f"Failed to scrape job data for job ID {job_record.external_job_id} due to error: "
661 f"{traceback.format_exc()}. Skipping job."
662 )
663 self.logger.exception(message)
664 job_record.scrape_error = job_record.scrape_error + [
665 {"datetime": dt.datetime.now(dt.timezone.utc).isoformat(), "error": traceback.format_exc()}
666 ]
667 job_record.retry_count += 1
668 if job_record.retry_count < 3:
669 job_record.next_retry_at = dt.datetime.now(dt.timezone.utc) + dt.timedelta(
670 hours=settings.scrape_retry_delay_hours
671 )
672 self.logger.info(
673 f"Scheduled retry {job_record.retry_count}/3 for job ID "
674 f"{job_record.external_job_id} in {settings.scrape_retry_delay_hours}h"
675 )
676 else:
677 job_record.is_processed = True
678 job_record.is_failed = True
679 self.logger.info(f"Job ID {job_record.external_job_id} permanently failed after 3 attempts")
680 self.db.commit()
681 self.upsert_platform_stat(service_log, job_record.platform, job_scrape_failed_ids=job_record.id)
682 else:
683 self.logger.info(f"Unknown platform for job {job_record.external_job_id}. Skipping job.")
684 job_record.is_skipped = True
685 job_record.skip_reason = f"Unknown platform {job_record.platform}"
686 job_record.is_processed = True
687 self.db.commit()
688 continue # next job record
691job_scraping_service_runner = ServiceRunner(
692 service_name=SERVICE_NAME,
693 service_function=JobEmailScraper().run_scraping,
694 service_kwargs=dict(timedelta_days=3),
695)