Coverage for backend / app / payments / routers / test_routers.py: 100%
12 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:34 +0000
1"""Mock webhook endpoints for testing Stripe subscription flows."""
3import asyncio
4import datetime as dt
6from fastapi import APIRouter, Depends, HTTPException
7from pydantic import BaseModel, Field
8from starlette import status
10from app.core.oauth2 import get_current_user
11from app.models import User
12from app.payments import logger, stripe
14payment_test_router = APIRouter(prefix="/test", tags=["testing"])
17@payment_test_router.delete("/delete-all-customers")
18async def delete_all_stripe_customers() -> dict: # pragma: no cover
19 """Delete ALL Stripe customers, test clocks, and cancel all subscriptions.
20 WARNING: This permanently deletes ALL customers and test clocks from Stripe
21 and immediately cancels all active subscriptions across your entire account.
22 This action cannot be undone.
23 :return: dict with deletion summary"""
25 deleted_customers = 0
26 deleted_clocks = 0
27 failed_deletions = []
29 try:
30 # Delete all customers with pagination
31 has_more = True
32 starting_after = None
34 while has_more:
35 params = {"limit": 100}
36 if starting_after:
37 params["starting_after"] = starting_after # noqa
39 customers = await stripe.Customer.list_async(**params)
41 for customer in customers.data:
42 try:
43 deleted_customer = await stripe.Customer.delete_async(customer.id)
44 if deleted_customer.get("deleted"):
45 deleted_customers += 1
46 logger.info(f"Deleted Stripe customer {customer.id}")
47 else:
48 failed_deletions.append(customer.id)
49 except stripe.error.StripeError as e:
50 logger.error(f"Failed to delete customer {customer.id}: {str(e)}")
51 failed_deletions.append(customer.id)
53 has_more = customers.has_more
54 if has_more and customers.data:
55 starting_after = customers.data[-1].id
57 # Delete all test clocks with pagination
58 has_more = True
59 starting_after = None
61 while has_more:
62 params = {"limit": 100}
63 if starting_after:
64 params["starting_after"] = starting_after # noqa
66 test_clocks = await stripe.test_helpers.TestClock.list_async(**params)
68 for clock in test_clocks.data:
69 try:
70 await stripe.test_helpers.TestClock.delete_async(clock.id)
71 deleted_clocks += 1
72 logger.info(f"Deleted test clock {clock.id}")
73 except stripe.error.StripeError as e:
74 logger.error(f"Failed to delete test clock {clock.id}: {str(e)}")
75 failed_deletions.append(clock.id)
77 has_more = test_clocks.has_more
78 if has_more and test_clocks.data:
79 starting_after = test_clocks.data[-1].id
81 return {
82 "success": True,
83 "message": f"Deleted {deleted_customers} customers and {deleted_clocks} test clocks",
84 "deleted_customers": deleted_customers,
85 "deleted_clocks": deleted_clocks,
86 "failed_count": len(failed_deletions),
87 "failed_ids": failed_deletions if failed_deletions else None,
88 }
90 except stripe.error.StripeError as e:
91 logger.error(f"Stripe error during bulk deletion: {str(e)}", exc_info=True)
92 raise HTTPException(
93 status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
94 detail="Payment service error. Please try again.",
95 )
96 except Exception as e:
97 logger.error(f"Unexpected error during bulk deletion: {str(e)}", exc_info=True)
98 raise HTTPException(
99 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
100 detail="An error occurred during deletion.",
101 )
104class AdvanceClockRequest(BaseModel):
105 days: int = Field(gt=0, le=730, description="Number of days to advance (1-730)")
106 test_clock_id: str | None = Field(
107 None, description="Optional test clock ID. If not provided, uses user's test clock"
108 )
111@payment_test_router.post("/advance-test-clock")
112async def advance_test_clock(
113 request: AdvanceClockRequest,
114 current_user: User = Depends(get_current_user),
115) -> dict: # pragma: no cover
116 """Advance a Stripe test clock by X days for testing subscriptions.
117 This allows you to simulate time passing to test trial expirations,
118 billing cycles, and subscription renewals without waiting.
119 :param request: Request with days to advance and optional test_clock_id
120 :param current_user: Authenticated user from JWT token
121 :return: dict with updated test clock information
122 :raises HTTPException: If test clock doesn't exist or Stripe error occurs"""
124 try:
125 # Get test clock ID (from request or retrieve user's test clock)
126 test_clock_id = request.test_clock_id
128 if not test_clock_id:
129 # If no clock ID provided, find the user's customer test clock
130 if not current_user.stripe_details.customer_id:
131 raise HTTPException(
132 status_code=status.HTTP_404_NOT_FOUND,
133 detail="No Stripe customer found. Create a subscription first.",
134 )
136 # Retrieve customer to get their test clock
137 customer = await stripe.Customer.retrieve_async(current_user.stripe_details.customer_id)
138 test_clock_id = customer.get("test_clock")
140 if not test_clock_id:
141 raise HTTPException(
142 status_code=status.HTTP_404_NOT_FOUND,
143 detail="Customer is not associated with a test clock",
144 )
146 # Retrieve current test clock to get frozen_time
147 test_clock = await stripe.test_helpers.TestClock.retrieve_async(test_clock_id)
149 if test_clock.status != "ready":
150 raise HTTPException(
151 status_code=409,
152 detail=f"Test clock is currently {test_clock.status}. Wait until ready.",
153 )
155 # Calculate new frozen time (current + days)
156 current_frozen_time = test_clock.frozen_time
157 seconds_to_advance = request.days * 24 * 60 * 60
158 new_frozen_time = current_frozen_time + seconds_to_advance
160 # Advance the test clock
161 await stripe.test_helpers.TestClock.advance_async(test_clock_id, frozen_time=new_frozen_time)
163 logger.info(f"Advanced test clock {test_clock_id} by {request.days} days for user {current_user.id}")
165 # Wait for the test clock to be ready again (poll up to 30 seconds)
166 max_wait_time = 30
167 poll_interval = 0.5
168 elapsed_time = 0
170 while elapsed_time < max_wait_time:
171 await asyncio.sleep(poll_interval)
172 elapsed_time += poll_interval
174 updated_clock = await stripe.test_helpers.TestClock.retrieve_async(test_clock_id)
175 if updated_clock.status == "ready":
176 break
177 else:
178 # Timeout waiting for clock to be ready
179 raise HTTPException(
180 status_code=status.HTTP_408_REQUEST_TIMEOUT,
181 detail=f"Test clock did not become ready within {max_wait_time} seconds",
182 )
184 # Retrieve final status
185 updated_clock = await stripe.test_helpers.TestClock.retrieve_async(test_clock_id)
187 return {
188 "success": True,
189 "test_clock_id": test_clock_id,
190 "days_advanced": request.days,
191 "previous_time": dt.datetime.fromtimestamp(current_frozen_time).isoformat(),
192 "new_time": dt.datetime.fromtimestamp(new_frozen_time).isoformat(),
193 "status": updated_clock.status,
194 "message": f"Advanced test clock by {request.days} days",
195 }
197 except stripe.error.InvalidRequestError as e:
198 logger.error(f"Invalid test clock request: {str(e)}")
199 raise HTTPException(status_code=404, detail="Test clock not found or invalid advancement")
200 except stripe.error.StripeError as e:
201 logger.error(f"Stripe error advancing test clock: {str(e)}", exc_info=True)
202 raise HTTPException(status_code=503, detail="Payment service temporarily unavailable. Please try again.")
203 except HTTPException:
204 raise
205 except Exception as e:
206 logger.error(f"Unexpected error advancing test clock: {str(e)}", exc_info=True)
207 raise HTTPException(status_code=500, detail="An error occurred. Please try again.")