Coverage for backend/app/eis/job_scraper.py: 43%

194 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-22 15:38 +0000

1"""LinkedIn and Indeed Job Scraper Module 

2 

3This module provides functionality to scrape LinkedIn job postings using the BrightData API. 

4It offers a complete workflow to trigger data collection, monitor processing status, and 

5retrieve scraped job information.""" 

6 

7import json 

8import os 

9import re 

10import time 

11 

12import requests 

13from tqdm import tqdm 

14 

15 

16class JobScrapper(object): 

17 """Job Scraper""" 

18 

19 base_url: str = "" 

20 name: str = "" 

21 poll_interval: int | float = 2 

22 max_attempts: int = 60 

23 

24 def __init__( 

25 self, 

26 job_ids: str | list[str], 

27 ) -> None: 

28 """Object constructor 

29 :param job_ids: List of job IDs to scrape""" 

30 

31 self.job_ids = [job_ids] if isinstance(job_ids, str) else job_ids 

32 self.job_urls = [f"{self.base_url}{job_id}" for job_id in self.job_ids] 

33 self.secrets_file = os.path.join(os.path.dirname(__file__), "eis_secrets.json") 

34 self.poll_interval = self.poll_interval 

35 self.max_attempts *= len(self.job_ids) 

36 

37 # Load credentials from the secrets file 

38 credentials = self._load_credentials() 

39 self.api_key = credentials["api_key"] 

40 self.dataset_id = credentials[f"{self.name}_dataset_id"] 

41 

42 def _load_credentials(self) -> dict: 

43 """Load BrightData credentials from the secrets file""" 

44 

45 if not os.path.exists(self.secrets_file): 

46 raise FileNotFoundError( 

47 f"Secrets file '{self.secrets_file}' not found. " 

48 "Please create it with your BrightData API credentials." 

49 ) 

50 

51 try: 

52 with open(self.secrets_file, "r") as f: 

53 secrets = json.load(f) 

54 return secrets["brightdata"] 

55 except (json.JSONDecodeError, KeyError) as e: 

56 raise ValueError(f"Invalid secrets file format or missing 'brightdata' section: {e}") 

57 

58 def get_snapshot(self) -> str: 

59 """Get the snapshot id""" 

60 

61 headers = { 

62 "Authorization": f"Bearer {self.api_key}", 

63 "Content-Type": "application/json", 

64 } 

65 

66 # Step 1: Trigger the job and get snapshot_id 

67 trigger_url = "https://api.brightdata.com/datasets/v3/trigger" 

68 params = { 

69 "dataset_id": self.dataset_id, 

70 "include_errors": "true", 

71 } 

72 data = [{"url": job_url} for job_url in self.job_urls] 

73 response = requests.post(trigger_url, headers=headers, params=params, json=data) 

74 if response.status_code != 200: 

75 raise Exception(f"Failed to trigger dataset: {response.status_code} {response.text}") 

76 snapshot_id = response.json().get("snapshot_id") 

77 if not snapshot_id: 

78 raise Exception(f"No snapshot_id returned: {response.text}") 

79 

80 return snapshot_id 

81 

82 def wait_for_data(self, snapshot_id: str) -> None: 

83 """Wait for the job data associated with a specific snapshot id to be ready 

84 :param snapshot_id: Snapshot ID""" 

85 

86 # Step 2: Poll for status 

87 progress_url = f"https://api.brightdata.com/datasets/v3/progress/{snapshot_id}" 

88 headers = {"Authorization": f"Bearer {self.api_key}"} 

89 

90 # Create progress bar for polling attempts 

91 with tqdm(total=self.max_attempts, desc="Waiting for data", unit="attempt") as pbar: 

92 for attempt in range(self.max_attempts): 

93 progress_resp = requests.get(progress_url, headers=headers) 

94 if progress_resp.status_code not in (200, 202): 

95 raise Exception(f"Failed to get snapshot status: {progress_resp.status_code} {progress_resp.text}") 

96 

97 status = progress_resp.json().get("status") 

98 

99 # Update progress bar description with current status 

100 pbar.set_description(f"Status: {status}") 

101 

102 if status.lower() == "ready": 

103 pbar.update(self.max_attempts - attempt) # Complete the bar 

104 break 

105 elif status.lower() == "failed": 

106 raise Exception("Snapshot processing failed.") 

107 

108 pbar.update(1) 

109 time.sleep(self.poll_interval) 

110 else: 

111 raise TimeoutError("Snapshot data not ready after maximum attempts.") 

112 

113 def retrieve_data(self, snapshot_id: str) -> list[dict]: 

114 """Retrieve the job data associated with the snapshot id 

115 :param snapshot_id: Snapshot ID 

116 :return: Job data dictionary""" 

117 

118 snapshot_url = f"https://api.brightdata.com/datasets/v3/snapshot/{snapshot_id}" 

119 params = {"format": "json"} 

120 headers = {"Authorization": f"Bearer {self.api_key}"} 

121 

122 data_resp = requests.get(snapshot_url, headers=headers, params=params) 

123 attempted = 0 

124 while data_resp.status_code == 202 and attempted < 10: 

125 data_resp = requests.get(snapshot_url, headers=headers, params=params) 

126 attempted += 1 

127 if data_resp.status_code != 200: 

128 raise Exception(f"Failed to get snapshot data: {data_resp.status_code} {data_resp.text}") 

129 return data_resp.json() 

130 

131 def process_job_data(self, job_data: dict) -> dict: 

132 """Process job data to extract relevant information 

133 :param job_data: Job data dictionary 

134 :return: Dictionary containing job information""" 

135 

136 pass 

137 

138 def scrape_job(self) -> list[dict]: 

139 """Complete workflow to scrape a LinkedIn job""" 

140 

141 snapshot_id = self.get_snapshot() 

142 self.wait_for_data(snapshot_id) 

143 data = self.retrieve_data(snapshot_id) 

144 return [self.process_job_data(d) for d in data] 

145 

146 

147class IndeedJobScraper(JobScrapper): 

148 """LinkedIn Scraper""" 

149 

150 base_url = "https://www.indeed.com/viewjob?jk=" 

151 name = "indeed" 

152 poll_interval: int | float = 10 

153 max_attempts: int = 100 

154 

155 def process_job_data(self, job_data: dict) -> dict: 

156 """Process job data to extract relevant information 

157 :param job_data: Job data dictionary 

158 :return: Dictionary containing job information""" 

159 

160 results = dict() 

161 results["company"] = job_data.get("company_name") 

162 results["company_id"] = job_data.get("company_url") 

163 results["location"] = job_data.get("location") 

164 results["job"] = dict() 

165 results["job"]["title"] = job_data.get("job_title") 

166 results["job"]["description"] = job_data.get("description_text").strip("Show more Show less") 

167 results["job"]["url"] = job_data.get("url") 

168 results["raw"] = job_data 

169 results["job"]["salary"] = dict(min_amount=None, max_amount=None) 

170 if salary_range := job_data.get("salary_formatted"): 

171 pattern = r"£(\d+(?:,\d+)?(?:k|K)?(?:\.\d+)?)\s*[-–]\s*£(\d+(?:,\d+)?(?:k|K)?(?:\.\d+)?)\s+(?:a|per)\s+(?:year|annum)" 

172 if match := re.search(pattern, salary_range): 

173 min_amount = float(match.group(1).replace(",", "")) 

174 max_amount = float(match.group(2).replace(",", "")) 

175 results["job"]["salary"]["min_amount"] = min_amount 

176 results["job"]["salary"]["max_amount"] = max_amount 

177 

178 return results 

179 

180 

181class LinkedinJobScraper(JobScrapper): 

182 """LinkedIn Scraper""" 

183 

184 base_url = "https://www.linkedin.com/jobs/view/" 

185 name = "linkedin" 

186 poll_interval: int | float = 2 

187 max_attempts: int = 60 

188 

189 def process_job_data(self, job_data: dict) -> dict: 

190 """Process job data to extract relevant information 

191 :param job_data: Job data dictionary 

192 :return: Dictionary containing job information""" 

193 

194 results = dict() 

195 results["company"] = job_data.get("company_name") 

196 results["company_id"] = job_data.get("company_id") 

197 results["location"] = job_data.get("job_location") 

198 results["job"] = dict() 

199 results["job"]["title"] = job_data.get("job_title") 

200 results["job"]["description"] = job_data.get("job_summary").strip("Show more Show less") 

201 results["job"]["url"] = job_data.get("url") 

202 results["job"]["salary"] = dict(min_amount=None, max_amount=None) 

203 results["raw"] = job_data 

204 base_salary = job_data.get("base_salary", {}) or {} 

205 currency = base_salary.get("currency") or "" 

206 payment_period = base_salary.get("payment_period") or "" 

207 

208 if currency.lower() in ("£", "gbp") and payment_period.lower() == "yr": 

209 results["job"]["salary"]["min_amount"] = base_salary.get("min_amount") 

210 results["job"]["salary"]["max_amount"] = base_salary.get("max_amount") 

211 

212 return results 

213 

214 

215def extract_indeed_jobs_from_email(body: str) -> list[dict[str, str | dict]]: 

216 """Extract job information directly from an Indeed email body 

217 :param body: Email body content as string 

218 :return: List of dictionaries containing job information""" 

219 

220 jobs = [] 

221 

222 # Split the email body by job entries 

223 # Look for patterns that indicate job separations 

224 job_sections = body.split("\n\n")[2:-4] 

225 

226 for section in job_sections: 

227 if not section.strip(): 

228 continue 

229 

230 job_info = parse_indeed_job_section(section) 

231 if job_info: 

232 jobs.append(job_info) 

233 

234 return jobs 

235 

236 

237def parse_indeed_job_section(section: str) -> dict[str, str] | None: 

238 """Parse a single job section from Indeed email 

239 :param section: Job section text 

240 :return: Dictionary with job information or None if parsing fails""" 

241 

242 lines = [line.strip() for line in section.strip().split("\n") if line.strip()] 

243 

244 if len(lines) < 2: 

245 return None 

246 

247 # Initialize job info with flat structure first 

248 job_info = { 

249 "title": lines[0], 

250 "company": "", 

251 "location": "", 

252 "salary": "", 

253 "description": "", 

254 "url": "", 

255 } 

256 

257 # First line is typically the job title 

258 # Second line is typically company - location or just company 

259 if len(lines) > 1: 

260 company_location_line = lines[1] 

261 

262 # Try to split company and location 

263 # Pattern: "Company Name - Location" 

264 if " - " in company_location_line: 

265 parts = company_location_line.split(" - ", 1) 

266 job_info["company"] = parts[0].strip() 

267 job_info["location"] = parts[1].strip() 

268 else: 

269 job_info["company"] = company_location_line.strip() 

270 

271 # Look for salary information 

272 salary_pattern = r"£([\d,]+(?:\.\d{2})?)\s*-\s*£([\d,]+(?:\.\d{2})?)\s*a\s*year" 

273 salary_min = None 

274 salary_max = None 

275 

276 for line in lines: 

277 salary_match = re.search(salary_pattern, line, re.IGNORECASE) 

278 if salary_match: 

279 job_info["salary"] = f"£{salary_match.group(1)} - £{salary_match.group(2)} a year" 

280 # Parse numeric values for min/max 

281 try: 

282 salary_min = float(salary_match.group(1).replace(",", "")) 

283 salary_max = float(salary_match.group(2).replace(",", "")) 

284 except ValueError: 

285 pass 

286 break 

287 

288 # Look for job description (usually starts after company/location and salary) 

289 description_lines = [] 

290 found_description_start = False 

291 

292 for i, line in enumerate(lines[2:], 2): # Start from third line 

293 # Skip salary lines 

294 if re.search(salary_pattern, line, re.IGNORECASE): 

295 continue 

296 # Skip time indicators 

297 if re.search(r"(just posted|(\d+\s+(day|hour)s?\s+ago))", line, re.IGNORECASE): 

298 continue 

299 # Skip URLs 

300 if line.startswith("http"): 

301 job_info["url"] = line 

302 continue 

303 # Skip "Easily apply" type lines 

304 if re.search(r"easily apply|apply now", line, re.IGNORECASE): 

305 continue 

306 

307 # This should be description content 

308 if line and not found_description_start: 

309 found_description_start = True 

310 

311 if found_description_start: 

312 description_lines.append(line) 

313 

314 job_info["description"] = " ".join(description_lines).strip() 

315 

316 # Clean up description - remove common email artifacts 

317 job_info["description"] = re.sub(r"\s+", " ", job_info["description"]) 

318 job_info["description"] = ( 

319 job_info["description"][:500] + "..." if len(job_info["description"]) > 500 else job_info["description"] 

320 ) 

321 

322 # Try to find URL if not already found 

323 if not job_info["url"]: 

324 url_matches = re.findall(r"https?://(?:uk\.)?indeed\.com/(?:pagead|rc)/clk/dl\?[^>\s]+", section, re.IGNORECASE) 

325 if url_matches: 

326 job_info["url"] = url_matches[0] 

327 

328 # Only return if we have at least title and company 

329 if not job_info["title"]: 

330 return None 

331 

332 # Transform to match the structure of other scraper functions 

333 results = { 

334 "company": job_info["company"], 

335 "location": job_info["location"], 

336 "job": { 

337 "title": job_info["title"], 

338 "description": job_info["description"], 

339 "url": job_info["url"], 

340 "salary": {"min_amount": salary_min, "max_amount": salary_max}, 

341 }, 

342 "raw": section, 

343 } 

344 

345 return results 

346 

347 

348# Usage example: 

349if __name__ == "__main__": 

350 scraper = LinkedinJobScraper(["4280160167"]) 

351 job_data1 = scraper.scrape_job() 

352 print(job_data1) 

353 

354 scraper = IndeedJobScraper("7b9119575c72cb5c") 

355 job_data1 = scraper.scrape_job() 

356 print(job_data1)