Source code for autogen_core.base._topic

from dataclasses import dataclass

from typing_extensions import Self


[docs] @dataclass(eq=True, frozen=True) class TopicId: type: str """Type of the event that this topic_id contains. Adhere's to the cloud event spec. Learn more here: https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#type """ source: str """Identifies the context in which an event happened. Adhere's to the cloud event spec. Learn more here: https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#source-1 """ def __str__(self) -> str: return f"{self.type}/{self.source}"
[docs] @classmethod def from_str(cls, topic_id: str) -> Self: items = topic_id.split("/", maxsplit=1) if len(items) != 2: raise ValueError(f"Invalid topic id: {topic_id}") type, source = items[0], items[1] return cls(type, source)