Coverage for backend / app / job_email_scraping / job_scrapers / brightdata.py: 100%

69 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-17 21:34 +0000

1"""Brightdata Job Scraper Module 

2 

3This module provides functionality to scrape job postings using the BrightData API.""" 

4 

5import time 

6 

7import requests 

8from tqdm import tqdm 

9 

10from app.config import settings 

11from app.job_email_scraping.schemas import JobResult 

12 

13 

14class BrightdataJobScraper(object): 

15 """Job Scraper 

16 :ivar base_url: Base URL for the job platform 

17 :ivar name: Name of the job platform 

18 :ivar poll_interval: Time interval (in seconds) between polling attempts 

19 :ivar max_attempts: Maximum number of polling attempts""" 

20 

21 base_url: str = "" 

22 name: str = "" 

23 poll_interval: int | float = 2 

24 max_attempts: int = 60 

25 

26 def __init__( 

27 self, 

28 job_ids: str | list[str], 

29 ) -> None: 

30 """Object constructor 

31 :param job_ids: List of job IDs to scrape""" 

32 

33 self.job_ids = [job_ids] if isinstance(job_ids, str) else job_ids 

34 self.job_urls = [f"{self.base_url}{job_id}" for job_id in self.job_ids] 

35 self.poll_interval = self.poll_interval 

36 self.max_attempts *= len(self.job_ids) 

37 

38 # Load credentials from the secrets file 

39 self.api_key = settings.brightdata_api_key 

40 self.dataset_id = getattr(settings, f"brightdata_{self.name}_dataset_id") 

41 

42 def _get_snapshot(self) -> str: 

43 """Get the snapshot id""" 

44 

45 headers = { 

46 "Authorization": f"Bearer {self.api_key}", 

47 "Content-Type": "application/json", 

48 } 

49 

50 trigger_url = "https://api.brightdata.com/datasets/v3/trigger" 

51 params = { 

52 "dataset_id": self.dataset_id, 

53 "include_errors": "true", 

54 } 

55 data = [{"url": job_url} for job_url in self.job_urls] 

56 response = requests.post(trigger_url, headers=headers, params=params, json=data) 

57 if response.status_code != 200: 

58 raise Exception(f"Failed to trigger dataset: {response.status_code} {response.text}") 

59 snapshot_id = response.json().get("snapshot_id") 

60 if not snapshot_id: 

61 raise Exception(f"No snapshot_id returned: {response.text}") 

62 

63 return snapshot_id 

64 

65 def _wait_for_data(self, snapshot_id: str) -> None: 

66 """Wait for the job data associated with a specific snapshot id to be ready 

67 :param snapshot_id: Snapshot ID""" 

68 

69 progress_url = f"https://api.brightdata.com/datasets/v3/progress/{snapshot_id}" 

70 headers = {"Authorization": f"Bearer {self.api_key}"} 

71 

72 # Create progress bar for polling attempts 

73 with tqdm(total=self.max_attempts, desc="Waiting for data", unit="attempt") as pbar: 

74 for attempt in range(self.max_attempts): 

75 progress_resp = requests.get(progress_url, headers=headers) 

76 if progress_resp.status_code not in (200, 202): 

77 raise Exception(f"Failed to get snapshot status: {progress_resp.status_code} {progress_resp.text}") 

78 

79 status = progress_resp.json().get("status") 

80 

81 # Update progress bar description with current status 

82 pbar.set_description(f"Status: {status}") 

83 

84 if status.lower() == "ready": 

85 pbar.update(self.max_attempts - attempt) # Complete the bar 

86 break 

87 elif status.lower() == "failed": 

88 raise Exception("Snapshot processing failed.") 

89 

90 pbar.update(1) 

91 time.sleep(self.poll_interval) 

92 else: 

93 raise TimeoutError("Snapshot data not ready after maximum attempts.") 

94 

95 def _retrieve_data(self, snapshot_id: str) -> list[dict]: 

96 """Retrieve the job data associated with the snapshot id 

97 :param snapshot_id: Snapshot ID 

98 :return: Job data dictionary""" 

99 

100 snapshot_url = f"https://api.brightdata.com/datasets/v3/snapshot/{snapshot_id}" 

101 params = {"format": "json"} 

102 headers = {"Authorization": f"Bearer {self.api_key}"} 

103 

104 data_resp = requests.get(snapshot_url, headers=headers, params=params) 

105 

106 # Handle data not ready 

107 attempted = 0 

108 while data_resp.status_code == 202 and attempted < 10: 

109 data_resp = requests.get(snapshot_url, headers=headers, params=params) 

110 attempted += 1 

111 json_data = data_resp.json() 

112 

113 # Handle other errors 

114 if data_resp.status_code != 200: 

115 raise Exception(f"Failed to get snapshot data: {data_resp.status_code} {data_resp.text}") 

116 if isinstance(json_data, list) and "error_code" in json_data[0]: 

117 raise Exception(f"Failed to get snapshot data: {json_data}") 

118 return json_data 

119 

120 def _process_job_data(self, job_data: dict) -> JobResult: 

121 """Process job data to extract relevant information 

122 :param job_data: Job data dictionary 

123 :return: Dictionary containing job information""" 

124 

125 raise AssertionError("This method should be implemented in subclasses.") 

126 

127 def scrape_job(self) -> list[JobResult]: 

128 """Complete workflow to scrape a LinkedIn job""" 

129 

130 snapshot_id = self._get_snapshot() 

131 self._wait_for_data(snapshot_id) 

132 data = self._retrieve_data(snapshot_id) 

133 return [self._process_job_data(d) for d in data]