Coverage for backend/app/eis/job_scraper.py: 43%
194 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-22 15:38 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-22 15:38 +0000
1"""LinkedIn and Indeed Job Scraper Module
3This module provides functionality to scrape LinkedIn job postings using the BrightData API.
4It offers a complete workflow to trigger data collection, monitor processing status, and
5retrieve scraped job information."""
7import json
8import os
9import re
10import time
12import requests
13from tqdm import tqdm
16class JobScrapper(object):
17 """Job Scraper"""
19 base_url: str = ""
20 name: str = ""
21 poll_interval: int | float = 2
22 max_attempts: int = 60
24 def __init__(
25 self,
26 job_ids: str | list[str],
27 ) -> None:
28 """Object constructor
29 :param job_ids: List of job IDs to scrape"""
31 self.job_ids = [job_ids] if isinstance(job_ids, str) else job_ids
32 self.job_urls = [f"{self.base_url}{job_id}" for job_id in self.job_ids]
33 self.secrets_file = os.path.join(os.path.dirname(__file__), "eis_secrets.json")
34 self.poll_interval = self.poll_interval
35 self.max_attempts *= len(self.job_ids)
37 # Load credentials from the secrets file
38 credentials = self._load_credentials()
39 self.api_key = credentials["api_key"]
40 self.dataset_id = credentials[f"{self.name}_dataset_id"]
42 def _load_credentials(self) -> dict:
43 """Load BrightData credentials from the secrets file"""
45 if not os.path.exists(self.secrets_file):
46 raise FileNotFoundError(
47 f"Secrets file '{self.secrets_file}' not found. "
48 "Please create it with your BrightData API credentials."
49 )
51 try:
52 with open(self.secrets_file, "r") as f:
53 secrets = json.load(f)
54 return secrets["brightdata"]
55 except (json.JSONDecodeError, KeyError) as e:
56 raise ValueError(f"Invalid secrets file format or missing 'brightdata' section: {e}")
58 def get_snapshot(self) -> str:
59 """Get the snapshot id"""
61 headers = {
62 "Authorization": f"Bearer {self.api_key}",
63 "Content-Type": "application/json",
64 }
66 # Step 1: Trigger the job and get snapshot_id
67 trigger_url = "https://api.brightdata.com/datasets/v3/trigger"
68 params = {
69 "dataset_id": self.dataset_id,
70 "include_errors": "true",
71 }
72 data = [{"url": job_url} for job_url in self.job_urls]
73 response = requests.post(trigger_url, headers=headers, params=params, json=data)
74 if response.status_code != 200:
75 raise Exception(f"Failed to trigger dataset: {response.status_code} {response.text}")
76 snapshot_id = response.json().get("snapshot_id")
77 if not snapshot_id:
78 raise Exception(f"No snapshot_id returned: {response.text}")
80 return snapshot_id
82 def wait_for_data(self, snapshot_id: str) -> None:
83 """Wait for the job data associated with a specific snapshot id to be ready
84 :param snapshot_id: Snapshot ID"""
86 # Step 2: Poll for status
87 progress_url = f"https://api.brightdata.com/datasets/v3/progress/{snapshot_id}"
88 headers = {"Authorization": f"Bearer {self.api_key}"}
90 # Create progress bar for polling attempts
91 with tqdm(total=self.max_attempts, desc="Waiting for data", unit="attempt") as pbar:
92 for attempt in range(self.max_attempts):
93 progress_resp = requests.get(progress_url, headers=headers)
94 if progress_resp.status_code not in (200, 202):
95 raise Exception(f"Failed to get snapshot status: {progress_resp.status_code} {progress_resp.text}")
97 status = progress_resp.json().get("status")
99 # Update progress bar description with current status
100 pbar.set_description(f"Status: {status}")
102 if status.lower() == "ready":
103 pbar.update(self.max_attempts - attempt) # Complete the bar
104 break
105 elif status.lower() == "failed":
106 raise Exception("Snapshot processing failed.")
108 pbar.update(1)
109 time.sleep(self.poll_interval)
110 else:
111 raise TimeoutError("Snapshot data not ready after maximum attempts.")
113 def retrieve_data(self, snapshot_id: str) -> list[dict]:
114 """Retrieve the job data associated with the snapshot id
115 :param snapshot_id: Snapshot ID
116 :return: Job data dictionary"""
118 snapshot_url = f"https://api.brightdata.com/datasets/v3/snapshot/{snapshot_id}"
119 params = {"format": "json"}
120 headers = {"Authorization": f"Bearer {self.api_key}"}
122 data_resp = requests.get(snapshot_url, headers=headers, params=params)
123 attempted = 0
124 while data_resp.status_code == 202 and attempted < 10:
125 data_resp = requests.get(snapshot_url, headers=headers, params=params)
126 attempted += 1
127 if data_resp.status_code != 200:
128 raise Exception(f"Failed to get snapshot data: {data_resp.status_code} {data_resp.text}")
129 return data_resp.json()
131 def process_job_data(self, job_data: dict) -> dict:
132 """Process job data to extract relevant information
133 :param job_data: Job data dictionary
134 :return: Dictionary containing job information"""
136 pass
138 def scrape_job(self) -> list[dict]:
139 """Complete workflow to scrape a LinkedIn job"""
141 snapshot_id = self.get_snapshot()
142 self.wait_for_data(snapshot_id)
143 data = self.retrieve_data(snapshot_id)
144 return [self.process_job_data(d) for d in data]
147class IndeedJobScraper(JobScrapper):
148 """LinkedIn Scraper"""
150 base_url = "https://www.indeed.com/viewjob?jk="
151 name = "indeed"
152 poll_interval: int | float = 10
153 max_attempts: int = 100
155 def process_job_data(self, job_data: dict) -> dict:
156 """Process job data to extract relevant information
157 :param job_data: Job data dictionary
158 :return: Dictionary containing job information"""
160 results = dict()
161 results["company"] = job_data.get("company_name")
162 results["company_id"] = job_data.get("company_url")
163 results["location"] = job_data.get("location")
164 results["job"] = dict()
165 results["job"]["title"] = job_data.get("job_title")
166 results["job"]["description"] = job_data.get("description_text").strip("Show more Show less")
167 results["job"]["url"] = job_data.get("url")
168 results["raw"] = job_data
169 results["job"]["salary"] = dict(min_amount=None, max_amount=None)
170 if salary_range := job_data.get("salary_formatted"):
171 pattern = r"£(\d+(?:,\d+)?(?:k|K)?(?:\.\d+)?)\s*[-–]\s*£(\d+(?:,\d+)?(?:k|K)?(?:\.\d+)?)\s+(?:a|per)\s+(?:year|annum)"
172 if match := re.search(pattern, salary_range):
173 min_amount = float(match.group(1).replace(",", ""))
174 max_amount = float(match.group(2).replace(",", ""))
175 results["job"]["salary"]["min_amount"] = min_amount
176 results["job"]["salary"]["max_amount"] = max_amount
178 return results
181class LinkedinJobScraper(JobScrapper):
182 """LinkedIn Scraper"""
184 base_url = "https://www.linkedin.com/jobs/view/"
185 name = "linkedin"
186 poll_interval: int | float = 2
187 max_attempts: int = 60
189 def process_job_data(self, job_data: dict) -> dict:
190 """Process job data to extract relevant information
191 :param job_data: Job data dictionary
192 :return: Dictionary containing job information"""
194 results = dict()
195 results["company"] = job_data.get("company_name")
196 results["company_id"] = job_data.get("company_id")
197 results["location"] = job_data.get("job_location")
198 results["job"] = dict()
199 results["job"]["title"] = job_data.get("job_title")
200 results["job"]["description"] = job_data.get("job_summary").strip("Show more Show less")
201 results["job"]["url"] = job_data.get("url")
202 results["job"]["salary"] = dict(min_amount=None, max_amount=None)
203 results["raw"] = job_data
204 base_salary = job_data.get("base_salary", {}) or {}
205 currency = base_salary.get("currency") or ""
206 payment_period = base_salary.get("payment_period") or ""
208 if currency.lower() in ("£", "gbp") and payment_period.lower() == "yr":
209 results["job"]["salary"]["min_amount"] = base_salary.get("min_amount")
210 results["job"]["salary"]["max_amount"] = base_salary.get("max_amount")
212 return results
215def extract_indeed_jobs_from_email(body: str) -> list[dict[str, str | dict]]:
216 """Extract job information directly from an Indeed email body
217 :param body: Email body content as string
218 :return: List of dictionaries containing job information"""
220 jobs = []
222 # Split the email body by job entries
223 # Look for patterns that indicate job separations
224 job_sections = body.split("\n\n")[2:-4]
226 for section in job_sections:
227 if not section.strip():
228 continue
230 job_info = parse_indeed_job_section(section)
231 if job_info:
232 jobs.append(job_info)
234 return jobs
237def parse_indeed_job_section(section: str) -> dict[str, str] | None:
238 """Parse a single job section from Indeed email
239 :param section: Job section text
240 :return: Dictionary with job information or None if parsing fails"""
242 lines = [line.strip() for line in section.strip().split("\n") if line.strip()]
244 if len(lines) < 2:
245 return None
247 # Initialize job info with flat structure first
248 job_info = {
249 "title": lines[0],
250 "company": "",
251 "location": "",
252 "salary": "",
253 "description": "",
254 "url": "",
255 }
257 # First line is typically the job title
258 # Second line is typically company - location or just company
259 if len(lines) > 1:
260 company_location_line = lines[1]
262 # Try to split company and location
263 # Pattern: "Company Name - Location"
264 if " - " in company_location_line:
265 parts = company_location_line.split(" - ", 1)
266 job_info["company"] = parts[0].strip()
267 job_info["location"] = parts[1].strip()
268 else:
269 job_info["company"] = company_location_line.strip()
271 # Look for salary information
272 salary_pattern = r"£([\d,]+(?:\.\d{2})?)\s*-\s*£([\d,]+(?:\.\d{2})?)\s*a\s*year"
273 salary_min = None
274 salary_max = None
276 for line in lines:
277 salary_match = re.search(salary_pattern, line, re.IGNORECASE)
278 if salary_match:
279 job_info["salary"] = f"£{salary_match.group(1)} - £{salary_match.group(2)} a year"
280 # Parse numeric values for min/max
281 try:
282 salary_min = float(salary_match.group(1).replace(",", ""))
283 salary_max = float(salary_match.group(2).replace(",", ""))
284 except ValueError:
285 pass
286 break
288 # Look for job description (usually starts after company/location and salary)
289 description_lines = []
290 found_description_start = False
292 for i, line in enumerate(lines[2:], 2): # Start from third line
293 # Skip salary lines
294 if re.search(salary_pattern, line, re.IGNORECASE):
295 continue
296 # Skip time indicators
297 if re.search(r"(just posted|(\d+\s+(day|hour)s?\s+ago))", line, re.IGNORECASE):
298 continue
299 # Skip URLs
300 if line.startswith("http"):
301 job_info["url"] = line
302 continue
303 # Skip "Easily apply" type lines
304 if re.search(r"easily apply|apply now", line, re.IGNORECASE):
305 continue
307 # This should be description content
308 if line and not found_description_start:
309 found_description_start = True
311 if found_description_start:
312 description_lines.append(line)
314 job_info["description"] = " ".join(description_lines).strip()
316 # Clean up description - remove common email artifacts
317 job_info["description"] = re.sub(r"\s+", " ", job_info["description"])
318 job_info["description"] = (
319 job_info["description"][:500] + "..." if len(job_info["description"]) > 500 else job_info["description"]
320 )
322 # Try to find URL if not already found
323 if not job_info["url"]:
324 url_matches = re.findall(r"https?://(?:uk\.)?indeed\.com/(?:pagead|rc)/clk/dl\?[^>\s]+", section, re.IGNORECASE)
325 if url_matches:
326 job_info["url"] = url_matches[0]
328 # Only return if we have at least title and company
329 if not job_info["title"]:
330 return None
332 # Transform to match the structure of other scraper functions
333 results = {
334 "company": job_info["company"],
335 "location": job_info["location"],
336 "job": {
337 "title": job_info["title"],
338 "description": job_info["description"],
339 "url": job_info["url"],
340 "salary": {"min_amount": salary_min, "max_amount": salary_max},
341 },
342 "raw": section,
343 }
345 return results
348# Usage example:
349if __name__ == "__main__":
350 scraper = LinkedinJobScraper(["4280160167"])
351 job_data1 = scraper.scrape_job()
352 print(job_data1)
354 scraper = IndeedJobScraper("7b9119575c72cb5c")
355 job_data1 = scraper.scrape_job()
356 print(job_data1)