This example demonstrates how to implement true Server-Sent Events (SSE) streaming using the OpenAI-compatible /v1/chat/completions endpoint. Unlike simulated streaming, this delivers tokens to the client as they are generated.
The streaming endpoint follows the OpenAI API specification, making it compatible with existing OpenAI client libraries and tools.
Why SSE Streaming?
- Real-time delivery - Tokens stream to the client as they’re generated
- Reduced perceived latency - Users see responses immediately, not after full generation
- Native browser support - SSE works out of the box with EventSource API
- OpenAI compatibility - Drop-in replacement for OpenAI streaming endpoints
How It Works
The streaming architecture connects your workflow directly to the HTTP response:
The Streaming Endpoint
The endpoint accepts OpenAI-compatible chat completion requests and returns an SSE stream:
from fastapi import APIRouter
from starlette.responses import StreamingResponse
from schemas.openai_schema import OpenAIChatSchema
from utils.event_stream_generator import event_stream_generator
from workflows.streaming_example_workflow import StreamingExampleWorkflow
router = APIRouter()
@router.post("/chat/completions")
async def handle_chat_completion_streaming(data: OpenAIChatSchema) -> StreamingResponse:
workflow = StreamingExampleWorkflow(enable_tracing=True)
workflow_stream = workflow.run_stream_async(data.model_dump())
return StreamingResponse(
event_stream_generator(workflow_stream),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
Example Workflow
The StreamingExampleWorkflow demonstrates a two-node streaming pipeline:
from core.schema import WorkflowSchema, NodeConfig
from core.workflow import Workflow
from schemas.openai_schema import OpenAIChatSchema
class StreamingExampleWorkflow(Workflow):
workflow_schema = WorkflowSchema(
description="SSE streaming example with text and structured output",
event_schema=OpenAIChatSchema,
start=TextStreamingNode,
nodes=[
NodeConfig(
node=TextStreamingNode,
connections=[StructuredStreamingNode],
),
NodeConfig(
node=StructuredStreamingNode,
connections=[],
),
],
)
Streaming Node Examples
Text Streaming
Stream plain text responses token by token:
from core.nodes.agent import AgentConfig, ModelProvider
from core.nodes.agent_streaming_node import AgentStreamingNode
class TextStreamingNode(AgentStreamingNode):
def get_agent_config(self) -> AgentConfig:
return AgentConfig(
model_provider=ModelProvider.MISTRAL,
model_name="mistral-small-2506",
output_type=str,
)
async def process(self, task_context: TaskContext) -> AsyncIterator[Dict[str, Any]]:
event: OpenAIChatSchema = task_context.event
async with self.agent.run_stream(user_prompt=event.get_message()) as result:
async for chunk in self.stream_text_deltas(result):
yield chunk
Structured Streaming
Stream structured Pydantic model outputs:
class StructuredStreamingNode(AgentStreamingNode):
class OutputType(AgentStreamingNode.OutputType):
thinking: str
reply: str
def get_agent_config(self) -> AgentConfig:
return AgentConfig(
model_provider=ModelProvider.MISTRAL,
model_name="mistral-small-2506",
output_type=self.OutputType,
)
async def process(self, task_context: TaskContext) -> AsyncIterator[Dict[str, Any]]:
event: OpenAIChatSchema = task_context.event
async with self.agent.run_stream(user_prompt=event.get_message()) as result:
async for chunk in self.stream_structured_deltas(result):
yield chunk
Testing the Endpoint
Use curl to test the streaming endpoint:
curl -X POST http://localhost:8080/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "default",
"messages": [
{"role": "user", "content": "Hello, how are you?"}
]
}'
You’ll see SSE events streaming in real-time:
data: {"object": "chat.completion.chunk", "model": "default", "choices": [{"index": 0, "delta": {"role": "assistant", "content": "Hello"}, "finish_reason": null}]}
data: {"object": "chat.completion.chunk", "model": "default", "choices": [{"index": 0, "delta": {"role": "assistant", "content": "!"}, "finish_reason": null}]}
data: [DONE]
Key Features
- Delta extraction - Only sends new tokens, not the full accumulated text
- Debouncing - Configurable delay (default 10ms) to batch rapid updates
- OpenAI chunk format - Compatible with standard OpenAI client libraries
- Multi-node streaming - Chain multiple streaming nodes in a single workflow
- Langfuse tracing - Full observability with
enable_tracing=True