Documentation Index
Fetch the complete documentation index at: https://launchpad.datalumina.com/llms.txt
Use this file to discover all available pages before exploring further.
This example demonstrates how to implement true Server-Sent Events (SSE) streaming using the OpenAI-compatible /v1/chat/completions endpoint. Unlike simulated streaming, this delivers tokens to the client as they are generated.
The streaming endpoint follows the OpenAI API specification, making it compatible with existing OpenAI client libraries and tools.
Why SSE Streaming?
- Real-time delivery - Tokens stream to the client as they’re generated
- Reduced perceived latency - Users see responses immediately, not after full generation
- Native browser support - SSE works out of the box with EventSource API
- OpenAI compatibility - Drop-in replacement for OpenAI streaming endpoints
How It Works
The streaming architecture connects your workflow directly to the HTTP response:
The Streaming Endpoint
The endpoint accepts OpenAI-compatible chat completion requests and returns an SSE stream:
from fastapi import APIRouter
from starlette.responses import StreamingResponse
from launchpad.workflows.examples.streaming.schema import OpenAIChatSchema
from launchpad.utils.event_stream_generator import event_stream_generator
from launchpad.workflows.examples.streaming.workflow import ExampleStreamingWorkflow
router = APIRouter()
@router.post("/chat/completions", dependencies=[])
async def handle_chat_completion_streaming(data: OpenAIChatSchema) -> StreamingResponse:
workflow = ExampleStreamingWorkflow(enable_tracing=True)
workflow_stream = workflow.run_stream_async(data.model_dump())
return StreamingResponse(
event_stream_generator(workflow_stream),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
The endpoint is mounted under the /v1 prefix by app/launchpad/api/router.py, so the full path is POST /v1/chat/completions.
Example Workflow
The ExampleStreamingWorkflow (registered as WorkflowRegistry.STREAMING) demonstrates a two-node streaming pipeline:
from launchpad.core.schema import WorkflowSchema, NodeConfig
from launchpad.core.workflow import Workflow
from launchpad.workflows.examples.streaming.schema import OpenAIChatSchema
from launchpad.workflows.examples.streaming.nodes.text_streaming_node import TextStreamingNode
from launchpad.workflows.examples.streaming.nodes.structured_streaming_node import StructuredStreamingNode
class ExampleStreamingWorkflow(Workflow):
workflow_schema = WorkflowSchema(
description="SSE streaming example with text and structured output",
event_schema=OpenAIChatSchema,
start=TextStreamingNode,
nodes=[
NodeConfig(
node=TextStreamingNode,
connections=[StructuredStreamingNode],
),
NodeConfig(
node=StructuredStreamingNode,
connections=[],
),
],
)
Streaming Node Examples
Text Streaming
Stream plain text responses token by token:
from typing import AsyncIterator, Dict, Any
from launchpad.core.nodes.agent import AgentConfig, ModelProvider
from launchpad.core.nodes.agent_streaming_node import AgentStreamingNode
from launchpad.core.task import TaskContext
from launchpad.workflows.examples.streaming.schema import OpenAIChatSchema
class TextStreamingNode(AgentStreamingNode):
def get_agent_config(self) -> AgentConfig:
return AgentConfig(
model_provider=ModelProvider.OPENAI,
model_name="gpt-5.4-mini",
output_type=str,
)
async def process(self, task_context: TaskContext) -> AsyncIterator[Dict[str, Any]]:
event: OpenAIChatSchema = task_context.event
async with self.agent.run_stream(user_prompt=event.get_message()) as result:
async for chunk in self.stream_text_deltas(result):
yield chunk
Structured Streaming
Stream structured Pydantic model outputs:
class StructuredStreamingNode(AgentStreamingNode):
class OutputType(AgentStreamingNode.OutputType):
thinking: str
reply: str
def get_agent_config(self) -> AgentConfig:
return AgentConfig(
model_provider=ModelProvider.OPENAI,
model_name="gpt-5.4-mini",
output_type=self.OutputType,
)
async def process(self, task_context: TaskContext) -> AsyncIterator[Dict[str, Any]]:
event: OpenAIChatSchema = task_context.event
async with self.agent.run_stream(user_prompt=event.get_message()) as result:
async for chunk in self.stream_structured_deltas(result):
yield chunk
Testing the Endpoint
Use curl to test the streaming endpoint:
curl -X POST http://localhost:8080/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "default",
"messages": [
{"role": "user", "content": "Hello, how are you?"}
]
}'
You’ll see SSE events streaming in real-time:
data: {"object": "chat.completion.chunk", "model": "default", "choices": [{"index": 0, "delta": {"role": "assistant", "content": "Hello"}, "finish_reason": null}]}
data: {"object": "chat.completion.chunk", "model": "default", "choices": [{"index": 0, "delta": {"role": "assistant", "content": "!"}, "finish_reason": null}]}
data: [DONE]
Key Features
- Delta extraction - Only sends new tokens, not the full accumulated text
- Debouncing - Configurable delay (default 10ms) to batch rapid updates
- OpenAI chunk format - Compatible with standard OpenAI client libraries
- Multi-node streaming - Chain multiple streaming nodes in a single workflow
- Langfuse tracing - Full observability with
enable_tracing=True