Source code for autogen_ext.tools.mcp._actor
import asyncio
import atexit
from typing import Any, Coroutine, Dict, Mapping, TypedDict
from autogen_core import Component, ComponentBase
from mcp.types import CallToolResult, ListToolsResult
from pydantic import BaseModel
from typing_extensions import Self
from ._config import McpServerParams
from ._session import create_mcp_server_session
McpResult = Coroutine[Any, Any, ListToolsResult] | Coroutine[Any, Any, CallToolResult]
McpFuture = asyncio.Future[McpResult]
class McpActorArgs(TypedDict):
name: str | None
kargs: Mapping[str, Any]
class McpSessionActorConfig(BaseModel):
server_params: McpServerParams
[docs]
class McpSessionActor(ComponentBase[BaseModel], Component[McpSessionActorConfig]):
component_type = "mcp_session_actor"
component_config_schema = McpSessionActorConfig
component_provider_override = "autogen_ext.tools.mcp.McpSessionActor"
server_params: McpServerParams
# model_config = ConfigDict(arbitrary_types_allowed=True)
def __init__(self, server_params: McpServerParams) -> None:
self.server_params: McpServerParams = server_params
self.name = "mcp_session_actor"
self.description = "MCP session actor"
self._command_queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue()
self._actor_task: asyncio.Task[Any] | None = None
self._shutdown_future: asyncio.Future[Any] | None = None
self._active = False
atexit.register(self._sync_shutdown)
[docs]
async def initialize(self) -> None:
if not self._active:
self._active = True
self._actor_task = asyncio.create_task(self._run_actor())
[docs]
async def call(self, type: str, args: McpActorArgs | None = None) -> McpFuture:
if not self._active:
raise RuntimeError("MCP Actor not running, call initialize() first")
if self._actor_task and self._actor_task.done():
raise RuntimeError("MCP actor task crashed", self._actor_task.exception())
fut: asyncio.Future[McpFuture] = asyncio.Future()
if type in {"list_tools", "shutdown"}:
await self._command_queue.put({"type": type, "future": fut})
res = await fut
elif type == "call_tool":
if args is None:
raise ValueError("args is required for call_tool")
name = args.get("name", None)
kwargs = args.get("kargs", {})
if name is None:
raise ValueError("name is required for call_tool")
await self._command_queue.put({"type": type, "name": name, "args": kwargs, "future": fut})
res = await fut
else:
raise ValueError(f"Unknown command type: {type}")
return res
[docs]
async def close(self) -> None:
if not self._active or self._actor_task is None:
return
self._shutdown_future = asyncio.Future()
await self._command_queue.put({"type": "shutdown", "future": self._shutdown_future})
await self._shutdown_future
await self._actor_task
self._active = False
async def _run_actor(self) -> None:
result: McpResult
try:
async with create_mcp_server_session(self.server_params) as session:
await session.initialize()
while True:
cmd = await self._command_queue.get()
if cmd["type"] == "shutdown":
cmd["future"].set_result("ok")
break
elif cmd["type"] == "call_tool":
try:
result = session.call_tool(name=cmd["name"], arguments=cmd["args"])
cmd["future"].set_result(result)
except Exception as e:
cmd["future"].set_exception(e)
elif cmd["type"] == "list_tools":
try:
result = session.list_tools()
cmd["future"].set_result(result)
except Exception as e:
cmd["future"].set_exception(e)
except Exception as e:
if self._shutdown_future and not self._shutdown_future.done():
self._shutdown_future.set_exception(e)
finally:
self._active = False
self._actor_task = None
def _sync_shutdown(self) -> None:
if not self._active or self._actor_task is None:
return
try:
loop = asyncio.get_event_loop()
except RuntimeError:
# No loop available — interpreter is likely shutting down
return
if loop.is_closed():
return
if loop.is_running():
loop.create_task(self.close())
else:
loop.run_until_complete(self.close())
def _to_config(self) -> McpSessionActorConfig:
"""
Convert the adapter to its configuration representation.
Returns:
McpSessionConfig: The configuration of the adapter.
"""
return McpSessionActorConfig(server_params=self.server_params)
@classmethod
def _from_config(cls, config: McpSessionActorConfig) -> Self:
"""
Create an instance of McpSessionActor from its configuration.
Args:
config (McpSessionConfig): The configuration of the adapter.
Returns:
McpSessionActor: An instance of SseMcpToolAdapter.
"""
return cls(server_params=config.server_params)