Skip to main content
The workflow system is built around the concept of a directed acyclic graph (DAG) where nodes represent processing steps and edges represent the flow of data between them. The Workflow class in workflow.py serves as the orchestrator, managing the execution flow and passing the task context between nodes.

Workflow Class

The foundation of the workflow system is the abstract Workflow class:
class Workflow(ABC):
    """Abstract base class for defining processing workflows.

    The Workflow class provides a framework for creating processing workflows
    with multiple nodes and routing logic. Each workflow must define its structure
    using a WorkflowSchema.
    """
Key capabilities:
  • Abstract base class for concrete workflows
  • Schema validation during initialization
  • Node execution and routing management
  • Task context passing between nodes

Schema Definitions

The schema.py module defines the structure and configuration of workflows using Pydantic models.

WorkflowSchema

class WorkflowSchema(BaseModel):
    """Schema definition for a complete workflow.

    WorkflowSchema defines the overall structure of a processing workflow,
    including its entry point and all constituent nodes.
    """
Key attributes:
  • description: Optional description of the workflow’s purpose
  • event_schema: Pydantic model for validating incoming events
  • start: The entry point Node class for the workflow
  • nodes: List of NodeConfig objects defining the workflow structure
Benefits:
  • Type Safety: Pydantic models ensure type validation
  • Documentation: Built-in schema documentation
  • Validation: Automatic validation of workflow structure
  • IDE Support: Full autocomplete and type checking

NodeConfig

class NodeConfig(BaseModel):
    """Configuration model for workflow nodes.

    NodeConfig defines the structure and behavior of a single node within
    a workflow, including its connections to other nodes and routing properties.
    """
NodeConfig attributes:
  • node: The Node class to be instantiated
  • connections: List of Node classes this node can connect to
  • is_router: Flag indicating if this node performs routing logic
  • description: Optional description of the node’s purpose
  • concurrent_nodes: Optional list of Node classes that can run concurrently

Workflow Validation

The validate.py module provides validation logic for workflow schemas, ensuring they form valid directed acyclic graphs (DAGs) and have proper routing configurations. Validation Features:
  1. DAG Validation - Validates that the workflow forms a proper DAG with no cycles
  2. Reachability Check - Ensures all nodes are reachable from the start node
  3. Router Validation - Validates that only router nodes have multiple connections

Workflow Example

Here’s a complete example of a workflow implementation:
from core.schema import WorkflowSchema, NodeConfig
from core.workflow import Workflow
from schemas.example_schema import ExampleEventSchema
from workflows.example_workflow_nodes.initial_node import InitialNode
from workflows.example_workflow_nodes.process_node import ProcessNode
from workflows.example_workflow_nodes.router_node import ExampleRouter
from workflows.example_workflow_nodes.success_node import SuccessNode
from workflows.example_workflow_nodes.error_node import ErrorNode

class ExampleWorkflow(Workflow):
    workflow_schema = WorkflowSchema(
        description="Example workflow for processing data",
        event_schema=ExampleEventSchema,
        start=InitialNode,
        nodes=[
            NodeConfig(
                node=InitialNode,
                connections=[ProcessNode],
                description="Initial processing of the event",
            ),
            NodeConfig(
                node=ProcessNode,
                connections=[ExampleRouter],
                description="Process the data",
            ),
            NodeConfig(
                node=ExampleRouter,
                connections=[SuccessNode, ErrorNode],
                is_router=True,
                description="Route based on processing results",
            ),
            NodeConfig(
                node=SuccessNode,
                connections=[],
                description="Handle successful processing",
            ),
            NodeConfig(
                node=ErrorNode,
                connections=[],
                description="Handle processing errors",
            ),
        ],
    )
This example demonstrates a typical workflow pattern: initial processing, data transformation, routing based on results, and terminal nodes for success and error handling.
I