Coverage for backend / app / job_email_scraping / job_scrapers / brightdata.py: 100%
69 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
1"""Brightdata Job Scraper Module
3This module provides functionality to scrape job postings using the BrightData API."""
5import time
7import requests
8from tqdm import tqdm
10from app.config import settings
11from app.job_email_scraping.schemas import JobResult
14class BrightdataJobScraper(object):
15 """Job Scraper
16 :ivar base_url: Base URL for the job platform
17 :ivar name: Name of the job platform
18 :ivar poll_interval: Time interval (in seconds) between polling attempts
19 :ivar max_attempts: Maximum number of polling attempts"""
21 base_url: str = ""
22 name: str = ""
23 poll_interval: int | float = 2
24 max_attempts: int = 60
26 def __init__(
27 self,
28 job_ids: str | list[str],
29 ) -> None:
30 """Object constructor
31 :param job_ids: List of job IDs to scrape"""
33 self.job_ids = [job_ids] if isinstance(job_ids, str) else job_ids
34 self.job_urls = [f"{self.base_url}{job_id}" for job_id in self.job_ids]
35 self.poll_interval = self.poll_interval
36 self.max_attempts *= len(self.job_ids)
38 # Load credentials from the secrets file
39 self.api_key = settings.brightdata_api_key
40 self.dataset_id = getattr(settings, f"brightdata_{self.name}_dataset_id")
42 def _get_snapshot(self) -> str:
43 """Get the snapshot id"""
45 headers = {
46 "Authorization": f"Bearer {self.api_key}",
47 "Content-Type": "application/json",
48 }
50 trigger_url = "https://api.brightdata.com/datasets/v3/trigger"
51 params = {
52 "dataset_id": self.dataset_id,
53 "include_errors": "true",
54 }
55 data = [{"url": job_url} for job_url in self.job_urls]
56 response = requests.post(trigger_url, headers=headers, params=params, json=data)
57 if response.status_code != 200:
58 raise Exception(f"Failed to trigger dataset: {response.status_code} {response.text}")
59 snapshot_id = response.json().get("snapshot_id")
60 if not snapshot_id:
61 raise Exception(f"No snapshot_id returned: {response.text}")
63 return snapshot_id
65 def _wait_for_data(self, snapshot_id: str) -> None:
66 """Wait for the job data associated with a specific snapshot id to be ready
67 :param snapshot_id: Snapshot ID"""
69 progress_url = f"https://api.brightdata.com/datasets/v3/progress/{snapshot_id}"
70 headers = {"Authorization": f"Bearer {self.api_key}"}
72 # Create progress bar for polling attempts
73 with tqdm(total=self.max_attempts, desc="Waiting for data", unit="attempt") as pbar:
74 for attempt in range(self.max_attempts):
75 progress_resp = requests.get(progress_url, headers=headers)
76 if progress_resp.status_code not in (200, 202):
77 raise Exception(f"Failed to get snapshot status: {progress_resp.status_code} {progress_resp.text}")
79 status = progress_resp.json().get("status")
81 # Update progress bar description with current status
82 pbar.set_description(f"Status: {status}")
84 if status.lower() == "ready":
85 pbar.update(self.max_attempts - attempt) # Complete the bar
86 break
87 elif status.lower() == "failed":
88 raise Exception("Snapshot processing failed.")
90 pbar.update(1)
91 time.sleep(self.poll_interval)
92 else:
93 raise TimeoutError("Snapshot data not ready after maximum attempts.")
95 def _retrieve_data(self, snapshot_id: str) -> list[dict]:
96 """Retrieve the job data associated with the snapshot id
97 :param snapshot_id: Snapshot ID
98 :return: Job data dictionary"""
100 snapshot_url = f"https://api.brightdata.com/datasets/v3/snapshot/{snapshot_id}"
101 params = {"format": "json"}
102 headers = {"Authorization": f"Bearer {self.api_key}"}
104 data_resp = requests.get(snapshot_url, headers=headers, params=params)
106 # Handle data not ready
107 attempted = 0
108 while data_resp.status_code == 202 and attempted < 10:
109 data_resp = requests.get(snapshot_url, headers=headers, params=params)
110 attempted += 1
111 json_data = data_resp.json()
113 # Handle other errors
114 if data_resp.status_code != 200:
115 raise Exception(f"Failed to get snapshot data: {data_resp.status_code} {data_resp.text}")
116 if isinstance(json_data, list) and "error_code" in json_data[0]:
117 raise Exception(f"Failed to get snapshot data: {json_data}")
118 return json_data
120 def _process_job_data(self, job_data: dict) -> JobResult:
121 """Process job data to extract relevant information
122 :param job_data: Job data dictionary
123 :return: Dictionary containing job information"""
125 raise AssertionError("This method should be implemented in subclasses.")
127 def scrape_job(self) -> list[JobResult]:
128 """Complete workflow to scrape a LinkedIn job"""
130 snapshot_id = self._get_snapshot()
131 self._wait_for_data(snapshot_id)
132 data = self._retrieve_data(snapshot_id)
133 return [self._process_job_data(d) for d in data]