Documentation Index
Fetch the complete documentation index at: https://launchpad.datalumina.com/llms.txt
Use this file to discover all available pages before exploring further.
The AgentNode class wraps a pydantic-ai Agent so LLM calls slot into the same Chain-of-Responsibility pattern as any other node. Subclasses implement two methods:
get_agent_config() — returns an AgentConfig describing the model provider, model name, structured output type, tools, and instructions.
process() — runs the underlying Agent, validates outputs against OutputType, and saves them with save_output().
See LLM Providers for per-provider configuration details.
AgentConfig
@dataclass
class AgentConfig:
model_provider: ModelProvider
model_name: Union[
OpenAIModelName, AnthropicModelName, GeminiModelName, BedrockModelName
]
output_type: Any = str
instructions: Optional[str] = None
deps_type: Optional[Type[Any]] = None
name: str | None = None
model_settings: ModelSettings | None = None
retries: int = 1
output_retries: int | None = None
tools: List = field(default_factory=list)
builtin_tools: List = field(default_factory=list)
instrument: bool = True
Notable fields:
instructions — the system prompt for the underlying pydantic-ai Agent. You can also register per-run context via the @self.agent.instructions decorator inside process().
output_type — a Pydantic model (subclass of AgentNode.OutputType) for structured output, or str for plain text.
deps_type — a Pydantic model exposed via RunContext so tools and instruction callbacks can read dependencies the node computes at runtime.
tools, builtin_tools — pydantic-ai tool definitions.
instrument — defaults to True; set False to opt this node out of Langfuse instrumentation even when the workflow is traced.
AgentNode base class
class AgentNode(Node, ABC):
class DepsType(BaseModel):
pass
class OutputType(BaseModel):
pass
def __init__(self, task_context: TaskContext = None):
super().__init__(task_context=task_context)
self.__async_client = AsyncClient()
agent_wrapper = self.get_agent_config()
self.agent = Agent(
model=self.__get_model_instance(
agent_wrapper.model_provider, agent_wrapper.model_name
),
output_type=agent_wrapper.output_type,
instructions=agent_wrapper.instructions,
deps_type=agent_wrapper.deps_type,
name=agent_wrapper.name,
model_settings=agent_wrapper.model_settings,
retries=agent_wrapper.retries,
output_retries=agent_wrapper.output_retries,
tools=agent_wrapper.tools,
builtin_tools=agent_wrapper.builtin_tools,
instrument=agent_wrapper.instrument,
)
self.agent.instrument_all()
@abstractmethod
def get_agent_config(self) -> AgentConfig:
pass
@abstractmethod
async def process(self, task_context: TaskContext) -> TaskContext:
pass
AgentNode runs asynchronously. Call await self.agent.run(...) or async with self.agent.run_stream(...) — pydantic-ai’s run_sync() will not work inside the workflow’s event loop.
Implementation examples
Without dependencies
class FilterSpamNode(AgentNode):
class OutputType(AgentNode.OutputType):
reasoning: str = Field(description="Reasoning for the spam classification.")
confidence: float = Field(ge=0, le=1)
is_human: bool = Field(description="True if the message is from a human.")
def get_agent_config(self) -> AgentConfig:
return AgentConfig(
instructions=(
"You are a helpful assistant that filters messages to determine "
"whether they are written by a human or are spam generated by a bot."
),
output_type=self.OutputType,
model_provider=ModelProvider.OPENAI,
model_name="gpt-5.4-mini",
)
async def process(self, task_context: TaskContext) -> TaskContext:
event: CustomerCareEventSchema = task_context.event
result = await self.agent.run(user_prompt=event.model_dump_json())
self.save_output(result.output)
return task_context
With dependencies and dynamic instructions
Use deps_type + @self.agent.instructions to inject runtime context (for example, retrieval results in a RAG node) into the system prompt at call time:
class GenerationNode(AgentNode):
class DepsType(AgentNode.DepsType):
context: RetrievalResults
class OutputType(AgentNode.OutputType):
answer: str
sources: list[str]
confidence: float = Field(ge=0, le=1)
def get_agent_config(self) -> AgentConfig:
return AgentConfig(
instructions=(
"You are a helpful assistant that answers questions using the "
"retrieved documents."
),
output_type=GenerationNode.OutputType,
deps_type=GenerationNode.DepsType,
model_provider=ModelProvider.OPENAI,
model_name="gpt-5.4-mini",
)
async def process(self, task_context: TaskContext) -> TaskContext:
retrieval: RetrievalNode.OutputType = self.get_output(RetrievalNode)
deps = GenerationNode.DepsType(context=retrieval.results)
@self.agent.instructions
def add_rag_context(ctx: RunContext[GenerationNode.DepsType]) -> str:
return (
"Here are the documents I found for your query:\n"
f"{ctx.deps.context.model_dump_json(indent=2)}"
)
result = await self.agent.run(
user_prompt=task_context.event.query,
deps=deps,
)
self.save_output(result.output)
return task_context
Key features
- Type-safe outputs — Pydantic
OutputType validates what the model returns.
- Flexible dependencies —
DepsType + RunContext plug structured context into tools and instructions.
- Seven model providers — switch between OpenAI, Azure OpenAI, Anthropic, Google Gemini, Google Vertex AI, Bedrock, and Ollama by changing one enum.
- Dynamic instructions — augment the system prompt per run via
@self.agent.instructions.
- Tool integration — register pydantic-ai tools or built-in tools via
AgentConfig.tools / builtin_tools.
- Langfuse-ready — tracing is on by default via
instrument=True; the workflow decides whether traces are actually exported.