Coverage for backend/app/routers/export.py: 100%

30 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-22 15:38 +0000

1"""API router for exporting data""" 

2 

3import csv 

4import io 

5import zipfile 

6 

7from fastapi import APIRouter, Depends 

8from fastapi.responses import StreamingResponse 

9from sqlalchemy.orm import Session 

10 

11from app import database, oauth2 

12 

13router = APIRouter(prefix="/export", tags=["export"]) 

14 

15from app import models 

16 

17MODEL_LIST = [ 

18 models.Job, 

19 models.Company, 

20 models.Keyword, 

21 models.Person, 

22 models.Location, 

23 models.Aggregator, 

24 models.Interview, 

25 models.JobApplicationUpdate, 

26] 

27 

28 

29@router.get("/") 

30def export_jobs_with_all_columns( 

31 db: Session = Depends(database.get_db), 

32 current_user: models.User = Depends(oauth2.get_current_user), 

33): 

34 """Export jobs with all columns (except IDs) and related data as a single CSV file.""" 

35 

36 jobs = db.query(models.Job).filter(models.Job.owner_id == current_user.id).all() 

37 

38 output = io.StringIO() 

39 writer = csv.writer(output) 

40 

41 # List all job columns except IDs and foreign keys 

42 job_fields = [ 

43 "title", 

44 "description", 

45 "salary_min", 

46 "salary_max", 

47 "personal_rating", 

48 "url", 

49 "deadline", 

50 "note", 

51 "attendance_type", 

52 "application_date", 

53 "application_url", 

54 "application_status", 

55 "application_note", 

56 "applied_via", 

57 "created_at", 

58 "modified_at", 

59 ] 

60 

61 # Header with related fields 

62 writer.writerow( 

63 job_fields 

64 + [ 

65 "Company", 

66 "Location", 

67 "Source Aggregator", 

68 "Application Aggregator", 

69 "Keywords", 

70 "Contacts", 

71 "Interviews", 

72 "Updates", 

73 ] 

74 ) 

75 

76 for job in jobs: 

77 row = [getattr(job, field) for field in job_fields] 

78 company = job.company.name if job.company else "" 

79 location = job.location.name if job.location else "" 

80 source_agg = job.source.name if job.source else "" 

81 app_agg = job.application_aggregator.name if job.application_aggregator else "" 

82 keywords = "; ".join([k.name for k in job.keywords]) 

83 contacts = "; ".join([f"{p.first_name} {p.last_name}" for p in job.contacts]) 

84 interviews = "; ".join([f"{i.date.strftime('%Y-%m-%d')} ({i.type})" for i in job.interviews]) 

85 updates = "; ".join([f"{u.date.strftime('%Y-%m-%d')} ({u.type})" for u in job.updates]) 

86 writer.writerow(row + [company, location, source_agg, app_agg, keywords, contacts, interviews, updates]) 

87 

88 output.seek(0) 

89 return StreamingResponse( 

90 output, media_type="text/csv", headers={"Content-Disposition": "attachment; filename=jobs_export.csv"} 

91 )