Coverage for backend / app / routers / export.py: 100%
47 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
1"""API router for exporting data"""
3import csv
4import io
5import zipfile
6from typing import Iterable, Any
8from fastapi import APIRouter, Depends
9from fastapi.responses import StreamingResponse
11from app import database, models
12from app.core import oauth2
14export_router = APIRouter(prefix="/export", tags=["export"])
17JOB_FIELDS = {
18 "title": "Job Title",
19 "description": "Job Description",
20 "salary_min": "Min. Salary",
21 "salary_max": "Max. Salary",
22 "personal_rating": "Personal Rating",
23 "url": "Job URL",
24 "deadline": "Application Deadline",
25 "note": "Note",
26 "attendance_type": "Attendance Type",
27 "application_date": "Application Date",
28 "application_url": "Application URL",
29 "application_status": "Application Status",
30 "application_note": "Application Note",
31 "applied_via": "Applied Via",
32 "created_at": "Created At",
33 "modified_at": "Last Modified At",
34}
36COMPANY_FIELDS = {
37 "name": "Company Name",
38 "url": "Website",
39 "description": "Description",
40 "created_at": "Created At",
41 "modified_at": "Last Modified At",
42}
44PERSON_FIELDS = {
45 "first_name": "First Name",
46 "last_name": "Last Name",
47 "email": "Email",
48 "phone": "Phone",
49 "role": "Role",
50 "linkedin_url": "LinkedIn URL",
51 "created_at": "Created At",
52 "modified_at": "Last Modified At",
53}
55AGGREGATOR_FIELDS = {
56 "name": "Aggregator Name",
57 "url": "Website",
58 "created_at": "Created At",
59 "modified_at": "Last Modified At",
60}
62SPECULATIVE_APPLICATION_FIELDS = {
63 "date": "Application Date",
64 "note": "Notes",
65 "contact_email": "Contact Email",
66 "created_at": "Created At",
67 "modified_at": "Last Modified At",
68}
70SCRAPED_JOB_FIELDS = {
71 "external_job_id": "External Job ID",
72 "platform": "Source",
73 "title": "Job Title",
74 "description": "Job Description",
75 "salary_min": "Min. Salary",
76 "salary_max": "Max. Salary",
77 "url": "Job URL",
78 "deadline": "Application Deadline",
79 "location": "Location",
80 "attendance_type": "Attendance Type",
81 "company": "Company",
82 "created_at": "Created At",
83 "modified_at": "Last Modified At",
84}
87def query_for_owner(
88 db,
89 model,
90 current_owner: models.User,
91) -> list[Any]:
92 """Return all records for the current owner.
93 :param db: Database session
94 :param model: SQLAlchemy model to query
95 :param current_owner: Current user
96 :return: List of model instances belonging to the current owner"""
98 return db.query(model).filter(model.owner_id == current_owner.id).all()
101def write_csv(
102 headers: list[str],
103 rows: Iterable[list[Any]],
104) -> str:
105 """Write CSV content to a string.
106 :param headers: List of CSV headers
107 :param rows: Iterable of rows, each row is a list of values
108 :return: CSV content as a string"""
110 output = io.StringIO()
111 writer = csv.writer(output)
112 writer.writerow(headers)
113 for row in rows:
114 writer.writerow(row)
115 return output.getvalue()
118def get_model_rows(
119 instance,
120 fields: dict,
121) -> list[Any]:
122 """Extract ordered field values from a SQLAlchemy model.
123 :param instance: SQLAlchemy model instance
124 :param fields: Dictionary of field names to extract
125 :return: List of field values in the order of the fields dictionary"""
127 return [getattr(instance, field) for field in fields]
130@export_router.get("/")
131def export_all(
132 db=Depends(database.get_db),
133 current_user=Depends(oauth2.get_current_user),
134) -> StreamingResponse:
135 """Export all user data as a ZIP of CSV files.
136 :param db: Database session
137 :param current_user: Current authenticated user"""
139 mem_zip = io.BytesIO()
141 with zipfile.ZipFile(mem_zip, "w", zipfile.ZIP_DEFLATED) as zf:
143 # Jobs
144 jobs = query_for_owner(db, models.Job, current_user)
145 job_rows = []
146 for job in jobs:
147 job_rows.append(
148 get_model_rows(job, JOB_FIELDS)
149 + [
150 job.company.name if job.company else "",
151 job.location.name if job.location else "",
152 job.source_aggregator.name if job.source_aggregator else "",
153 job.application_aggregator.name if job.application_aggregator else "",
154 "; ".join(k.name for k in job.keywords),
155 "; ".join(f"{p.first_name} {p.last_name}" for p in job.contacts),
156 "; ".join(f"{i.date:%Y-%m-%d} ({i.type}) (notes: {i.note})" for i in job.interviews),
157 "; ".join(f"{u.date:%Y-%m-%d} ({u.type}) (notes: {u.note})" for u in job.updates),
158 ]
159 )
160 zf.writestr(
161 "jobs.csv",
162 write_csv(
163 list(JOB_FIELDS.values())
164 + [
165 "Company",
166 "Location",
167 "Source Aggregator",
168 "Application Aggregator",
169 "Keywords",
170 "Contacts",
171 "Interviews",
172 "Updates",
173 ],
174 job_rows,
175 ),
176 )
178 # People
179 people = query_for_owner(db, models.Person, current_user)
180 zf.writestr(
181 "people.csv",
182 write_csv(
183 list(PERSON_FIELDS.values()) + ["Company"],
184 [get_model_rows(p, PERSON_FIELDS) + [p.company.name if p.company else ""] for p in people],
185 ),
186 )
188 # Companies
189 companies = query_for_owner(db, models.Company, current_user)
190 zf.writestr(
191 "companies.csv",
192 write_csv(
193 list(COMPANY_FIELDS.values()) + ["People"],
194 [
195 get_model_rows(c, COMPANY_FIELDS) + ["; ".join(f"{p.first_name} {p.last_name}" for p in c.persons)]
196 for c in companies
197 ],
198 ),
199 )
201 # Aggregators
202 aggregators = query_for_owner(db, models.Aggregator, current_user)
203 zf.writestr(
204 "aggregators.csv",
205 write_csv(
206 list(AGGREGATOR_FIELDS.values()),
207 [get_model_rows(a, AGGREGATOR_FIELDS) for a in aggregators],
208 ),
209 )
211 # Speculative Applications
212 spec_apps = query_for_owner(db, models.SpeculativeApplication, current_user)
213 zf.writestr(
214 "speculative_applications.csv",
215 write_csv(
216 list(SPECULATIVE_APPLICATION_FIELDS.values()) + ["Company"],
217 [
218 get_model_rows(sa, SPECULATIVE_APPLICATION_FIELDS) + [sa.company.name if sa.company else ""]
219 for sa in spec_apps
220 ],
221 ),
222 )
224 # Scraped Jobs
225 scraped_jobs = query_for_owner(db, models.ScrapedJob, current_user)
226 zf.writestr(
227 "scraped_jobs.csv",
228 write_csv(
229 list(SCRAPED_JOB_FIELDS.values()) + ["Rating"],
230 [
231 get_model_rows(sj, SCRAPED_JOB_FIELDS) + [sj.job_rating.overall_score if sj.job_rating else None]
232 for sj in scraped_jobs
233 ],
234 ),
235 )
237 mem_zip.seek(0)
238 return StreamingResponse(
239 mem_zip,
240 media_type="application/x-zip-compressed",
241 headers={"Content-Disposition": "attachment; filename=all_exports.zip"},
242 )