Coverage for backend / app / routers / export.py: 100%

47 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-17 21:34 +0000

1"""API router for exporting data""" 

2 

3import csv 

4import io 

5import zipfile 

6from typing import Iterable, Any 

7 

8from fastapi import APIRouter, Depends 

9from fastapi.responses import StreamingResponse 

10 

11from app import database, models 

12from app.core import oauth2 

13 

14export_router = APIRouter(prefix="/export", tags=["export"]) 

15 

16 

17JOB_FIELDS = { 

18 "title": "Job Title", 

19 "description": "Job Description", 

20 "salary_min": "Min. Salary", 

21 "salary_max": "Max. Salary", 

22 "personal_rating": "Personal Rating", 

23 "url": "Job URL", 

24 "deadline": "Application Deadline", 

25 "note": "Note", 

26 "attendance_type": "Attendance Type", 

27 "application_date": "Application Date", 

28 "application_url": "Application URL", 

29 "application_status": "Application Status", 

30 "application_note": "Application Note", 

31 "applied_via": "Applied Via", 

32 "created_at": "Created At", 

33 "modified_at": "Last Modified At", 

34} 

35 

36COMPANY_FIELDS = { 

37 "name": "Company Name", 

38 "url": "Website", 

39 "description": "Description", 

40 "created_at": "Created At", 

41 "modified_at": "Last Modified At", 

42} 

43 

44PERSON_FIELDS = { 

45 "first_name": "First Name", 

46 "last_name": "Last Name", 

47 "email": "Email", 

48 "phone": "Phone", 

49 "role": "Role", 

50 "linkedin_url": "LinkedIn URL", 

51 "created_at": "Created At", 

52 "modified_at": "Last Modified At", 

53} 

54 

55AGGREGATOR_FIELDS = { 

56 "name": "Aggregator Name", 

57 "url": "Website", 

58 "created_at": "Created At", 

59 "modified_at": "Last Modified At", 

60} 

61 

62SPECULATIVE_APPLICATION_FIELDS = { 

63 "date": "Application Date", 

64 "note": "Notes", 

65 "contact_email": "Contact Email", 

66 "created_at": "Created At", 

67 "modified_at": "Last Modified At", 

68} 

69 

70SCRAPED_JOB_FIELDS = { 

71 "external_job_id": "External Job ID", 

72 "platform": "Source", 

73 "title": "Job Title", 

74 "description": "Job Description", 

75 "salary_min": "Min. Salary", 

76 "salary_max": "Max. Salary", 

77 "url": "Job URL", 

78 "deadline": "Application Deadline", 

79 "location": "Location", 

80 "attendance_type": "Attendance Type", 

81 "company": "Company", 

82 "created_at": "Created At", 

83 "modified_at": "Last Modified At", 

84} 

85 

86 

87def query_for_owner( 

88 db, 

89 model, 

90 current_owner: models.User, 

91) -> list[Any]: 

92 """Return all records for the current owner. 

93 :param db: Database session 

94 :param model: SQLAlchemy model to query 

95 :param current_owner: Current user 

96 :return: List of model instances belonging to the current owner""" 

97 

98 return db.query(model).filter(model.owner_id == current_owner.id).all() 

99 

100 

101def write_csv( 

102 headers: list[str], 

103 rows: Iterable[list[Any]], 

104) -> str: 

105 """Write CSV content to a string. 

106 :param headers: List of CSV headers 

107 :param rows: Iterable of rows, each row is a list of values 

108 :return: CSV content as a string""" 

109 

110 output = io.StringIO() 

111 writer = csv.writer(output) 

112 writer.writerow(headers) 

113 for row in rows: 

114 writer.writerow(row) 

115 return output.getvalue() 

116 

117 

118def get_model_rows( 

119 instance, 

120 fields: dict, 

121) -> list[Any]: 

122 """Extract ordered field values from a SQLAlchemy model. 

123 :param instance: SQLAlchemy model instance 

124 :param fields: Dictionary of field names to extract 

125 :return: List of field values in the order of the fields dictionary""" 

126 

127 return [getattr(instance, field) for field in fields] 

128 

129 

130@export_router.get("/") 

131def export_all( 

132 db=Depends(database.get_db), 

133 current_user=Depends(oauth2.get_current_user), 

134) -> StreamingResponse: 

135 """Export all user data as a ZIP of CSV files. 

136 :param db: Database session 

137 :param current_user: Current authenticated user""" 

138 

139 mem_zip = io.BytesIO() 

140 

141 with zipfile.ZipFile(mem_zip, "w", zipfile.ZIP_DEFLATED) as zf: 

142 

143 # Jobs 

144 jobs = query_for_owner(db, models.Job, current_user) 

145 job_rows = [] 

146 for job in jobs: 

147 job_rows.append( 

148 get_model_rows(job, JOB_FIELDS) 

149 + [ 

150 job.company.name if job.company else "", 

151 job.location.name if job.location else "", 

152 job.source_aggregator.name if job.source_aggregator else "", 

153 job.application_aggregator.name if job.application_aggregator else "", 

154 "; ".join(k.name for k in job.keywords), 

155 "; ".join(f"{p.first_name} {p.last_name}" for p in job.contacts), 

156 "; ".join(f"{i.date:%Y-%m-%d} ({i.type}) (notes: {i.note})" for i in job.interviews), 

157 "; ".join(f"{u.date:%Y-%m-%d} ({u.type}) (notes: {u.note})" for u in job.updates), 

158 ] 

159 ) 

160 zf.writestr( 

161 "jobs.csv", 

162 write_csv( 

163 list(JOB_FIELDS.values()) 

164 + [ 

165 "Company", 

166 "Location", 

167 "Source Aggregator", 

168 "Application Aggregator", 

169 "Keywords", 

170 "Contacts", 

171 "Interviews", 

172 "Updates", 

173 ], 

174 job_rows, 

175 ), 

176 ) 

177 

178 # People 

179 people = query_for_owner(db, models.Person, current_user) 

180 zf.writestr( 

181 "people.csv", 

182 write_csv( 

183 list(PERSON_FIELDS.values()) + ["Company"], 

184 [get_model_rows(p, PERSON_FIELDS) + [p.company.name if p.company else ""] for p in people], 

185 ), 

186 ) 

187 

188 # Companies 

189 companies = query_for_owner(db, models.Company, current_user) 

190 zf.writestr( 

191 "companies.csv", 

192 write_csv( 

193 list(COMPANY_FIELDS.values()) + ["People"], 

194 [ 

195 get_model_rows(c, COMPANY_FIELDS) + ["; ".join(f"{p.first_name} {p.last_name}" for p in c.persons)] 

196 for c in companies 

197 ], 

198 ), 

199 ) 

200 

201 # Aggregators 

202 aggregators = query_for_owner(db, models.Aggregator, current_user) 

203 zf.writestr( 

204 "aggregators.csv", 

205 write_csv( 

206 list(AGGREGATOR_FIELDS.values()), 

207 [get_model_rows(a, AGGREGATOR_FIELDS) for a in aggregators], 

208 ), 

209 ) 

210 

211 # Speculative Applications 

212 spec_apps = query_for_owner(db, models.SpeculativeApplication, current_user) 

213 zf.writestr( 

214 "speculative_applications.csv", 

215 write_csv( 

216 list(SPECULATIVE_APPLICATION_FIELDS.values()) + ["Company"], 

217 [ 

218 get_model_rows(sa, SPECULATIVE_APPLICATION_FIELDS) + [sa.company.name if sa.company else ""] 

219 for sa in spec_apps 

220 ], 

221 ), 

222 ) 

223 

224 # Scraped Jobs 

225 scraped_jobs = query_for_owner(db, models.ScrapedJob, current_user) 

226 zf.writestr( 

227 "scraped_jobs.csv", 

228 write_csv( 

229 list(SCRAPED_JOB_FIELDS.values()) + ["Rating"], 

230 [ 

231 get_model_rows(sj, SCRAPED_JOB_FIELDS) + [sj.job_rating.overall_score if sj.job_rating else None] 

232 for sj in scraped_jobs 

233 ], 

234 ), 

235 ) 

236 

237 mem_zip.seek(0) 

238 return StreamingResponse( 

239 mem_zip, 

240 media_type="application/x-zip-compressed", 

241 headers={"Content-Disposition": "attachment; filename=all_exports.zip"}, 

242 )