Source code for autogen_core.base._serialization

import json
from dataclasses import asdict, dataclass, fields
from typing import Any, ClassVar, Dict, List, Protocol, Sequence, TypeVar, cast, get_args, get_origin, runtime_checkable

from google.protobuf.message import Message
from pydantic import BaseModel

from autogen_core.base._type_helpers import is_union

T = TypeVar("T")


[docs] class MessageSerializer(Protocol[T]): @property def data_content_type(self) -> str: ... @property def type_name(self) -> str: ...
[docs] def deserialize(self, payload: bytes) -> T: ...
[docs] def serialize(self, message: T) -> bytes: ...
@runtime_checkable class IsDataclass(Protocol): # as already noted in comments, checking for this attribute is currently # the most reliable way to ascertain that something is a dataclass __dataclass_fields__: ClassVar[Dict[str, Any]] def is_dataclass(cls: type[Any]) -> bool: return hasattr(cls, "__dataclass_fields__") def has_nested_dataclass(cls: type[IsDataclass]) -> bool: # iterate fields and check if any of them are dataclasses return any(is_dataclass(f.type) for f in cls.__dataclass_fields__.values()) def contains_a_union(cls: type[IsDataclass]) -> bool: return any(is_union(f.type) for f in cls.__dataclass_fields__.values()) def has_nested_base_model(cls: type[IsDataclass]) -> bool: for f in fields(cls): field_type = f.type # Resolve forward references and other annotations origin = get_origin(field_type) args = get_args(field_type) # If the field type is directly a subclass of BaseModel if isinstance(field_type, type) and issubclass(field_type, BaseModel): return True # If the field type is a generic type like List[BaseModel], Tuple[BaseModel, ...], etc. if origin is not None and args: for arg in args: # Recursively check the argument types if isinstance(arg, type) and issubclass(arg, BaseModel): return True elif get_origin(arg) is not None: # Handle nested generics like List[List[BaseModel]] if has_nested_base_model_in_type(arg): return True # Handle Union types elif args: for arg in args: if isinstance(arg, type) and issubclass(arg, BaseModel): return True elif get_origin(arg) is not None: if has_nested_base_model_in_type(arg): return True return False def has_nested_base_model_in_type(tp: Any) -> bool: """Helper function to check if a type or its arguments is a BaseModel subclass.""" origin = get_origin(tp) args = get_args(tp) if isinstance(tp, type) and issubclass(tp, BaseModel): return True if origin is not None and args: for arg in args: if has_nested_base_model_in_type(arg): return True return False DataclassT = TypeVar("DataclassT", bound=IsDataclass) JSON_DATA_CONTENT_TYPE = "application/json" # TODO: what's the correct content type? There seems to be some disagreement over what it should be PROTOBUF_DATA_CONTENT_TYPE = "application/x-protobuf" class DataclassJsonMessageSerializer(MessageSerializer[DataclassT]): def __init__(self, cls: type[DataclassT]) -> None: if contains_a_union(cls): raise ValueError("Dataclass has a union type, which is not supported. To use a union, use a Pydantic model") if has_nested_dataclass(cls) or has_nested_base_model(cls): raise ValueError( "Dataclass has nested dataclasses or base models, which are not supported. To use nested types, use a Pydantic model" ) self.cls = cls @property def data_content_type(self) -> str: return JSON_DATA_CONTENT_TYPE @property def type_name(self) -> str: return _type_name(self.cls) def deserialize(self, payload: bytes) -> DataclassT: message_str = payload.decode("utf-8") return self.cls(**json.loads(message_str)) def serialize(self, message: DataclassT) -> bytes: return json.dumps(asdict(message)).encode("utf-8") PydanticT = TypeVar("PydanticT", bound=BaseModel) class PydanticJsonMessageSerializer(MessageSerializer[PydanticT]): def __init__(self, cls: type[PydanticT]) -> None: self.cls = cls @property def data_content_type(self) -> str: return JSON_DATA_CONTENT_TYPE @property def type_name(self) -> str: return _type_name(self.cls) def deserialize(self, payload: bytes) -> PydanticT: message_str = payload.decode("utf-8") return self.cls.model_validate_json(message_str) def serialize(self, message: PydanticT) -> bytes: return message.model_dump_json().encode("utf-8") ProtobufT = TypeVar("ProtobufT", bound=Message) class ProtobufMessageSerializer(MessageSerializer[ProtobufT]): def __init__(self, cls: type[ProtobufT]) -> None: self.cls = cls @property def data_content_type(self) -> str: # TODO: This should be PROTOBUF_DATA_CONTENT_TYPE. There are currently # a couple of hard coded places where the system assumes the # content is JSON_DATA_CONTENT_TYPE which will need to be fixed # first. return JSON_DATA_CONTENT_TYPE @property def type_name(self) -> str: return _type_name(self.cls) def deserialize(self, payload: bytes) -> ProtobufT: ret = self.cls() ret.ParseFromString(payload) return ret def serialize(self, message: ProtobufT) -> bytes: return message.SerializeToString()
[docs] @dataclass class UnknownPayload: type_name: str data_content_type: str payload: bytes
def _type_name(cls: type[Any] | Any) -> str: if isinstance(cls, type): return cls.__name__ else: return cast(str, cls.__class__.__name__) V = TypeVar("V")
[docs] def try_get_known_serializers_for_type(cls: type[Any]) -> list[MessageSerializer[Any]]: serializers: List[MessageSerializer[Any]] = [] if issubclass(cls, BaseModel): serializers.append(PydanticJsonMessageSerializer(cls)) elif isinstance(cls, IsDataclass): serializers.append(DataclassJsonMessageSerializer(cls)) elif issubclass(cls, Message): serializers.append(ProtobufMessageSerializer(cls)) return serializers
[docs] class SerializationRegistry: def __init__(self) -> None: # type_name, data_content_type -> serializer self._serializers: dict[tuple[str, str], MessageSerializer[Any]] = {}
[docs] def add_serializer(self, serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) -> None: if isinstance(serializer, Sequence): for c in serializer: self.add_serializer(c) return self._serializers[(serializer.type_name, serializer.data_content_type)] = serializer
[docs] def deserialize(self, payload: bytes, *, type_name: str, data_content_type: str) -> Any: serializer = self._serializers.get((type_name, data_content_type)) if serializer is None: return UnknownPayload(type_name, data_content_type, payload) return serializer.deserialize(payload)
[docs] def serialize(self, message: Any, *, type_name: str, data_content_type: str) -> bytes: serializer = self._serializers.get((type_name, data_content_type)) if serializer is None: raise ValueError(f"Unknown type {type_name} with content type {data_content_type}") return serializer.serialize(message)
[docs] def is_registered(self, type_name: str, data_content_type: str) -> bool: return (type_name, data_content_type) in self._serializers
[docs] def type_name(self, message: Any) -> str: return _type_name(message)