Skip to main content
The AgentStreamingNode class extends AgentNode to support real-time streaming of LLM responses. Instead of waiting for the complete response, tokens are yielded as they’re generated. Key difference from AgentNode:
  • AgentNode.process() returns TaskContext
  • AgentStreamingNode.process() returns AsyncIterator[Dict[str, Any]]

AgentStreamingNode Class Structure

class AgentStreamingNode(AgentNode, ABC):
    def __init__(self, task_context: TaskContext = None):
        super().__init__(task_context=task_context)

    @abstractmethod
    async def process(self, task_context: TaskContext) -> AsyncIterator[Dict[str, Any]]:
        pass

    async def stream_text_deltas(
        self,
        stream_result,
        debounce_by: float = 0.01,
    ) -> AsyncIterator[dict]:
        ...

    async def stream_structured_deltas(
        self,
        stream_result,
        debounce_by: float = 0.01,
    ) -> AsyncIterator[dict]:
        ...

    def completion_chunk(self, content: str) -> dict:
        ...

Streaming Methods

stream_text_deltas

Streams plain text responses, extracting only the new tokens (deltas) from each chunk:
async def stream_text_deltas(
    self,
    stream_result,
    debounce_by: float = 0.01,
) -> AsyncIterator[dict]:
    previous_text = ""
    async for text_chunk in stream_result.stream_text(debounce_by=debounce_by):
        if text_chunk.startswith(previous_text):
            delta_text = text_chunk[len(previous_text):]
        else:
            delta_text = text_chunk
        if not delta_text:
            continue
        previous_text = text_chunk
        yield self.completion_chunk(delta_text)
Parameters:
  • stream_result - The streaming result from agent.run_stream()
  • debounce_by - Delay in seconds between updates (default: 0.01)

stream_structured_deltas

Streams structured Pydantic model outputs:
async def stream_structured_deltas(
    self,
    stream_result,
    debounce_by: float = 0.01,
) -> AsyncIterator[dict]:
    async for chunk in stream_result.stream(debounce_by=debounce_by):
        if chunk.model_dump():
            yield self.completion_chunk(chunk.model_dump())

completion_chunk

Formats content into OpenAI-compatible completion chunks:
def completion_chunk(self, content: str) -> dict:
    return {
        "object": "chat.completion.chunk",
        "model": "default",
        "choices": [
            {
                "index": 0,
                "delta": {"role": "assistant", "content": content},
                "finish_reason": None,
            }
        ],
    }

Implementation Examples

Text Streaming Node

Stream plain text responses token by token:
from typing import AsyncIterator, Dict, Any
from core.nodes.agent import AgentConfig, ModelProvider
from core.nodes.agent_streaming_node import AgentStreamingNode
from core.task import TaskContext
from schemas.openai_schema import OpenAIChatSchema

class TextStreamingNode(AgentStreamingNode):
    def get_agent_config(self) -> AgentConfig:
        return AgentConfig(
            model_provider=ModelProvider.MISTRAL,
            model_name="mistral-small-2506",
            output_type=str,
        )

    async def process(self, task_context: TaskContext) -> AsyncIterator[Dict[str, Any]]:
        event: OpenAIChatSchema = task_context.event
        async with self.agent.run_stream(user_prompt=event.get_message()) as result:
            async for chunk in self.stream_text_deltas(result):
                yield chunk

Structured Streaming Node

Stream structured outputs with multiple fields:
class StructuredStreamingNode(AgentStreamingNode):
    class OutputType(AgentStreamingNode.OutputType):
        thinking: str
        reply: str

    def get_agent_config(self) -> AgentConfig:
        return AgentConfig(
            model_provider=ModelProvider.MISTRAL,
            model_name="mistral-small-2506",
            output_type=self.OutputType,
        )

    async def process(self, task_context: TaskContext) -> AsyncIterator[Dict[str, Any]]:
        event: OpenAIChatSchema = task_context.event
        async with self.agent.run_stream(user_prompt=event.get_message()) as result:
            async for chunk in self.stream_structured_deltas(result):
                yield chunk

Using in Workflows

The Workflow class automatically detects AgentStreamingNode instances and yields their events directly:
# In workflow.run_stream_async()
if isinstance(node_instance, AgentStreamingNode):
    async for stream_event in node_instance.process(task_context):
        yield stream_event  # Events flow directly to client
else:
    task_context = await node_instance.process(task_context)
Use run_stream_async() instead of run() or run_async() when your workflow contains streaming nodes:
workflow = MyStreamingWorkflow(enable_tracing=True)
async for event in workflow.run_stream_async(event_data):
    # Process each streaming event
    print(event)

Key Features

  • Delta extraction - Only transmits new tokens, not accumulated text
  • Debouncing - Configurable delay to batch rapid updates
  • OpenAI format - Chunks follow the OpenAI streaming specification
  • Structured support - Stream complex Pydantic models, not just text
  • Workflow integration - Automatic detection and handling by the workflow engine