Documentation Index
Fetch the complete documentation index at: https://launchpad.datalumina.com/llms.txt
Use this file to discover all available pages before exploring further.
The workflow system is built around the concept of a directed acyclic graph (DAG) where nodes represent processing steps and edges represent the flow of data between them. The Workflow class in workflow.py serves as the orchestrator, managing the execution flow and passing the task context between nodes.
Workflow Class
The foundation of the workflow system is the abstract Workflow class:
class Workflow(ABC):
"""Abstract base class for defining processing workflows.
The Workflow class provides a framework for creating processing workflows
with multiple nodes and routing logic. Each workflow must define its structure
using a WorkflowSchema.
"""
Key capabilities:
- Abstract base class for concrete workflows
- Schema validation during initialization
- Node execution and routing management
- Task context passing between nodes
- Optional Langfuse tracing via
enable_tracing=True (default False)
Execution Methods
A Workflow instance exposes three entry points:
| Method | Signature | Use when |
|---|
run | run(event=None, *, context=None) | Calling from sync Python (Celery task, script). Wraps run_async in asyncio.run. |
run_async | await run_async(event=None, *, context=None) | Calling from an existing event loop (FastAPI route, parent node delegating to a child workflow). |
run_stream_async | async for chunk in run_stream_async(event) | Streaming nodes that yield SSE chunks (see the SSE Streaming example). |
run and run_async both return the final TaskContext. Pass event= for a fresh run, or context= to continue with an existing TaskContext — this is how one workflow composes another without losing accumulated state.
# Enable Langfuse tracing for a single invocation
workflow = CustomerCareWorkflow(enable_tracing=True)
# Standard entry from a sync caller
context = workflow.run(event_data)
# Inside a FastAPI handler
context = await workflow.run_async(event_data)
# Compose a child workflow, carrying the parent's context forward
child = InvoiceWorkflow()
context = await child.run_async(context=context)
When enable_tracing=True and Langfuse credentials are missing or invalid, the workflow constructor raises LangfuseAuthenticationError.
Nested Workflows
Use nested workflows when one step in a larger workflow deserves its own workflow definition. A parent node can delegate to a child workflow and pass the current TaskContext forward:
class RunReplyWorkflowNode(Node):
class OutputType(BaseModel):
delegated: bool
workflow: str
async def process(self, task_context: TaskContext) -> TaskContext:
await ReplyDraftWorkflow().run_async(context=task_context)
self.save_output(
self.OutputType(delegated=True, workflow="ReplyDraftWorkflow")
)
return task_context
When context= is provided, the child workflow reuses the existing TaskContext instead of parsing a fresh event. That means:
- The child workflow can read the same
task_context.event and previous node outputs.
- Outputs saved by child nodes remain available to later parent nodes.
- The workflow engine temporarily swaps in the child’s node registry while the child runs, then restores the parent’s registry afterward.
should_stop is reset when entering the child workflow so a child can run even if a parent step previously stopped another branch.
This pattern is useful for reusable sub-processes like drafting a reply, extracting invoice data, running a review workflow, or grouping a specialized sequence behind a single parent node.
See the Nested Workflow example for a complete parent/child workflow implementation.
Schema Definitions
The schema.py module defines the structure and configuration of workflows using Pydantic models.
WorkflowSchema
class WorkflowSchema(BaseModel):
"""Schema definition for a complete workflow.
WorkflowSchema defines the overall structure of a processing workflow,
including its entry point and all constituent nodes.
"""
Key attributes:
description: Optional description of the workflow’s purpose
event_schema: Pydantic model for validating incoming events
start: The entry point Node class for the workflow
nodes: List of NodeConfig objects defining the workflow structure
Benefits:
- Type Safety: Pydantic models ensure type validation
- Documentation: Built-in schema documentation
- Validation: Automatic validation of workflow structure
- IDE Support: Full autocomplete and type checking
NodeConfig
class NodeConfig(BaseModel):
"""Configuration model for workflow nodes.
NodeConfig defines the structure and behavior of a single node within
a workflow, including its connections to other nodes and routing properties.
"""
NodeConfig attributes:
node: The Node class to be instantiated
connections: List of Node classes this node can connect to
is_router: Flag indicating if this node performs routing logic
description: Optional description of the node’s purpose
concurrent_nodes: Optional list of Node classes that can run concurrently
Workflow Validation
The validate.py module provides validation logic for workflow schemas, ensuring they form valid directed acyclic graphs (DAGs) and have proper routing configurations.
Validation Features:
- DAG Validation - Validates that the workflow forms a proper DAG with no cycles
- Reachability Check - Ensures all nodes are reachable from the start node
- Router Validation - Validates that only router nodes have multiple connections
Workflow Example
Here’s a complete example of a workflow implementation:
from launchpad.core.schema import WorkflowSchema, NodeConfig
from launchpad.core.workflow import Workflow
from launchpad.workflows.examples.quickstart.schema import CustomerCareEventSchema
from launchpad.workflows.examples.quickstart.nodes.analyze_ticket_node import AnalyzeTicketNode
from launchpad.workflows.examples.quickstart.nodes.close_ticket_node import CloseTicketNode
from launchpad.workflows.examples.quickstart.nodes.determine_intent_ticket_node import (
DetermineTicketIntentNode,
)
from launchpad.workflows.examples.quickstart.nodes.escalate_ticket_node import EscalateTicketNode
from launchpad.workflows.examples.quickstart.nodes.filter_spam import FilterSpamNode
from launchpad.workflows.examples.quickstart.nodes.generate_response_node import GenerateResponseNode
from launchpad.workflows.examples.quickstart.nodes.process_invoice_node import ProcessInvoiceNode
from launchpad.workflows.examples.quickstart.nodes.send_reply_node import SendReplyNode
from launchpad.workflows.examples.quickstart.nodes.ticket_router_node import TicketRouterNode
from launchpad.workflows.examples.quickstart.nodes.validate_ticket_node import ValidateTicketNode
class CustomerCareWorkflow(Workflow):
workflow_schema = WorkflowSchema(
description="Customer care ticket processing workflow",
event_schema=CustomerCareEventSchema,
start=AnalyzeTicketNode,
nodes=[
NodeConfig(
node=AnalyzeTicketNode,
connections=[TicketRouterNode],
concurrent_nodes=[
DetermineTicketIntentNode,
FilterSpamNode,
ValidateTicketNode,
],
),
NodeConfig(
node=TicketRouterNode,
connections=[
CloseTicketNode,
EscalateTicketNode,
GenerateResponseNode,
ProcessInvoiceNode,
],
is_router=True,
),
NodeConfig(
node=GenerateResponseNode,
connections=[SendReplyNode],
),
],
)
This example demonstrates a typical workflow pattern: concurrent analysis, routing based on results, and a terminal action node for the selected path.