Skip to main content
You now set up an endpoint to receive events. The endpoint validates input, persists the event, and enqueues processing.

Endpoint Implementation

The implementation below creates a simple POST endpoint without authentication (for demonstration purposes):
from http import HTTPStatus

import json
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from starlette.responses import Response

from celery_worker.config import celery_app
from database.event import Event
from database.repository import GenericRepository
from database.session import db_session
from schemas.customer_care_schema import CustomerCareEventSchema
from workflows.workflow_registry import WorkflowRegistry

router = APIRouter()


@router.post("/", dependencies=[])
def handle_event(
    data: CustomerCareEventSchema,
    session: Session = Depends(db_session),
) -> Response:
    # Store event in database
    repository = GenericRepository(
        session=session,
        model=Event,
    )
    raw_event = data.model_dump(mode="json")
    event = Event(data=raw_event, workflow_type=get_workflow_type())
    repository.create(obj=event)

    # Queue processing task
    task_id = celery_app.send_task(
        "process_incoming_event",
        args=[str(event.id)],
    )

    # Return acceptance response
    return Response(
        content=json.dumps({"message": f"process_incoming_event started `{task_id}` "}),
        status_code=HTTPStatus.ACCEPTED,
    )


def get_workflow_type() -> str:
    return WorkflowRegistry.CUSTOMER_CARE.name

Key Actions Performed

1

Data validation

FastAPI validates the request body against CustomerCareEventSchema. If validation fails, the API returns a 422 response with details.
2

Database storage

The event is stored with raw JSON, the workflow type from the registry, a generated ID, and timestamps.
3

Task queuing

A Celery task named process_incoming_event is queued with the event ID and returns a task ID for tracking.
4

Response

The endpoint returns HTTP 202 (Accepted) to indicate asynchronous processing.

Understanding the Components

  • GenericRepository: Provides database operations (create, read, update, delete) for any model
  • db_session: FastAPI dependency that provides a database session for the request
  • celery_app: The Celery application instance for queuing background tasks
  • WorkflowRegistry: Enum containing all registered workflows in the system

Response Status Code

The endpoint returns HTTP 202 (Accepted) rather than 200 (OK) because the workflow processing happens asynchronously, the client is informed that the request is queued, and the actual processing will happen in the background.

Security Considerations

Security Note: This example endpoint has no authentication for simplicity. In production, you should add authentication middleware, implement rate limiting, validate API keys or JWT tokens, and use HTTPS for secure communication.
I