Source code for autogen_ext.runtimes.grpc._worker_runtime_host

import asyncio
import logging
import signal
from typing import Optional, Sequence

from ._constants import GRPC_IMPORT_ERROR_STR
from ._type_helpers import ChannelArgumentType
from ._worker_runtime_host_servicer import GrpcWorkerAgentRuntimeHostServicer

try:
    import grpc
except ImportError as e:
    raise ImportError(GRPC_IMPORT_ERROR_STR) from e
from .protos import agent_worker_pb2_grpc

logger = logging.getLogger("autogen_core")


[docs] class GrpcWorkerAgentRuntimeHost: def __init__(self, address: str, extra_grpc_config: Optional[ChannelArgumentType] = None) -> None: self._server = grpc.aio.server(options=extra_grpc_config) self._servicer = GrpcWorkerAgentRuntimeHostServicer() agent_worker_pb2_grpc.add_AgentRpcServicer_to_server(self._servicer, self._server) self._server.add_insecure_port(address) self._address = address self._serve_task: asyncio.Task[None] | None = None async def _serve(self) -> None: await self._server.start() logger.info(f"Server started at {self._address}.") await self._server.wait_for_termination()
[docs] def start(self) -> None: """Start the server in a background task.""" if self._serve_task is not None: raise RuntimeError("Host runtime is already started.") self._serve_task = asyncio.create_task(self._serve())
[docs] async def stop(self, grace: int = 5) -> None: """Stop the server.""" if self._serve_task is None: raise RuntimeError("Host runtime is not started.") await self._server.stop(grace=grace) self._serve_task.cancel() try: await self._serve_task except asyncio.CancelledError: pass logger.info("Server stopped.") self._serve_task = None
[docs] async def stop_when_signal( self, grace: int = 5, signals: Sequence[signal.Signals] = (signal.SIGTERM, signal.SIGINT) ) -> None: """Stop the server when a signal is received.""" if self._serve_task is None: raise RuntimeError("Host runtime is not started.") # Set up signal handling for graceful shutdown. loop = asyncio.get_running_loop() shutdown_event = asyncio.Event() def signal_handler() -> None: logger.info("Received exit signal, shutting down gracefully...") shutdown_event.set() for sig in signals: loop.add_signal_handler(sig, signal_handler) # Wait for the signal to trigger the shutdown event. await shutdown_event.wait() # Shutdown the server. await self.stop(grace=grace)