Documentation Index
Fetch the complete documentation index at: https://launchpad.datalumina.com/llms.txt
Use this file to discover all available pages before exploring further.
The AgentStreamingNode class extends AgentNode to support real-time streaming of LLM responses. Instead of waiting for the complete response, tokens are yielded as they’re generated.
Key difference from AgentNode:
AgentNode.process() returns TaskContext
AgentStreamingNode.process() returns AsyncIterator[Dict[str, Any]]
AgentStreamingNode Class Structure
class AgentStreamingNode(AgentNode, ABC):
def __init__(self, task_context: TaskContext = None):
super().__init__(task_context=task_context)
@abstractmethod
async def process(self, task_context: TaskContext) -> AsyncIterator[Dict[str, Any]]:
pass
async def stream_text_deltas(
self,
stream_result,
debounce_by: float = 0.01,
) -> AsyncIterator[dict]:
...
async def stream_structured_deltas(
self,
stream_result,
debounce_by: float = 0.01,
) -> AsyncIterator[dict]:
...
def completion_chunk(self, content: str) -> dict:
...
Streaming Methods
stream_text_deltas
Streams plain text responses, extracting only the new tokens (deltas) from each chunk:
async def stream_text_deltas(
self,
stream_result,
debounce_by: float = 0.01,
) -> AsyncIterator[dict]:
previous_text = ""
async for text_chunk in stream_result.stream_text(debounce_by=debounce_by):
if text_chunk.startswith(previous_text):
delta_text = text_chunk[len(previous_text):]
else:
delta_text = text_chunk
if not delta_text:
continue
previous_text = text_chunk
yield self.completion_chunk(delta_text)
Parameters:
stream_result - The streaming result from agent.run_stream()
debounce_by - Delay in seconds between updates (default: 0.01)
stream_structured_deltas
Streams structured Pydantic model outputs:
async def stream_structured_deltas(
self,
stream_result,
debounce_by: float = 0.01,
) -> AsyncIterator[dict]:
async for chunk in stream_result.stream_output(debounce_by=debounce_by):
if chunk.model_dump():
yield self.completion_chunk(chunk.model_dump())
completion_chunk
Formats content into OpenAI-compatible completion chunks:
def completion_chunk(self, content: str) -> dict:
return {
"object": "chat.completion.chunk",
"model": "default",
"choices": [
{
"index": 0,
"delta": {"role": "assistant", "content": content},
"finish_reason": None,
}
],
}
Implementation Examples
Text Streaming Node
Stream plain text responses token by token:
from typing import AsyncIterator, Dict, Any
from launchpad.core.nodes.agent import AgentConfig, ModelProvider
from launchpad.core.nodes.agent_streaming_node import AgentStreamingNode
from launchpad.core.task import TaskContext
from launchpad.workflows.examples.streaming.schema import OpenAIChatSchema
class TextStreamingNode(AgentStreamingNode):
def get_agent_config(self) -> AgentConfig:
return AgentConfig(
model_provider=ModelProvider.OPENAI,
model_name="gpt-5.4-mini",
output_type=str,
)
async def process(self, task_context: TaskContext) -> AsyncIterator[Dict[str, Any]]:
event: OpenAIChatSchema = task_context.event
async with self.agent.run_stream(user_prompt=event.get_message()) as result:
async for chunk in self.stream_text_deltas(result):
yield chunk
Structured Streaming Node
Stream structured outputs with multiple fields:
class StructuredStreamingNode(AgentStreamingNode):
class OutputType(AgentStreamingNode.OutputType):
thinking: str
reply: str
def get_agent_config(self) -> AgentConfig:
return AgentConfig(
model_provider=ModelProvider.OPENAI,
model_name="gpt-5.4-mini",
output_type=self.OutputType,
)
async def process(self, task_context: TaskContext) -> AsyncIterator[Dict[str, Any]]:
event: OpenAIChatSchema = task_context.event
async with self.agent.run_stream(user_prompt=event.get_message()) as result:
async for chunk in self.stream_structured_deltas(result):
yield chunk
Using in Workflows
The Workflow class automatically detects AgentStreamingNode instances and yields their events directly:
# In workflow.run_stream_async()
if isinstance(node_instance, AgentStreamingNode):
async for stream_event in node_instance.process(task_context):
yield stream_event # Events flow directly to client
else:
task_context = await node_instance.process(task_context)
Use run_stream_async() instead of run() or run_async() when your workflow contains streaming nodes:
workflow = MyStreamingWorkflow(enable_tracing=True)
async for event in workflow.run_stream_async(event_data):
# Process each streaming event
print(event)
Key Features
- Delta extraction - Only transmits new tokens, not accumulated text
- Debouncing - Configurable delay to batch rapid updates
- OpenAI format - Chunks follow the OpenAI streaming specification
- Structured support - Stream complex Pydantic models, not just text
- Workflow integration - Automatic detection and handling by the workflow engine