Skip to main content
The AgentNode class is for tasks that use large language models. It wraps PydanticAI for streamlined configuration and execution. To implement a custom AgentNode, define two methods: Required methods:
  • get_agent_config() - Returns an AgentConfig instance describing prompt, output, model provider, and model name
  • process() - Processes a task using the configured agent
The class also supports Pydantic-based DepsType and OutputType.

AgentNode Class Structure

AgentConfig

@dataclass
class AgentConfig:
    model_provider: ModelProvider
    model_name: Union[
        OpenAIModelName, AnthropicModelName, GeminiModelName, BedrockModelName
    ]
    output_type: Any = str
    instructions: Optional[str] = None
    system_prompt: str | Sequence[str] = ()
    deps_type: Optional[Type[Any]] = None
    name: str | None = None
    model_settings: ModelSettings | None = None
    retries: int = 1
    output_retries: int | None = None
    tools: Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]] = ()
    mcp_servers: Sequence[MCPServer] = ()
    instrument: InstrumentationSettings | bool | None = None

AgentNode Base Class

class AgentNode(Node, ABC):
    class DepsType(BaseModel):
        pass

    class OutputType(BaseModel):
        pass

    def __init__(self):
        self.__async_client = AsyncClient()
        agent_wrapper = self.get_agent_config()
        self.agent = Agent(
            model=self.__get_model_instance(
                agent_wrapper.model_provider, agent_wrapper.model_name
            ),
            output_type=agent_wrapper.output_type,
            instructions=agent_wrapper.instructions,
            system_prompt=agent_wrapper.system_prompt,
            deps_type=agent_wrapper.deps_type,
            name=agent_wrapper.name,
            model_settings=agent_wrapper.model_settings,
            retries=agent_wrapper.retries,
            output_retries=agent_wrapper.output_retries,
            tools=agent_wrapper.tools,
            mcp_servers=agent_wrapper.mcp_servers,
            instrument=agent_wrapper.instrument,
        )

    @abstractmethod
    def get_agent_config(self) -> AgentConfig:
        pass

    @abstractmethod
    async def process(self, task_context: TaskContext) -> TaskContext:
        pass

Using PydanticAI Agent

Since Launchpad v3.0.0, AgentNode operates in an asynchronous context. Use the run() method on the Agent to execute completions. The synchronous run_sync() method is no longer suitable here.

Running a completion

# Correct - Use async run method
result = await self.agent.run(user_prompt="Your prompt here")

# Incorrect - Don't use sync method in async context
result = self.agent.run_sync(user_prompt="Your prompt here")
For more details on the Agent, see the PydanticAI docs.

Implementation examples

Example 1: Without Dependencies

class FilterSpamNode(AgentNode):
    class OutputType(AgentNode.OutputType):
        reasoning: str = Field(
            description="Explain your reasoning for determining whether the message is written by a human or is spam generated by a bot."
        )
        confidence: float = Field(
            ge=0,
            le=1,
            description="Confidence score for the human vs spam classification.",
        )
        is_human: bool = Field(
            description="Set to True if the message appears to be written by a genuine human; False if it's most likely spam from a bot."
        )

    def get_agent_config(self) -> AgentConfig:
        return AgentConfig(
            system_prompt="You are a helpful assistant that filters messages to determine whether they are written by a human or are spam generated by a bot.",
            output_type=self.OutputType,
            deps_type=None,
            model_provider=ModelProvider.OPENAI,
            model_name="gpt-4o",
        )

    async def process(self, task_context: TaskContext) -> TaskContext:
        event: CustomerCareEventSchema = task_context.event
        result = await self.agent.run(
            user_prompt=event.model_dump_json(),
        )
        task_context.update_node(node_name=self.node_name, result=result)
        return task_context

Example 2: With Dependencies

class GenerationNode(AgentNode):
    class DepsType(AgentNode.DepsType):
        context: RetrievalResults

    class OutputType(AgentNode.OutputType):
        answer: str = Field(description="The answer to the query")
        sources: list[str] = Field(description="The sources used to answer the query")
        confidence: float = Field(
            description="The confidence in the answer", ge=0, le=1
        )

    def get_agent_config(self) -> AgentConfig:
        return AgentConfig(
            system_prompt="You are a helpful assistant that can answer questions and provide information based on the retrieved documents.",
            output_type=GenerationNode.OutputType,
            deps_type=GenerationNode.DepsType,
            model_provider=ModelProvider.OPENAI,
            model_name="gpt-4.1",
        )

    async def process(self, task_context: TaskContext) -> TaskContext:
        deps = GenerationNode.DepsType(
            context=task_context.nodes["RetrievalNode"]["results"],
        )

        @self.agent.system_prompt
        def add_rag_context(ctx: RunContext[GenerationNode.DepsType]) -> str:
            return f"Here are the documents I found for your query:\n{ctx.deps.context.model_dump_json(indent=2)}"

        result = await self.agent.run(
            user_prompt=task_context.event.query,
            deps=deps,
        )

        task_context.update_node(node_name=self.node_name, result=result)

        return task_context

Key features

  • Type-Safe Outputs - Pydantic models for consistent, validated results
  • Flexible Dependencies - Inject context for RAG patterns
  • Multiple Model Providers - OpenAI, Anthropic, etc.
  • Dynamic System Prompts - Functions can access dependencies
  • Tool Integration - Tools and MCP servers for actions beyond text generation
I