Skip to main content
This node enables dynamic routing by selecting the next node based on routing rules. Each rule is a RouterNode. If no rule matches, the router uses a fallback node. Router components:
  • BaseRouter - Orchestrates routing decisions and manages the routing process
  • RouterNode - Individual routing rules implementing conditional logic

Router Classes

BaseRouter

class BaseRouter(Node):
    def process(self, task_context: TaskContext) -> TaskContext:
        next_node = self.route(task_context)
        task_context.nodes[self.node_name] = {"next_node": next_node.node_name}
        return task_context

    def route(self, task_context: TaskContext) -> Node:
        for route_node in self.routes:
            next_node = route_node.determine_next_node(task_context)
            if next_node:
                return next_node
        return self.fallback if self.fallback else None

RouterNode

class RouterNode(ABC):
    @abstractmethod
    def determine_next_node(self, task_context: TaskContext) -> Optional[Node]:
        pass

    @property
    def node_name(self):
        return self.__class__.__name__

How routing works

  1. Process Method Called - The BaseRouter’s process() method runs during workflow execution
  2. Route Evaluation - The router iterates through each RouterNode, calling determine_next_node()
  3. First Match Wins - The first router rule that returns a node determines the next step
  4. Fallback Handling - If no rules match, the router uses the fallback node
  5. Result Storage - The selected next node is stored in the task context

Implementation example

Main Router

class TicketRouterNode(BaseRouter):
    def __init__(self):
        self.routes = [
            CloseTicketRouter(),
            EscalationRouter(),
            InvoiceRouter(),
        ]
        self.fallback = GenerateResponseNode()

Router rule examples

class CloseTicketRouter(RouterNode):
    def determine_next_node(self, task_context: TaskContext) -> Optional[Node]:
        output: FilterSpamNode.OutputType = task_context.nodes["FilterSpamNode"][
            "result"
        ].output
        if not output.is_human and output.confidence > 0.8:
            return CloseTicketNode()
        return None

class EscalationRouter(RouterNode):
    def determine_next_node(self, task_context: TaskContext) -> Optional[Node]:
        analysis = task_context.nodes["DetermineTicketIntentNode"]["result"].output
        if analysis.intent.escalate or analysis.escalate:
            return EscalateTicketNode()
        return None

Best practices

  • Order Matters - Arrange router nodes in order of priority; first match wins
  • Clear Conditions - Keep routing conditions explicit and easy to reason about
  • Fallback Strategy - Provide a meaningful fallback node
  • Test Coverage - Test all routing paths

Configuration in WorkflowSchema

NodeConfig(
    node=TicketRouterNode,
    connections=[CloseTicketNode, EscalateTicketNode, ProcessInvoiceNode, GenerateResponseNode],
    is_router=True,
    description="Route tickets based on analysis results",
)
I