Skip to main content
This example demonstrates how to implement true Server-Sent Events (SSE) streaming using the OpenAI-compatible /v1/chat/completions endpoint. Unlike simulated streaming, this delivers tokens to the client as they are generated.
The streaming endpoint follows the OpenAI API specification, making it compatible with existing OpenAI client libraries and tools.

Why SSE Streaming?

  • Real-time delivery - Tokens stream to the client as they’re generated
  • Reduced perceived latency - Users see responses immediately, not after full generation
  • Native browser support - SSE works out of the box with EventSource API
  • OpenAI compatibility - Drop-in replacement for OpenAI streaming endpoints

How It Works

The streaming architecture connects your workflow directly to the HTTP response:

The Streaming Endpoint

The endpoint accepts OpenAI-compatible chat completion requests and returns an SSE stream:
from fastapi import APIRouter
from starlette.responses import StreamingResponse

from schemas.openai_schema import OpenAIChatSchema
from utils.event_stream_generator import event_stream_generator
from workflows.streaming_example_workflow import StreamingExampleWorkflow

router = APIRouter()

@router.post("/chat/completions")
async def handle_chat_completion_streaming(data: OpenAIChatSchema) -> StreamingResponse:
    workflow = StreamingExampleWorkflow(enable_tracing=True)
    workflow_stream = workflow.run_stream_async(data.model_dump())

    return StreamingResponse(
        event_stream_generator(workflow_stream),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        },
    )

Example Workflow

The StreamingExampleWorkflow demonstrates a two-node streaming pipeline:
from core.schema import WorkflowSchema, NodeConfig
from core.workflow import Workflow
from schemas.openai_schema import OpenAIChatSchema

class StreamingExampleWorkflow(Workflow):
    workflow_schema = WorkflowSchema(
        description="SSE streaming example with text and structured output",
        event_schema=OpenAIChatSchema,
        start=TextStreamingNode,
        nodes=[
            NodeConfig(
                node=TextStreamingNode,
                connections=[StructuredStreamingNode],
            ),
            NodeConfig(
                node=StructuredStreamingNode,
                connections=[],
            ),
        ],
    )

Streaming Node Examples

Text Streaming

Stream plain text responses token by token:
from core.nodes.agent import AgentConfig, ModelProvider
from core.nodes.agent_streaming_node import AgentStreamingNode

class TextStreamingNode(AgentStreamingNode):
    def get_agent_config(self) -> AgentConfig:
        return AgentConfig(
            model_provider=ModelProvider.MISTRAL,
            model_name="mistral-small-2506",
            output_type=str,
        )

    async def process(self, task_context: TaskContext) -> AsyncIterator[Dict[str, Any]]:
        event: OpenAIChatSchema = task_context.event
        async with self.agent.run_stream(user_prompt=event.get_message()) as result:
            async for chunk in self.stream_text_deltas(result):
                yield chunk

Structured Streaming

Stream structured Pydantic model outputs:
class StructuredStreamingNode(AgentStreamingNode):
    class OutputType(AgentStreamingNode.OutputType):
        thinking: str
        reply: str

    def get_agent_config(self) -> AgentConfig:
        return AgentConfig(
            model_provider=ModelProvider.MISTRAL,
            model_name="mistral-small-2506",
            output_type=self.OutputType,
        )

    async def process(self, task_context: TaskContext) -> AsyncIterator[Dict[str, Any]]:
        event: OpenAIChatSchema = task_context.event
        async with self.agent.run_stream(user_prompt=event.get_message()) as result:
            async for chunk in self.stream_structured_deltas(result):
                yield chunk

Testing the Endpoint

Use curl to test the streaming endpoint:
curl -X POST http://localhost:8080/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{
    "model": "default",
    "messages": [
      {"role": "user", "content": "Hello, how are you?"}
    ]
  }'
You’ll see SSE events streaming in real-time:
data: {"object": "chat.completion.chunk", "model": "default", "choices": [{"index": 0, "delta": {"role": "assistant", "content": "Hello"}, "finish_reason": null}]}

data: {"object": "chat.completion.chunk", "model": "default", "choices": [{"index": 0, "delta": {"role": "assistant", "content": "!"}, "finish_reason": null}]}

data: [DONE]

Key Features

  • Delta extraction - Only sends new tokens, not the full accumulated text
  • Debouncing - Configurable delay (default 10ms) to batch rapid updates
  • OpenAI chunk format - Compatible with standard OpenAI client libraries
  • Multi-node streaming - Chain multiple streaming nodes in a single workflow
  • Langfuse tracing - Full observability with enable_tracing=True