# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Middleware that creates OutputScope spans for outgoing messages."""
from __future__ import annotations
import logging
from collections.abc import Awaitable, Callable
from microsoft_agents.activity import Activity
from microsoft_agents.hosting.core.turn_context import TurnContext
from microsoft.opentelemetry.a365.core.agent_details import AgentDetails
from microsoft.opentelemetry.a365.constants import (
CHANNEL_LINK_KEY,
CHANNEL_NAME_KEY,
)
from microsoft.opentelemetry.a365.hosting.scope_helpers.utils import resolve_sub_channel
from microsoft.opentelemetry.a365.core.models.response import Response
from microsoft.opentelemetry.a365.core.models.user_details import UserDetails
from microsoft.opentelemetry.a365.core.request import Request
from microsoft.opentelemetry.a365.core.span_details import SpanDetails
from microsoft.opentelemetry.a365.core.spans_scopes.output_scope import OutputScope
from microsoft.opentelemetry.a365.core.utils import extract_context_from_headers
# mypy: disable-error-code="call-arg"
logger = logging.getLogger(__name__)
# TurnState key for the parent trace context (W3C traceparent string).
A365_PARENT_TRACEPARENT_KEY = "A365ParentTraceparent"
def _derive_agent_details(context: TurnContext) -> AgentDetails | None:
"""Derive target agent details from the activity recipient.
Returns ``None`` when the activity is not an agentic request or the
recipient is missing, so callers can short-circuit without emitting
spans with empty identifiers.
"""
activity = context.activity
if not activity.is_agentic_request():
return None
recipient = getattr(activity, "recipient", None)
if not recipient:
return None
return AgentDetails(
agent_id=activity.get_agentic_instance_id() or "",
agent_name=getattr(recipient, "name", None),
agentic_user_id=getattr(recipient, "aad_object_id", None),
agentic_user_email=activity.get_agentic_user(),
agent_description=getattr(recipient, "role", None),
tenant_id=getattr(recipient, "tenant_id", None),
)
def _derive_user_details(context: TurnContext) -> UserDetails | None:
"""Derive user identity details from the activity from property."""
frm = getattr(context.activity, "from_property", None)
if not frm:
return None
return UserDetails(
user_id=getattr(frm, "aad_object_id", None),
user_name=getattr(frm, "name", None),
)
def _derive_conversation_id(context: TurnContext) -> str | None:
"""Derive conversation id from the TurnContext."""
conv = getattr(context.activity, "conversation", None)
return conv.id if conv else None
def _derive_channel(
context: TurnContext,
) -> dict[str, str | None]:
"""Derive channel (name and link) from TurnContext."""
channel_id = getattr(context.activity, "channel_id", None)
channel_name: str | None = None
if channel_id is not None:
if hasattr(channel_id, "channel"):
channel_name = channel_id.channel
elif isinstance(channel_id, str):
channel_name = channel_id
sub_channel = resolve_sub_channel(context.activity) if context.activity else None
return {"name": channel_name, "link": sub_channel}
[docs]
class OutputLoggingMiddleware:
"""Middleware that creates :class:`OutputScope` spans for outgoing messages.
Links to a parent span when :data:`A365_PARENT_TRACEPARENT_KEY` is set in
``turn_state``.
**Privacy note:** Outgoing message content is captured verbatim as span
attributes and exported to the configured telemetry backend.
"""
[docs]
async def on_turn(
self,
context: TurnContext,
logic: Callable[[TurnContext], Awaitable],
) -> None:
agent_details = _derive_agent_details(context)
if not agent_details:
await logic()
return
user_details = _derive_user_details(context)
conversation_id = _derive_conversation_id(context)
channel = _derive_channel(context)
context.on_send_activities(
self._create_send_handler(
context,
agent_details,
user_details,
conversation_id,
channel,
)
)
await logic()
def _create_send_handler(
self,
turn_context: TurnContext,
agent_details: AgentDetails,
user_details: UserDetails | None,
conversation_id: str | None,
channel: dict[str, str | None],
) -> Callable:
"""Create a send handler that wraps outgoing messages in OutputScope spans.
Reads parent span ref lazily so the agent handler can set it during ``logic()``.
"""
async def handler(
ctx: TurnContext,
activities: list[Activity],
send_next: Callable,
) -> None:
messages = [a.text for a in activities if getattr(a, "type", None) == "message" and a.text]
if not messages:
await send_next()
return
traceparent: str | None = turn_context.turn_state.get(A365_PARENT_TRACEPARENT_KEY)
parent_context = None
if traceparent:
parent_context = extract_context_from_headers({"traceparent": traceparent})
else:
logger.warning(
"[OutputLoggingMiddleware] No traceparent in turn_state under "
"'%s'. OutputScope will not be linked to a parent.",
A365_PARENT_TRACEPARENT_KEY,
)
request = Request(
conversation_id=conversation_id,
)
span_details = SpanDetails(parent_context=parent_context) if parent_context else None
output_scope = OutputScope.start(
request=request,
response=Response(messages=messages),
agent_details=agent_details,
user_details=user_details,
span_details=span_details,
)
# Set additional attributes on the scope
output_scope.set_tag_maybe(CHANNEL_NAME_KEY, channel.get("name"))
output_scope.set_tag_maybe(CHANNEL_LINK_KEY, channel.get("link"))
try:
await send_next()
except Exception as error:
output_scope.record_error(error)
raise
finally:
output_scope.dispose()
return handler