Documentation Index
Fetch the complete documentation index at: https://launchpad.datalumina.com/llms.txt
Use this file to discover all available pages before exploring further.
The Node class is the foundation for all other node types. Each specialized node implements the process() method, which contains the step’s core logic.
Use process() to define how the node handles data during execution. This design keeps implementations consistent and extensible.
Node Class
The base Node class provides the essential structure for all workflow processing steps:
class Node(ABC):
def save_output(self, output: BaseModel):
self.task_context.nodes[self.node_name] = output
def get_output(self, node_class: Type["Node"]) -> Optional[OutputType]:
return self.task_context.nodes.get(node_class.__name__, None)
@property
def node_name(self) -> str:
return self.__class__.__name__
@abstractmethod
async def process(self, task_context: TaskContext) -> TaskContext:
pass
async def cleanup(self) -> None:
"""Release per-instance resources. Called even if process() raised."""
pass
Key features:
- Abstract base class for a consistent interface
- Node name property uses the class name
- Abstract
process method for subclasses
- Async support for non-blocking operations
- Optional
cleanup() hook for releasing clients/connections, called by the orchestrator after each node run (including when an exception propagates)
Basic Implementation
Here is a simple example of a custom node:
class ValidateInputNode(Node):
async def process(self, task_context: TaskContext) -> TaskContext:
# Your custom processing logic here
return task_context
Always return the task_context to maintain data flow through the pipeline.
Storing and Accessing Node Results
Why store results? Persist outputs so later nodes can use them.
Storing Node Results
Use save_output() to store results in the task context:
class ValidateInputNode(Node):
class OutputType(BaseModel):
is_valid: bool
validation_score: float
def validate_input(self, event: ExampleEvent):
# Your validation logic here
return self.OutputType(is_valid=True, validation_score=0.95)
async def process(self, task_context: TaskContext) -> TaskContext:
event: ExampleEvent = task_context.event
result = self.validate_input(event)
# Store the result using save_output
self.save_output(result)
return task_context
Accessing Node Results
Retrieve results from previous nodes using get_output():
class CalculateDifferenceNode(Node):
async def process(self, task_context: TaskContext) -> TaskContext:
# Access results from the ValidateInputNode
validation_result: ValidateInputNode.OutputType = self.get_output(ValidateInputNode)
# Use the validation result in your processing
if validation_result and validation_result.is_valid:
# Process valid data
pass
return task_context
Key benefits
- Consistency - A predictable interface across nodes
- Flexibility - Customize while maintaining structure
- Composability - Combine and reorder nodes freely
- Testability - Test nodes independently with mock contexts