[docs]classGrpcWorkerAgentRuntimeHostServicer(agent_worker_pb2_grpc.AgentRpcServicer):"""A gRPC servicer that hosts message delivery service for agents."""def__init__(self)->None:self._client_id=0self._client_id_lock=asyncio.Lock()self._send_queues:Dict[int,asyncio.Queue[agent_worker_pb2.Message]]={}self._agent_type_to_client_id_lock=asyncio.Lock()self._agent_type_to_client_id:Dict[str,int]={}self._pending_responses:Dict[int,Dict[str,Future[Any]]]={}self._background_tasks:Set[Task[Any]]=set()self._subscription_manager=SubscriptionManager()self._client_id_to_subscription_id_mapping:Dict[int,set[str]]={}
[docs]asyncdefOpenChannel(# type: ignoreself,request_iterator:AsyncIterator[agent_worker_pb2.Message],context:grpc.aio.ServicerContext[agent_worker_pb2.Message,agent_worker_pb2.Message],)->Iterator[agent_worker_pb2.Message]|AsyncIterator[agent_worker_pb2.Message]:# type: ignore# Aquire the lock to get a new client id.asyncwithself._client_id_lock:self._client_id+=1client_id=self._client_id# Register the client with the server and create a send queue for the client.send_queue:asyncio.Queue[agent_worker_pb2.Message]=asyncio.Queue()self._send_queues[client_id]=send_queuelogger.info(f"Client {client_id} connected.")try:# Concurrently handle receiving messages from the client and sending messages to the client.# This task will receive messages from the client.receiving_task=asyncio.create_task(self._receive_messages(client_id,request_iterator))# Return an async generator that will yield messages from the send queue to the client.whileTrue:message=awaitsend_queue.get()# Yield the message to the client.try:yieldmessageexceptExceptionase:logger.error(f"Failed to send message to client {client_id}: {e}",exc_info=True)breaklogger.info(f"Sent message to client {client_id}: {message}")# Wait for the receiving task to finish.awaitreceiving_taskfinally:# Clean up the client connection.delself._send_queues[client_id]# Cancel pending requests sent to this client.forfutureinself._pending_responses.pop(client_id,{}).values():future.cancel()# Remove the client id from the agent type to client id mapping.awaitself._on_client_disconnect(client_id)
asyncdef_on_client_disconnect(self,client_id:int)->None:asyncwithself._agent_type_to_client_id_lock:agent_types=[agent_typeforagent_type,id_inself._agent_type_to_client_id.items()ifid_==client_id]foragent_typeinagent_types:logger.info(f"Removing agent type {agent_type} from agent type to client id mapping")delself._agent_type_to_client_id[agent_type]forsub_idinself._client_id_to_subscription_id_mapping.get(client_id,set()):logger.info(f"Client id {client_id} disconnected. Removing corresponding subscription with id {id}")awaitself._subscription_manager.remove_subscription(sub_id)logger.info(f"Client {client_id} disconnected successfully")def_raise_on_exception(self,task:Task[Any])->None:exception=task.exception()ifexceptionisnotNone:raiseexceptionasyncdef_receive_messages(self,client_id:int,request_iterator:AsyncIterator[agent_worker_pb2.Message])->None:# Receive messages from the client and process them.asyncformessageinrequest_iterator:logger.info(f"Received message from client {client_id}: {message}")oneofcase=message.WhichOneof("message")matchoneofcase:case"request":request:agent_worker_pb2.RpcRequest=message.requesttask=asyncio.create_task(self._process_request(request,client_id))self._background_tasks.add(task)task.add_done_callback(self._raise_on_exception)task.add_done_callback(self._background_tasks.discard)case"response":response:agent_worker_pb2.RpcResponse=message.responsetask=asyncio.create_task(self._process_response(response,client_id))self._background_tasks.add(task)task.add_done_callback(self._raise_on_exception)task.add_done_callback(self._background_tasks.discard)case"cloudEvent":# The proto typing doesnt resolve this oneevent=cast(cloudevent_pb2.CloudEvent,message.cloudEvent)# type: ignoretask=asyncio.create_task(self._process_event(event))self._background_tasks.add(task)task.add_done_callback(self._raise_on_exception)task.add_done_callback(self._background_tasks.discard)case"registerAgentTypeRequest":register_agent_type:agent_worker_pb2.RegisterAgentTypeRequest=message.registerAgentTypeRequesttask=asyncio.create_task(self._process_register_agent_type_request(register_agent_type,client_id))self._background_tasks.add(task)task.add_done_callback(self._raise_on_exception)task.add_done_callback(self._background_tasks.discard)case"addSubscriptionRequest":add_subscription:agent_worker_pb2.AddSubscriptionRequest=message.addSubscriptionRequesttask=asyncio.create_task(self._process_add_subscription_request(add_subscription,client_id))self._background_tasks.add(task)task.add_done_callback(self._raise_on_exception)task.add_done_callback(self._background_tasks.discard)case"registerAgentTypeResponse"|"addSubscriptionResponse":logger.warning(f"Received unexpected message type: {oneofcase}")caseNone:logger.warning("Received empty message")asyncdef_process_request(self,request:agent_worker_pb2.RpcRequest,client_id:int)->None:# Deliver the message to a client given the target agent type.asyncwithself._agent_type_to_client_id_lock:target_client_id=self._agent_type_to_client_id.get(request.target.type)iftarget_client_idisNone:logger.error(f"Agent {request.target.type} not found, failed to deliver message.")returntarget_send_queue=self._send_queues.get(target_client_id)iftarget_send_queueisNone:logger.error(f"Client {target_client_id} not found, failed to deliver message.")returnawaittarget_send_queue.put(agent_worker_pb2.Message(request=request))# Create a future to wait for the response from the target.future=asyncio.get_event_loop().create_future()self._pending_responses.setdefault(target_client_id,{})[request.request_id]=future# Create a task to wait for the response and send it back to the client.send_response_task=asyncio.create_task(self._wait_and_send_response(future,client_id))self._background_tasks.add(send_response_task)send_response_task.add_done_callback(self._raise_on_exception)send_response_task.add_done_callback(self._background_tasks.discard)asyncdef_wait_and_send_response(self,future:Future[agent_worker_pb2.RpcResponse],client_id:int)->None:response=awaitfuturemessage=agent_worker_pb2.Message(response=response)send_queue=self._send_queues.get(client_id)ifsend_queueisNone:logger.error(f"Client {client_id} not found, failed to send response message.")returnawaitsend_queue.put(message)asyncdef_process_response(self,response:agent_worker_pb2.RpcResponse,client_id:int)->None:# Setting the result of the future will send the response back to the original sender.future=self._pending_responses[client_id].pop(response.request_id)future.set_result(response)asyncdef_process_event(self,event:cloudevent_pb2.CloudEvent)->None:topic_id=TopicId(type=event.type,source=event.source)recipients=awaitself._subscription_manager.get_subscribed_recipients(topic_id)# Get the client ids of the recipients.asyncwithself._agent_type_to_client_id_lock:client_ids:Set[int]=set()forrecipientinrecipients:client_id=self._agent_type_to_client_id.get(recipient.type)ifclient_idisnotNone:client_ids.add(client_id)else:logger.error(f"Agent {recipient.type} and its client not found for topic {topic_id}.")# Deliver the event to clients.forclient_idinclient_ids:awaitself._send_queues[client_id].put(agent_worker_pb2.Message(cloudEvent=event))asyncdef_process_register_agent_type_request(self,register_agent_type_req:agent_worker_pb2.RegisterAgentTypeRequest,client_id:int)->None:# Register the agent type with the host runtime.asyncwithself._agent_type_to_client_id_lock:ifregister_agent_type_req.typeinself._agent_type_to_client_id:existing_client_id=self._agent_type_to_client_id[register_agent_type_req.type]logger.error(f"Agent type {register_agent_type_req.type} already registered with client {existing_client_id}.")success=Falseerror=f"Agent type {register_agent_type_req.type} already registered."else:self._agent_type_to_client_id[register_agent_type_req.type]=client_idsuccess=Trueerror=None# Send a response back to the client.awaitself._send_queues[client_id].put(agent_worker_pb2.Message(registerAgentTypeResponse=agent_worker_pb2.RegisterAgentTypeResponse(request_id=register_agent_type_req.request_id,success=success,error=error)))asyncdef_process_add_subscription_request(self,add_subscription_req:agent_worker_pb2.AddSubscriptionRequest,client_id:int)->None:oneofcase=add_subscription_req.subscription.WhichOneof("subscription")subscription:Subscription|None=Nonematchoneofcase:case"typeSubscription":type_subscription_msg:agent_worker_pb2.TypeSubscription=(add_subscription_req.subscription.typeSubscription)subscription=TypeSubscription(topic_type=type_subscription_msg.topic_type,agent_type=type_subscription_msg.agent_type)case"typePrefixSubscription":type_prefix_subscription_msg:agent_worker_pb2.TypePrefixSubscription=(add_subscription_req.subscription.typePrefixSubscription)subscription=TypePrefixSubscription(topic_type_prefix=type_prefix_subscription_msg.topic_type_prefix,agent_type=type_prefix_subscription_msg.agent_type,)caseNone:logger.warning("Received empty subscription message")ifsubscriptionisnotNone:try:awaitself._subscription_manager.add_subscription(subscription)subscription_ids=self._client_id_to_subscription_id_mapping.setdefault(client_id,set())subscription_ids.add(subscription.id)success=Trueerror=NoneexceptValueErrorase:success=Falseerror=str(e)# Send a response back to the client.awaitself._send_queues[client_id].put(agent_worker_pb2.Message(addSubscriptionResponse=agent_worker_pb2.AddSubscriptionResponse(request_id=add_subscription_req.request_id,success=success,error=error)))
[docs]asyncdefGetState(# type: ignoreself,request:agent_worker_pb2.AgentId,context:grpc.aio.ServicerContext[agent_worker_pb2.AgentId,agent_worker_pb2.GetStateResponse],)->agent_worker_pb2.GetStateResponse:# type: ignoreraiseNotImplementedError("Method not implemented!")
[docs]asyncdefSaveState(# type: ignoreself,request:agent_worker_pb2.AgentState,context:grpc.aio.ServicerContext[agent_worker_pb2.AgentId,agent_worker_pb2.SaveStateResponse],)->agent_worker_pb2.SaveStateResponse:# type: ignoreraiseNotImplementedError("Method not implemented!")