Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions docs/architecture/human-in-the-loop.md
Original file line number Diff line number Diff line change
Expand Up @@ -485,3 +485,131 @@ some while rejecting others:
to the next LLM step without pausing.
7. The LLM receives the mixed results and generates an appropriate text
response (e.g., summarising what succeeded and what was rejected).

---

## Enhancement: Rejection Reasons

Add an optional free-text reason to rejections. The reason travels through
the existing HITL plumbing using `ToolConfirmation.payload` from the `DataPart` from frontend.

Currently we use two custom keys to indicate rejection reason in the frontend payload and the ADK payload.
This handles parallel tool call rejection as well using the batch flow described above.

In the before-tool callback (`_approval.py`) we will check if payload exists.
If so, we will append the rejection reason to the callback response to tell the model.

---

## Enhancement: Ask-User Tool

Discussed in issue #1415.
Why this is a good idea: https://www.atcyrus.com/stories/claude-code-ask-user-question-tool-guide

### Design

A built-in `ask_user` tool that:
- Lets the model pose **one or more questions** in a single call (batched).
- Each question can include **predefined choices** and a flag for whether
multiple selections are allowed.
- The user always has the option to **type a free-text answer** alongside
(or instead of) the predefined choices.
- Uses the **same `request_confirmation` HITL plumbing** as tool approval —
no new executor logic, event converter changes, or A2A protocol extensions.

The tool is added **unconditionally** to every agent as a built-in tool,
similar to how memory tools are added.

### Tool Schema

```python
ask_user(questions=[
{
"question": "Which database should I use?",
"choices": ["PostgreSQL", "MySQL", "SQLite"],
"multiple": False # single-select (default)
},
{
"question": "Which features do you want?",
"choices": ["Auth", "Logging", "Caching"],
"multiple": True # multi-select
},
{
"question": "Any additional requirements?",
"choices": [], # free-text only
"multiple": False
}
])
```

### Tool Result (returned to model)

```python
[
{"question": "Which database should I use?", "answer": ["PostgreSQL"]},
{"question": "Which features do you want?", "answer": ["Auth", "Caching"]},
{"question": "Any additional requirements?", "answer": ["Add rate limiting"]}
]
```

### Data Flow

- A special path to process ask user result is added to agent executor and HITL converter
- UI will be updated to recognize and display this
- This tool will be added to every agent by default

```plaintext
LLM generates function call: ask_user(questions=[...])
AskUserTool.run_async (first invocation)
│ tool_context.tool_confirmation is None
│ Calls tool_context.request_confirmation(hint=<question summary>)
│ Returns {"status": "pending", "questions": [...]}
ADK generates adk_request_confirmation event (built-in)
│ originalFunctionCall = {name: "ask_user", args: {questions: [...]}, id: ...}
│ long_running_tool_ids set
Event converter (existing, unchanged)
│ Converts to DataPart: {type: "function_call", is_long_running: true}
│ Sets TaskState.input_required
Executor event loop
│ Breaks on long_running_tool_ids (existing logic)
UI detects input_required + adk_request_confirmation
│ Sees originalFunctionCall.name === "ask_user"
│ Renders AskUserDisplay instead of ToolApprovalRequest card
AskUserDisplay renders:
│ For each question:
│ - Header (bold, short) + question text
│ - Choice buttons as toggleable chips (single or multi-select)
│ - Free-text input always visible below choices
│ Single Submit button at the bottom
User selects choices / types answers, clicks Submit
│ UI sends DataPart:
│ {decision_type: "approve",
│ ask_user_answers: [
│ {answer: ["PostgreSQL"]},
│ {answer: ["Auth", "Caching"]},
│ {answer: ["Add rate limiting"]}
│ ]}
Executor resume path
│ Sees decision_type: "approve" + ask_user_answers present
│ Constructs ToolConfirmation(confirmed=True,
│ payload={"answers": [...]})
ADK replays ask_user tool call
AskUserTool.run_async (second invocation)
│ tool_context.tool_confirmation.confirmed is True
│ Reads payload["answers"]
│ Zips answers with original questions
│ Returns JSON: [{question: "...", answer: [...]}, ...]
LLM receives the answers as the function response
│ Continues execution with user's input
```
40 changes: 38 additions & 2 deletions python/packages/kagent-adk/src/kagent/adk/_agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
KAGENT_HITL_DECISION_TYPE_APPROVE,
KAGENT_HITL_DECISION_TYPE_BATCH,
TaskResultAggregator,
extract_ask_user_answers_from_message,
extract_batch_decisions_from_message,
extract_decision_from_message,
extract_rejection_reasons_from_message,
get_kagent_metadata_key,
)
from kagent.core.tracing._span_processor import (
Expand Down Expand Up @@ -397,6 +399,28 @@ def _process_hitl_decision(
len(pending_confirmations),
)

# Check for ask-user answers — if present, build a single approved
# ToolConfirmation with the answers payload regardless of decision_type.
# The tool will use the payload and construct the user answer to the agent
ask_user_answers = extract_ask_user_answers_from_message(message)
if ask_user_answers is not None:
parts = []
for fc_id in pending_confirmations:
confirmation = ToolConfirmation(confirmed=True, payload={"answers": ask_user_answers})
parts.append(
genai_types.Part(
function_response=genai_types.FunctionResponse(
name=REQUEST_CONFIRMATION_FUNCTION_CALL_NAME,
id=fc_id,
response={"response": confirmation.model_dump_json()},
)
)
)
return parts

# Extract optional rejection reasons from the message.
rejection_reasons = extract_rejection_reasons_from_message(message)

if decision == KAGENT_HITL_DECISION_TYPE_BATCH:
# Batch mode: per-tool decisions
batch_decisions = extract_batch_decisions_from_message(message) or {}
Expand All @@ -405,7 +429,13 @@ def _process_hitl_decision(
# Look up the per-tool decision using the original tool call ID
tool_decision = batch_decisions.get(original_id, KAGENT_HITL_DECISION_TYPE_APPROVE)
confirmed = tool_decision == KAGENT_HITL_DECISION_TYPE_APPROVE
confirmation = ToolConfirmation(confirmed=confirmed)
# Attach rejection reason if provided for this specific tool
payload: dict | None = None
if not confirmed and rejection_reasons:
reason = rejection_reasons.get(original_id) if original_id else None
if reason:
payload = {"rejection_reason": reason}
confirmation = ToolConfirmation(confirmed=confirmed, payload=payload)
# Append a response for each tool call
parts.append(
genai_types.Part(
Expand All @@ -420,7 +450,13 @@ def _process_hitl_decision(
else:
# Uniform mode: same decision for all pending tools
confirmed = decision == KAGENT_HITL_DECISION_TYPE_APPROVE
confirmation = ToolConfirmation(confirmed=confirmed)
# Attach rejection reason if provided (uniform denial uses "*" sentinel)
payload = None
if not confirmed and rejection_reasons:
reason = rejection_reasons.get("*")
if reason:
payload = {"rejection_reason": reason}
confirmation = ToolConfirmation(confirmed=confirmed, payload=payload)
return [
genai_types.Part(
function_response=genai_types.FunctionResponse(
Expand Down
13 changes: 11 additions & 2 deletions python/packages/kagent-adk/src/kagent/adk/_approval.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def before_tool(
tool: BaseTool,
args: dict[str, Any],
tool_context: ToolContext,
) -> dict | None:
) -> str | dict | None:
tool_name = tool.name
if tool_name not in tools_requiring_approval:
return None # No approval needed, proceed normally
Expand All @@ -42,9 +42,18 @@ def before_tool(
logger.debug("Tool %s approved by user, proceeding", tool_name)
return None # Approved — proceed with tool execution
logger.debug("Tool %s rejected by user", tool_name)
return {"status": "rejected", "message": "Tool call was rejected by user."}
# Check for an optional rejection reason in the payload
# (the key "rejection_reason" is set by _agent_executor._process_hitl_decision)
payload = tool_context.tool_confirmation.payload or {}
reason = payload.get("rejection_reason", "") if isinstance(payload, dict) else ""
# __build_response_event wraps it as {"result": "..."} that LLM adapters expect
# ADK will skip executing the function if the before tool callback returns a response
if reason:
return f"Tool call was rejected by user. Reason: {reason}"
return "Tool call was rejected by user."

# First invocation — request confirmation and block execution
# This response is never sent to the LLM
logger.debug("Tool %s requires approval, requesting confirmation", tool_name)
tool_context.request_confirmation(
hint=f"Tool '{tool_name}' requires approval before execution.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ async def _generate_embedding_async(
litellm_model = f"ollama/{model_name}"
elif provider == "vertex_ai":
litellm_model = f"vertex_ai/{model_name}"
elif provider == "gemini":
litellm_model = f"gemini/{model_name}"

try:
is_batch = isinstance(input_data, list)
Expand Down
111 changes: 111 additions & 0 deletions python/packages/kagent-adk/src/kagent/adk/tools/ask_user_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""Built-in tool for asking the user questions during agent execution.

Uses the ADK-native ToolContext.request_confirmation() mechanism so that
the standard HITL event plumbing (adk_request_confirmation, long_running_tool_ids,
executor resume path) handles the interrupt and resume transparently.
"""

from __future__ import annotations

import json
import logging
from typing import Any

from google.adk.tools.base_tool import BaseTool
from google.adk.tools.tool_context import ToolContext
from google.genai import types

logger = logging.getLogger(__name__)

# Schema for a single question object passed to ask_user.
_QUESTION_SCHEMA = types.Schema(
type=types.Type.OBJECT,
properties={
"question": types.Schema(
type=types.Type.STRING,
description="The question text to display to the user.",
),
"choices": types.Schema(
type=types.Type.ARRAY,
items=types.Schema(type=types.Type.STRING),
description=(
"Predefined answer choices shown as selectable chips. Leave empty for a free-text-only question."
),
),
"multiple": types.Schema(
type=types.Type.BOOLEAN,
description=("If true, the user can select multiple choices. Defaults to false (single-select)."),
),
},
required=["question"],
)


class AskUserTool(BaseTool):
"""Built-in tool that lets the agent ask the user one or more questions.

The tool uses the ADK ``request_confirmation`` mechanism to pause
execution and present the questions to the UI. On the resume path the
UI sends back ``ask_user_answers`` in the approval DataPart and the
executor injects them via ``ToolConfirmation.payload``.

Because the interrupt is driven by ``request_confirmation``, this tool
does *not* need to be listed in ``tools_requiring_approval``.
"""

def __init__(self) -> None:
super().__init__(
name="ask_user",
description=(
"Ask the user one or more questions and wait for their answers "
"before continuing. Use this when you need clarifying information, "
"preferences, or explicit confirmation from the user."
),
)

def _get_declaration(self) -> types.FunctionDeclaration:
return types.FunctionDeclaration(
name=self.name,
description=self.description,
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"questions": types.Schema(
type=types.Type.ARRAY,
items=_QUESTION_SCHEMA,
description="List of questions to ask the user.",
),
},
required=["questions"],
),
)

async def run_async(
self,
*,
args: dict[str, Any],
tool_context: ToolContext,
) -> Any:
questions: list[dict] = args.get("questions", [])

if tool_context.tool_confirmation is None:
# First invocation — pause execution and ask the user.
summary = "; ".join(q.get("question", "") for q in questions if q.get("question"))
tool_context.request_confirmation(hint=summary or "Questions for the user.")
logger.debug("ask_user: requesting confirmation with %d question(s)", len(questions))
return {"status": "pending", "questions": questions}

if tool_context.tool_confirmation.confirmed:
# Second invocation — the executor injected answers via payload.
payload = tool_context.tool_confirmation.payload or {}
answers: list[dict] = payload.get("answers", []) if isinstance(payload, dict) else []
result = []
for i, q in enumerate(questions):
ans = answers[i]["answer"] if i < len(answers) and "answer" in answers[i] else []
result.append({"question": q.get("question", ""), "answer": ans})
logger.debug("ask_user: returning %d answer(s)", len(result))
return json.dumps(result)

# User cancelled or rejected (should not normally happen for ask_user).
logger.debug("ask_user: confirmation not received, returning cancelled status")
return json.dumps({"status": "cancelled"})
4 changes: 4 additions & 0 deletions python/packages/kagent-adk/src/kagent/adk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from kagent.adk._mcp_toolset import KAgentMcpToolset
from kagent.adk.models._litellm import KAgentLiteLlm
from kagent.adk.sandbox_code_executer import SandboxedLocalCodeExecutor
from kagent.adk.tools.ask_user_tool import AskUserTool

from .models import AzureOpenAI as OpenAIAzure
from .models import OpenAI as OpenAINative
Expand Down Expand Up @@ -377,6 +378,9 @@ async def rewrite_url_to_proxy(request: httpx.Request) -> None:
code_executor = SandboxedLocalCodeExecutor() if self.execute_code else None
model = _create_llm_from_model_config(self.model)

# Add built-in ask_user tool unconditionally — every agent can ask the user questions.
tools.append(AskUserTool())

# Build before_tool_callback if any tools require approval
before_tool_callback = make_approval_callback(tools_requiring_approval) if tools_requiring_approval else None

Expand Down
Loading
Loading