Source code for microsoft.opentelemetry.a365.core.exporters.utils

# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

"""Utilities for the Agent365 exporter.

Vendored from microsoft-agents-a365-observability-core exporters/utils.py.
"""

from __future__ import annotations

import json
import logging
import os
import threading
import time
from collections.abc import Callable, Mapping, Sequence
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, List, Optional, TypeVar
from urllib.parse import urlparse

from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanProcessor
from opentelemetry.trace import SpanKind, StatusCode

from microsoft.opentelemetry.a365.constants import (
    A365_AGENT_APP_INSTANCE_ID_ENV,
    A365_AGENTIC_USER_ID_ENV,
    A365_CLUSTER_CATEGORY_ENV,
    A365_HTTP_TIMEOUT_SECONDS,
    A365_OBSERVABILITY_DOMAIN_OVERRIDE,
    A365_SERVICE_CLIENT_ID_ENV,
    A365_SERVICE_CLIENT_SECRET_ENV,
    A365_SERVICE_TENANT_ID_ENV,
    A365_SUPPRESS_INVOKE_AGENT_INPUT_ENV,
    A365_USE_S2S_ENDPOINT_ENV,
    APPLY_GUARDRAIL_OPERATION_NAME,
    CHAT_OPERATION_NAME,
    ENABLE_A365_OBSERVABILITY_EXPORTER,
    EXECUTE_TOOL_OPERATION_NAME,
    GEN_AI_AGENT_ID_KEY,
    GEN_AI_OPERATION_NAME_KEY,
    INVOKE_AGENT_OPERATION_NAME,
    OUTPUT_MESSAGES_OPERATION_NAME,
    TENANT_ID_KEY,
)
from microsoft.opentelemetry.a365.core.inference_operation_type import InferenceOperationType

if TYPE_CHECKING:
    from microsoft.opentelemetry.a365.core.exporters.token_resolver_context import TokenResolverContext

logger = logging.getLogger(__name__)

# Maximum allowed span size in bytes (250KB)
MAX_SPAN_SIZE_BYTES = 250 * 1024

# Operation names that identify a span as eligible for export to the Agent 365
# observability ingest service. Only spans whose gen_ai.operation.name matches
# one of these values are included; all other spans are filtered out.
GEN_AI_OPERATION_NAMES: frozenset[str] = frozenset(
    {
        INVOKE_AGENT_OPERATION_NAME,
        EXECUTE_TOOL_OPERATION_NAME,
        OUTPUT_MESSAGES_OPERATION_NAME,
        CHAT_OPERATION_NAME,
        APPLY_GUARDRAIL_OPERATION_NAME,
        InferenceOperationType.CHAT.value,
    }
)


# pylint: disable=broad-exception-caught, too-many-return-statements
[docs] def hex_trace_id(value: int) -> str: """Convert a 128-bit trace ID to a 32-character hex string.""" return f"{value:032x}"
[docs] def hex_span_id(value: int) -> str: """Convert a 64-bit span ID to a 16-character hex string.""" return f"{value:016x}"
def _as_str(v: Any) -> str | None: if v is None: return None s = str(v) return s if s.strip() else None
[docs] def kind_name(kind: SpanKind) -> str: """Return span kind name (enum name or numeric).""" try: return kind.name except Exception: return str(kind)
[docs] def status_name(code: StatusCode) -> str: """Return status code name.""" try: return code.name except Exception: return str(code)
[docs] def truncate_span(span_dict: dict[str, Any]) -> dict[str, Any]: """Truncate span attributes if the serialized span exceeds MAX_SPAN_SIZE_BYTES. Removes the largest attributes first until the span fits within the limit. """ try: serialized = json.dumps(span_dict, separators=(",", ":")) current_size = len(serialized.encode("utf-8")) if current_size <= MAX_SPAN_SIZE_BYTES: return span_dict logger.warning( "Span size (%d bytes) exceeds limit (%d bytes). Truncating large payload attributes.", current_size, MAX_SPAN_SIZE_BYTES, ) truncated_span = span_dict.copy() if "attributes" in truncated_span: truncated_span["attributes"] = truncated_span["attributes"].copy() attributes = truncated_span.get("attributes", {}) truncated_keys: list[str] = [] if attributes: attr_sizes: list[tuple[str, int]] = [] for key, value in attributes.items(): try: value_size = len(json.dumps(value, separators=(",", ":")).encode("utf-8")) attr_sizes.append((key, value_size)) except Exception: attr_sizes.append((key, 0)) attr_sizes.sort(key=lambda x: x[1], reverse=True) for key, _ in attr_sizes: if key in attributes: attributes[key] = "TRUNCATED" truncated_keys.append(key) serialized = json.dumps(truncated_span, separators=(",", ":")) current_size = len(serialized.encode("utf-8")) if current_size <= MAX_SPAN_SIZE_BYTES: break if truncated_keys: logger.info("Truncated attributes: %s", ", ".join(truncated_keys)) return truncated_span except Exception as e: logger.error("Error during span truncation: %s", e) return span_dict
[docs] def filter_and_partition_by_identity( spans: Sequence[ReadableSpan], ) -> dict[tuple[str, str], list[ReadableSpan]]: """Filter export-eligible spans and partition them by (tenantId, agentId). Only spans whose ``gen_ai.operation.name`` is in ``GEN_AI_OPERATION_NAMES`` are included; non-genAI spans (e.g. HTTP, DB) and spans with other operation names are filtered out. Spans without both tenant and agent identity are also skipped. """ groups: dict[tuple[str, str], list[ReadableSpan]] = {} non_gen_ai_count = 0 missing_identity_count = 0 for sp in spans: attrs = sp.attributes or {} operation_name = _as_str(attrs.get(GEN_AI_OPERATION_NAME_KEY)) if not operation_name or operation_name not in GEN_AI_OPERATION_NAMES: non_gen_ai_count += 1 continue tenant = _as_str(attrs.get(TENANT_ID_KEY)) agent = _as_str(attrs.get(GEN_AI_AGENT_ID_KEY)) if not tenant or not agent: missing_identity_count += 1 continue key = (tenant, agent) groups.setdefault(key, []).append(sp) if non_gen_ai_count > 0: logger.debug( "[Agent365Exporter] %d spans without an eligible gen_ai.operation.name filtered out", non_gen_ai_count, ) if missing_identity_count > 0: logger.debug( "[Agent365Exporter] %d spans skipped due to missing tenant or agent ID", missing_identity_count, ) return groups
[docs] def get_validated_domain_override() -> str | None: """Get and validate the domain override from environment variable. Returns the validated domain override, or None if not set or invalid. """ domain_override = os.getenv(A365_OBSERVABILITY_DOMAIN_OVERRIDE, "").strip() if not domain_override: return None try: parsed = urlparse(domain_override) if parsed.scheme and "://" in domain_override: if parsed.scheme not in ("http", "https"): logger.warning( "Invalid domain override '%s': scheme must be http or https, got '%s'", domain_override, parsed.scheme, ) return None if not parsed.netloc: logger.warning("Invalid domain override '%s': missing hostname", domain_override) return None else: if domain_override.startswith(("http:", "https:")) and "://" not in domain_override: logger.warning( "Invalid domain override '%s': malformed URL - protocol requires '://'", domain_override, ) return None if "/" in domain_override: logger.warning( "Invalid domain override '%s': domain without protocol should not contain path separators (/)", domain_override, ) return None except Exception as e: logger.warning("Invalid domain override '%s': %s", domain_override, e) return None if domain_override.lower().startswith("http://"): logger.warning( "Domain override uses insecure HTTP. Telemetry data (including " "bearer tokens) will be transmitted in cleartext." ) return domain_override
[docs] def build_export_url(endpoint: str, agent_id: str, tenant_id: str, use_s2s_endpoint: bool = False) -> str: """Construct the full export URL from endpoint and agent ID.""" endpoint_path = ( f"/observabilityService/tenants/{tenant_id}/otlp/agents/{agent_id}/traces" if use_s2s_endpoint else f"/observability/tenants/{tenant_id}/otlp/agents/{agent_id}/traces" ) parsed = urlparse(endpoint) if parsed.scheme and "://" in endpoint: return f"{endpoint}{endpoint_path}?api-version=1" return f"https://{endpoint}{endpoint_path}?api-version=1"
[docs] def parse_retry_after(headers: Mapping[str, str]) -> float | None: """Parse the ``Retry-After`` header value. Only numeric (seconds) values are supported. HTTP-date values are ignored. """ retry_after = headers.get("Retry-After") if retry_after is None: return None try: return float(retry_after) except (ValueError, TypeError): return None
[docs] def is_agent365_exporter_enabled() -> bool: """Check if Agent365 exporter is enabled via environment variable.""" enable_exporter = os.getenv(ENABLE_A365_OBSERVABILITY_EXPORTER, "").lower() return enable_exporter in ("true", "1", "yes", "on")
# --------------------------------------------------------------------------- # Span size estimation and byte-level chunking # --------------------------------------------------------------------------- # Default upper bound on HTTP request body size in bytes. Provides ~100 KB # headroom under the A365 1 MB server limit for estimator error and JSON/ # envelope overhead (e.g. resource attributes and scope wrappers). DEFAULT_MAX_PAYLOAD_BYTES = 900_000 # Overhead constant for OTLP JSON span fixed fields (traceId, spanId, # parentSpanId, kind, timestamps, status, scope wrapper, etc.). Intentionally # generous to account for serializer variance. _SPAN_BASE_OVERHEAD = 2000 # Overhead per attribute in OTLP JSON format. Covers key/value JSON wrapping. _ATTR_OVERHEAD = 80 # Overhead per event in OTLP JSON. _EVENT_OVERHEAD = 200 # Extra bytes per character to account for JSON escaping (quotes, # backslashes, control characters, unicode escapes). A factor of 1.1 # covers typical real-world content; the base overhead constants # provide additional headroom. _JSON_ESCAPE_FACTOR = 1.1 def _utf8_len(s: str) -> int: return len(s.encode("utf-8"))
[docs] def estimate_value_bytes(value: Any) -> int: """Estimate the serialized byte size of a single attribute value in OTLP/HTTP JSON.""" if isinstance(value, str): return 40 + int(_utf8_len(value) * _JSON_ESCAPE_FACTOR) # bool is a subclass of int; check before sequence/list handling below if isinstance(value, bool): return 40 if isinstance(value, (list, tuple)): if len(value) == 0: return 60 first = value[0] if isinstance(first, str): total = 60 for s in value: total += 40 + int(_utf8_len(str(s)) * _JSON_ESCAPE_FACTOR) return total return 60 + 50 * len(value) return 40 # int/float/None/other
[docs] def estimate_span_bytes(span: dict[str, Any]) -> int: """Heuristic estimator for the serialized size of an OTLP span in HTTP JSON. Uses generous constants tuned to over-estimate by ~25-50%, providing headroom for JSON serializer variance (whitespace, enum representation, integer-as-string). """ total = _SPAN_BASE_OVERHEAD name = span.get("name") if isinstance(name, str): total += _utf8_len(name) attributes = span.get("attributes") if attributes: for key, value in attributes.items(): total += _ATTR_OVERHEAD total += _utf8_len(str(key)) total += estimate_value_bytes(value) events = span.get("events") if events: for ev in events: total += _EVENT_OVERHEAD ev_name = ev.get("name") if isinstance(ev, dict) else None if isinstance(ev_name, str): total += _utf8_len(ev_name) ev_attrs = ev.get("attributes") if isinstance(ev, dict) else None if ev_attrs: for key, value in ev_attrs.items(): total += _ATTR_OVERHEAD total += _utf8_len(str(key)) total += estimate_value_bytes(value) return total
T = TypeVar("T")
[docs] def chunk_by_size( items: Sequence[T], get_size: Callable[[T], int], max_chunk_bytes: int, ) -> list[list[T]]: """Split items into sub-batches whose cumulative estimated size stays under ``max_chunk_bytes``. Multi-item chunks are guaranteed to stay within the limit. A single item whose estimated size exceeds ``max_chunk_bytes`` forms its own one-item chunk (never silently dropped) even though that chunk exceeds the limit. Invariants: - Input order is preserved across chunks. - Empty input produces empty output. - No item is ever dropped. - No chunk is ever empty. Raises: ValueError: If ``max_chunk_bytes`` is not positive, or if ``get_size`` returns a negative value for any item. """ if max_chunk_bytes <= 0: raise ValueError(f"max_chunk_bytes must be positive, got {max_chunk_bytes}") chunks: list[list[T]] = [] current: list[T] = [] current_bytes = 0 for item in items: item_bytes = get_size(item) if item_bytes < 0: raise ValueError(f"get_size returned a negative value ({item_bytes}); sizes must be non-negative") if current and current_bytes + item_bytes > max_chunk_bytes: chunks.append(current) current = [] current_bytes = 0 current.append(item) current_bytes += item_bytes if current: chunks.append(current) return chunks
_A365_DEFAULT_SCOPE = "api://9b975845-388f-4429-889e-eab1ef63949c/Agent365.Observability.OtelWrite" def _create_fic_token_resolver(scope_override: Optional[str] = None) -> Callable[[str, str], Optional[str]]: """Create a token resolver using the FIC (Federated Identity Credential) flow. Uses MSAL's ``ConfidentialClientApplication`` with the ``fmi_path`` parameter, which is the mechanism the agents SDK uses internally. This runs synchronously, which is safe because the batch span processor calls ``export`` from a worker thread, not the async event loop. Required environment variables: - ``CONNECTIONS__SERVICE_CONNECTION__SETTINGS__CLIENTID`` - ``CONNECTIONS__SERVICE_CONNECTION__SETTINGS__CLIENTSECRET`` - ``CONNECTIONS__SERVICE_CONNECTION__SETTINGS__TENANTID`` - ``A365_AGENT_APP_INSTANCE_ID`` - ``A365_AGENTIC_USER_ID`` """ try: import msal except ImportError: logger.warning( "msal is not installed. Install it (`pip install msal`) to use FIC token authentication for A365 export." ) return lambda _agent_id, _tenant_id: None # type: ignore[return-value] _cache: dict[str, tuple[str, float]] = {} # key -> (token, expires_at) _lock = threading.Lock() def _resolve(agent_id: str, tenant_id: str) -> Optional[str]: cache_key = f"{tenant_id}:{agent_id}" with _lock: cached = _cache.get(cache_key) if cached is not None: token, expires_at = cached if time.time() < expires_at - 60: # 60 s buffer return token client_id = os.environ.get(A365_SERVICE_CLIENT_ID_ENV, "") client_secret = os.environ.get(A365_SERVICE_CLIENT_SECRET_ENV, "") cfg_tenant = os.environ.get(A365_SERVICE_TENANT_ID_ENV, tenant_id) instance_id = os.environ.get(A365_AGENT_APP_INSTANCE_ID_ENV, "") user_id = os.environ.get(A365_AGENTIC_USER_ID_ENV, "") if not all([client_id, client_secret, instance_id, user_id]): logger.debug( "FIC env vars incomplete — need %s, %s, %s, %s.", A365_SERVICE_CLIENT_ID_ENV, A365_SERVICE_CLIENT_SECRET_ENV, A365_AGENT_APP_INSTANCE_ID_ENV, A365_AGENTIC_USER_ID_ENV, ) return None authority = f"https://login.microsoftonline.com/{cfg_tenant}" try: # Step 1: Agent application token via fmi_path # Uses the blueprint's credentials + fmi_path=instance_id app = msal.ConfidentialClientApplication( client_id=client_id, client_credential=client_secret, authority=authority, timeout=A365_HTTP_TIMEOUT_SECONDS, ) result = app.acquire_token_for_client( scopes=["api://AzureAdTokenExchange/.default"], fmi_path=instance_id, ) if "access_token" not in result: logger.warning("FIC step 1 (app token) failed: %s", result.get("error_description", result)) return None agent_token = result["access_token"] # Step 2: Instance token (client_assertion = agent_token) instance_app = msal.ConfidentialClientApplication( client_id=instance_id, client_credential={"client_assertion": agent_token}, authority=authority, timeout=A365_HTTP_TIMEOUT_SECONDS, ) result = instance_app.acquire_token_for_client( scopes=["api://AzureAdTokenExchange/.default"], ) if "access_token" not in result: logger.warning("FIC step 2 (instance token) failed: %s", result.get("error_description", result)) return None instance_token = result["access_token"] # Step 3: User FIC token for A365 observability scope result = instance_app.acquire_token_for_client( scopes=[scope_override or _A365_DEFAULT_SCOPE], data={ "user_id": user_id, "user_federated_identity_credential": instance_token, "grant_type": "user_fic", }, ) if "access_token" not in result: logger.warning("FIC step 3 (user FIC token) failed: %s", result.get("error_description", result)) return None access_token = result["access_token"] expires_in = result.get("expires_in", 3600) with _lock: _cache[cache_key] = (access_token, time.time() + expires_in) logger.debug("FIC token acquired for agent %s, tenant %s", agent_id, tenant_id) return access_token # type: ignore[no-any-return] except Exception: logger.warning("FIC token flow failed.", exc_info=True) return None return _resolve def _create_dac_token_resolver(scope_override: Optional[str] = None) -> Callable[[str, str], Optional[str]]: """Create a token resolver backed by ``DefaultAzureCredential``. The credential is lazily initialised on first call and cached for subsequent invocations (thread-safe). """ _credential = None _lock = threading.Lock() scope = scope_override or _A365_DEFAULT_SCOPE def _resolve(_agent_id: str, _tenant_id: str) -> Optional[str]: nonlocal _credential try: from azure.identity import DefaultAzureCredential # type: ignore[import-untyped] except ImportError: logger.warning( "azure-identity is not installed. " "Install it or provide a365_token_resolver to authenticate with the A365 endpoint." ) return None with _lock: if _credential is None: _credential = DefaultAzureCredential() try: token = _credential.get_token(scope) return token.token except Exception: logger.warning("Failed to acquire A365 token via DefaultAzureCredential.", exc_info=True) return None return _resolve def _create_default_token_resolver( scope_override: Optional[str] = None, ) -> Callable[[str, str], Optional[str]]: """Create the default token resolver. Tries FIC first (if the required env vars are set), otherwise falls back to ``DefaultAzureCredential``. """ fic_available = all( [ os.environ.get(A365_SERVICE_CLIENT_ID_ENV), os.environ.get(A365_SERVICE_CLIENT_SECRET_ENV), os.environ.get(A365_AGENT_APP_INSTANCE_ID_ENV), os.environ.get(A365_AGENTIC_USER_ID_ENV), ] ) if fic_available: logger.info("FIC env vars detected \u2014 using FIC token resolver for A365.") return _create_fic_token_resolver(scope_override=scope_override) logger.info("FIC env vars not set \u2014 falling back to DefaultAzureCredential for A365.") return _create_dac_token_resolver(scope_override=scope_override)
[docs] @dataclass class A365Handlers: """Processors created for Agent365 export, mirroring ``OtlpHandlers``.""" span_processors: List[SpanProcessor] = field(default_factory=list)
[docs] def is_a365_enabled(enable_a365: bool = False) -> bool: """Determine whether Agent365 export should be enabled.""" return bool(enable_a365)
def _env_bool(name: str, default: bool = False) -> bool: """Read a boolean from an environment variable.""" val = os.environ.get(name, "").strip().lower() if not val: return default return val in ("true", "1", "yes", "on")
[docs] def create_a365_components( token_resolver: Callable[[str, str], Optional[str]] | None = None, contextual_token_resolver: Callable[[TokenResolverContext], Optional[str]] | None = None, ) -> A365Handlers: """Create Agent365 span processors ready to be added to a TracerProvider. :param token_resolver: Optional callable ``(agent_id, tenant_id) -> str | None``. When provided, it is used instead of the default ``DefaultAzureCredential`` resolver. This allows callers to supply FIC-based or other custom tokens. :param contextual_token_resolver: Optional callable ``(TokenResolverContext) -> str | None``. Provides rich context including the agentic user ID. Takes precedence over ``token_resolver`` when set. All other configuration is read from environment variables: - ``ENABLE_A365_OBSERVABILITY_EXPORTER`` -- must be true for the HTTP exporter - ``A365_CLUSTER_CATEGORY`` -- defaults to ``"prod"`` - ``A365_USE_S2S_ENDPOINT`` -- defaults to False - ``A365_SUPPRESS_INVOKE_AGENT_INPUT`` -- defaults to False """ # pylint: disable=import-outside-toplevel,cyclic-import from microsoft.opentelemetry.a365.core.exporters.enriching_span_processor import _EnrichingBatchSpanProcessor from microsoft.opentelemetry.a365.core.exporters.agent365_exporter import _Agent365Exporter from microsoft.opentelemetry.a365.core.exporters.agent365_exporter_options import Agent365ExporterOptions from microsoft.opentelemetry.a365.core.exporters.span_processor import A365SpanProcessor if contextual_token_resolver is not None: resolved_token_resolver = None elif token_resolver is None: resolved_token_resolver = _create_default_token_resolver() else: resolved_token_resolver = token_resolver cluster_category = os.environ.get(A365_CLUSTER_CATEGORY_ENV, "prod") use_s2s_endpoint = _env_bool(A365_USE_S2S_ENDPOINT_ENV) suppress_invoke_agent_input = _env_bool(A365_SUPPRESS_INVOKE_AGENT_INPUT_ENV) options = Agent365ExporterOptions( cluster_category=cluster_category, token_resolver=resolved_token_resolver, contextual_token_resolver=contextual_token_resolver, use_s2s_endpoint=use_s2s_endpoint, ) # Create the exporter (Agent365 HTTP or console fallback) has_resolver = options.token_resolver is not None or options.contextual_token_resolver is not None if is_agent365_exporter_enabled() and has_resolver: exporter = _Agent365Exporter( token_resolver=options.token_resolver, contextual_token_resolver=options.contextual_token_resolver, cluster_category=options.cluster_category, use_s2s_endpoint=options.use_s2s_endpoint, max_payload_bytes=options.max_payload_bytes, ) else: logger.warning( "ENABLE_A365_OBSERVABILITY_EXPORTER not set or no token resolver provided. " "A365 exporter will not be active." ) return A365Handlers() # Enriching batch processor wrapping the exporter batch_processor = _EnrichingBatchSpanProcessor( exporter, suppress_invoke_agent_input=suppress_invoke_agent_input, max_queue_size=options.max_queue_size, schedule_delay_millis=options.scheduled_delay_ms, export_timeout_millis=options.exporter_timeout_ms, max_export_batch_size=options.max_export_batch_size, ) # Baggage-to-span attribute propagation processor. Tenant/agent IDs # are supplied per-request via BaggageMiddleware / BaggageBuilder. baggage_processor = A365SpanProcessor() return A365Handlers(span_processors=[batch_processor, baggage_processor])