Source code for autogen_ext.tools.mcp._host._elicitation

import asyncio
import json
import sys
from abc import ABC, abstractmethod
from typing import TextIO

from autogen_core import (
    Component,
    ComponentBase,
)
from pydantic import BaseModel

from mcp import types as mcp_types

_ELICITATION_CHOICE_SHORTHANDS = {"a": "accept", "d": "decline", "c": "cancel"}


[docs] class Elicitor(ABC, ComponentBase[BaseModel]): """Abstract base class for handling MCP elicitation requests. Elicitors are responsible for processing elicitation requests from MCP servers, which typically involve prompting for user input, and sometimes require more structured responses. """ component_type = "mcp_elicitor"
[docs] @abstractmethod async def elicit(self, params: mcp_types.ElicitRequestParams) -> mcp_types.ElicitResult | mcp_types.ErrorData: ...
[docs] class StreamElicitor(Elicitor): """Handle MCP elicitation requests by reading/writing to TextIO streams.""" def __init__(self, read_stream: TextIO, write_stream: TextIO, timeout: float | None = None) -> None: self._read_stream = read_stream self._write_stream = write_stream self._timeout = timeout def _write(self, text: str) -> None: self._write_stream.writelines(text) self._write_stream.flush() async def _read(self) -> str: """ Await a single line from `read` without blocking the event loop. Returns the raw line including its trailing newline (if any). """ # Read one line from the provided TextIO in a worker thread coroutine = asyncio.to_thread(self._read_stream.readline) if self._timeout: coroutine = asyncio.wait_for(coroutine, self._timeout) return await coroutine
[docs] async def elicit(self, params: mcp_types.ElicitRequestParams) -> mcp_types.ElicitResult: header = "=== BEGIN MCP ELICITATION REQUEST ===" border = "=" * len(header) header = f"{border}\n{header}\n{border}" prompt = "\n".join( [ header, params.message, "Choices:", "\t[a]ccept", "\t[d]ecline", "\t[c]ancel", "Please enter one of the above options: ", ] ) self._write(prompt) try: action = await self._read() action = action.strip().lower() action = _ELICITATION_CHOICE_SHORTHANDS.get(action, action) result = mcp_types.ElicitResult.model_validate({"action": action}) if action == "accept" and params.requestedSchema: prompt = "\n".join( [ "Input Schema:", json.dumps(params.requestedSchema, indent=2), "Please enter a JSON string following the above schema: ", ] ) self._write(prompt) content = await self._read() result.content = json.loads(content) return result finally: footer = "=== END MCP ELICITATION REQUEST ===" border = "=" * len(footer) footer = f"{border}\n{footer}\n{border}" self._write(footer)
[docs] class StdioElicitorConfig(BaseModel): timeout: float | None
[docs] class StdioElicitor(StreamElicitor, Component[StdioElicitorConfig]): """Handle MCP elicitation requests by reading/writing to stdio""" component_config_schema = StdioElicitorConfig component_provider_override = "autogen_ext.tools.mcp.StdioElicitor" def __init__(self, timeout: float | None = None) -> None: super().__init__(sys.stdin, sys.stdout, timeout) @property def timeout(self) -> float | None: """Get the timeout value for elicitation operations.""" return self._timeout
[docs] def _to_config(self) -> BaseModel: return StdioElicitorConfig(timeout=self._timeout)
[docs] @classmethod def _from_config(cls, config: StdioElicitorConfig) -> "StdioElicitor": return StdioElicitor(timeout=config.timeout)