Coverage for backend / app / job_email_scraping / job_scrapers / apify.py: 100%

54 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-17 21:34 +0000

1"""Apify Job Scraper Module 

2 

3This module provides functionality to scrape job postings using the Apify API.""" 

4 

5import time 

6 

7from apify_client import ApifyClient 

8from tqdm import tqdm 

9 

10from app.config import settings 

11from app.job_email_scraping.schemas import JobResult 

12 

13 

14class ApifyJobScraper(object): 

15 """Job Scraper 

16 :ivar base_url: Base URL for the job platform 

17 :ivar name: Name of the job platform 

18 :ivar poll_interval: Time interval (in seconds) between polling attempts 

19 :ivar max_attempts: Maximum number of polling attempts 

20 :ivar actor_id: Apify actor ID to use for scraping""" 

21 

22 base_url: str = "" 

23 name: str = "" 

24 poll_interval: int | float = 2 

25 max_attempts: int = 60 

26 actor_id: str = "" 

27 

28 def __init__(self, job_ids: str | list[str]) -> None: 

29 """Object constructor 

30 :param job_ids: List of job IDs to scrape""" 

31 

32 self.job_ids = [job_ids] if isinstance(job_ids, str) else job_ids 

33 self.job_urls = [f"{self.base_url}{job_id}" for job_id in self.job_ids] 

34 self.poll_interval = self.poll_interval 

35 self.max_attempts *= len(self.job_ids) 

36 

37 # Load credentials from the secrets file 

38 self.api_key = settings.apify_api_key 

39 self.client = ApifyClient(self.api_key) 

40 

41 def _start_actor_run(self) -> dict: 

42 """Start the Apify actor run 

43 :return: Actor run information""" 

44 

45 run_input = { 

46 "startUrls": [{"url": job_url} for job_url in self.job_urls], 

47 "proxy": {"useApifyProxy": True, "apifyProxyGroups": ["RESIDENTIAL"]}, 

48 } 

49 run = self.client.actor(self.actor_id).start(run_input=run_input) 

50 return run 

51 

52 def _wait_for_data(self, run_id: str) -> None: 

53 """Wait for the actor run to complete 

54 :param run_id: Actor run ID""" 

55 

56 # Create progress bar for polling attempts 

57 with tqdm(total=self.max_attempts, desc="Waiting for data", unit="attempt") as pbar: 

58 for attempt in range(self.max_attempts): 

59 run_info = self.client.run(run_id).get() 

60 status = run_info.get("status") 

61 

62 # Update progress bar description with current status 

63 pbar.set_description(f"Status: {status}") 

64 

65 if status == "SUCCEEDED": 

66 pbar.update(self.max_attempts - attempt) # Complete the bar 

67 break 

68 elif status in ("FAILED", "ABORTED", "TIMED-OUT"): 

69 raise Exception(f"Actor run {status.lower()}: {run_info}") 

70 

71 pbar.update(1) 

72 time.sleep(self.poll_interval) 

73 else: 

74 raise TimeoutError("Actor run not complete after maximum attempts.") 

75 

76 def _retrieve_data(self, dataset_id: str) -> list[dict]: 

77 """Retrieve the job data from the dataset 

78 :param dataset_id: Dataset ID 

79 :return: List of job data dictionaries""" 

80 

81 items = self.client.dataset(dataset_id).list_items().items 

82 if not items: 

83 raise Exception("No data returned from actor run") 

84 return items 

85 

86 def _process_job_data(self, job_data: dict) -> JobResult: 

87 """Process job data to extract relevant information 

88 :param job_data: Job data dictionary 

89 :return: Dictionary containing job information""" 

90 

91 raise AssertionError("This method should be implemented in subclasses.") 

92 

93 def scrape_job(self) -> list[JobResult]: 

94 """Complete workflow to scrape jobs using Apify""" 

95 

96 run = self._start_actor_run() 

97 run_id = run.get("id") 

98 dataset_id = run.get("defaultDatasetId") 

99 

100 if not run_id: 

101 raise Exception(f"No run_id returned: {run}") 

102 if not dataset_id: 

103 raise Exception(f"No defaultDatasetId returned: {run}") 

104 

105 self._wait_for_data(run_id) 

106 data = self._retrieve_data(dataset_id) 

107 return [self._process_job_data(d) for d in data]