Coverage for backend / app / job_email_scraping / routers / forwarding_confirmation.py: 100%
24 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
1"""FastAPI routers for the job email scraping service endpoints.
3Provides REST API endpoints for managing job alert emails, scraped job postings,
4and service execution logs with CRUD operations and admin access controls."""
6from fastapi import APIRouter, Depends, HTTPException
7from sqlalchemy.orm import Session
8from starlette import status
10from app import models
11from app.core.oauth2 import get_current_user
12from app.database import get_db
13from app.job_email_scraping import schemas
15forwarding_confirmation_router = APIRouter(
16 prefix="/forwarding-confirmation-links",
17 tags=["forwarding-confirmation-links"],
18)
21@forwarding_confirmation_router.get("/pending", response_model=schemas.ForwardingConfirmationLinkOut | None)
22def get_pending_confirmation_links(
23 current_user: models.User = Depends(get_current_user),
24 db: Session = Depends(get_db),
25) -> models.ForwardingConfirmationLink | None:
26 """Get all unused forwarding confirmation links for the current user.
27 :param current_user: Current authenticated user
28 :param db: Database session
29 :return: List of pending forwarding confirmation links"""
31 entry = (
32 db.query(models.ForwardingConfirmationLink)
33 .filter(models.ForwardingConfirmationLink.owner_id == current_user.id)
34 .order_by(models.ForwardingConfirmationLink.created_at.desc())
35 .first()
36 )
37 if entry and entry.is_used:
38 return None
39 else:
40 return entry
43@forwarding_confirmation_router.put("/{link_id}", response_model=schemas.ForwardingConfirmationLinkOut)
44def update_confirmation_link(
45 link_id: int,
46 update_data: schemas.ForwardingConfirmationLinkUpdate,
47 current_user: models.User = Depends(get_current_user),
48 db: Session = Depends(get_db),
49) -> models.ForwardingConfirmationLink:
50 """Update a forwarding confirmation link (mark as used).
51 :param link_id: ID of the link to update
52 :param update_data: Update data
53 :param current_user: Current authenticated user
54 :param db: Database session
55 :return: Updated forwarding confirmation link"""
57 link = (
58 db.query(models.ForwardingConfirmationLink)
59 .filter(models.ForwardingConfirmationLink.id == link_id)
60 .filter(models.ForwardingConfirmationLink.owner_id == current_user.id)
61 .first()
62 )
64 if not link:
65 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Confirmation link not found")
67 for key, value in update_data.model_dump(exclude_unset=True).items():
68 setattr(link, key, value)
70 db.commit()
71 db.refresh(link)
72 return link