Session Metrics Observer
The SessionMetricsObserver collects comprehensive performance metrics and statistics during constellation execution. It tracks task execution times, constellation lifecycle, modifications, and computes detailed statistics for performance analysis.
Location: galaxy/session/observers/base_observer.py
The metrics observer is essential for evaluating Galaxy performance, identifying bottlenecks, and analyzing constellation modification patterns for research and optimization.
🎯 Purpose
The Metrics Observer provides:
- Performance Tracking — Measure task and constellation execution times
- Success Rate Monitoring — Track completion and failure rates
- Modification Analytics — Monitor constellation structural changes
- Statistical Summaries — Compute aggregated metrics for analysis
🏗️ Architecture
📊 Metrics Collected
The observer collects metrics across three categories:
Task Metrics
Track individual task execution:
| Metric | Description | Computed |
|---|---|---|
| task_count | Total number of tasks started | Real-time |
| completed_tasks | Number of successfully completed tasks | Real-time |
| failed_tasks | Number of failed tasks | Real-time |
| total_execution_time | Sum of all task execution times | Real-time |
| task_timings | Dict mapping task_id → | Real-time |
| success_rate | completed / total tasks | Computed |
| failure_rate | failed / total tasks | Computed |
| average_task_duration | Average execution time per task | Computed |
| min_task_duration | Fastest task execution time | Computed |
| max_task_duration | Slowest task execution time | Computed |
Constellation Metrics
Monitor constellation lifecycle:
| Metric | Description | Computed |
|---|---|---|
| constellation_count | Total constellations processed | Real-time |
| completed_constellations | Successfully completed constellations | Real-time |
| failed_constellations | Failed constellations | Real-time |
| total_constellation_time | Total constellation execution time | Real-time |
| constellation_timings | Dict mapping constellation_id → timing data | Real-time |
| constellation_success_rate | completed / total constellations | Computed |
| average_constellation_duration | Average constellation execution time | Computed |
| min_constellation_duration | Fastest constellation | Computed |
| max_constellation_duration | Slowest constellation | Computed |
| average_tasks_per_constellation | Average number of tasks | Computed |
Modification Metrics
Track constellation structural changes:
| Metric | Description | Computed |
|---|---|---|
| constellation_modifications | Dict mapping constellation_id → modification list | Real-time |
| total_modifications | Total number of modifications | Computed |
| constellations_modified | Number of constellations with modifications | Computed |
| average_modifications_per_constellation | Average modifications per constellation | Computed |
| max_modifications_for_single_constellation | Most-modified constellation | Computed |
| most_modified_constellation | ID of most-modified constellation | Computed |
| modification_types_breakdown | Count by modification type | Computed |
💻 Implementation
Initialization
from galaxy.session.observers import SessionMetricsObserver
import logging
# Create metrics observer
metrics_observer = SessionMetricsObserver(
session_id="galaxy_session_20231113",
logger=logging.getLogger(__name__)
)
# Subscribe to event bus
from galaxy.core.events import get_event_bus
event_bus = get_event_bus()
event_bus.subscribe(metrics_observer)
Constructor Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
session_id |
str |
Yes | Unique identifier for the session |
logger |
logging.Logger |
No | Logger instance (creates default if None) |
Internal Metrics Structure
The observer maintains a comprehensive metrics dictionary:
self.metrics: Dict[str, Any] = {
"session_id": session_id,
# Task metrics
"task_count": 0,
"completed_tasks": 0,
"failed_tasks": 0,
"total_execution_time": 0.0,
"task_timings": {}, # task_id -> {start, end, duration}
# Constellation metrics
"constellation_count": 0,
"completed_constellations": 0,
"failed_constellations": 0,
"total_constellation_time": 0.0,
"constellation_timings": {}, # constellation_id -> timing data
# Modification tracking
"constellation_modifications": {} # constellation_id -> [modifications]
}
🔄 Event Processing
Task Event Handling
The observer tracks task lifecycle events:
Processing Logic:
def _handle_task_started(self, event: TaskEvent) -> None:
"""Handle TASK_STARTED event."""
self.metrics["task_count"] += 1
self.metrics["task_timings"][event.task_id] = {
"start": event.timestamp
}
def _handle_task_completed(self, event: TaskEvent) -> None:
"""Handle TASK_COMPLETED event."""
self.metrics["completed_tasks"] += 1
if event.task_id in self.metrics["task_timings"]:
duration = (
event.timestamp -
self.metrics["task_timings"][event.task_id]["start"]
)
self.metrics["task_timings"][event.task_id]["duration"] = duration
self.metrics["task_timings"][event.task_id]["end"] = event.timestamp
self.metrics["total_execution_time"] += duration
def _handle_task_failed(self, event: TaskEvent) -> None:
"""Handle TASK_FAILED event."""
self.metrics["failed_tasks"] += 1
# Also calculate duration for failed tasks
if event.task_id in self.metrics["task_timings"]:
duration = (
event.timestamp -
self.metrics["task_timings"][event.task_id]["start"]
)
self.metrics["task_timings"][event.task_id]["duration"] = duration
self.metrics["total_execution_time"] += duration
Constellation Event Handling
Tracks constellation lifecycle and modifications:
def _handle_constellation_started(self, event: ConstellationEvent) -> None:
"""Handle CONSTELLATION_STARTED event."""
self.metrics["constellation_count"] += 1
constellation_id = event.constellation_id
constellation = event.data.get("constellation")
# Store initial statistics
self.metrics["constellation_timings"][constellation_id] = {
"start_time": event.timestamp,
"initial_statistics": (
constellation.get_statistics() if constellation else {}
),
"processing_start_time": event.data.get("processing_start_time"),
"processing_end_time": event.data.get("processing_end_time"),
"processing_duration": event.data.get("processing_duration"),
}
def _handle_constellation_completed(self, event: ConstellationEvent) -> None:
"""Handle CONSTELLATION_COMPLETED event."""
self.metrics["completed_constellations"] += 1
constellation_id = event.constellation_id
constellation = event.data.get("constellation")
# Calculate duration and store final statistics
duration = (
event.timestamp -
self.metrics["constellation_timings"][constellation_id]["start_time"]
if constellation_id in self.metrics["constellation_timings"]
else None
)
if constellation_id in self.metrics["constellation_timings"]:
self.metrics["constellation_timings"][constellation_id].update({
"end_time": event.timestamp,
"duration": duration,
"final_statistics": (
constellation.get_statistics() if constellation else {}
),
})
Modification Tracking
Tracks constellation structural changes with detailed change detection:
def _handle_constellation_modified(self, event: ConstellationEvent) -> None:
"""Handle CONSTELLATION_MODIFIED event."""
constellation_id = event.constellation_id
# Initialize modifications list if needed
if constellation_id not in self.metrics["constellation_modifications"]:
self.metrics["constellation_modifications"][constellation_id] = []
if hasattr(event, "data") and event.data:
old_constellation = event.data.get("old_constellation")
new_constellation = event.data.get("new_constellation")
# Calculate changes using VisualizationChangeDetector
changes = None
if old_constellation and new_constellation:
changes = VisualizationChangeDetector.calculate_constellation_changes(
old_constellation, new_constellation
)
# Store modification record
modification_record = {
"timestamp": event.timestamp,
"modification_type": event.data.get("modification_type", "unknown"),
"on_task_id": event.data.get("on_task_id", []),
"changes": changes,
"new_statistics": (
new_constellation.get_statistics() if new_constellation else {}
),
"processing_start_time": event.data.get("processing_start_time"),
"processing_end_time": event.data.get("processing_end_time"),
"processing_duration": event.data.get("processing_duration"),
}
self.metrics["constellation_modifications"][constellation_id].append(
modification_record
)
📖 API Reference
Constructor
def __init__(self, session_id: str, logger: Optional[logging.Logger] = None)
Initialize the metrics observer.
Parameters:
session_id— Unique identifier for the sessionlogger— Optional logger instance (creates default if None)
get_metrics()
def get_metrics(self) -> Dict[str, Any]
Get collected metrics with computed statistics.
Returns:
Dictionary containing:
- All raw metrics (counts, timings, etc.)
- task_statistics — Computed task metrics
- constellation_statistics — Computed constellation metrics
- modification_statistics — Computed modification metrics
Example:
# After constellation execution
metrics = metrics_observer.get_metrics()
# Access task statistics
print(f"Total tasks: {metrics['task_statistics']['total_tasks']}")
print(f"Success rate: {metrics['task_statistics']['success_rate']:.2%}")
print(f"Avg duration: {metrics['task_statistics']['average_task_duration']:.2f}s")
# Access constellation statistics
print(f"Total constellations: {metrics['constellation_statistics']['total_constellations']}")
print(f"Avg tasks per constellation: {metrics['constellation_statistics']['average_tasks_per_constellation']:.1f}")
# Access modification statistics
print(f"Total modifications: {metrics['modification_statistics']['total_modifications']}")
print(f"Modification types: {metrics['modification_statistics']['modification_types_breakdown']}")
📊 Computed Statistics
The observer computes three categories of statistics:
Task Statistics
{
"total_tasks": 10,
"completed_tasks": 8,
"failed_tasks": 2,
"success_rate": 0.8,
"failure_rate": 0.2,
"average_task_duration": 2.5,
"min_task_duration": 0.5,
"max_task_duration": 5.2,
"total_task_execution_time": 25.0
}
Constellation Statistics
{
"total_constellations": 1,
"completed_constellations": 1,
"failed_constellations": 0,
"success_rate": 1.0,
"average_constellation_duration": 30.5,
"min_constellation_duration": 30.5,
"max_constellation_duration": 30.5,
"total_constellation_time": 30.5,
"average_tasks_per_constellation": 10.0
}
Modification Statistics
{
"total_modifications": 3,
"constellations_modified": 1,
"average_modifications_per_constellation": 3.0,
"max_modifications_for_single_constellation": 3,
"most_modified_constellation": "const_123",
"modifications_per_constellation": {
"const_123": 3
},
"modification_types_breakdown": {
"add_tasks": 2,
"modify_dependencies": 1
}
}
🔍 Usage Examples
Example 1: Basic Metrics Collection
import asyncio
from galaxy.core.events import get_event_bus
from galaxy.session.observers import SessionMetricsObserver
async def collect_metrics():
"""Collect and display metrics for constellation execution."""
# Create and subscribe metrics observer
metrics_observer = SessionMetricsObserver(session_id="demo_session")
event_bus = get_event_bus()
event_bus.subscribe(metrics_observer)
# Execute constellation (orchestrator will publish events)
await orchestrator.execute_constellation(constellation)
# Retrieve metrics
metrics = metrics_observer.get_metrics()
# Display summary
print("\n=== Execution Summary ===")
print(f"Session: {metrics['session_id']}")
print(f"Tasks: {metrics['task_count']} total, "
f"{metrics['completed_tasks']} completed, "
f"{metrics['failed_tasks']} failed")
print(f"Total execution time: {metrics['total_execution_time']:.2f}s")
# Display task statistics
task_stats = metrics['task_statistics']
print(f"\nTask Success Rate: {task_stats['success_rate']:.1%}")
print(f"Average Task Duration: {task_stats['average_task_duration']:.2f}s")
print(f"Fastest Task: {task_stats['min_task_duration']:.2f}s")
print(f"Slowest Task: {task_stats['max_task_duration']:.2f}s")
# Clean up
event_bus.unsubscribe(metrics_observer)
asyncio.run(collect_metrics())
Example 2: Performance Analysis
def analyze_performance(metrics_observer: SessionMetricsObserver):
"""Analyze performance metrics and identify bottlenecks."""
metrics = metrics_observer.get_metrics()
task_timings = metrics['task_timings']
# Find slowest tasks
sorted_tasks = sorted(
task_timings.items(),
key=lambda x: x[1].get('duration', 0),
reverse=True
)
print("\n=== Top 5 Slowest Tasks ===")
for task_id, timing in sorted_tasks[:5]:
duration = timing.get('duration', 0)
print(f"{task_id}: {duration:.2f}s")
# Analyze modification patterns
mod_stats = metrics['modification_statistics']
if mod_stats['total_modifications'] > 0:
print(f"\n=== Modification Analysis ===")
print(f"Total Modifications: {mod_stats['total_modifications']}")
print(f"Average per Constellation: "
f"{mod_stats['average_modifications_per_constellation']:.1f}")
print(f"Most Modified: {mod_stats['most_modified_constellation']}")
print("\nModification Types:")
for mod_type, count in mod_stats['modification_types_breakdown'].items():
print(f" {mod_type}: {count}")
Example 3: Export Metrics to JSON
import json
from pathlib import Path
def export_metrics(metrics_observer: SessionMetricsObserver, output_path: str):
"""Export metrics to JSON file for analysis."""
metrics = metrics_observer.get_metrics()
# Convert to JSON-serializable format
output_data = {
"session_id": metrics["session_id"],
"task_statistics": metrics["task_statistics"],
"constellation_statistics": metrics["constellation_statistics"],
"modification_statistics": metrics["modification_statistics"],
"raw_metrics": {
"task_count": metrics["task_count"],
"completed_tasks": metrics["completed_tasks"],
"failed_tasks": metrics["failed_tasks"],
"total_execution_time": metrics["total_execution_time"],
"constellation_count": metrics["constellation_count"],
}
}
# Write to file
output_file = Path(output_path)
output_file.parent.mkdir(parents=True, exist_ok=True)
with open(output_file, 'w') as f:
json.dump(output_data, f, indent=2)
print(f"Metrics exported to: {output_file}")
🎓 Best Practices
1. Session ID Naming
Use descriptive session IDs for easier analysis:
# ✅ Good: Descriptive session ID
session_id = f"galaxy_session_{task_type}_{timestamp}"
# ❌ Bad: Generic session ID
session_id = "session_1"
2. Metrics Export
Export metrics immediately after execution:
try:
await orchestrator.execute_constellation(constellation)
finally:
# Always export metrics, even if execution failed
metrics = metrics_observer.get_metrics()
export_metrics(metrics, "results/metrics.json")
3. Memory Management
Clear large timing dictionaries for long-running sessions:
# After processing metrics
metrics_observer.metrics["task_timings"].clear()
metrics_observer.metrics["constellation_timings"].clear()
🔗 Related Documentation
- Observer System Overview — Architecture and design
- Event System Core — Event types and EventBus
Additional Resources
For information on constellation execution and orchestration, see the constellation orchestrator documentation in galaxy/constellation/orchestrator/.
📋 Summary
The Session Metrics Observer:
- Collects comprehensive performance metrics
- Tracks task and constellation execution times
- Monitors modification patterns
- Computes statistical summaries
- Exports data for analysis
This observer is essential for performance evaluation, bottleneck identification, and research analysis of Galaxy's constellation execution.