Source code for microsoft.opentelemetry.a365.core.spans_scopes.output_scope

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

from opentelemetry.trace import SpanKind

from microsoft.opentelemetry.a365.core.agent_details import AgentDetails
from microsoft.opentelemetry.a365.core.constants import (
    GEN_AI_CALLER_CLIENT_IP_KEY,
    GEN_AI_CONVERSATION_ID_KEY,
    GEN_AI_OUTPUT_MESSAGES_KEY,
    USER_EMAIL_KEY,
    USER_ID_KEY,
    USER_NAME_KEY,
)
from microsoft.opentelemetry.a365.core.message_utils import normalize_output_messages, serialize_messages
from microsoft.opentelemetry.a365.core.models.messages import OutputMessages
from microsoft.opentelemetry.a365.core.models.response import Response, ResponseMessagesParam
from microsoft.opentelemetry.a365.core.models.user_details import UserDetails
from microsoft.opentelemetry.a365.core.opentelemetry_scope import OpenTelemetryScope
from microsoft.opentelemetry.a365.core.request import Request
from microsoft.opentelemetry.a365.core.span_details import SpanDetails
from microsoft.opentelemetry.a365.core.utils import safe_json_dumps, validate_and_normalize_ip

OUTPUT_OPERATION_NAME = "output_messages"


[docs] class OutputScope(OpenTelemetryScope): """Provides OpenTelemetry tracing scope for output messages. Output messages are set once (via the constructor or ``record_output_messages``) rather than accumulated. For streaming scenarios, the agent developer should collect all output (e.g. via a list or string builder) and pass the final result to ``OutputScope``. """
[docs] @staticmethod def start( request: Request, response: Response, agent_details: AgentDetails, user_details: UserDetails | None = None, span_details: SpanDetails | None = None, ) -> "OutputScope": """Creates and starts a new scope for output tracing. Args: request: Request details for the output response: The response details from the agent agent_details: The details of the agent user_details: Optional human user details span_details: Optional span configuration (parent context, timing) Returns: A new OutputScope instance """ return OutputScope(request, response, agent_details, user_details, span_details)
def __init__( self, request: Request, response: Response, agent_details: AgentDetails, user_details: UserDetails | None = None, span_details: SpanDetails | None = None, ): """Initialize the output scope. Args: request: Request details for the output response: The response details from the agent agent_details: The details of the agent user_details: Optional human user details span_details: Optional span configuration (parent context, timing) """ # spanKind for OutputScope is always CLIENT resolved_span_details = ( SpanDetails( span_kind=SpanKind.CLIENT, parent_context=span_details.parent_context if span_details else None, start_time=span_details.start_time if span_details else None, end_time=span_details.end_time if span_details else None, span_links=span_details.span_links if span_details else None, ) if span_details else SpanDetails(span_kind=SpanKind.CLIENT) ) super().__init__( operation_name=OUTPUT_OPERATION_NAME, activity_name=(f"{OUTPUT_OPERATION_NAME} {agent_details.agent_id}"), agent_details=agent_details, span_details=resolved_span_details, ) self.set_tag_maybe(GEN_AI_CONVERSATION_ID_KEY, request.conversation_id) self._set_output(response.messages) # Set user details if provided if user_details: self.set_tag_maybe(USER_ID_KEY, user_details.user_id) self.set_tag_maybe(USER_EMAIL_KEY, user_details.user_email) self.set_tag_maybe(USER_NAME_KEY, user_details.user_name) self.set_tag_maybe( GEN_AI_CALLER_CLIENT_IP_KEY, validate_and_normalize_ip(user_details.user_client_ip), ) def _set_output(self, messages: ResponseMessagesParam) -> None: """Serialize and set the output messages attribute on the span.""" if isinstance(messages, dict): self.set_tag_maybe(GEN_AI_OUTPUT_MESSAGES_KEY, safe_json_dumps(messages)) else: normalized = normalize_output_messages(messages) wrapper = OutputMessages(messages=list(normalized.messages)) self.set_tag_maybe(GEN_AI_OUTPUT_MESSAGES_KEY, serialize_messages(wrapper))
[docs] def record_output_messages(self, messages: ResponseMessagesParam) -> None: """Records the output messages for telemetry tracking. Overwrites any previously set output messages. Accepts a single string, a list of strings (auto-wrapped as OTEL OutputMessage), a versioned ``OutputMessages`` wrapper, or a ``dict[str, object]`` for tool call results (per OTEL spec). Args: messages: String(s), OutputMessages, or dict for tool call results """ self._set_output(messages)