Coverage for backend/app/routers/dashboard.py: 100%

48 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-22 15:38 +0000

1"""API router for dashboard data""" 

2 

3from datetime import datetime, timedelta 

4 

5from fastapi import APIRouter, Depends 

6from sqlalchemy import or_ 

7from sqlalchemy.orm import Session 

8 

9from app import models, database, oauth2, schemas 

10 

11router = APIRouter(prefix="/dashboard", tags=["dashboard"]) 

12 

13 

14@router.get("/") 

15def get_dashboard_data( 

16 db: Session = Depends(database.get_db), 

17 current_user: models.User = Depends(oauth2.get_current_user), 

18) -> dict: 

19 """Get dashboard data including job applications, interviews, and job application updates. 

20 :param db: Database session 

21 :param current_user: Authenticated user""" 

22 

23 update_limit = current_user.update_limit 

24 chase_threshold = current_user.chase_threshold 

25 deadline_threshold = current_user.deadline_threshold 

26 

27 # ---------------------------------------------------- ALL DATA ---------------------------------------------------- 

28 

29 # noinspection PyTypeChecker 

30 job_query = db.query(models.Job).filter(models.Job.owner_id == current_user.id) 

31 jobs = job_query.all() 

32 

33 job_application_query = job_query.filter( 

34 or_(models.Job.application_date.isnot(None), models.Job.application_status.isnot(None)) 

35 ) 

36 job_applications = job_application_query.all() 

37 

38 job_application_pending = job_application_query.filter( 

39 models.Job.application_status.notin_(["rejected", "withdrawn"]) 

40 ).all() 

41 

42 # noinspection PyTypeChecker 

43 interview_query = db.query(models.Interview).filter(models.Interview.owner_id == current_user.id) 

44 interviews = interview_query.all() 

45 

46 # noinspection PyTypeChecker 

47 updates = ( 

48 db.query(models.JobApplicationUpdate).filter(models.JobApplicationUpdate.owner_id == current_user.id).all() 

49 ) 

50 

51 # --------------------------------------------------- STATISTICS --------------------------------------------------- 

52 

53 statistics = { 

54 "jobs": len(jobs), 

55 "job_applications": len(job_applications), 

56 "job_application_pending": len(job_application_pending), 

57 "interviews": len(interviews), 

58 } 

59 

60 # -------------------------------------------------- NEED CHASING -------------------------------------------------- 

61 

62 # Filter by last update date in Python and prepare job data 

63 needs_chase = [] 

64 for job in job_application_pending: 

65 # Convert job application to Pydantic schema to access computed fields 

66 job_schema = schemas.JobOut.model_validate(job, from_attributes=True) 

67 

68 # If we have a last update date, check if it's older than the threshold 

69 if job_schema.days_since_last_update is not None: 

70 if job_schema.days_since_last_update > chase_threshold: 

71 needs_chase.append(job_schema) 

72 

73 # ----------------------------------------------------- UPDATES ---------------------------------------------------- 

74 

75 # Create unified update objects 

76 all_updates = [] 

77 

78 # Add job applications as "Application" updates 

79 for job in job_applications: 

80 job_out = schemas.JobOut.model_validate(job, from_attributes=True) 

81 update_item = { 

82 "data": job_out, 

83 "date": job_out.application_date, 

84 "type": "Application", 

85 "job": job_out, 

86 } 

87 all_updates.append(update_item) 

88 

89 # Add interviews as "Interview" updates 

90 for interview in interviews: 

91 interview_out = schemas.InterviewOut.model_validate(interview, from_attributes=True) 

92 job_out = schemas.JobOut.model_validate(interview.job, from_attributes=True) 

93 update_item = { 

94 "data": interview_out, 

95 "date": interview_out.date, 

96 "type": "Interview", 

97 "job": job_out, 

98 } 

99 all_updates.append(update_item) 

100 

101 # Add job application updates 

102 for update in updates: 

103 update_out = schemas.JobApplicationUpdateOut.model_validate(update, from_attributes=True) 

104 job_out = schemas.JobOut.model_validate(update.job, from_attributes=True) 

105 update_item = { 

106 "data": update_out, 

107 "date": update_out.date, 

108 "type": "Job Application Update", 

109 "job": job_out, 

110 } 

111 all_updates.append(update_item) 

112 

113 # Sort by date (most recent first) and apply the limit 

114 all_updates.sort(key=lambda x: x["date"], reverse=True) 

115 all_updates = all_updates[:update_limit] 

116 

117 # ---------------------------------------------- UPCOMING INTERVIEWS ----------------------------------------------- 

118 

119 upcoming_interviews = ( 

120 interview_query.filter(models.Interview.date >= datetime.now()).order_by(models.Interview.date).all() 

121 ) 

122 upcoming_interviews = [ 

123 schemas.InterviewOut.model_validate(interview, from_attributes=True) for interview in upcoming_interviews 

124 ] 

125 

126 # --------------------------------=-------------- UPCOMING DEADLINES ----------------------------------------------- 

127 

128 upcoming_deadlines = ( 

129 job_query.filter(models.Job.application_date.is_(None), models.Job.application_status.is_(None)) 

130 .filter((models.Job.deadline - datetime.now()) <= timedelta(days=deadline_threshold)) 

131 .order_by(models.Job.deadline) 

132 .all() 

133 ) 

134 upcoming_deadlines = [schemas.JobOut.model_validate(job, from_attributes=True) for job in upcoming_deadlines] 

135 

136 return dict( 

137 statistics=statistics, 

138 needs_chase=needs_chase, 

139 all_updates=all_updates, 

140 upcoming_interviews=upcoming_interviews, 

141 upcoming_deadlines=upcoming_deadlines, 

142 )