Source code for autogen_core.base.intervention

from typing import Any, Awaitable, Callable, Protocol, final

from autogen_core.base import AgentId

__all__ = [
    "DropMessage",
    "InterventionFunction",
    "InterventionHandler",
    "DefaultInterventionHandler",
]


[docs] @final class DropMessage: ...
InterventionFunction = Callable[[Any], Any | Awaitable[type[DropMessage]]]
[docs] class InterventionHandler(Protocol):
[docs] async def on_send(self, message: Any, *, sender: AgentId | None, recipient: AgentId) -> Any | type[DropMessage]: ...
[docs] async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any | type[DropMessage]: ...
[docs] async def on_response(
self, message: Any, *, sender: AgentId, recipient: AgentId | None ) -> Any | type[DropMessage]: ...
[docs] class DefaultInterventionHandler(InterventionHandler):
[docs] async def on_send(self, message: Any, *, sender: AgentId | None, recipient: AgentId) -> Any | type[DropMessage]: return message
[docs] async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any | type[DropMessage]: return message
[docs] async def on_response(self, message: Any, *, sender: AgentId, recipient: AgentId | None) -> Any | type[DropMessage]: return message