Skip to main content
The Node class is the foundation for all other node types. Each specialized node implements the process() method, which contains the step’s core logic. Use process() to define how the node handles data during execution. This design keeps implementations consistent and extensible.

Node Class

The base Node class provides the essential structure for all workflow processing steps:
class Node(ABC):
    def save_output(self, output: BaseModel):
        self.task_context.nodes[self.node_name] = output

    def get_output(self, node_class: Type["Node"]) -> Optional[OutputType]:
        return self.task_context.nodes.get(node_class.__name__, None)

    @property
    def node_name(self) -> str:
        return self.__class__.__name__

    @abstractmethod
    async def process(self, task_context: TaskContext) -> TaskContext:
        pass
Key features:
  • Abstract base class for a consistent interface
  • Node name property uses the class name
  • Abstract process method for subclasses
  • Async support for non‑blocking operations

Basic Implementation

Here is a simple example of a custom node:
class ValidateInputNode(Node):
    async def process(self, task_context: TaskContext) -> TaskContext:
        # Your custom processing logic here
        return task_context
Always return the task_context to maintain data flow through the pipeline.

Storing and Accessing Node Results

Why store results? Persist outputs so later nodes can use them.

Storing Node Results

Use save_output() to store results in the task context:
class ValidateInputNode(Node):
    class OutputType(BaseModel):
        is_valid: bool
        validation_score: float

    def validate_input(self, event: ExampleEvent):
        # Your validation logic here
        return self.OutputType(is_valid=True, validation_score=0.95)

    async def process(self, task_context: TaskContext) -> TaskContext:
        event: ExampleEvent = task_context.event

        result = self.validate_input(event)

        # Store the result using save_output
        self.save_output(result)

        return task_context

Accessing Node Results

Retrieve results from previous nodes using get_output():
class CalculateDifferenceNode(Node):
    async def process(self, task_context: TaskContext) -> TaskContext:
        # Access results from the ValidateInputNode
        validation_result: ValidateInputNode.OutputType = self.get_output(ValidateInputNode)

        # Use the validation result in your processing
        if validation_result and validation_result.is_valid:
            # Process valid data
            pass

        return task_context

Key benefits

  • Consistency - A predictable interface across nodes
  • Flexibility - Customize while maintaining structure
  • Composability - Combine and reorder nodes freely
  • Testability - Test nodes independently with mock contexts
I