Source code for autogen_core._topic

import re
from dataclasses import dataclass

from typing_extensions import Self


def is_valid_topic_type(value: str) -> bool:
    return bool(re.match(r"^[\w\-\.\:\=]+\Z", value))


[docs] @dataclass(eq=True, frozen=True) class TopicId: type: str """Type of the event that this topic_id contains. Adhere's to the cloud event spec. Must match the pattern: ^[\\w\\-\\.\\:\\=]+\\Z Learn more here: https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#type """ source: str """Identifies the context in which an event happened. Adhere's to the cloud event spec. Learn more here: https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#source-1 """ def __post_init__(self) -> None: if is_valid_topic_type(self.type) is False: raise ValueError(f"Invalid topic type: {self.type}. Must match the pattern: ^[\\w\\-\\.\\:\\=]+\\Z") def __str__(self) -> str: return f"{self.type}/{self.source}"
[docs] @classmethod def from_str(cls, topic_id: str) -> Self: items = topic_id.split("/", maxsplit=1) if len(items) != 2: raise ValueError(f"Invalid topic id: {topic_id}") type, source = items[0], items[1] return cls(type, source)