Coverage for backend / app / job_email_scraping / job_scrapers / apify.py: 100%
54 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
1"""Apify Job Scraper Module
3This module provides functionality to scrape job postings using the Apify API."""
5import time
7from apify_client import ApifyClient
8from tqdm import tqdm
10from app.config import settings
11from app.job_email_scraping.schemas import JobResult
14class ApifyJobScraper(object):
15 """Job Scraper
16 :ivar base_url: Base URL for the job platform
17 :ivar name: Name of the job platform
18 :ivar poll_interval: Time interval (in seconds) between polling attempts
19 :ivar max_attempts: Maximum number of polling attempts
20 :ivar actor_id: Apify actor ID to use for scraping"""
22 base_url: str = ""
23 name: str = ""
24 poll_interval: int | float = 2
25 max_attempts: int = 60
26 actor_id: str = ""
28 def __init__(self, job_ids: str | list[str]) -> None:
29 """Object constructor
30 :param job_ids: List of job IDs to scrape"""
32 self.job_ids = [job_ids] if isinstance(job_ids, str) else job_ids
33 self.job_urls = [f"{self.base_url}{job_id}" for job_id in self.job_ids]
34 self.poll_interval = self.poll_interval
35 self.max_attempts *= len(self.job_ids)
37 # Load credentials from the secrets file
38 self.api_key = settings.apify_api_key
39 self.client = ApifyClient(self.api_key)
41 def _start_actor_run(self) -> dict:
42 """Start the Apify actor run
43 :return: Actor run information"""
45 run_input = {
46 "startUrls": [{"url": job_url} for job_url in self.job_urls],
47 "proxy": {"useApifyProxy": True, "apifyProxyGroups": ["RESIDENTIAL"]},
48 }
49 run = self.client.actor(self.actor_id).start(run_input=run_input)
50 return run
52 def _wait_for_data(self, run_id: str) -> None:
53 """Wait for the actor run to complete
54 :param run_id: Actor run ID"""
56 # Create progress bar for polling attempts
57 with tqdm(total=self.max_attempts, desc="Waiting for data", unit="attempt") as pbar:
58 for attempt in range(self.max_attempts):
59 run_info = self.client.run(run_id).get()
60 status = run_info.get("status")
62 # Update progress bar description with current status
63 pbar.set_description(f"Status: {status}")
65 if status == "SUCCEEDED":
66 pbar.update(self.max_attempts - attempt) # Complete the bar
67 break
68 elif status in ("FAILED", "ABORTED", "TIMED-OUT"):
69 raise Exception(f"Actor run {status.lower()}: {run_info}")
71 pbar.update(1)
72 time.sleep(self.poll_interval)
73 else:
74 raise TimeoutError("Actor run not complete after maximum attempts.")
76 def _retrieve_data(self, dataset_id: str) -> list[dict]:
77 """Retrieve the job data from the dataset
78 :param dataset_id: Dataset ID
79 :return: List of job data dictionaries"""
81 items = self.client.dataset(dataset_id).list_items().items
82 if not items:
83 raise Exception("No data returned from actor run")
84 return items
86 def _process_job_data(self, job_data: dict) -> JobResult:
87 """Process job data to extract relevant information
88 :param job_data: Job data dictionary
89 :return: Dictionary containing job information"""
91 raise AssertionError("This method should be implemented in subclasses.")
93 def scrape_job(self) -> list[JobResult]:
94 """Complete workflow to scrape jobs using Apify"""
96 run = self._start_actor_run()
97 run_id = run.get("id")
98 dataset_id = run.get("defaultDatasetId")
100 if not run_id:
101 raise Exception(f"No run_id returned: {run}")
102 if not dataset_id:
103 raise Exception(f"No defaultDatasetId returned: {run}")
105 self._wait_for_data(run_id)
106 data = self._retrieve_data(dataset_id)
107 return [self._process_job_data(d) for d in data]