Documentation Index
Fetch the complete documentation index at: https://agno-v2-feat-executor-hitl-wf.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Executor-level HITL pauses a workflow during a step, when the agent or team running inside the step calls a tool marked with requires_confirmation, requires_user_input, or external_execution. The pause propagates from the agent/team up to the workflow, and the user resolves it the same way they would on a standalone agent run.
from agno.agent import Agent
from agno.db.sqlite import SqliteDb
from agno.models.openai import OpenAIChat
from agno.tools import tool
from agno.workflow.step import Step
from agno.workflow.workflow import Workflow
@tool(requires_confirmation=True)
def get_the_weather(city: str) -> str:
return f"It is 70 degrees and cloudy in {city}"
weather_agent = Agent(
name="WeatherAgent",
model=OpenAIChat(id="gpt-5.4"),
tools=[get_the_weather],
db=SqliteDb(db_file="workflow.db"),
)
workflow = Workflow(
name="WeatherWorkflow",
db=SqliteDb(db_file="workflow.db"),
steps=[Step(name="get_weather", agent=weather_agent)],
)
response = workflow.run("What is the weather in Tokyo?")
if response.is_paused:
for step_req in response.step_requirements or []:
if step_req.requires_executor_input:
for executor_req in step_req.executor_requirements or []:
executor_req.confirm() # or executor_req.reject(note="...")
response = workflow.continue_run(response)
print(response.content)
When to Use
| Use Case | Level |
|---|
| Approve a specific tool call (DB write, payment, email send) | Executor |
| Collect user-provided values for a tool argument | Executor |
| Defer tool execution to an external system | Executor |
| Gate the whole step before the agent runs | Step-level |
| Review the step’s final output | Output Review |
| Pick which branch a router takes | Router HITL |
Use executor-level HITL when the gate is on the tool, not on the step. The agent can call other tools freely; only the marked tool pauses execution.
Three decorator flags trigger executor-level pauses. They behave the same as on standalone agents.
| Decorator | Pauses For | Resolve With |
|---|
@tool(requires_confirmation=True) | User approval | executor_req.confirm() or executor_req.reject(note=...) |
@tool(requires_user_input=True, user_input_fields=[...]) | User-supplied argument values | executor_req.provide_user_input({...}) |
@tool(external_execution=True) | Out-of-process execution; user supplies the result | Set tool_execution.result on the requirement |
from agno.tools import tool
@tool(requires_confirmation=True)
def delete_user(user_id: str) -> str:
...
@tool(requires_user_input=True, user_input_fields=["recipient"])
def send_money(amount: float, recipient: str, note: str) -> str:
...
@tool(external_execution=True)
def run_query(sql: str) -> str:
...
Pause Anatomy
When the agent inside a step calls a marked tool, the workflow pauses. The pause is visible at three levels:
1. The run output
run_output.is_paused # True
run_output.pause_kind # "executor"
run_output.paused_step_name # "get_weather"
run_output.step_requirements # list[StepRequirement]
2. The step requirement
Each entry in step_requirements represents one paused step. For executor pauses:
step_req.requires_executor_input # True
step_req.executor_id # agent/team id
step_req.executor_name # "WeatherAgent"
step_req.executor_type # "agent" | "team"
step_req.executor_requirements # list of tool-level requirements
3. The executor requirement
Each entry in executor_requirements is one pending tool call from the agent/team:
executor_req.tool_execution.tool_name # "get_the_weather"
executor_req.tool_execution.tool_args # {"city": "Tokyo"}
executor_req.needs_confirmation # True for requires_confirmation
executor_req.needs_user_input # True for requires_user_input
executor_req.needs_external_execution # True for external_execution
executor_req.user_input_schema # fields user must fill
Always read the last entry in step_requirements to determine the current pause state. Earlier entries are history from prior pause/resume cycles in the same run. The _active = (run_output.step_requirements or [])[-1:] pattern is required when a step has multiple HITL gates.
Resolving Each Type
Confirmation
for step_req in response.step_requirements or []:
if step_req.requires_executor_input:
for executor_req in step_req.executor_requirements or []:
if executor_req.needs_confirmation:
if user_approves():
executor_req.confirm()
else:
executor_req.reject(note="User declined")
response = workflow.continue_run(response)
When rejected, the agent receives the rejection note and decides what to do next (it may try another tool, or surface the rejection in its response).
for step_req in response.step_requirements or []:
if step_req.requires_executor_input:
for executor_req in step_req.executor_requirements or []:
if executor_req.needs_user_input:
values = {
field.name: prompt(field.name)
for field in executor_req.user_input_schema or []
}
executor_req.provide_user_input(values)
response = workflow.continue_run(response)
The values overwrite the corresponding tool arguments before the tool runs.
External Execution
@tool(external_execution=True) means the agent never runs the tool itself. The workflow pauses, hands you the tool name and arguments, and you run it however you want — call a backend service, dispatch a job, hit a privileged API. Set tool_execution.result to whatever the tool was supposed to return, and the agent resumes with that result.
for step_req in response.step_requirements or []:
if step_req.requires_executor_input:
for executor_req in step_req.executor_requirements or []:
if executor_req.needs_external_execution:
tool_name = executor_req.tool_execution.tool_name
tool_args = executor_req.tool_execution.tool_args
# Replace this with your own dispatch logic, for example:
# result = my_backend.run(tool_name, **tool_args)
# result = subprocess.run(...).stdout
# result = await microservice.call(tool_name, tool_args)
result = my_dispatcher(tool_name, tool_args)
executor_req.tool_execution.result = result
response = workflow.continue_run(response)
Streaming
In streaming mode the workflow yields a StepExecutorPausedEvent when the agent’s tool call pauses, and a StepExecutorContinuedEvent when execution resumes.
from agno.run.workflow import (
StepExecutorPausedEvent,
StepExecutorContinuedEvent,
WorkflowCompletedEvent,
)
for event in workflow.run("What is the weather in Tokyo?", stream=True):
if isinstance(event, StepExecutorPausedEvent):
print(f"Paused: {event.executor_name} ({event.executor_type})")
elif isinstance(event, StepExecutorContinuedEvent):
print(f"Resumed: {event.executor_name}")
elif isinstance(event, WorkflowCompletedEvent):
print("Done")
# The full WorkflowRunOutput is persisted to the session, not yielded in the stream.
session = workflow.get_session()
run_output = session.runs[-1] if session and session.runs else None
if run_output and run_output.is_paused:
for step_req in run_output.step_requirements or []:
for executor_req in step_req.executor_requirements or []:
executor_req.confirm()
for event in workflow.continue_run(run_output, stream=True):
if hasattr(event, "content") and event.content:
print(event.content, end="", flush=True)
After a streaming run pauses, read the paused run from workflow.get_session().runs[-1] — step_requirements is persisted to the database, not emitted as a stream event.
Composite Steps
Executor HITL works inside any composite step that contains a Step with an agent or team. The pause propagates up through the wrapping primitive.
| Primitive | Behavior |
|---|
Step | Pauses when the agent/team inside the step calls a HITL tool |
Steps | Pauses on the inner Step whose agent/team calls a HITL tool |
Condition | Pauses if the chosen branch contains a Step whose agent/team calls a HITL tool |
Loop | Pauses on the iteration where the inner Step’s agent/team calls a HITL tool |
Router | Pauses if the selected branch contains a Step whose agent/team calls a HITL tool |
from agno.workflow.condition import Condition
from agno.workflow.step import Step
workflow = Workflow(
name="ConditionExecutorHITL",
db=db,
steps=[
Step(name="gather_data", executor=gather_data),
Condition(
name="analysis_decision",
evaluator=lambda step_input: True,
steps=[Step(name="detailed_analysis", agent=analysis_agent)],
else_steps=[Step(name="quick_summary", executor=quick_summary)],
),
Step(name="report", executor=generate_report),
],
)
The resolution code does not change: walk step_requirements, find the entry with requires_executor_input, and resolve its executor_requirements.
Sync, Async, and Continue
| Method | Sync | Async |
|---|
| Run | workflow.run(input) | await workflow.arun(input) |
| Run streaming | workflow.run(input, stream=True) | workflow.arun(input, stream=True) |
| Continue | workflow.continue_run(run_output) | await workflow.acontinue_run(run_output) |
| Continue streaming | workflow.continue_run(run_output, stream=True) | workflow.acontinue_run(run_output, stream=True) |
Events
| Event | Emitted When |
|---|
StepStartedEvent | Step begins executing |
StepExecutorPausedEvent | Agent/team pauses on a HITL tool call |
StepExecutorContinuedEvent | Agent/team resumes after the requirement is resolved |
StepCompletedEvent | Step finishes (after resume and any further tool calls) |
WorkflowCompletedEvent | Workflow finishes |
Cookbooks
Runnable examples in cookbook/04_workflows/08_human_in_the_loop/executor_hitl/:
| File | Demonstrates |
|---|
01_agent_confirmation.py | Agent tool with requires_confirmation |
02_agent_confirmation_stream.py | Same, streaming |
03_team_in_step.py | Team executor with HITL tool |
04_agent_confirmation_in_condition_step.py | Executor HITL inside a Condition |
05_agent_confirmation_in_loop_step.py | Executor HITL inside a Loop |
06_agent_confirmation_in_steps_container.py | Executor HITL inside Steps |
07_agent_confirmation_in_router_step.py | Executor HITL inside a Router branch |
08_agent_user_input_step.py | Tool with requires_user_input |
09_executor_continued_event.py | StepExecutorContinuedEvent lifecycle |
Developer Resources