Skip to main content
Now you implement the workflow steps. Each step is a node. This example demonstrates reusable patterns you can apply across client projects.

Workflow Steps

1

Analyze ticket contents

Run three concurrent analyses:
  • Determine ticket intent
  • Check for spam
  • Validate information sufficiency
2

Route based on analysis

Make intelligent routing decisions:
  • Close spam tickets
  • Escalate urgent issues
  • Process specific requests (invoices, refunds)
  • Generate responses for general queries
3

Take action

Execute the appropriate action based on routing decision
All nodes are located in the folder: app/workflows/customer_care_workflow_nodes.

Step 1: Concurrent Analysis

AnalyzeTicketNode

Since our three analysis operations are independent, we use ConcurrentNode to run them simultaneously:
class AnalyzeTicketNode(ConcurrentNode):
    async def process(self, task_context: TaskContext) -> TaskContext:
        await self.execute_nodes_concurrently(task_context)
        return task_context
Tip: Use concurrency when independent AI calls reduce total processing time.

DetermineTicketIntentNode

This AgentNode uses AI to classify the ticket intent:
class CustomerIntent(str, Enum):
    GENERAL_QUESTION = "general/question"
    PRODUCT_QUESTION = "product/question"
    BILLING_INVOICE = "billing/invoice"
    REFUND_REQUEST = "refund/request"

    @property
    def escalate(self) -> bool:
        return self in {self.REFUND_REQUEST}


class DetermineTicketIntentNode(AgentNode):
    class OutputType(AgentNode.OutputType):
        reasoning: str = Field(
            description="Explain your reasoning for the intent classification"
        )
        intent: CustomerIntent
        confidence: float = Field(
            ge=0, le=1, description="Confidence score for the intent"
        )
        escalate: bool = Field(
            description="Flag to indicate if the ticket needs escalation"
        )

    class DepsType(AgentNode.DepsType):
        from_email: str = Field(..., description="Email address of the sender")
        sender: str = Field(..., description="Name or identifier of the sender")
        subject: str = Field(..., description="Subject of the ticket")
        body: str = Field(..., description="The body of the ticket")

    def get_agent_config(self) -> AgentConfig:
        return AgentConfig(
            system_prompt=PromptManager().get_prompt("ticket_analysis"),
            output_type=self.OutputType,
            deps_type=self.DepsType,
            model_provider=ModelProvider.ANTHROPIC,
            model_name="claude-3-7-sonnet-latest",
        )

    async def process(self, task_context: TaskContext) -> TaskContext:
        event: CustomerCareEventSchema = task_context.event
        deps = self.DepsType(
            from_email=event.from_email,
            sender=event.sender,
            subject=event.subject,
            body=event.body,
        )

        @self.agent.system_prompt
        def add_ticket_context(
            ctx: RunContext[DetermineTicketIntentNode.DepsType],
        ) -> str:
            return deps.model_dump_json(indent=2)

        result = await self.agent.run(
            user_prompt=event.model_dump_json(indent=2),
        )
        self.save_output(result.output)
        return task_context

FilterSpamNode

Detects spam messages using AI:
class FilterSpamNode(AgentNode):
    class OutputType(AgentNode.OutputType):
        reasoning: str = Field(
            description="Explain your reasoning for spam detection"
        )
        confidence: float = Field(
            ge=0, le=1,
            description="Confidence score for the classification"
        )
        is_human: bool = Field(
            description="True if human-written, False if spam"
        )

    def get_agent_config(self) -> AgentConfig:
        return AgentConfig(
            system_prompt="You are a helpful assistant that filters messages...",
            output_type=self.OutputType,
            deps_type=None,
            model_provider=ModelProvider.ANTHROPIC,
            model_name="claude-3-7-sonnet-latest",
        )

    async def process(self, task_context: TaskContext) -> TaskContext:
        event: CustomerCareEventSchema = task_context.event
        result = await self.agent.run(
            user_prompt=event.model_dump_json(),
        )
        self.save_output(result.output)
        return task_context

ValidateTicketNode

Verifies if the ticket contains actionable information:
class ValidateTicketNode(AgentNode):
    class OutputType(AgentNode.OutputType):
        reasoning: str = Field(
            description="Reasoning for actionability determination"
        )
        confidence: float = Field(
            ge=0, le=1,
            description="Confidence score"
        )
        is_actionable: bool = Field(
            description="True if ticket is actionable"
        )

    def get_agent_config(self) -> AgentConfig:
        return AgentConfig(
            system_prompt="Review tickets for actionable information...",
            output_type=self.OutputType,
            deps_type=None,
            model_provider=ModelProvider.ANTHROPIC,
            model_name="claude-3-7-sonnet-latest",
        )

    async def process(self, task_context: TaskContext) -> TaskContext:
        event: CustomerCareEventSchema = task_context.event
        result = await self.agent.run(
            user_prompt=event.model_dump_json(),
        )
        self.save_output(result.output)
        return task_context

Step 2: Intelligent Routing

TicketRouterNode

The router examines analysis results and selects the next action:
class TicketRouterNode(BaseRouter):
    def __init__(self):
        self.routes = [
            CloseTicketRouter(),
            EscalationRouter(),
            InvoiceRouter(),
        ]
        self.fallback = GenerateResponseNode()
  • CloseTicketRouter: Closes spam tickets automatically when confidence > 0.8
  • EscalationRouter: Escalates urgent or sensitive issues to human agents
  • InvoiceRouter: Routes billing-related requests to invoice processing

Router Implementation

class CloseTicketRouter(RouterNode):
    def determine_next_node(self, task_context: TaskContext) -> Optional[Node]:
        filter_spam_node: FilterSpamNode.OutputType = self.get_output(FilterSpamNode)
        if not filter_spam_node.is_human and filter_spam_node.confidence > 0.8:
            return CloseTicketNode()
        return None

class EscalationRouter(RouterNode):
    def determine_next_node(self, task_context: TaskContext) -> Optional[Node]:
        intent_node: DetermineTicketIntentNode.OutputType = self.get_output(
            DetermineTicketIntentNode
        )
        if intent_node.intent.escalate or intent_node.escalate:
            return EscalateTicketNode()
        return None

class InvoiceRouter(RouterNode):
    def determine_next_node(self, task_context: TaskContext) -> Optional[Node]:
        intent_node: DetermineTicketIntentNode.OutputType = self.get_output(
            DetermineTicketIntentNode
        )
        if intent_node.intent == CustomerIntent.BILLING_INVOICE:
            return ProcessInvoiceNode()
        return None

Step 3: Action Nodes

GenerateResponseNode

Creates AI-powered responses for customer queries:
class GenerateResponseNode(AgentNode):
    class OutputType(AgentNode.OutputType):
        reasoning: str = Field(description="The reasoning for the response")
        response: str = Field(description="The response to the ticket")
        confidence: float = Field(
            ge=0, le=1, 
            description="Confidence score for response quality"
        )

    def get_agent_config(self) -> AgentConfig:
        return AgentConfig(
            system_prompt=PromptManager().get_prompt("customer_ticket_response"),
            output_type=self.OutputType,
            deps_type=None,
            model_provider=ModelProvider.AZURE_OPENAI,
            model_name="gpt-4o",
        )

    async def process(self, task_context: TaskContext) -> TaskContext:
        event: CustomerCareEventSchema = task_context.event
        result = await self.agent.run(
            user_prompt=event.model_dump_json(),
        )
        self.save_output(result.output)
        return task_context

SendReplyNode

Delivers the generated response:
class SendReplyNode(Node):
    async def process(self, task_context: TaskContext) -> TaskContext:
        logging.info("Sending reply:")
        output: GenerateResponseNode.OutputType = self.get_output(GenerateResponseNode)
        logging.info(output.response)
        return task_context

Complete Workflow Schema

class CustomerCareWorkflow(Workflow):
    workflow_schema = WorkflowSchema(
        description="Customer care ticket processing workflow",
        event_schema=CustomerCareEventSchema,
        start=AnalyzeTicketNode,
        nodes=[
            NodeConfig(
                node=AnalyzeTicketNode,
                connections=[TicketRouterNode],
                description="Analyze ticket in parallel",
                parallel_nodes=[
                    DetermineTicketIntentNode,
                    FilterSpamNode,
                    ValidateTicketNode,
                ],
            ),
            NodeConfig(
                node=TicketRouterNode,
                connections=[
                    CloseTicketNode,
                    EscalateTicketNode,
                    GenerateResponseNode,
                    ProcessInvoiceNode,
                ],
                description="Route based on analysis",
                is_router=True,
            ),
            NodeConfig(
                node=GenerateResponseNode,
                connections=[SendReplyNode],
                description="Generate AI response",
            ),
        ],
    )
I