AgentStreamingNode class extends AgentNode to support real-time streaming of LLM responses. Instead of waiting for the complete response, tokens are yielded as they’re generated.
Key difference from AgentNode:
AgentNode.process()returnsTaskContextAgentStreamingNode.process()returnsAsyncIterator[Dict[str, Any]]
AgentStreamingNode Class Structure
Streaming Methods
stream_text_deltas
Streams plain text responses, extracting only the new tokens (deltas) from each chunk:stream_result- The streaming result fromagent.run_stream()debounce_by- Delay in seconds between updates (default: 0.01)
stream_structured_deltas
Streams structured Pydantic model outputs:completion_chunk
Formats content into OpenAI-compatible completion chunks:Implementation Examples
Text Streaming Node
Stream plain text responses token by token:Structured Streaming Node
Stream structured outputs with multiple fields:Using in Workflows
TheWorkflow class automatically detects AgentStreamingNode instances and yields their events directly:
run_stream_async() instead of run() or run_async() when your workflow contains streaming nodes:
Key Features
- Delta extraction - Only transmits new tokens, not accumulated text
- Debouncing - Configurable delay to batch rapid updates
- OpenAI format - Chunks follow the OpenAI streaming specification
- Structured support - Stream complex Pydantic models, not just text
- Workflow integration - Automatic detection and handling by the workflow engine