Skip to main content

Documentation Index

Fetch the complete documentation index at: https://launchpad.datalumina.com/llms.txt

Use this file to discover all available pages before exploring further.

The workflow system is built around the concept of a directed acyclic graph (DAG) where nodes represent processing steps and edges represent the flow of data between them. The Workflow class in workflow.py serves as the orchestrator, managing the execution flow and passing the task context between nodes.

Workflow Class

The foundation of the workflow system is the abstract Workflow class:
class Workflow(ABC):
    """Abstract base class for defining processing workflows.

    The Workflow class provides a framework for creating processing workflows
    with multiple nodes and routing logic. Each workflow must define its structure
    using a WorkflowSchema.
    """
Key capabilities:
  • Abstract base class for concrete workflows
  • Schema validation during initialization
  • Node execution and routing management
  • Task context passing between nodes
  • Optional Langfuse tracing via enable_tracing=True (default False)

Execution Methods

A Workflow instance exposes three entry points:
MethodSignatureUse when
runrun(event=None, *, context=None)Calling from sync Python (Celery task, script). Wraps run_async in asyncio.run.
run_asyncawait run_async(event=None, *, context=None)Calling from an existing event loop (FastAPI route, parent node delegating to a child workflow).
run_stream_asyncasync for chunk in run_stream_async(event)Streaming nodes that yield SSE chunks (see the SSE Streaming example).
run and run_async both return the final TaskContext. Pass event= for a fresh run, or context= to continue with an existing TaskContext — this is how one workflow composes another without losing accumulated state.
# Enable Langfuse tracing for a single invocation
workflow = CustomerCareWorkflow(enable_tracing=True)

# Standard entry from a sync caller
context = workflow.run(event_data)

# Inside a FastAPI handler
context = await workflow.run_async(event_data)

# Compose a child workflow, carrying the parent's context forward
child = InvoiceWorkflow()
context = await child.run_async(context=context)
When enable_tracing=True and Langfuse credentials are missing or invalid, the workflow constructor raises LangfuseAuthenticationError.

Nested Workflows

Use nested workflows when one step in a larger workflow deserves its own workflow definition. A parent node can delegate to a child workflow and pass the current TaskContext forward:
class RunReplyWorkflowNode(Node):
    class OutputType(BaseModel):
        delegated: bool
        workflow: str

    async def process(self, task_context: TaskContext) -> TaskContext:
        await ReplyDraftWorkflow().run_async(context=task_context)
        self.save_output(
            self.OutputType(delegated=True, workflow="ReplyDraftWorkflow")
        )
        return task_context
When context= is provided, the child workflow reuses the existing TaskContext instead of parsing a fresh event. That means:
  • The child workflow can read the same task_context.event and previous node outputs.
  • Outputs saved by child nodes remain available to later parent nodes.
  • The workflow engine temporarily swaps in the child’s node registry while the child runs, then restores the parent’s registry afterward.
  • should_stop is reset when entering the child workflow so a child can run even if a parent step previously stopped another branch.
This pattern is useful for reusable sub-processes like drafting a reply, extracting invoice data, running a review workflow, or grouping a specialized sequence behind a single parent node. See the Nested Workflow example for a complete parent/child workflow implementation.

Schema Definitions

The schema.py module defines the structure and configuration of workflows using Pydantic models.

WorkflowSchema

class WorkflowSchema(BaseModel):
    """Schema definition for a complete workflow.

    WorkflowSchema defines the overall structure of a processing workflow,
    including its entry point and all constituent nodes.
    """
Key attributes:
  • description: Optional description of the workflow’s purpose
  • event_schema: Pydantic model for validating incoming events
  • start: The entry point Node class for the workflow
  • nodes: List of NodeConfig objects defining the workflow structure
Benefits:
  • Type Safety: Pydantic models ensure type validation
  • Documentation: Built-in schema documentation
  • Validation: Automatic validation of workflow structure
  • IDE Support: Full autocomplete and type checking

NodeConfig

class NodeConfig(BaseModel):
    """Configuration model for workflow nodes.

    NodeConfig defines the structure and behavior of a single node within
    a workflow, including its connections to other nodes and routing properties.
    """
NodeConfig attributes:
  • node: The Node class to be instantiated
  • connections: List of Node classes this node can connect to
  • is_router: Flag indicating if this node performs routing logic
  • description: Optional description of the node’s purpose
  • concurrent_nodes: Optional list of Node classes that can run concurrently

Workflow Validation

The validate.py module provides validation logic for workflow schemas, ensuring they form valid directed acyclic graphs (DAGs) and have proper routing configurations. Validation Features:
  1. DAG Validation - Validates that the workflow forms a proper DAG with no cycles
  2. Reachability Check - Ensures all nodes are reachable from the start node
  3. Router Validation - Validates that only router nodes have multiple connections

Workflow Example

Here’s a complete example of a workflow implementation:
from launchpad.core.schema import WorkflowSchema, NodeConfig
from launchpad.core.workflow import Workflow
from launchpad.workflows.examples.quickstart.schema import CustomerCareEventSchema
from launchpad.workflows.examples.quickstart.nodes.analyze_ticket_node import AnalyzeTicketNode
from launchpad.workflows.examples.quickstart.nodes.close_ticket_node import CloseTicketNode
from launchpad.workflows.examples.quickstart.nodes.determine_intent_ticket_node import (
    DetermineTicketIntentNode,
)
from launchpad.workflows.examples.quickstart.nodes.escalate_ticket_node import EscalateTicketNode
from launchpad.workflows.examples.quickstart.nodes.filter_spam import FilterSpamNode
from launchpad.workflows.examples.quickstart.nodes.generate_response_node import GenerateResponseNode
from launchpad.workflows.examples.quickstart.nodes.process_invoice_node import ProcessInvoiceNode
from launchpad.workflows.examples.quickstart.nodes.send_reply_node import SendReplyNode
from launchpad.workflows.examples.quickstart.nodes.ticket_router_node import TicketRouterNode
from launchpad.workflows.examples.quickstart.nodes.validate_ticket_node import ValidateTicketNode


class CustomerCareWorkflow(Workflow):
    workflow_schema = WorkflowSchema(
        description="Customer care ticket processing workflow",
        event_schema=CustomerCareEventSchema,
        start=AnalyzeTicketNode,
        nodes=[
            NodeConfig(
                node=AnalyzeTicketNode,
                connections=[TicketRouterNode],
                concurrent_nodes=[
                    DetermineTicketIntentNode,
                    FilterSpamNode,
                    ValidateTicketNode,
                ],
            ),
            NodeConfig(
                node=TicketRouterNode,
                connections=[
                    CloseTicketNode,
                    EscalateTicketNode,
                    GenerateResponseNode,
                    ProcessInvoiceNode,
                ],
                is_router=True,
            ),
            NodeConfig(
                node=GenerateResponseNode,
                connections=[SendReplyNode],
            ),
        ],
    )
This example demonstrates a typical workflow pattern: concurrent analysis, routing based on results, and a terminal action node for the selected path.