The AgentNode
class is for tasks that use large language models. It wraps PydanticAI for streamlined configuration and execution.
To implement a custom AgentNode
, define two methods:
Required methods:
- get_agent_config() - Returns an
AgentConfig
instance describing prompt, output, model provider, and model name
- process() - Processes a task using the configured agent
The class also supports Pydantic-based DepsType
and OutputType
.
AgentNode Class Structure
AgentConfig
@dataclass
class AgentConfig:
model_provider: ModelProvider
model_name: Union[
OpenAIModelName, AnthropicModelName, GeminiModelName, BedrockModelName
]
output_type: Any = str
instructions: Optional[str] = None
system_prompt: str | Sequence[str] = ()
deps_type: Optional[Type[Any]] = None
name: str | None = None
model_settings: ModelSettings | None = None
retries: int = 1
output_retries: int | None = None
tools: Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]] = ()
mcp_servers: Sequence[MCPServer] = ()
instrument: InstrumentationSettings | bool | None = None
AgentNode Base Class
class AgentNode(Node, ABC):
class DepsType(BaseModel):
pass
class OutputType(BaseModel):
pass
def __init__(self):
self.__async_client = AsyncClient()
agent_wrapper = self.get_agent_config()
self.agent = Agent(
model=self.__get_model_instance(
agent_wrapper.model_provider, agent_wrapper.model_name
),
output_type=agent_wrapper.output_type,
instructions=agent_wrapper.instructions,
system_prompt=agent_wrapper.system_prompt,
deps_type=agent_wrapper.deps_type,
name=agent_wrapper.name,
model_settings=agent_wrapper.model_settings,
retries=agent_wrapper.retries,
output_retries=agent_wrapper.output_retries,
tools=agent_wrapper.tools,
mcp_servers=agent_wrapper.mcp_servers,
instrument=agent_wrapper.instrument,
)
@abstractmethod
def get_agent_config(self) -> AgentConfig:
pass
@abstractmethod
async def process(self, task_context: TaskContext) -> TaskContext:
pass
Using PydanticAI Agent
Since Launchpad v3.0.0, AgentNode
operates in an asynchronous context. Use the run()
method on the Agent
to execute completions. The synchronous run_sync()
method is no longer suitable here.
Running a completion
# Correct - Use async run method
result = await self.agent.run(user_prompt="Your prompt here")
# Incorrect - Don't use sync method in async context
result = self.agent.run_sync(user_prompt="Your prompt here")
For more details on the Agent
, see the PydanticAI docs.
Implementation examples
Example 1: Without Dependencies
class FilterSpamNode(AgentNode):
class OutputType(AgentNode.OutputType):
reasoning: str = Field(
description="Explain your reasoning for determining whether the message is written by a human or is spam generated by a bot."
)
confidence: float = Field(
ge=0,
le=1,
description="Confidence score for the human vs spam classification.",
)
is_human: bool = Field(
description="Set to True if the message appears to be written by a genuine human; False if it's most likely spam from a bot."
)
def get_agent_config(self) -> AgentConfig:
return AgentConfig(
system_prompt="You are a helpful assistant that filters messages to determine whether they are written by a human or are spam generated by a bot.",
output_type=self.OutputType,
deps_type=None,
model_provider=ModelProvider.OPENAI,
model_name="gpt-4o",
)
async def process(self, task_context: TaskContext) -> TaskContext:
event: CustomerCareEventSchema = task_context.event
result = await self.agent.run(
user_prompt=event.model_dump_json(),
)
task_context.update_node(node_name=self.node_name, result=result)
return task_context
Example 2: With Dependencies
class GenerationNode(AgentNode):
class DepsType(AgentNode.DepsType):
context: RetrievalResults
class OutputType(AgentNode.OutputType):
answer: str = Field(description="The answer to the query")
sources: list[str] = Field(description="The sources used to answer the query")
confidence: float = Field(
description="The confidence in the answer", ge=0, le=1
)
def get_agent_config(self) -> AgentConfig:
return AgentConfig(
system_prompt="You are a helpful assistant that can answer questions and provide information based on the retrieved documents.",
output_type=GenerationNode.OutputType,
deps_type=GenerationNode.DepsType,
model_provider=ModelProvider.OPENAI,
model_name="gpt-4.1",
)
async def process(self, task_context: TaskContext) -> TaskContext:
deps = GenerationNode.DepsType(
context=task_context.nodes["RetrievalNode"]["results"],
)
@self.agent.system_prompt
def add_rag_context(ctx: RunContext[GenerationNode.DepsType]) -> str:
return f"Here are the documents I found for your query:\n{ctx.deps.context.model_dump_json(indent=2)}"
result = await self.agent.run(
user_prompt=task_context.event.query,
deps=deps,
)
task_context.update_node(node_name=self.node_name, result=result)
return task_context
Key features
- Type-Safe Outputs - Pydantic models for consistent, validated results
- Flexible Dependencies - Inject context for RAG patterns
- Multiple Model Providers - OpenAI, Anthropic, etc.
- Dynamic System Prompts - Functions can access dependencies
- Tool Integration - Tools and MCP servers for actions beyond text generation