Coverage for backend / app / payments / routers / test_routers.py: 100%

12 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-17 21:34 +0000

1"""Mock webhook endpoints for testing Stripe subscription flows.""" 

2 

3import asyncio 

4import datetime as dt 

5 

6from fastapi import APIRouter, Depends, HTTPException 

7from pydantic import BaseModel, Field 

8from starlette import status 

9 

10from app.core.oauth2 import get_current_user 

11from app.models import User 

12from app.payments import logger, stripe 

13 

14payment_test_router = APIRouter(prefix="/test", tags=["testing"]) 

15 

16 

17@payment_test_router.delete("/delete-all-customers") 

18async def delete_all_stripe_customers() -> dict: # pragma: no cover 

19 """Delete ALL Stripe customers, test clocks, and cancel all subscriptions. 

20 WARNING: This permanently deletes ALL customers and test clocks from Stripe 

21 and immediately cancels all active subscriptions across your entire account. 

22 This action cannot be undone. 

23 :return: dict with deletion summary""" 

24 

25 deleted_customers = 0 

26 deleted_clocks = 0 

27 failed_deletions = [] 

28 

29 try: 

30 # Delete all customers with pagination 

31 has_more = True 

32 starting_after = None 

33 

34 while has_more: 

35 params = {"limit": 100} 

36 if starting_after: 

37 params["starting_after"] = starting_after # noqa 

38 

39 customers = await stripe.Customer.list_async(**params) 

40 

41 for customer in customers.data: 

42 try: 

43 deleted_customer = await stripe.Customer.delete_async(customer.id) 

44 if deleted_customer.get("deleted"): 

45 deleted_customers += 1 

46 logger.info(f"Deleted Stripe customer {customer.id}") 

47 else: 

48 failed_deletions.append(customer.id) 

49 except stripe.error.StripeError as e: 

50 logger.error(f"Failed to delete customer {customer.id}: {str(e)}") 

51 failed_deletions.append(customer.id) 

52 

53 has_more = customers.has_more 

54 if has_more and customers.data: 

55 starting_after = customers.data[-1].id 

56 

57 # Delete all test clocks with pagination 

58 has_more = True 

59 starting_after = None 

60 

61 while has_more: 

62 params = {"limit": 100} 

63 if starting_after: 

64 params["starting_after"] = starting_after # noqa 

65 

66 test_clocks = await stripe.test_helpers.TestClock.list_async(**params) 

67 

68 for clock in test_clocks.data: 

69 try: 

70 await stripe.test_helpers.TestClock.delete_async(clock.id) 

71 deleted_clocks += 1 

72 logger.info(f"Deleted test clock {clock.id}") 

73 except stripe.error.StripeError as e: 

74 logger.error(f"Failed to delete test clock {clock.id}: {str(e)}") 

75 failed_deletions.append(clock.id) 

76 

77 has_more = test_clocks.has_more 

78 if has_more and test_clocks.data: 

79 starting_after = test_clocks.data[-1].id 

80 

81 return { 

82 "success": True, 

83 "message": f"Deleted {deleted_customers} customers and {deleted_clocks} test clocks", 

84 "deleted_customers": deleted_customers, 

85 "deleted_clocks": deleted_clocks, 

86 "failed_count": len(failed_deletions), 

87 "failed_ids": failed_deletions if failed_deletions else None, 

88 } 

89 

90 except stripe.error.StripeError as e: 

91 logger.error(f"Stripe error during bulk deletion: {str(e)}", exc_info=True) 

92 raise HTTPException( 

93 status_code=status.HTTP_503_SERVICE_UNAVAILABLE, 

94 detail="Payment service error. Please try again.", 

95 ) 

96 except Exception as e: 

97 logger.error(f"Unexpected error during bulk deletion: {str(e)}", exc_info=True) 

98 raise HTTPException( 

99 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 

100 detail="An error occurred during deletion.", 

101 ) 

102 

103 

104class AdvanceClockRequest(BaseModel): 

105 days: int = Field(gt=0, le=730, description="Number of days to advance (1-730)") 

106 test_clock_id: str | None = Field( 

107 None, description="Optional test clock ID. If not provided, uses user's test clock" 

108 ) 

109 

110 

111@payment_test_router.post("/advance-test-clock") 

112async def advance_test_clock( 

113 request: AdvanceClockRequest, 

114 current_user: User = Depends(get_current_user), 

115) -> dict: # pragma: no cover 

116 """Advance a Stripe test clock by X days for testing subscriptions. 

117 This allows you to simulate time passing to test trial expirations, 

118 billing cycles, and subscription renewals without waiting. 

119 :param request: Request with days to advance and optional test_clock_id 

120 :param current_user: Authenticated user from JWT token 

121 :return: dict with updated test clock information 

122 :raises HTTPException: If test clock doesn't exist or Stripe error occurs""" 

123 

124 try: 

125 # Get test clock ID (from request or retrieve user's test clock) 

126 test_clock_id = request.test_clock_id 

127 

128 if not test_clock_id: 

129 # If no clock ID provided, find the user's customer test clock 

130 if not current_user.stripe_details.customer_id: 

131 raise HTTPException( 

132 status_code=status.HTTP_404_NOT_FOUND, 

133 detail="No Stripe customer found. Create a subscription first.", 

134 ) 

135 

136 # Retrieve customer to get their test clock 

137 customer = await stripe.Customer.retrieve_async(current_user.stripe_details.customer_id) 

138 test_clock_id = customer.get("test_clock") 

139 

140 if not test_clock_id: 

141 raise HTTPException( 

142 status_code=status.HTTP_404_NOT_FOUND, 

143 detail="Customer is not associated with a test clock", 

144 ) 

145 

146 # Retrieve current test clock to get frozen_time 

147 test_clock = await stripe.test_helpers.TestClock.retrieve_async(test_clock_id) 

148 

149 if test_clock.status != "ready": 

150 raise HTTPException( 

151 status_code=409, 

152 detail=f"Test clock is currently {test_clock.status}. Wait until ready.", 

153 ) 

154 

155 # Calculate new frozen time (current + days) 

156 current_frozen_time = test_clock.frozen_time 

157 seconds_to_advance = request.days * 24 * 60 * 60 

158 new_frozen_time = current_frozen_time + seconds_to_advance 

159 

160 # Advance the test clock 

161 await stripe.test_helpers.TestClock.advance_async(test_clock_id, frozen_time=new_frozen_time) 

162 

163 logger.info(f"Advanced test clock {test_clock_id} by {request.days} days for user {current_user.id}") 

164 

165 # Wait for the test clock to be ready again (poll up to 30 seconds) 

166 max_wait_time = 30 

167 poll_interval = 0.5 

168 elapsed_time = 0 

169 

170 while elapsed_time < max_wait_time: 

171 await asyncio.sleep(poll_interval) 

172 elapsed_time += poll_interval 

173 

174 updated_clock = await stripe.test_helpers.TestClock.retrieve_async(test_clock_id) 

175 if updated_clock.status == "ready": 

176 break 

177 else: 

178 # Timeout waiting for clock to be ready 

179 raise HTTPException( 

180 status_code=status.HTTP_408_REQUEST_TIMEOUT, 

181 detail=f"Test clock did not become ready within {max_wait_time} seconds", 

182 ) 

183 

184 # Retrieve final status 

185 updated_clock = await stripe.test_helpers.TestClock.retrieve_async(test_clock_id) 

186 

187 return { 

188 "success": True, 

189 "test_clock_id": test_clock_id, 

190 "days_advanced": request.days, 

191 "previous_time": dt.datetime.fromtimestamp(current_frozen_time).isoformat(), 

192 "new_time": dt.datetime.fromtimestamp(new_frozen_time).isoformat(), 

193 "status": updated_clock.status, 

194 "message": f"Advanced test clock by {request.days} days", 

195 } 

196 

197 except stripe.error.InvalidRequestError as e: 

198 logger.error(f"Invalid test clock request: {str(e)}") 

199 raise HTTPException(status_code=404, detail="Test clock not found or invalid advancement") 

200 except stripe.error.StripeError as e: 

201 logger.error(f"Stripe error advancing test clock: {str(e)}", exc_info=True) 

202 raise HTTPException(status_code=503, detail="Payment service temporarily unavailable. Please try again.") 

203 except HTTPException: 

204 raise 

205 except Exception as e: 

206 logger.error(f"Unexpected error advancing test clock: {str(e)}", exc_info=True) 

207 raise HTTPException(status_code=500, detail="An error occurred. Please try again.")