Source code for autogen_core.base.intervention
from typing import Any, Awaitable, Callable, Protocol, final
from autogen_core.base import AgentId
__all__ = [
"DropMessage",
"InterventionFunction",
"InterventionHandler",
"DefaultInterventionHandler",
]
[docs]
@final
class DropMessage: ...
InterventionFunction = Callable[[Any], Any | Awaitable[type[DropMessage]]]
[docs]
class InterventionHandler(Protocol):
[docs]
async def on_send(self, message: Any, *, sender: AgentId | None, recipient: AgentId) -> Any | type[DropMessage]: ...
[docs]
async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any | type[DropMessage]: ...
self, message: Any, *, sender: AgentId, recipient: AgentId | None
) -> Any | type[DropMessage]: ...
[docs]
class DefaultInterventionHandler(InterventionHandler):
[docs]
async def on_send(self, message: Any, *, sender: AgentId | None, recipient: AgentId) -> Any | type[DropMessage]:
return message
[docs]
async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any | type[DropMessage]:
return message
[docs]
async def on_response(self, message: Any, *, sender: AgentId, recipient: AgentId | None) -> Any | type[DropMessage]:
return message