Coverage for backend / app / job_rating / scraped_job_rating.py: 97%
119 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
1"""Use Gemini LLM to rate how well scraped jobs match user qualifications."""
3import datetime as dt
4import traceback
6from sqlalchemy.orm import Session
8from app import models, utils
9from app.config import settings
10from app.database import get_db
11from app.job_rating.claude import MODEL as CLAUDE_MODEL, claude_query
12from app.job_rating.prompts import create_job_only_prompt, create_system_prompt_with_profile
13from app.service_runner.service_runner import ServiceRunner
15SERVICE_NAME = "job_rating_service"
18def ensure_length_limit(
19 text_describer: str,
20 text: str,
21 max_length: int,
22 logger=None,
23) -> tuple[str, str | None]:
24 """Ensure that the given text is not longer than the given maximum length.
25 :param text_describer: A description of the text, for logging purposes
26 :param text: The text to check
27 :param max_length: The maximum allowed length
28 :param logger: The logger to use for logging
29 :return: A tuple containing the truncated text and a note explaining why it was truncated, if any."""
31 if not text:
32 return text, None
33 if len(text) > max_length:
34 if logger:
35 logger.info(f"Job {text_describer} is too long ({len(text)}.")
36 text = text[:max_length] + "..."
37 note = f"{text_describer.capitalize()} was truncated as it was too long ({len(text)} characters. Limit is {max_length} characters)"
38 return text, note
39 else:
40 return text, None
43def get_rating_active_users(db: Session) -> list[models.User]:
44 """Get all active users with job rating active
45 :param db: Database session
46 :return: List of active users with job rating active"""
48 return (
49 db.query(models.User)
50 .filter(models.User.premium.has(is_active=True, job_rating_active=True))
51 .filter(models.User.is_active)
52 .filter(models.User.is_verified)
53 .all()
54 )
57def get_user_unrated_scraped_jobs(db: Session, user_id: int) -> list[models.ScrapedJob]:
58 """Get all unrated scraped jobs for a given user.
59 :param db: Database session
60 :param user_id: ID of the user to get jobs for
61 :return: List of unrated scraped jobs"""
63 # noinspection PyComparisonWithNone
64 return (
65 db.query(models.ScrapedJob)
66 .filter(models.ScrapedJob.owner_id == user_id)
67 .filter(models.ScrapedJob.is_processed.is_(True))
68 .filter(models.ScrapedJob.is_scraped.is_(True))
69 .filter(models.ScrapedJob.is_failed.is_(False))
70 .filter(models.ScrapedJob.job_rating == None)
71 .filter(models.ScrapedJob.is_active.is_(True))
72 .filter(models.ScrapedJob.is_imported.is_(False))
73 .filter(models.ScrapedJob.exclusion_filter == None)
74 .all()
75 )
78class ScrapedJobRater:
79 """Rates scraped jobs against user qualifications using AI."""
81 def __init__(self) -> None:
82 """Initialise the job rater."""
84 self.logger = utils.AppLogger.create_service_logger(SERVICE_NAME, "INFO")
86 def run(self, db: Session | None = None) -> models.JobRatingServiceLog:
87 """Score all scraped jobs using AI.
88 :param db: Database session
89 :return: Job rating service log entry"""
91 db = next(get_db()) if db is None else db
93 # Create service log entry
94 start_time = dt.datetime.now()
95 service_log = models.JobRatingServiceLog(run_datetime=start_time)
96 db.add(service_log)
97 db.commit()
98 db.refresh(service_log)
100 try:
101 # Get all active users with job rating active
102 users = get_rating_active_users(db)
103 self.logger.info(f"Found {len(users)} active users to process")
104 service_log.user_found_ids = [user.id for user in users]
106 # Get latest system and job prompt templates
107 system_prompt = db.query(models.AiSystemPrompt).order_by(models.AiSystemPrompt.id.desc()).first()
108 job_prompt = db.query(models.AiJobPromptTemplate).order_by(models.AiJobPromptTemplate.id.desc()).first()
110 # Process each user
111 for user in users:
112 self._process_user(db, user.id, service_log, system_prompt, job_prompt)
114 # Mark service log as successful
115 service_log.is_success = True
117 except Exception as exception:
118 self.logger.exception(f"Critical error in rating workflow: {exception}")
119 service_log.is_success = False
120 service_log.error_message = str(exception)
121 finally:
122 self.logger.info("Finished workflow")
124 service_log.run_duration = (dt.datetime.now() - start_time).total_seconds()
125 db.commit()
126 db.refresh(service_log)
127 return service_log
129 def _process_user(
130 self,
131 db: Session,
132 user_id: int,
133 service_log: models.JobRatingServiceLog,
134 system_prompt: models.AiSystemPrompt,
135 job_prompt_template: models.AiJobPromptTemplate,
136 ) -> None:
137 """Process a single user's jobs.
138 :param db: Database session
139 :param user_id: The ID of the user to process jobs for
140 :param service_log: Job rating service log entry
141 :param system_prompt: Latest system prompt template
142 :param job_prompt_template: Latest job prompt template"""
144 # Ensure that the user has a qualification
145 user_qualification = (
146 db.query(models.UserQualification)
147 .filter(models.UserQualification.owner_id == user_id)
148 .order_by(models.UserQualification.modified_at.desc())
149 .first()
150 )
151 if not user_qualification:
152 self.logger.info(f"Skipping user {user_id} as no user qualification found")
153 return
154 else:
155 self.logger.info(f"Processing user {user_id}")
156 scraped_jobs = get_user_unrated_scraped_jobs(db, user_id)
157 service_log.job_found_ids = service_log.job_found_ids + [job.id for job in scraped_jobs]
158 self.logger.info(f"Found {len(scraped_jobs)} scraped jobs to rate")
160 # Build the combined system prompt (instructions + candidate profile) once per user
161 # so Anthropic caches it across all jobs for this user
162 combined_system_prompt = create_system_prompt_with_profile(
163 prompt_template=system_prompt.prompt,
164 user_experience=user_qualification.experience,
165 user_education=user_qualification.education,
166 user_skills=user_qualification.skills,
167 user_qualities=user_qualification.qualities,
168 user_interests=user_qualification.interests,
169 )
171 for scraped_job in scraped_jobs:
172 self._rate_job(
173 db,
174 scraped_job,
175 user_id,
176 user_qualification,
177 service_log,
178 system_prompt,
179 job_prompt_template,
180 combined_system_prompt,
181 )
183 service_log.user_processed_ids = service_log.user_processed_ids + [user_id]
184 db.commit()
186 def _rate_job(
187 self,
188 db: Session,
189 scraped_job: models.ScrapedJob,
190 user_id: int,
191 user_qualification: models.UserQualification,
192 service_log: models.JobRatingServiceLog,
193 system_prompt: models.AiSystemPrompt,
194 job_prompt_template: models.AiJobPromptTemplate,
195 combined_system_prompt: str,
196 ) -> None:
197 """Rate a single scraped job.
198 :param db: Database session
199 :param scraped_job: The scraped job to rate
200 :param user_id: The ID of the user to rate the job for
201 :param user_qualification: The user's qualification
202 :param service_log: Job rating service log entry
203 :param system_prompt: Latest system prompt template
204 :param job_prompt_template: Latest job prompt template
205 :param combined_system_prompt: Pre-built system prompt with candidate profile embedded"""
207 self.logger.info(f"Processing job ID {scraped_job.id}")
208 notes = []
210 job_rating_kwargs = dict(
211 scraped_job_id=scraped_job.id,
212 owner_id=user_id,
213 user_qualification_id=user_qualification.id,
214 system_prompt_id=system_prompt.id,
215 job_prompt_template_id=job_prompt_template.id,
216 llm_model=CLAUDE_MODEL,
217 )
219 # Check that the job is not closed
220 if scraped_job.is_closed or (scraped_job.deadline and scraped_job.deadline < dt.datetime.now(dt.timezone.utc)):
221 self.logger.info(f"Skipping job ID {scraped_job.id} as it is closed")
222 job_rating = models.JobRating(
223 is_skipped=True,
224 skip_reason="Job is closed",
225 **job_rating_kwargs,
226 )
227 db.add(job_rating)
228 service_log.job_skipped_ids = service_log.job_skipped_ids + [scraped_job.id]
229 db.commit()
230 return
232 # Check that the job description is not too short
233 if scraped_job.description and len(scraped_job.description) < settings.min_scraping_description_length:
234 self.logger.info(f"Skipping job ID {scraped_job.id} as its description is too short")
235 job_rating = models.JobRating(
236 is_skipped=True,
237 skip_reason=f"Job description too short (minimum length is {settings.min_scraping_description_length} characters)",
238 **job_rating_kwargs,
239 )
240 db.add(job_rating)
241 service_log.job_skipped_ids = service_log.job_skipped_ids + [scraped_job.id]
242 db.commit()
243 return
245 # Ensure that the job has a description
246 if not scraped_job.description:
247 self.logger.info(f"Skipping job ID {scraped_job.id} as it has no description")
248 job_rating = models.JobRating(
249 is_skipped=True,
250 skip_reason="Job has no description",
251 **job_rating_kwargs,
252 )
253 db.add(job_rating)
254 service_log.job_skipped_ids = service_log.job_skipped_ids + [scraped_job.id]
255 db.commit()
256 return
258 description, description_note = ensure_length_limit(
259 "description", scraped_job.description, settings.max_scraping_description_length, self.logger
260 )
261 if description_note:
262 notes.append(description_note)
263 title, title_note = ensure_length_limit(
264 "title", scraped_job.title, settings.max_scraping_title_length, self.logger
265 )
266 if title_note:
267 notes.append(title_note)
268 company, company_note = ensure_length_limit(
269 "company", scraped_job.company, settings.max_scraping_company_length, self.logger
270 )
271 if company_note:
272 notes.append(company_note)
274 if notes:
275 job_rating_kwargs["notes"] = notes
277 score = None
278 try:
279 self.logger.info(f"Scoring job ID {scraped_job.id}")
280 job_prompt = create_job_only_prompt(
281 prompt_template=job_prompt_template.prompt,
282 job_title=title,
283 job_company=company,
284 job_description=description,
285 )
286 score = claude_query(combined_system_prompt, job_prompt)
287 job_rating = models.JobRating(
288 overall_score=score["overall_score"],
289 technical_score=score["technical_fit"],
290 experience_score=score["experience_alignment"],
291 educational_score=score["educational_match"],
292 interest_score=score["interest_match"],
293 feedback=score["explanation"],
294 job_prompt=combined_system_prompt + "\n\n" + job_prompt,
295 is_success=True,
296 **job_rating_kwargs,
297 )
298 db.add(job_rating)
299 service_log.job_succeeded_ids = service_log.job_succeeded_ids + [scraped_job.id]
300 db.commit()
301 except Exception as exception:
302 tb = traceback.format_exc()
303 self.logger.exception(f"Error in rating workflow: {exception}")
304 job_rating = models.JobRating(
305 is_success=False,
306 error=f"Error scoring job: {exception}\n{tb}\nRaw response is {score}",
307 **job_rating_kwargs,
308 )
309 db.add(job_rating)
310 service_log.job_failed_ids = service_log.job_failed_ids + [scraped_job.id]
311 db.commit()
314job_rating_service_runner = ServiceRunner(
315 service_name=SERVICE_NAME,
316 service_function=ScrapedJobRater().run,
317)