Coverage for backend/app/routers/dashboard.py: 100%
48 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-22 15:38 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-22 15:38 +0000
1"""API router for dashboard data"""
3from datetime import datetime, timedelta
5from fastapi import APIRouter, Depends
6from sqlalchemy import or_
7from sqlalchemy.orm import Session
9from app import models, database, oauth2, schemas
11router = APIRouter(prefix="/dashboard", tags=["dashboard"])
14@router.get("/")
15def get_dashboard_data(
16 db: Session = Depends(database.get_db),
17 current_user: models.User = Depends(oauth2.get_current_user),
18) -> dict:
19 """Get dashboard data including job applications, interviews, and job application updates.
20 :param db: Database session
21 :param current_user: Authenticated user"""
23 update_limit = current_user.update_limit
24 chase_threshold = current_user.chase_threshold
25 deadline_threshold = current_user.deadline_threshold
27 # ---------------------------------------------------- ALL DATA ----------------------------------------------------
29 # noinspection PyTypeChecker
30 job_query = db.query(models.Job).filter(models.Job.owner_id == current_user.id)
31 jobs = job_query.all()
33 job_application_query = job_query.filter(
34 or_(models.Job.application_date.isnot(None), models.Job.application_status.isnot(None))
35 )
36 job_applications = job_application_query.all()
38 job_application_pending = job_application_query.filter(
39 models.Job.application_status.notin_(["rejected", "withdrawn"])
40 ).all()
42 # noinspection PyTypeChecker
43 interview_query = db.query(models.Interview).filter(models.Interview.owner_id == current_user.id)
44 interviews = interview_query.all()
46 # noinspection PyTypeChecker
47 updates = (
48 db.query(models.JobApplicationUpdate).filter(models.JobApplicationUpdate.owner_id == current_user.id).all()
49 )
51 # --------------------------------------------------- STATISTICS ---------------------------------------------------
53 statistics = {
54 "jobs": len(jobs),
55 "job_applications": len(job_applications),
56 "job_application_pending": len(job_application_pending),
57 "interviews": len(interviews),
58 }
60 # -------------------------------------------------- NEED CHASING --------------------------------------------------
62 # Filter by last update date in Python and prepare job data
63 needs_chase = []
64 for job in job_application_pending:
65 # Convert job application to Pydantic schema to access computed fields
66 job_schema = schemas.JobOut.model_validate(job, from_attributes=True)
68 # If we have a last update date, check if it's older than the threshold
69 if job_schema.days_since_last_update is not None:
70 if job_schema.days_since_last_update > chase_threshold:
71 needs_chase.append(job_schema)
73 # ----------------------------------------------------- UPDATES ----------------------------------------------------
75 # Create unified update objects
76 all_updates = []
78 # Add job applications as "Application" updates
79 for job in job_applications:
80 job_out = schemas.JobOut.model_validate(job, from_attributes=True)
81 update_item = {
82 "data": job_out,
83 "date": job_out.application_date,
84 "type": "Application",
85 "job": job_out,
86 }
87 all_updates.append(update_item)
89 # Add interviews as "Interview" updates
90 for interview in interviews:
91 interview_out = schemas.InterviewOut.model_validate(interview, from_attributes=True)
92 job_out = schemas.JobOut.model_validate(interview.job, from_attributes=True)
93 update_item = {
94 "data": interview_out,
95 "date": interview_out.date,
96 "type": "Interview",
97 "job": job_out,
98 }
99 all_updates.append(update_item)
101 # Add job application updates
102 for update in updates:
103 update_out = schemas.JobApplicationUpdateOut.model_validate(update, from_attributes=True)
104 job_out = schemas.JobOut.model_validate(update.job, from_attributes=True)
105 update_item = {
106 "data": update_out,
107 "date": update_out.date,
108 "type": "Job Application Update",
109 "job": job_out,
110 }
111 all_updates.append(update_item)
113 # Sort by date (most recent first) and apply the limit
114 all_updates.sort(key=lambda x: x["date"], reverse=True)
115 all_updates = all_updates[:update_limit]
117 # ---------------------------------------------- UPCOMING INTERVIEWS -----------------------------------------------
119 upcoming_interviews = (
120 interview_query.filter(models.Interview.date >= datetime.now()).order_by(models.Interview.date).all()
121 )
122 upcoming_interviews = [
123 schemas.InterviewOut.model_validate(interview, from_attributes=True) for interview in upcoming_interviews
124 ]
126 # --------------------------------=-------------- UPCOMING DEADLINES -----------------------------------------------
128 upcoming_deadlines = (
129 job_query.filter(models.Job.application_date.is_(None), models.Job.application_status.is_(None))
130 .filter((models.Job.deadline - datetime.now()) <= timedelta(days=deadline_threshold))
131 .order_by(models.Job.deadline)
132 .all()
133 )
134 upcoming_deadlines = [schemas.JobOut.model_validate(job, from_attributes=True) for job in upcoming_deadlines]
136 return dict(
137 statistics=statistics,
138 needs_chase=needs_chase,
139 all_updates=all_updates,
140 upcoming_interviews=upcoming_interviews,
141 upcoming_deadlines=upcoming_deadlines,
142 )