Skip to main content
This node replaces ParallelNode (since v3.0.0). It executes multiple child nodes concurrently when their operations do not depend on each other. Use cases:
  • Independent Validations - Running validation steps concurrently without dependencies
  • Parallel Transformations - Applying independent transformations or checks simultaneously
  • Performance Optimization - Reducing overall task completion time by leveraging parallelism
  • Guardrails Processing - Running multiple guardrails or safety checks simultaneously
After implementing the child nodes (which can be regular Node or AgentNode instances), add them to the NodeConfig in the WorkflowSchema using the concurrent_nodes parameter.

ConcurrentNode Class

class ConcurrentNode(Node, ABC):
    """
    Base class for nodes that execute other nodes concurrently using asyncio.

    This class provides a method to execute a list of nodes concurrently on a single thread,
    using asyncio.gather. This ensures that I/O-bound operations can proceed in parallel
    without blocking the main thread or event loop.

    Subclasses must implement the `process` method to define the specific logic of the concurrent node.
    """

    async def execute_nodes_concurrently(self, task_context: TaskContext):
        node_config: NodeConfig = task_context.metadata["nodes"][self.__class__]
        coroutines = [
            node().process(task_context) for node in node_config.parallel_nodes
        ]
        return await asyncio.gather(*coroutines)

    @abstractmethod
    async def process(self, task_context: TaskContext) -> TaskContext:
        pass

Implementation Example

class AnalyzeTicketNode(ConcurrentNode):
    async def process(self, task_context: TaskContext) -> TaskContext:
        await self.execute_nodes_concurrently(task_context)
        return task_context

WorkflowSchema Configuration

class CustomerCareWorkflow(Workflow):
    workflow_schema = WorkflowSchema(
        description="Customer care workflow with concurrent analysis",
        event_schema=CustomerCareEventSchema,
        start=AnalyzeTicketNode,
        nodes=[
            NodeConfig(
                node=AnalyzeTicketNode,
                connections=[TicketRouterNode],
                description="Concurrent analysis of customer ticket",
                concurrent_nodes=[
                    DetermineTicketIntentNode,
                    FilterSpamNode,
                    ValidateTicketNode,
                ],
            ),
        ],
    )

How It Works

  1. Node Configuration - Configure the concurrent nodes in your WorkflowSchema by specifying them in the concurrent_nodes list
  2. Concurrent Execution - When execute_nodes_concurrently() is called, it creates coroutines for each child node and runs them simultaneously using asyncio.gather()
  3. Result Collection - All child nodes process the same task_context and can store their results independently using task_context.update_node()
  4. Workflow Continuation - After all concurrent nodes complete, the workflow continues to the next node in the pipeline

Performance Considerations

When to use: I/O-bound operations; independent steps; reduce total time; multiple validations or analyses. When not to use: dependent outputs; CPU-bound tasks; overhead outweighs benefits; strict sequential processing needed.
I