Coverage for backend / app / job_email_scraping / routers / forwarding_confirmation.py: 100%

24 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-17 21:34 +0000

1"""FastAPI routers for the job email scraping service endpoints. 

2 

3Provides REST API endpoints for managing job alert emails, scraped job postings, 

4and service execution logs with CRUD operations and admin access controls.""" 

5 

6from fastapi import APIRouter, Depends, HTTPException 

7from sqlalchemy.orm import Session 

8from starlette import status 

9 

10from app import models 

11from app.core.oauth2 import get_current_user 

12from app.database import get_db 

13from app.job_email_scraping import schemas 

14 

15forwarding_confirmation_router = APIRouter( 

16 prefix="/forwarding-confirmation-links", 

17 tags=["forwarding-confirmation-links"], 

18) 

19 

20 

21@forwarding_confirmation_router.get("/pending", response_model=schemas.ForwardingConfirmationLinkOut | None) 

22def get_pending_confirmation_links( 

23 current_user: models.User = Depends(get_current_user), 

24 db: Session = Depends(get_db), 

25) -> models.ForwardingConfirmationLink | None: 

26 """Get all unused forwarding confirmation links for the current user. 

27 :param current_user: Current authenticated user 

28 :param db: Database session 

29 :return: List of pending forwarding confirmation links""" 

30 

31 entry = ( 

32 db.query(models.ForwardingConfirmationLink) 

33 .filter(models.ForwardingConfirmationLink.owner_id == current_user.id) 

34 .order_by(models.ForwardingConfirmationLink.created_at.desc()) 

35 .first() 

36 ) 

37 if entry and entry.is_used: 

38 return None 

39 else: 

40 return entry 

41 

42 

43@forwarding_confirmation_router.put("/{link_id}", response_model=schemas.ForwardingConfirmationLinkOut) 

44def update_confirmation_link( 

45 link_id: int, 

46 update_data: schemas.ForwardingConfirmationLinkUpdate, 

47 current_user: models.User = Depends(get_current_user), 

48 db: Session = Depends(get_db), 

49) -> models.ForwardingConfirmationLink: 

50 """Update a forwarding confirmation link (mark as used). 

51 :param link_id: ID of the link to update 

52 :param update_data: Update data 

53 :param current_user: Current authenticated user 

54 :param db: Database session 

55 :return: Updated forwarding confirmation link""" 

56 

57 link = ( 

58 db.query(models.ForwardingConfirmationLink) 

59 .filter(models.ForwardingConfirmationLink.id == link_id) 

60 .filter(models.ForwardingConfirmationLink.owner_id == current_user.id) 

61 .first() 

62 ) 

63 

64 if not link: 

65 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Confirmation link not found") 

66 

67 for key, value in update_data.model_dump(exclude_unset=True).items(): 

68 setattr(link, key, value) 

69 

70 db.commit() 

71 db.refresh(link) 

72 return link