How to Handle Webhooks in FastAPI
Basic Webhook Endpoint
FastAPI makes webhook handling elegant with type hints and automatic validation:
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModelapp = FastAPI()
class WebhookEvent(BaseModel): type: str data: dict
@app.post("/webhooks") async def handle_webhook(event: WebhookEvent): match event.type: case "payment.completed": await handle_payment(event.data) case "user.created": await handle_user_created(event.data) case _: print(f"Unhandled event: {event.type}")
return {"received": True} ```
FastAPI automatically parses and validates the JSON body against the Pydantic model. If the payload does not match the schema, FastAPI returns a 422 error. For webhook endpoints, you may want to use a more flexible schema since providers can change payload formats.
Raw Body and Signature Verification
For signature verification, access the raw body using Request directly:
import hmac
import hashlib
from fastapi import FastAPI, Request, HTTPExceptionapp = FastAPI()
@app.post("/webhooks/stripe") async def stripe_webhook(request: Request): payload = await request.body() signature = request.headers.get("stripe-signature", "") secret = os.environ["STRIPE_WEBHOOK_SECRET"]
# Parse Stripe signature elements = dict( item.split("=", 1) for item in signature.split(",") )
signed_payload = f"{elements['t']}.{payload.decode()}" expected = hmac.new( secret.encode(), signed_payload.encode(), hashlib.sha256 ).hexdigest()
if not hmac.compare_digest(expected, elements["v1"]): raise HTTPException(status_code=401, detail="Invalid signature")
event = json.loads(payload) await process_event(event)
return {"received": True} ```
Note the use of await request.body() to get the raw bytes. This is important because FastAPI would otherwise parse the body, potentially altering it. Use hmac.compare_digest() for timing-safe comparison.
Dependency Injection for Verification
FastAPI's dependency injection system is perfect for creating reusable verification logic:
from fastapi import Depends, Headerasync def verify_stripe_webhook( request: Request, stripe_signature: str = Header(alias="stripe-signature"), ) -> dict: payload = await request.body() secret = os.environ["STRIPE_WEBHOOK_SECRET"]
elements = dict( item.split("=", 1) for item in stripe_signature.split(",") ) signed_payload = f"{elements['t']}.{payload.decode()}" expected = hmac.new( secret.encode(), signed_payload.encode(), hashlib.sha256 ).hexdigest()
if not hmac.compare_digest(expected, elements["v1"]): raise HTTPException(status_code=401, detail="Invalid signature")
return json.loads(payload)
@app.post("/webhooks/stripe") async def stripe_webhook(event: dict = Depends(verify_stripe_webhook)): # event is already verified and parsed match event["type"]: case "payment_intent.succeeded": await handle_payment(event["data"]["object"]) case "checkout.session.completed": await handle_checkout(event["data"]["object"])
return {"received": True} ```
The dependency handles verification and parsing, keeping the route handler focused on business logic. If verification fails, FastAPI automatically returns the 401 error.
Background Tasks
FastAPI provides built-in background tasks for async processing:
from fastapi import BackgroundTasksasync def process_event_background(event_type: str, event_data: dict): match event_type: case "payment_intent.succeeded": await update_order_status(event_data["id"], "paid") case "invoice.paid": await extend_subscription(event_data["subscription"])
@app.post("/webhooks/stripe") async def stripe_webhook( background_tasks: BackgroundTasks, event: dict = Depends(verify_stripe_webhook), ): event_id = event["id"]
# Idempotency check if await is_event_processed(event_id): return {"received": True}
await mark_event_processed(event_id)
# Process in background background_tasks.add_task( process_event_background, event["type"], event["data"]["object"] )
return {"received": True} ```
For heavier workloads, use Celery or an async task queue like arq instead of FastAPI's built-in background tasks. The built-in tasks run in the same process and do not survive server restarts.
Testing with ReqPour
Run FastAPI's development server and the ReqPour relay:
uvicorn main:app --reload --port 8000
# In another terminal:
npx reqpour relay --to http://localhost:8000FastAPI's TestClient makes webhook testing straightforward:
from fastapi.testclient import TestClientclient = TestClient(app)
def test_payment_webhook(): payload = { "id": "evt_test_123", "type": "payment_intent.succeeded", "data": {"object": {"id": "pi_456", "amount": 2500}}, } response = client.post("/webhooks/stripe", json=payload) assert response.status_code == 200 assert response.json() == {"received": True} ```
Use payloads from the ReqPour dashboard as test fixtures. FastAPI's automatic OpenAPI documentation also generates an interactive docs page at /docs where you can test webhook endpoints manually during development.
Related
Get started with ReqPour
Catch, inspect, and relay webhooks to localhost. Free to start, $3/mo for Pro.