Observer System — Overview

The Observer System in UFO Galaxy implements an event-driven architecture that enables real-time monitoring, visualization, and coordination of constellation execution. It provides a decoupled, extensible mechanism for components to react to system events without tight coupling.

The system implements the classic Observer Pattern (also known as Publish-Subscribe), enabling loose coupling between event producers and consumers. This allows the system to be extended with new observers without modifying existing code.


🎯 Purpose and Design Goals

The observer system serves several critical functions in the Galaxy framework:

  1. Real-time Monitoring — Track task execution, constellation lifecycle, and system events
  2. Visualization — Provide live updates for DAG topology and execution progress
  3. Metrics Collection — Gather performance statistics and execution data
  4. Synchronization — Coordinate between agent modifications and orchestrator execution
  5. Agent Output Handling — Display agent responses and actions in real-time

🏗️ Architecture Overview

The observer system consists of three main layers:

graph TB subgraph "Event Publishers" A1[Orchestrator] A2[Agent] A3[Device Manager] end subgraph "Event Bus Layer" B[EventBus<br/>Singleton] end subgraph "Observer Layer" C1[ConstellationProgressObserver] C2[SessionMetricsObserver] C3[DAGVisualizationObserver] C4[ConstellationModificationSynchronizer] C5[AgentOutputObserver] end subgraph "Handler Layer" D1[TaskVisualizationHandler] D2[ConstellationVisualizationHandler] end A1 -->|publish events| B A2 -->|publish events| B A3 -->|publish events| B B -->|notify| C1 B -->|notify| C2 B -->|notify| C3 B -->|notify| C4 B -->|notify| C5 C3 -->|delegate| D1 C3 -->|delegate| D2 style B fill:#4a90e2,stroke:#333,stroke-width:3px,color:#fff style C1 fill:#66bb6a,stroke:#333,stroke-width:2px style C2 fill:#66bb6a,stroke:#333,stroke-width:2px style C3 fill:#66bb6a,stroke:#333,stroke-width:2px style C4 fill:#ffa726,stroke:#333,stroke-width:2px style C5 fill:#66bb6a,stroke:#333,stroke-width:2px

Architecture Layers:

Layer Component Responsibility
Event Publishers Orchestrator, Agent, Device Manager Generate events during system operation
Event Bus EventBus singleton Central message broker, manages subscriptions and routing
Observers 5 specialized observers React to specific event types and perform actions
Handlers Task & Constellation handlers Delegate visualization logic for specific components

📊 Core Components

Event System Core

The foundation of the observer system consists of:

Component Location Description
EventBus galaxy/core/events.py Central message broker managing subscriptions
EventType galaxy/core/events.py Enumeration of all system event types
Event Classes galaxy/core/events.py Base (Event) and specialized (TaskEvent, ConstellationEvent, AgentEvent, DeviceEvent) event data structures
Interfaces galaxy/core/events.py IEventObserver, IEventPublisher contracts

For detailed documentation of the event system core components, see the Event System Core documentation.

Observer Implementations

Five specialized observers handle different aspects of system monitoring:

Observer File Location Primary Role Key Features
ConstellationProgressObserver galaxy/session/observers/base_observer.py Task progress tracking Queues completion events for agent, coordinates task lifecycle
SessionMetricsObserver galaxy/session/observers/base_observer.py Performance metrics Collects timing, success rates, modification statistics
DAGVisualizationObserver galaxy/session/observers/dag_visualization_observer.py Real-time visualization Displays constellation topology and execution flow
ConstellationModificationSynchronizer galaxy/session/observers/constellation_sync_observer.py Modification coordination Prevents race conditions between agent and orchestrator
AgentOutputObserver galaxy/session/observers/agent_output_observer.py Agent interaction display Shows agent responses and actions in real-time

🔄 Event Flow

The following diagram illustrates how events flow through the system:

sequenceDiagram participant O as Orchestrator participant EB as EventBus participant CPO as ProgressObserver participant SMO as MetricsObserver participant DVO as VisualizationObserver participant A as Agent O->>EB: publish(TASK_STARTED) EB->>CPO: on_event(event) EB->>SMO: on_event(event) EB->>DVO: on_event(event) Note over DVO: Display task start Note over SMO: Increment task count O->>EB: publish(TASK_COMPLETED) EB->>CPO: on_event(event) EB->>SMO: on_event(event) EB->>DVO: on_event(event) CPO->>A: add_task_completion_event() Note over A: Process result,<br/>modify constellation A->>EB: publish(CONSTELLATION_MODIFIED) EB->>SMO: on_event(event) EB->>DVO: on_event(event) Note over DVO: Display updated DAG Note over SMO: Track modification

The event flow demonstrates how a single action (task completion) triggers multiple observers, each performing their specialized function without interfering with others.


📋 Event Types

The system defines the following event types:

Task Events

Track individual task execution lifecycle:

Event Type Trigger Data Includes
TASK_STARTED Task begins execution task_id, status, constellation_id
TASK_COMPLETED Task finishes successfully task_id, result, execution_time, newly_ready_tasks
TASK_FAILED Task encounters error task_id, error, retry_info

Constellation Events

Monitor constellation-level operations:

Event Type Trigger Data Includes
CONSTELLATION_STARTED Constellation begins processing constellation, initial_statistics, processing_time
CONSTELLATION_COMPLETED All tasks finished constellation, final_statistics, execution_time
CONSTELLATION_FAILED Constellation encounters error constellation, error
CONSTELLATION_MODIFIED Structure changed by agent old_constellation, new_constellation, on_task_id, modification_type, changes

Agent Events

Display agent interactions:

Event Type Trigger Data Includes
AGENT_RESPONSE Agent generates response agent_name, agent_type, response_data
AGENT_ACTION Agent executes action agent_name, action_type, actions

Device Events

Monitor device status (used by client):

Event Type Trigger Data Includes
DEVICE_CONNECTED Device joins pool device_id, device_status, device_info
DEVICE_DISCONNECTED Device leaves pool device_id, device_status
DEVICE_STATUS_CHANGED Device state changes device_id, device_status, all_devices

🚀 Usage Example

Here's a complete example showing how observers are initialized and used in a Galaxy session:

from galaxy.core.events import get_event_bus, EventType
from galaxy.session.observers import (
    ConstellationProgressObserver,
    SessionMetricsObserver,
    DAGVisualizationObserver,
    ConstellationModificationSynchronizer,
    AgentOutputObserver
)

# Get the global event bus
event_bus = get_event_bus()

# 1. Create progress observer for agent coordination
progress_observer = ConstellationProgressObserver(agent=constellation_agent)
event_bus.subscribe(progress_observer)

# 2. Create metrics observer for performance tracking
metrics_observer = SessionMetricsObserver(
    session_id="my_session",
    logger=logger
)
event_bus.subscribe(metrics_observer)

# 3. Create visualization observer for real-time display
viz_observer = DAGVisualizationObserver(enable_visualization=True)
event_bus.subscribe(viz_observer)

# 4. Create synchronizer to prevent race conditions
synchronizer = ConstellationModificationSynchronizer(
    orchestrator=orchestrator,
    logger=logger
)
event_bus.subscribe(synchronizer)

# 5. Create agent output observer for displaying interactions
agent_output_observer = AgentOutputObserver(presenter_type="rich")
event_bus.subscribe(agent_output_observer)

# Execute constellation
await orchestrator.execute_constellation(constellation)

# Retrieve collected metrics
metrics = metrics_observer.get_metrics()
print(f"Tasks completed: {metrics['completed_tasks']}")
print(f"Total execution time: {metrics['total_execution_time']:.2f}s")
print(f"Modifications: {metrics['constellation_modifications']}")

🔑 Key Benefits

1. Decoupling

Events decouple components — publishers don't need to know about observers:

  • Orchestrator publishes task events without knowing who's listening
  • Agent publishes modification events without coordinating with orchestrator
  • New observers can be added without changing existing code

2. Extensibility

Add custom observers for new functionality:

from galaxy.core.events import IEventObserver, Event, EventType

class CustomMetricsObserver(IEventObserver):
    """Custom observer for domain-specific metrics."""

    def __init__(self):
        self.custom_metrics = {}

    async def on_event(self, event: Event) -> None:
        if event.event_type == EventType.TASK_COMPLETED:
            # Collect custom metrics
            task_type = event.data.get("task_type")
            if task_type not in self.custom_metrics:
                self.custom_metrics[task_type] = []

            self.custom_metrics[task_type].append({
                "duration": event.data.get("execution_time"),
                "result": event.result
            })

# Subscribe to specific events
event_bus = get_event_bus()
custom_observer = CustomMetricsObserver()
event_bus.subscribe(custom_observer, {EventType.TASK_COMPLETED})

3. Concurrent Execution

All observers are notified concurrently using asyncio.gather():

  • No observer blocks another
  • Exceptions in one observer don't affect others
  • Efficient parallel processing

4. Type-Safe Event Handling

Specialized event classes provide type safety:

async def on_event(self, event: Event) -> None:
    if isinstance(event, TaskEvent):
        # TaskEvent-specific handling
        task_id = event.task_id  # Type-safe access
        status = event.status

    elif isinstance(event, ConstellationEvent):
        # ConstellationEvent-specific handling
        constellation_id = event.constellation_id
        state = event.constellation_state

📚 Component Documentation

Explore detailed documentation for each observer:

Additional Observers

Documentation for ConstellationProgressObserver, DAGVisualizationObserver, ConstellationModificationSynchronizer, and AgentOutputObserver is available in their source code files. These observers handle task progress tracking, real-time visualization, modification synchronization, and agent output display respectively.



💡 Best Practices

Observer Lifecycle Management

Properly manage observer subscriptions to prevent memory leaks:

# Subscribe observers
observers = [progress_observer, metrics_observer, viz_observer]
for observer in observers:
    event_bus.subscribe(observer)

try:
    # Execute constellation
    await orchestrator.execute_constellation(constellation)
finally:
    # Clean up observers
    for observer in observers:
        event_bus.unsubscribe(observer)

Event-Specific Subscription

Subscribe only to relevant events for efficiency:

# Instead of subscribing to all events
event_bus.subscribe(observer)  # Receives ALL events

# Subscribe to specific event types
event_bus.subscribe(observer, {
    EventType.TASK_COMPLETED,
    EventType.TASK_FAILED,
    EventType.CONSTELLATION_MODIFIED
})

Error Handling in Observers

Always handle exceptions gracefully:

async def on_event(self, event: Event) -> None:
    try:
        # Process event
        await self._handle_event(event)
    except Exception as e:
        self.logger.error(f"Error processing event: {e}")
        # Don't re-raise - let other observers continue

🎓 Summary

The Observer System provides a robust, event-driven foundation for monitoring and coordinating Galaxy's constellation execution:

  • Event Bus acts as central message broker
  • 5 specialized observers handle different aspects of monitoring
  • Loose coupling enables extensibility and maintainability
  • Concurrent execution ensures efficient event processing
  • Type-safe events provide clear contracts and error prevention

For implementation details of specific observers, refer to the individual component documentation pages linked above.