Coverage for backend / app / job_rating / scraped_job_rating.py: 97%

119 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-17 21:34 +0000

1"""Use Gemini LLM to rate how well scraped jobs match user qualifications.""" 

2 

3import datetime as dt 

4import traceback 

5 

6from sqlalchemy.orm import Session 

7 

8from app import models, utils 

9from app.config import settings 

10from app.database import get_db 

11from app.job_rating.claude import MODEL as CLAUDE_MODEL, claude_query 

12from app.job_rating.prompts import create_job_only_prompt, create_system_prompt_with_profile 

13from app.service_runner.service_runner import ServiceRunner 

14 

15SERVICE_NAME = "job_rating_service" 

16 

17 

18def ensure_length_limit( 

19 text_describer: str, 

20 text: str, 

21 max_length: int, 

22 logger=None, 

23) -> tuple[str, str | None]: 

24 """Ensure that the given text is not longer than the given maximum length. 

25 :param text_describer: A description of the text, for logging purposes 

26 :param text: The text to check 

27 :param max_length: The maximum allowed length 

28 :param logger: The logger to use for logging 

29 :return: A tuple containing the truncated text and a note explaining why it was truncated, if any.""" 

30 

31 if not text: 

32 return text, None 

33 if len(text) > max_length: 

34 if logger: 

35 logger.info(f"Job {text_describer} is too long ({len(text)}.") 

36 text = text[:max_length] + "..." 

37 note = f"{text_describer.capitalize()} was truncated as it was too long ({len(text)} characters. Limit is {max_length} characters)" 

38 return text, note 

39 else: 

40 return text, None 

41 

42 

43def get_rating_active_users(db: Session) -> list[models.User]: 

44 """Get all active users with job rating active 

45 :param db: Database session 

46 :return: List of active users with job rating active""" 

47 

48 return ( 

49 db.query(models.User) 

50 .filter(models.User.premium.has(is_active=True, job_rating_active=True)) 

51 .filter(models.User.is_active) 

52 .filter(models.User.is_verified) 

53 .all() 

54 ) 

55 

56 

57def get_user_unrated_scraped_jobs(db: Session, user_id: int) -> list[models.ScrapedJob]: 

58 """Get all unrated scraped jobs for a given user. 

59 :param db: Database session 

60 :param user_id: ID of the user to get jobs for 

61 :return: List of unrated scraped jobs""" 

62 

63 # noinspection PyComparisonWithNone 

64 return ( 

65 db.query(models.ScrapedJob) 

66 .filter(models.ScrapedJob.owner_id == user_id) 

67 .filter(models.ScrapedJob.is_processed.is_(True)) 

68 .filter(models.ScrapedJob.is_scraped.is_(True)) 

69 .filter(models.ScrapedJob.is_failed.is_(False)) 

70 .filter(models.ScrapedJob.job_rating == None) 

71 .filter(models.ScrapedJob.is_active.is_(True)) 

72 .filter(models.ScrapedJob.is_imported.is_(False)) 

73 .filter(models.ScrapedJob.exclusion_filter == None) 

74 .all() 

75 ) 

76 

77 

78class ScrapedJobRater: 

79 """Rates scraped jobs against user qualifications using AI.""" 

80 

81 def __init__(self) -> None: 

82 """Initialise the job rater.""" 

83 

84 self.logger = utils.AppLogger.create_service_logger(SERVICE_NAME, "INFO") 

85 

86 def run(self, db: Session | None = None) -> models.JobRatingServiceLog: 

87 """Score all scraped jobs using AI. 

88 :param db: Database session 

89 :return: Job rating service log entry""" 

90 

91 db = next(get_db()) if db is None else db 

92 

93 # Create service log entry 

94 start_time = dt.datetime.now() 

95 service_log = models.JobRatingServiceLog(run_datetime=start_time) 

96 db.add(service_log) 

97 db.commit() 

98 db.refresh(service_log) 

99 

100 try: 

101 # Get all active users with job rating active 

102 users = get_rating_active_users(db) 

103 self.logger.info(f"Found {len(users)} active users to process") 

104 service_log.user_found_ids = [user.id for user in users] 

105 

106 # Get latest system and job prompt templates 

107 system_prompt = db.query(models.AiSystemPrompt).order_by(models.AiSystemPrompt.id.desc()).first() 

108 job_prompt = db.query(models.AiJobPromptTemplate).order_by(models.AiJobPromptTemplate.id.desc()).first() 

109 

110 # Process each user 

111 for user in users: 

112 self._process_user(db, user.id, service_log, system_prompt, job_prompt) 

113 

114 # Mark service log as successful 

115 service_log.is_success = True 

116 

117 except Exception as exception: 

118 self.logger.exception(f"Critical error in rating workflow: {exception}") 

119 service_log.is_success = False 

120 service_log.error_message = str(exception) 

121 finally: 

122 self.logger.info("Finished workflow") 

123 

124 service_log.run_duration = (dt.datetime.now() - start_time).total_seconds() 

125 db.commit() 

126 db.refresh(service_log) 

127 return service_log 

128 

129 def _process_user( 

130 self, 

131 db: Session, 

132 user_id: int, 

133 service_log: models.JobRatingServiceLog, 

134 system_prompt: models.AiSystemPrompt, 

135 job_prompt_template: models.AiJobPromptTemplate, 

136 ) -> None: 

137 """Process a single user's jobs. 

138 :param db: Database session 

139 :param user_id: The ID of the user to process jobs for 

140 :param service_log: Job rating service log entry 

141 :param system_prompt: Latest system prompt template 

142 :param job_prompt_template: Latest job prompt template""" 

143 

144 # Ensure that the user has a qualification 

145 user_qualification = ( 

146 db.query(models.UserQualification) 

147 .filter(models.UserQualification.owner_id == user_id) 

148 .order_by(models.UserQualification.modified_at.desc()) 

149 .first() 

150 ) 

151 if not user_qualification: 

152 self.logger.info(f"Skipping user {user_id} as no user qualification found") 

153 return 

154 else: 

155 self.logger.info(f"Processing user {user_id}") 

156 scraped_jobs = get_user_unrated_scraped_jobs(db, user_id) 

157 service_log.job_found_ids = service_log.job_found_ids + [job.id for job in scraped_jobs] 

158 self.logger.info(f"Found {len(scraped_jobs)} scraped jobs to rate") 

159 

160 # Build the combined system prompt (instructions + candidate profile) once per user 

161 # so Anthropic caches it across all jobs for this user 

162 combined_system_prompt = create_system_prompt_with_profile( 

163 prompt_template=system_prompt.prompt, 

164 user_experience=user_qualification.experience, 

165 user_education=user_qualification.education, 

166 user_skills=user_qualification.skills, 

167 user_qualities=user_qualification.qualities, 

168 user_interests=user_qualification.interests, 

169 ) 

170 

171 for scraped_job in scraped_jobs: 

172 self._rate_job( 

173 db, 

174 scraped_job, 

175 user_id, 

176 user_qualification, 

177 service_log, 

178 system_prompt, 

179 job_prompt_template, 

180 combined_system_prompt, 

181 ) 

182 

183 service_log.user_processed_ids = service_log.user_processed_ids + [user_id] 

184 db.commit() 

185 

186 def _rate_job( 

187 self, 

188 db: Session, 

189 scraped_job: models.ScrapedJob, 

190 user_id: int, 

191 user_qualification: models.UserQualification, 

192 service_log: models.JobRatingServiceLog, 

193 system_prompt: models.AiSystemPrompt, 

194 job_prompt_template: models.AiJobPromptTemplate, 

195 combined_system_prompt: str, 

196 ) -> None: 

197 """Rate a single scraped job. 

198 :param db: Database session 

199 :param scraped_job: The scraped job to rate 

200 :param user_id: The ID of the user to rate the job for 

201 :param user_qualification: The user's qualification 

202 :param service_log: Job rating service log entry 

203 :param system_prompt: Latest system prompt template 

204 :param job_prompt_template: Latest job prompt template 

205 :param combined_system_prompt: Pre-built system prompt with candidate profile embedded""" 

206 

207 self.logger.info(f"Processing job ID {scraped_job.id}") 

208 notes = [] 

209 

210 job_rating_kwargs = dict( 

211 scraped_job_id=scraped_job.id, 

212 owner_id=user_id, 

213 user_qualification_id=user_qualification.id, 

214 system_prompt_id=system_prompt.id, 

215 job_prompt_template_id=job_prompt_template.id, 

216 llm_model=CLAUDE_MODEL, 

217 ) 

218 

219 # Check that the job is not closed 

220 if scraped_job.is_closed or (scraped_job.deadline and scraped_job.deadline < dt.datetime.now(dt.timezone.utc)): 

221 self.logger.info(f"Skipping job ID {scraped_job.id} as it is closed") 

222 job_rating = models.JobRating( 

223 is_skipped=True, 

224 skip_reason="Job is closed", 

225 **job_rating_kwargs, 

226 ) 

227 db.add(job_rating) 

228 service_log.job_skipped_ids = service_log.job_skipped_ids + [scraped_job.id] 

229 db.commit() 

230 return 

231 

232 # Check that the job description is not too short 

233 if scraped_job.description and len(scraped_job.description) < settings.min_scraping_description_length: 

234 self.logger.info(f"Skipping job ID {scraped_job.id} as its description is too short") 

235 job_rating = models.JobRating( 

236 is_skipped=True, 

237 skip_reason=f"Job description too short (minimum length is {settings.min_scraping_description_length} characters)", 

238 **job_rating_kwargs, 

239 ) 

240 db.add(job_rating) 

241 service_log.job_skipped_ids = service_log.job_skipped_ids + [scraped_job.id] 

242 db.commit() 

243 return 

244 

245 # Ensure that the job has a description 

246 if not scraped_job.description: 

247 self.logger.info(f"Skipping job ID {scraped_job.id} as it has no description") 

248 job_rating = models.JobRating( 

249 is_skipped=True, 

250 skip_reason="Job has no description", 

251 **job_rating_kwargs, 

252 ) 

253 db.add(job_rating) 

254 service_log.job_skipped_ids = service_log.job_skipped_ids + [scraped_job.id] 

255 db.commit() 

256 return 

257 

258 description, description_note = ensure_length_limit( 

259 "description", scraped_job.description, settings.max_scraping_description_length, self.logger 

260 ) 

261 if description_note: 

262 notes.append(description_note) 

263 title, title_note = ensure_length_limit( 

264 "title", scraped_job.title, settings.max_scraping_title_length, self.logger 

265 ) 

266 if title_note: 

267 notes.append(title_note) 

268 company, company_note = ensure_length_limit( 

269 "company", scraped_job.company, settings.max_scraping_company_length, self.logger 

270 ) 

271 if company_note: 

272 notes.append(company_note) 

273 

274 if notes: 

275 job_rating_kwargs["notes"] = notes 

276 

277 score = None 

278 try: 

279 self.logger.info(f"Scoring job ID {scraped_job.id}") 

280 job_prompt = create_job_only_prompt( 

281 prompt_template=job_prompt_template.prompt, 

282 job_title=title, 

283 job_company=company, 

284 job_description=description, 

285 ) 

286 score = claude_query(combined_system_prompt, job_prompt) 

287 job_rating = models.JobRating( 

288 overall_score=score["overall_score"], 

289 technical_score=score["technical_fit"], 

290 experience_score=score["experience_alignment"], 

291 educational_score=score["educational_match"], 

292 interest_score=score["interest_match"], 

293 feedback=score["explanation"], 

294 job_prompt=combined_system_prompt + "\n\n" + job_prompt, 

295 is_success=True, 

296 **job_rating_kwargs, 

297 ) 

298 db.add(job_rating) 

299 service_log.job_succeeded_ids = service_log.job_succeeded_ids + [scraped_job.id] 

300 db.commit() 

301 except Exception as exception: 

302 tb = traceback.format_exc() 

303 self.logger.exception(f"Error in rating workflow: {exception}") 

304 job_rating = models.JobRating( 

305 is_success=False, 

306 error=f"Error scoring job: {exception}\n{tb}\nRaw response is {score}", 

307 **job_rating_kwargs, 

308 ) 

309 db.add(job_rating) 

310 service_log.job_failed_ids = service_log.job_failed_ids + [scraped_job.id] 

311 db.commit() 

312 

313 

314job_rating_service_runner = ServiceRunner( 

315 service_name=SERVICE_NAME, 

316 service_function=ScrapedJobRater().run, 

317)