[docs]classGrpcWorkerAgentRuntimeHost:def__init__(self,address:str,extra_grpc_config:Optional[ChannelArgumentType]=None)->None:self._server=grpc.aio.server(options=extra_grpc_config)self._servicer=GrpcWorkerAgentRuntimeHostServicer()agent_worker_pb2_grpc.add_AgentRpcServicer_to_server(self._servicer,self._server)self._server.add_insecure_port(address)self._address=addressself._serve_task:asyncio.Task[None]|None=Noneasyncdef_serve(self)->None:awaitself._server.start()logger.info(f"Server started at {self._address}.")awaitself._server.wait_for_termination()
[docs]defstart(self)->None:"""Start the server in a background task."""ifself._serve_taskisnotNone:raiseRuntimeError("Host runtime is already started.")self._serve_task=asyncio.create_task(self._serve())
[docs]asyncdefstop(self,grace:int=5)->None:"""Stop the server."""ifself._serve_taskisNone:raiseRuntimeError("Host runtime is not started.")awaitself._server.stop(grace=grace)self._serve_task.cancel()try:awaitself._serve_taskexceptasyncio.CancelledError:passlogger.info("Server stopped.")self._serve_task=None
[docs]asyncdefstop_when_signal(self,grace:int=5,signals:Sequence[signal.Signals]=(signal.SIGTERM,signal.SIGINT))->None:"""Stop the server when a signal is received."""ifself._serve_taskisNone:raiseRuntimeError("Host runtime is not started.")# Set up signal handling for graceful shutdown.loop=asyncio.get_running_loop()shutdown_event=asyncio.Event()defsignal_handler()->None:logger.info("Received exit signal, shutting down gracefully...")shutdown_event.set()forsiginsignals:loop.add_signal_handler(sig,signal_handler)# Wait for the signal to trigger the shutdown event.awaitshutdown_event.wait()# Shutdown the server.awaitself.stop(grace=grace)