Coverage for backend / app / job_email_scraping / email_scraper.py: 80%

316 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-17 21:34 +0000

1"""Email scraper service. 

2 

3Reads job alert emails from a shared inbox, extracts job IDs for supported 

4platforms (LinkedIn, Indeed, Veganjobs, etc.), stores email and job metadata in the 

5database, scrapes full job details (via platform scrapers or from email 

6content), and records run statistics in an JobScrapingServiceLog.""" 

7 

8import datetime as dt 

9import traceback 

10 

11from sqlalchemy import or_ 

12 

13from app import models 

14from app.config import settings 

15from app.database import get_db 

16from app.emails.email_service import EmailService 

17from app.geolocation.geolocation import geocode_location 

18from app.job_email_scraping.email_parsers import JOB_PARSERS, ALERT_NAME_EXTRACTORS, PLATFORM_SENDER_EMAILS 

19from app.job_email_scraping.email_parsers.utils import Platform, remove_style_tags 

20from app.job_email_scraping.filtering import is_job_filtered_out, is_job_favoured 

21from app.job_email_scraping.gmail import extract_forwarding_confirmation_link, extract_gmail_originator 

22from app.job_email_scraping.job_scrapers import SCRAPERS 

23from app.job_email_scraping.location_parser import LocationParser 

24from app.job_email_scraping.models import ( 

25 JobEmail, 

26 ScrapedJob, 

27 JobEmailScrapingServiceLog, 

28 JobEmailScrapingPlatformStat, 

29 JobEmailScrapingServiceError, 

30) 

31from app.job_email_scraping.schemas import JobResult 

32from app.resources import CURRENCIES 

33from app.service_runner.service_runner import ServiceRunner 

34from app.utils import AppLogger 

35 

36SERVICE_NAME = "email_scraper_service" 

37 

38 

39class JobEmailScraper(EmailService): 

40 """Job Email Alert Scraper""" 

41 

42 def __init__(self, db=None) -> None: 

43 """Object constructor 

44 :param db: optional database session for testing""" 

45 

46 EmailService.__init__(self, settings.scraper_email_username, settings.scraper_email_password) 

47 self.location_parser = LocationParser() 

48 self.logger = AppLogger.create_service_logger(SERVICE_NAME, "INFO") 

49 self.db = next(get_db()) if db is None else db 

50 

51 def create_service_log(self, **kwargs) -> JobEmailScrapingServiceLog: 

52 """Create a new service log entry 

53 :param kwargs: JobEmailScrapingServiceLog keyword arguments""" 

54 

55 # noinspection PyArgumentList 

56 service_log_entry = JobEmailScrapingServiceLog(**kwargs) 

57 self.db.add(service_log_entry) 

58 self.db.commit() 

59 self.db.refresh(service_log_entry) 

60 return service_log_entry 

61 

62 def upsert_platform_stat( 

63 self, 

64 service_log: JobEmailScrapingServiceLog, 

65 platform: Platform, 

66 **kwargs, 

67 ) -> JobEmailScrapingPlatformStat: 

68 """Create a new platform statistics entry 

69 :param service_log: associated JobEmailScrapingServiceLog instance 

70 :param platform: Platform enum value 

71 :param kwargs: PlatformStat keyword arguments""" 

72 

73 # Check the platform_stats entry exists 

74 platform_stats = ( 

75 self.db.query(JobEmailScrapingPlatformStat) 

76 .join(JobEmailScrapingServiceLog) 

77 .filter(JobEmailScrapingServiceLog.id == service_log.id) 

78 .filter(JobEmailScrapingPlatformStat.name == platform) 

79 .first() 

80 ) 

81 

82 # Update existing entry by adding the new values 

83 if not platform_stats: 

84 platform_stats = JobEmailScrapingPlatformStat(service_log_id=service_log.id, name=platform) 

85 self.db.add(platform_stats) 

86 

87 for key in kwargs: 

88 if isinstance(kwargs[key], list): 

89 value = kwargs[key] 

90 else: 

91 value = [kwargs[key]] 

92 setattr(platform_stats, key, list(set(getattr(platform_stats, key) + value))) 

93 

94 self.db.commit() 

95 self.db.refresh(platform_stats) 

96 return platform_stats 

97 

98 def log_service_error( 

99 self, 

100 service_log: JobEmailScrapingServiceLog, 

101 exc: Exception | str, 

102 ) -> JobEmailScrapingServiceError: 

103 """Create a JobEmailScrapingServiceError for a caught exception. 

104 :param service_log: associated JobEmailScrapingServiceLog instance 

105 :param exc: the caught exception 

106 :return: JobEmailScrapingServiceError instance""" 

107 

108 tb = traceback.format_exc() 

109 # noinspection PyArgumentList 

110 err = JobEmailScrapingServiceError( 

111 error_type=type(exc).__name__, 

112 message=str(exc), 

113 traceback=tb, 

114 service_log_id=service_log.id, 

115 ) 

116 self.db.add(err) 

117 self.db.commit() 

118 return err 

119 

120 def get_user_monthly_scrape_count(self, owner_id: int) -> int: 

121 """Get the count of jobs scraped by a user in the current month. 

122 :param owner_id: User ID 

123 :return: Number of jobs scraped this month""" 

124 

125 start_of_month = dt.datetime.now(dt.timezone.utc).replace(day=1, hour=0, minute=0, second=0, microsecond=0) 

126 count = ( 

127 self.db.query(ScrapedJob) 

128 .filter(ScrapedJob.owner_id == owner_id) 

129 .filter(ScrapedJob.is_scraped) 

130 .filter(ScrapedJob.is_failed.is_(False)) 

131 .filter(ScrapedJob.scrape_datetime >= start_of_month) 

132 .count() 

133 ) 

134 return count 

135 

136 def is_user_over_monthly_quota(self, owner_id: int) -> bool: 

137 """Check if a user has exceeded their monthly scrape quota. 

138 :param owner_id: User ID 

139 :return: True if the user has exceeded their quota""" 

140 

141 return self.get_user_monthly_scrape_count(owner_id) >= settings.monthly_scrape_quota 

142 

143 def extract_forwarding_email_confirmation(self, service_log: JobEmailScrapingServiceLog): 

144 """Extract and save forwarding confirmation links from emails sent by different email providers 

145 :param service_log: associated JobEmailScrapingServiceLog instance""" 

146 

147 email_platforms = {"gmail": "forwarding-noreply@google.com"} 

148 for email_platform in email_platforms: 

149 try: 

150 email_ids = self.get_email_ids(from_email=email_platforms[email_platform], timedelta_days=365) 

151 except: 

152 self.log_service_error(service_log, f"Failed to get email with platform {email_platform}.") 

153 continue 

154 

155 for email_id in email_ids: 

156 try: 

157 existing_entry = ( 

158 self.db.query(models.ForwardingConfirmationLink) 

159 .filter(models.ForwardingConfirmationLink.email_external_id == email_id) 

160 .first() 

161 ) 

162 if existing_entry: 

163 self.logger.info(f"Forwarding confirmation link for email {email_id} already exists. Skipping.") 

164 continue 

165 else: 

166 email = self.get_email_data(email_id) 

167 except: 

168 self.log_service_error( 

169 service_log, f"Failed to read email {email_id} with platform {email_platform}." 

170 ) 

171 continue 

172 try: 

173 link = extract_forwarding_confirmation_link(email["body"]) 

174 if not link: 

175 message = ( 

176 "Forwarding confirmation link not found in email body. Skipping forwarding confirmation." 

177 ) 

178 self.logger.warning(message) 

179 continue 

180 user_email = extract_gmail_originator(email["body"]) 

181 user = self.db.query(models.User).filter(models.User.email == user_email).first() 

182 if not user: 

183 message = ( 

184 f"User with email {user_email} not found in database. Skipping forwarding confirmation." 

185 ) 

186 self.logger.warning(message) 

187 continue 

188 # noinspection PyArgumentList 

189 confirmation = models.ForwardingConfirmationLink( 

190 email_external_id=email["id"], 

191 url=link, 

192 platform=email_platform, 

193 owner_id=user.id, 

194 ) 

195 self.db.add(confirmation) 

196 self.db.commit() 

197 except: 

198 message = f"Failed to extract forwarding confirmation link from email {email["id"]}" 

199 self.log_service_error(service_log, message) 

200 

201 # ------------------------------------------------ EMAIL PROCESSING ------------------------------------------------ 

202 

203 def get_and_save_email_to_db( 

204 self, 

205 email_id: str, 

206 user: models.User, 

207 service_log_id: int, 

208 forwarded: bool = False, 

209 ) -> tuple[JobEmail, bool]: 

210 """Read and save an email to the database 

211 :param email_id: Email ID 

212 :param user: User entry associated with this email 

213 :param service_log_id: ID of the JobEmailScrapingServiceLog instance associated with this email 

214 :param forwarded: Whether the email was forwarded 

215 :return: JobEmails instance and whether the record was created or already existing""" 

216 

217 # Check if the email already exists and return it if it does 

218 existing_email = self.db.query(JobEmail).filter(JobEmail.external_email_id == email_id).first() 

219 

220 # Return the existing email 

221 if existing_email: 

222 return existing_email, False 

223 

224 else: 

225 

226 # Read the email content and determine the platform 

227 message = self.get_email_data(email_id) 

228 

229 # Determine the platform 

230 platform = PLATFORM_SENDER_EMAILS.get(message["from"].lower()) 

231 if not platform: 

232 for plat in PLATFORM_SENDER_EMAILS: 

233 if plat in message["body"].lower(): 

234 platform = PLATFORM_SENDER_EMAILS[plat] 

235 break 

236 if not platform: 

237 raise ValueError("Email body does not contain a valid platform identifier.") 

238 alert_name = ALERT_NAME_EXTRACTORS[platform](message["subject"], message["body"]) 

239 sender = message["from"] if forwarded else message["to"] 

240 # Create a new email record 

241 email_record = JobEmail( 

242 owner_id=user.id, 

243 service_log_id=service_log_id, 

244 external_email_id=email_id, 

245 subject=message["subject"], 

246 sender=sender, 

247 date_received=message["date"], 

248 body=remove_style_tags(message["body"]), 

249 platform=platform, 

250 alert_name=alert_name, 

251 ) 

252 self.db.add(email_record) 

253 self.db.commit() 

254 self.db.refresh(email_record) 

255 

256 return email_record, True 

257 

258 # ------------------------------------------------- JOB PROCESSING ------------------------------------------------- 

259 

260 def process_job_result(self, job_result: JobResult) -> dict: 

261 """Process a single JobResult and extract relevant data 

262 :param job_result: JobResult instance 

263 :return dictionary of extracted job data""" 

264 

265 result = {} # noqa 

266 

267 # Location & attendance type 

268 result["location"] = job_result.location 

269 result["parsed_location"], result["attendance_type"] = self.location_parser.parse_location(result["location"]) 

270 

271 if result["parsed_location"]: 

272 geolocation = geocode_location(result["parsed_location"], self.db, self.logger) 

273 if geolocation: 

274 result["geolocation_id"] = geolocation.id 

275 result["location_postcode"] = geolocation.postcode 

276 result["location_city"] = geolocation.city 

277 result["location_country"] = geolocation.country 

278 

279 # Salary 

280 result["salary_min"] = job_result.job.salary.min_amount 

281 result["salary_max"] = job_result.job.salary.max_amount 

282 result["salary_currency"] = None 

283 currency_code = (job_result.job.salary.currency or "").lower() 

284 for currency in CURRENCIES: 

285 if currency_code in (currency["code"].lower(), currency["symbol_native"].lower()): 

286 result["salary_currency"] = currency["code"] 

287 break 

288 

289 # Job data 

290 result["raw_url"] = job_result.job.raw_url 

291 result["url"] = job_result.job.url 

292 result["title"] = job_result.job.title 

293 result["description"] = job_result.job.description 

294 result["company"] = job_result.company 

295 result["deadline"] = job_result.job.deadline 

296 result["external_job_id"] = job_result.job_id 

297 result["platform"] = job_result.platform 

298 result["is_closed"] = job_result.job.is_closed 

299 

300 return result 

301 

302 def save_job_base_info_to_db( 

303 self, 

304 email_record: JobEmail, 

305 job_results: list[JobResult], 

306 ) -> list[ScrapedJob]: 

307 """Save extracted job IDs from an email to the database and link them to the email. 

308 If they already exist, just link them to the email. 

309 :param email_record: associated JobAlertEmail record instance 

310 :param job_results: list of job IDs to save 

311 :return: list of ScrapedJob instances created or already existing in the database""" 

312 

313 job_records = [] 

314 

315 for job_result in job_results: 

316 

317 # Check if the job already exists for this owner 

318 existing_entry = ( 

319 self.db.query(ScrapedJob) 

320 .filter(ScrapedJob.external_job_id == job_result.job_id) 

321 .filter(ScrapedJob.owner_id == email_record.owner_id) 

322 .first() 

323 ) 

324 

325 # Create new job record if it doesn't exist 

326 if not existing_entry: 

327 data = self.process_job_result(job_result) 

328 

329 # noinspection PyArgumentList 

330 new_job = ScrapedJob( 

331 owner_id=email_record.owner_id, 

332 service_log_id=email_record.service_log_id, 

333 **data, 

334 ) 

335 new_job.emails.append(email_record) 

336 self.db.add(new_job) 

337 job_records.append(new_job) 

338 

339 # Link existing job record to the email 

340 else: 

341 # Check if this email is already linked to avoid duplicates 

342 if email_record not in existing_entry.emails: 

343 existing_entry.emails.append(email_record) 

344 job_records.append(existing_entry) 

345 

346 # Commit and refresh the records 

347 self.db.commit() 

348 for job_record in job_records: 

349 self.db.refresh(job_record) 

350 

351 return job_records 

352 

353 def update_scraped_job_data( 

354 self, 

355 job_record: ScrapedJob, 

356 job_result: JobResult | None, 

357 ) -> None: 

358 """Update the job records with the scraped data 

359 :param job_record: ScrapedJob instance 

360 :param job_result: scraped job data""" 

361 

362 # Update the job data 

363 if job_result is not None: 

364 data = self.process_job_result(job_result) 

365 for key in data: 

366 if data[key] is not None: 

367 setattr(job_record, key, data[key]) 

368 

369 # Scraping information 

370 job_record.scrape_datetime = dt.datetime.now() 

371 job_record.is_scraped = True 

372 

373 self.db.commit() 

374 

375 def copy_existing_entry( 

376 self, 

377 job_record1: ScrapedJob, 

378 job_record2: ScrapedJob, 

379 ) -> ScrapedJob: 

380 """Copy the data from job_record1 to job_record2 

381 :param job_record1: Source ScrapedJob instance 

382 :param job_record2: Target ScrapedJob instance 

383 :return: Updated job_record2 instance""" 

384 

385 columns = [ 

386 # Only scraping info are necessary 

387 "is_processed", 

388 "is_scraped", 

389 "scrape_datetime", 

390 # Job details 

391 "title", 

392 "description", 

393 "salary_min", 

394 "salary_max", 

395 "salary_currency", 

396 "url", 

397 "deadline", 

398 "company", 

399 "is_closed", 

400 "location", 

401 "location_city", 

402 "location_country", 

403 "location_postcode", 

404 "attendance_type", 

405 ] 

406 for key in columns: 

407 if getattr(job_record2, key) is not None: 

408 setattr(job_record2, key, getattr(job_record1, key)) 

409 self.db.commit() 

410 return job_record2 

411 

412 # ----------------------------------------------------- RUNNER ----------------------------------------------------- 

413 

414 def run_scraping(self, timedelta_days: int | float = 1) -> JobEmailScrapingServiceLog: 

415 """Run the email scraping workflow 

416 :param timedelta_days: Number of days to search for emails""" 

417 

418 start_time = dt.datetime.now() 

419 self.logger.info("Starting email scraping workflow") 

420 service_log = self.create_service_log(run_datetime=start_time) 

421 self.extract_forwarding_email_confirmation(service_log) 

422 

423 try: 

424 # Process emails for all users 

425 self.process_emails(timedelta_days, service_log) 

426 

427 # Scrape remaining jobs that haven't been scraped yet 

428 self.scrape_jobs(service_log) 

429 

430 # Log final statistics 

431 service_log.run_duration = (dt.datetime.now() - start_time).total_seconds() 

432 service_log.is_success = True 

433 

434 except Exception as exception: 

435 self.logger.exception(f"Critical error in scraping workflow: {exception}") 

436 service_log.run_duration = (dt.datetime.now() - start_time).total_seconds() 

437 service_log.is_success = False 

438 service_log.error_message = str(exception) 

439 finally: 

440 self.logger.info("Finished email scraping workflow") 

441 

442 self.db.commit() 

443 return service_log 

444 

445 def process_emails( 

446 self, 

447 timedelta_days: int | float, 

448 service_log: JobEmailScrapingServiceLog, 

449 ) -> None: 

450 """For each user, get and save each new email, then extract the job ids and job data. 

451 :param timedelta_days: Number of days to search for emails 

452 :param service_log: JobEmailScrapingServiceLog entry""" 

453 

454 # Get the list of active users with TOAST active 

455 users = ( 

456 self.db.query(models.User) 

457 .filter(models.User.premium.has(is_active=True, job_scraping_active=True)) 

458 .filter(models.User.is_active) 

459 .filter(models.User.is_verified) 

460 .all() 

461 ) 

462 self.logger.info(f"Found {len(users)} users to process.") 

463 service_log.user_found_ids = [user.id for user in users] 

464 

465 # For each user... 

466 for user in users: 

467 self.logger.info(f"Processing user: {user.email} (ID: {user.id})") 

468 

469 # Get the list of all emails 

470 forwarded = False 

471 try: 

472 email_ids = self.get_email_ids( 

473 recipient_email=settings.scraper_email_username, 

474 sender_email=user.email, 

475 timedelta_days=timedelta_days, 

476 from_email=list(PLATFORM_SENDER_EMAILS.keys()), 

477 ) 

478 # If no emails found, look for forwarded emails 

479 if not email_ids: 

480 email_ids = self.get_email_ids( 

481 from_email=user.email, 

482 to_email=settings.scraper_email_username, 

483 timedelta_days=timedelta_days, 

484 ) 

485 forwarded = True 

486 service_log.email_found_n += len(email_ids) 

487 self.logger.info(f"Found {len(email_ids)} emails") 

488 except Exception as exception: 

489 self.log_service_error(service_log, exception) 

490 self.logger.exception(f"Failed to search messages due to error: {exception}. Skipping user.") 

491 email_ids = [] 

492 

493 # For each email... 

494 for email_id in email_ids: 

495 self.logger.info(f"Processing email with ID: {email_id}") 

496 try: 

497 email_record, is_new = self.get_and_save_email_to_db(email_id, user, service_log.id, forwarded) 

498 

499 # Extract jobs if this is a new email 

500 if is_new: 

501 self.upsert_platform_stat(service_log, email_record.platform, email_saved_ids=email_record.id) 

502 self.extract_email_data(email_record, service_log) 

503 else: 

504 self.upsert_platform_stat(service_log, email_record.platform, email_skipped_ids=email_record.id) 

505 self.logger.info("Email already exists in database. Skipping email.") 

506 

507 except Exception as exception: 

508 self.log_service_error(service_log, exception) 

509 self.logger.exception( 

510 f"Failed to get and save email data due to error: {exception}. Skipping email." 

511 ) 

512 continue # next email 

513 

514 # noinspection PyAugmentAssignment 

515 service_log.user_processed_ids = service_log.user_processed_ids + [user.id] 

516 

517 def extract_email_data( 

518 self, 

519 email_record: JobEmail, 

520 service_log: JobEmailScrapingServiceLog, 

521 ) -> None: 

522 """Extract job ids from an email and save them to the database. 

523 May also extract job data directly from the email for some platforms depending on settings. 

524 :param email_record: JobEmail record 

525 :param service_log: Service log entry 

526 :return: Dictionary of jobs data if the job data were directly extracted from the email""" 

527 

528 try: 

529 jobs = JOB_PARSERS[email_record.platform](email_record.body) 

530 except Exception as exception: 

531 self.log_service_error(service_log, exception) 

532 self.logger.exception( 

533 f"Failed to parse email ID {email_record.external_email_id} due to error: {exception}. Skipping email." 

534 ) 

535 return None # skip the email parsing 

536 

537 # Update the email record with the number of jobs found 

538 email_record.job_found_n = len(jobs) 

539 self.db.commit() 

540 

541 # Save the extracted job ids to the database 

542 try: 

543 scraped_jobs = self.save_job_base_info_to_db(email_record, jobs) 

544 self.upsert_platform_stat(service_log, email_record.platform, job_found_ids=[j.id for j in scraped_jobs]) 

545 self.logger.info(f"Extracted and saved {len(jobs)} job IDs from {email_record.platform}") 

546 except Exception as exception: 

547 error = f"Failed to save job IDs for email ID {email_record.external_email_id} due to error: {exception}. Skipping email." 

548 self.log_service_error(service_log, error) 

549 self.logger.exception(error) 

550 

551 def scrape_jobs(self, service_log: JobEmailScrapingServiceLog) -> None: 

552 """Process all unscraped jobs, including those scheduled for retry. 

553 :param service_log: Service log entry""" 

554 

555 # List all unprocessed job records, including those whose retry window has passed 

556 now = dt.datetime.now(dt.timezone.utc) 

557 job_records = ( 

558 self.db.query(ScrapedJob) 

559 .filter(ScrapedJob.is_processed.is_(False)) 

560 .filter( 

561 or_( 

562 ScrapedJob.next_retry_at.is_(None), 

563 ScrapedJob.next_retry_at <= now, 

564 ) 

565 ) 

566 .all() 

567 ) 

568 platforms = set([job.platform for job in job_records]) 

569 for platform in platforms: 

570 job_ids = [job.id for job in job_records if job.platform == platform] 

571 self.upsert_platform_stat(service_log, platform, job_to_process_ids=job_ids) 

572 

573 # For each job record... 

574 for job_record in job_records: 

575 

576 # Check if filtered out 

577 try: 

578 if job_filter_rule := is_job_filtered_out(self.db, job_record): 

579 self.logger.info( 

580 f"Job ID {job_record.external_job_id} filtered out for user ID {job_record.owner_id} " 

581 f"due to rule {job_filter_rule.name}" 

582 ) 

583 job_record.is_processed = True 

584 job_record.exclusion_filter_id = job_filter_rule.id 

585 self.db.commit() 

586 self.upsert_platform_stat(service_log, job_record.platform, job_scrape_filtered_ids=job_record.id) 

587 continue # next job record 

588 except Exception as exception: 

589 error = ( 

590 f"Failed to check filtering for job ID {job_record.external_job_id} due to error: {exception}. " 

591 f"Proceeding with scraping." 

592 ) 

593 self.log_service_error(service_log, error) 

594 self.logger.exception(error) 

595 

596 # Check if favoured 

597 try: 

598 if favoured_rule := is_job_favoured(self.db, job_record): 

599 self.logger.info( 

600 f"Job ID {job_record.external_job_id} favoured for user ID {job_record.owner_id} " 

601 f"due to rule {favoured_rule.name}" 

602 ) 

603 job_record.favourite_filter_id = favoured_rule.id 

604 self.db.commit() 

605 except Exception as exception: 

606 error = ( 

607 f"Failed to check favouring for job ID {job_record.external_job_id} due to error: {exception}. " 

608 f"Proceeding with scraping." 

609 ) 

610 self.log_service_error(service_log, error) 

611 self.logger.exception(error) 

612 

613 # Find any existing successfully scraped job data in the database 

614 existing_data = ( 

615 self.db.query(ScrapedJob) 

616 .filter(ScrapedJob.external_job_id == job_record.external_job_id) 

617 .filter(ScrapedJob.platform == job_record.platform) 

618 .filter(ScrapedJob.is_processed) 

619 .filter(ScrapedJob.is_scraped) 

620 .first() 

621 ) 

622 

623 # If previously scraped data exists, copy it to the unscraped record 

624 if existing_data: 

625 self.logger.info( 

626 f"Job ID {job_record.external_job_id} already has scraped data in the database. " 

627 f"Copying data to unscraped record." 

628 ) 

629 self.copy_existing_entry(existing_data, job_record) 

630 job_record.is_processed = True 

631 self.db.commit() 

632 self.upsert_platform_stat(service_log, job_record.platform, job_scrape_copied_ids=job_record.id) 

633 continue # next job record 

634 

635 # Check if user has exceeded their monthly scrape quota 

636 if self.is_user_over_monthly_quota(job_record.owner_id): 

637 self.logger.info( 

638 f"User ID {job_record.owner_id} has exceeded their monthly scrape quota of " 

639 f"{settings.monthly_scrape_quota}. Skipping job ID {job_record.external_job_id}." 

640 ) 

641 job_record.is_skipped = True 

642 job_record.skip_reason = f"Monthly scrape quota of {settings.monthly_scrape_quota} exceeded" 

643 job_record.is_processed = True 

644 self.db.commit() 

645 self.upsert_platform_stat(service_log, job_record.platform, job_scrape_skipped_ids=job_record.id) 

646 continue # next job record 

647 

648 # Otherwise, scrape the job data 

649 if job_record.platform in SCRAPERS: 

650 scraper = SCRAPERS[job_record.platform](job_record.external_job_id) 

651 self.logger.info(f"Scraping job ID: {job_record.external_job_id}") 

652 try: 

653 job_data = scraper.scrape_job()[0] 

654 self.update_scraped_job_data(job_record, job_data) 

655 job_record.is_processed = True 

656 self.db.commit() 

657 self.upsert_platform_stat(service_log, job_record.platform, job_scrape_succeeded_ids=job_record.id) 

658 except: 

659 message = ( 

660 f"Failed to scrape job data for job ID {job_record.external_job_id} due to error: " 

661 f"{traceback.format_exc()}. Skipping job." 

662 ) 

663 self.logger.exception(message) 

664 job_record.scrape_error = job_record.scrape_error + [ 

665 {"datetime": dt.datetime.now(dt.timezone.utc).isoformat(), "error": traceback.format_exc()} 

666 ] 

667 job_record.retry_count += 1 

668 if job_record.retry_count < 3: 

669 job_record.next_retry_at = dt.datetime.now(dt.timezone.utc) + dt.timedelta( 

670 hours=settings.scrape_retry_delay_hours 

671 ) 

672 self.logger.info( 

673 f"Scheduled retry {job_record.retry_count}/3 for job ID " 

674 f"{job_record.external_job_id} in {settings.scrape_retry_delay_hours}h" 

675 ) 

676 else: 

677 job_record.is_processed = True 

678 job_record.is_failed = True 

679 self.logger.info(f"Job ID {job_record.external_job_id} permanently failed after 3 attempts") 

680 self.db.commit() 

681 self.upsert_platform_stat(service_log, job_record.platform, job_scrape_failed_ids=job_record.id) 

682 else: 

683 self.logger.info(f"Unknown platform for job {job_record.external_job_id}. Skipping job.") 

684 job_record.is_skipped = True 

685 job_record.skip_reason = f"Unknown platform {job_record.platform}" 

686 job_record.is_processed = True 

687 self.db.commit() 

688 continue # next job record 

689 

690 

691job_scraping_service_runner = ServiceRunner( 

692 service_name=SERVICE_NAME, 

693 service_function=JobEmailScraper().run_scraping, 

694 service_kwargs=dict(timedelta_days=3), 

695)