Observer System — Overview
The Observer System in UFO Galaxy implements an event-driven architecture that enables real-time monitoring, visualization, and coordination of constellation execution. It provides a decoupled, extensible mechanism for components to react to system events without tight coupling.
The system implements the classic Observer Pattern (also known as Publish-Subscribe), enabling loose coupling between event producers and consumers. This allows the system to be extended with new observers without modifying existing code.
🎯 Purpose and Design Goals
The observer system serves several critical functions in the Galaxy framework:
- Real-time Monitoring — Track task execution, constellation lifecycle, and system events
- Visualization — Provide live updates for DAG topology and execution progress
- Metrics Collection — Gather performance statistics and execution data
- Synchronization — Coordinate between agent modifications and orchestrator execution
- Agent Output Handling — Display agent responses and actions in real-time
🏗️ Architecture Overview
The observer system consists of three main layers:
Architecture Layers:
| Layer | Component | Responsibility |
|---|---|---|
| Event Publishers | Orchestrator, Agent, Device Manager | Generate events during system operation |
| Event Bus | EventBus singleton |
Central message broker, manages subscriptions and routing |
| Observers | 5 specialized observers | React to specific event types and perform actions |
| Handlers | Task & Constellation handlers | Delegate visualization logic for specific components |
📊 Core Components
Event System Core
The foundation of the observer system consists of:
| Component | Location | Description |
|---|---|---|
| EventBus | galaxy/core/events.py |
Central message broker managing subscriptions |
| EventType | galaxy/core/events.py |
Enumeration of all system event types |
| Event Classes | galaxy/core/events.py |
Base (Event) and specialized (TaskEvent, ConstellationEvent, AgentEvent, DeviceEvent) event data structures |
| Interfaces | galaxy/core/events.py |
IEventObserver, IEventPublisher contracts |
For detailed documentation of the event system core components, see the Event System Core documentation.
Observer Implementations
Five specialized observers handle different aspects of system monitoring:
| Observer | File Location | Primary Role | Key Features |
|---|---|---|---|
| ConstellationProgressObserver | galaxy/session/observers/base_observer.py |
Task progress tracking | Queues completion events for agent, coordinates task lifecycle |
| SessionMetricsObserver | galaxy/session/observers/base_observer.py |
Performance metrics | Collects timing, success rates, modification statistics |
| DAGVisualizationObserver | galaxy/session/observers/dag_visualization_observer.py |
Real-time visualization | Displays constellation topology and execution flow |
| ConstellationModificationSynchronizer | galaxy/session/observers/constellation_sync_observer.py |
Modification coordination | Prevents race conditions between agent and orchestrator |
| AgentOutputObserver | galaxy/session/observers/agent_output_observer.py |
Agent interaction display | Shows agent responses and actions in real-time |
🔄 Event Flow
The following diagram illustrates how events flow through the system:
The event flow demonstrates how a single action (task completion) triggers multiple observers, each performing their specialized function without interfering with others.
📋 Event Types
The system defines the following event types:
Task Events
Track individual task execution lifecycle:
| Event Type | Trigger | Data Includes |
|---|---|---|
TASK_STARTED |
Task begins execution | task_id, status, constellation_id |
TASK_COMPLETED |
Task finishes successfully | task_id, result, execution_time, newly_ready_tasks |
TASK_FAILED |
Task encounters error | task_id, error, retry_info |
Constellation Events
Monitor constellation-level operations:
| Event Type | Trigger | Data Includes |
|---|---|---|
CONSTELLATION_STARTED |
Constellation begins processing | constellation, initial_statistics, processing_time |
CONSTELLATION_COMPLETED |
All tasks finished | constellation, final_statistics, execution_time |
CONSTELLATION_FAILED |
Constellation encounters error | constellation, error |
CONSTELLATION_MODIFIED |
Structure changed by agent | old_constellation, new_constellation, on_task_id, modification_type, changes |
Agent Events
Display agent interactions:
| Event Type | Trigger | Data Includes |
|---|---|---|
AGENT_RESPONSE |
Agent generates response | agent_name, agent_type, response_data |
AGENT_ACTION |
Agent executes action | agent_name, action_type, actions |
Device Events
Monitor device status (used by client):
| Event Type | Trigger | Data Includes |
|---|---|---|
DEVICE_CONNECTED |
Device joins pool | device_id, device_status, device_info |
DEVICE_DISCONNECTED |
Device leaves pool | device_id, device_status |
DEVICE_STATUS_CHANGED |
Device state changes | device_id, device_status, all_devices |
🚀 Usage Example
Here's a complete example showing how observers are initialized and used in a Galaxy session:
from galaxy.core.events import get_event_bus, EventType
from galaxy.session.observers import (
ConstellationProgressObserver,
SessionMetricsObserver,
DAGVisualizationObserver,
ConstellationModificationSynchronizer,
AgentOutputObserver
)
# Get the global event bus
event_bus = get_event_bus()
# 1. Create progress observer for agent coordination
progress_observer = ConstellationProgressObserver(agent=constellation_agent)
event_bus.subscribe(progress_observer)
# 2. Create metrics observer for performance tracking
metrics_observer = SessionMetricsObserver(
session_id="my_session",
logger=logger
)
event_bus.subscribe(metrics_observer)
# 3. Create visualization observer for real-time display
viz_observer = DAGVisualizationObserver(enable_visualization=True)
event_bus.subscribe(viz_observer)
# 4. Create synchronizer to prevent race conditions
synchronizer = ConstellationModificationSynchronizer(
orchestrator=orchestrator,
logger=logger
)
event_bus.subscribe(synchronizer)
# 5. Create agent output observer for displaying interactions
agent_output_observer = AgentOutputObserver(presenter_type="rich")
event_bus.subscribe(agent_output_observer)
# Execute constellation
await orchestrator.execute_constellation(constellation)
# Retrieve collected metrics
metrics = metrics_observer.get_metrics()
print(f"Tasks completed: {metrics['completed_tasks']}")
print(f"Total execution time: {metrics['total_execution_time']:.2f}s")
print(f"Modifications: {metrics['constellation_modifications']}")
🔑 Key Benefits
1. Decoupling
Events decouple components — publishers don't need to know about observers:
- Orchestrator publishes task events without knowing who's listening
- Agent publishes modification events without coordinating with orchestrator
- New observers can be added without changing existing code
2. Extensibility
Add custom observers for new functionality:
from galaxy.core.events import IEventObserver, Event, EventType
class CustomMetricsObserver(IEventObserver):
"""Custom observer for domain-specific metrics."""
def __init__(self):
self.custom_metrics = {}
async def on_event(self, event: Event) -> None:
if event.event_type == EventType.TASK_COMPLETED:
# Collect custom metrics
task_type = event.data.get("task_type")
if task_type not in self.custom_metrics:
self.custom_metrics[task_type] = []
self.custom_metrics[task_type].append({
"duration": event.data.get("execution_time"),
"result": event.result
})
# Subscribe to specific events
event_bus = get_event_bus()
custom_observer = CustomMetricsObserver()
event_bus.subscribe(custom_observer, {EventType.TASK_COMPLETED})
3. Concurrent Execution
All observers are notified concurrently using asyncio.gather():
- No observer blocks another
- Exceptions in one observer don't affect others
- Efficient parallel processing
4. Type-Safe Event Handling
Specialized event classes provide type safety:
async def on_event(self, event: Event) -> None:
if isinstance(event, TaskEvent):
# TaskEvent-specific handling
task_id = event.task_id # Type-safe access
status = event.status
elif isinstance(event, ConstellationEvent):
# ConstellationEvent-specific handling
constellation_id = event.constellation_id
state = event.constellation_state
📚 Component Documentation
Explore detailed documentation for each observer:
- Session Metrics Observer — Performance metrics and statistics collection
- Event System Core — Event bus, event types, and interfaces
Additional Observers
Documentation for ConstellationProgressObserver, DAGVisualizationObserver, ConstellationModificationSynchronizer, and AgentOutputObserver is available in their source code files. These observers handle task progress tracking, real-time visualization, modification synchronization, and agent output display respectively.
🔗 Related Documentation
- Constellation Orchestrator — Event publishers for task execution
- Constellation Agent — Event publishers for agent operations
- Performance Metrics — How metrics are collected and analyzed
- Event-Driven Coordination — Deep dive into event system architecture
💡 Best Practices
Observer Lifecycle Management
Properly manage observer subscriptions to prevent memory leaks:
# Subscribe observers
observers = [progress_observer, metrics_observer, viz_observer]
for observer in observers:
event_bus.subscribe(observer)
try:
# Execute constellation
await orchestrator.execute_constellation(constellation)
finally:
# Clean up observers
for observer in observers:
event_bus.unsubscribe(observer)
Event-Specific Subscription
Subscribe only to relevant events for efficiency:
# Instead of subscribing to all events
event_bus.subscribe(observer) # Receives ALL events
# Subscribe to specific event types
event_bus.subscribe(observer, {
EventType.TASK_COMPLETED,
EventType.TASK_FAILED,
EventType.CONSTELLATION_MODIFIED
})
Error Handling in Observers
Always handle exceptions gracefully:
async def on_event(self, event: Event) -> None:
try:
# Process event
await self._handle_event(event)
except Exception as e:
self.logger.error(f"Error processing event: {e}")
# Don't re-raise - let other observers continue
🎓 Summary
The Observer System provides a robust, event-driven foundation for monitoring and coordinating Galaxy's constellation execution:
- Event Bus acts as central message broker
- 5 specialized observers handle different aspects of monitoring
- Loose coupling enables extensibility and maintainability
- Concurrent execution ensures efficient event processing
- Type-safe events provide clear contracts and error prevention
For implementation details of specific observers, refer to the individual component documentation pages linked above.