Constellation Manager
Overview
The ConstellationManager is a companion component to the TaskConstellationOrchestrator that handles device assignment, resource management, and constellation lifecycle tracking. While the orchestrator focuses on execution flow and coordination, the manager provides the infrastructure for device operations and state management.
This separation of concerns follows the Single Responsibility Principle: orchestration logic remains independent of device management details.
Architecture
Learn more about the orchestrator architecture and asynchronous scheduling.
Core Responsibilities
The ConstellationManager handles four primary responsibilities:
1. Device Assignment
Assigns tasks to appropriate devices using configurable strategies:
| Strategy | Description | Use Case |
|---|---|---|
| Round Robin | Distributes tasks evenly across devices | Load balancing for homogeneous devices |
| Capability Match | Matches task requirements to device capabilities | Heterogeneous device types (Windows, Android, iOS) |
| Load Balance | Assigns to device with lowest current load | Dynamic workload distribution |
2. Resource Management
Tracks and manages constellation resources:
- Device availability and status
- Constellation registration and metadata
- Device utilization statistics
- Assignment validation
3. Lifecycle Management
Manages constellation lifecycle:
- Registration when orchestration begins
- Metadata tracking during execution
- Unregistration after completion
- Status querying
4. Validation
Validates device assignments against constraints:
- All tasks have assigned devices
- Assigned devices exist and are connected
- Device capabilities match task requirements
Device Assignment Strategies
Round Robin
Distributes tasks cyclically across available devices:
async def _assign_round_robin(
self,
constellation: TaskConstellation,
available_devices: List[Dict[str, Any]],
preferences: Optional[Dict[str, str]] = None,
) -> Dict[str, str]:
"""Round robin device assignment strategy."""
assignments = {}
device_index = 0
for task_id, task in constellation.tasks.items():
# Check preferences first
if preferences and task_id in preferences:
preferred_device = preferences[task_id]
if any(d["device_id"] == preferred_device for d in available_devices):
assignments[task_id] = preferred_device
continue
# Round robin assignment
device = available_devices[device_index % len(available_devices)]
assignments[task_id] = device["device_id"]
device_index += 1
return assignments
Characteristics:
- Fairness: Each device gets approximately equal number of tasks
- Simplicity: No complex decision-making
- Overhead: O(N) where N = number of tasks
- Best for: Homogeneous devices with similar capabilities
Example:
# 3 devices, 7 tasks
Task 1 → Device A
Task 2 → Device B
Task 3 → Device C
Task 4 → Device A
Task 5 → Device B
Task 6 → Device C
Task 7 → Device A
Capability Match
Matches tasks to devices based on device type and capabilities:
async def _assign_capability_match(
self,
constellation: TaskConstellation,
available_devices: List[Dict[str, Any]],
preferences: Optional[Dict[str, str]] = None,
) -> Dict[str, str]:
"""Capability-based device assignment strategy."""
assignments = {}
for task_id, task in constellation.tasks.items():
# Check preferences first
if preferences and task_id in preferences:
preferred_device = preferences[task_id]
if any(d["device_id"] == preferred_device for d in available_devices):
assignments[task_id] = preferred_device
continue
# Find devices matching task requirements
matching_devices = []
if task.device_type:
matching_devices = [
d for d in available_devices
if d.get("device_type") == task.device_type.value
]
# Fall back to any available device if no matches
if not matching_devices:
matching_devices = available_devices
# Choose first matching device
if matching_devices:
assignments[task_id] = matching_devices[0]["device_id"]
return assignments
Characteristics:
- Type-aware: Respects task's
device_typerequirement - Fallback: Uses any device if no type match found
- Overhead: O(N × D) where N = tasks, D = devices
- Best for: Heterogeneous device ecosystems
Example:
# Mixed device types
Task A (requires Windows) → Windows Device 1
Task B (requires Android) → Android Device 1
Task C (requires Windows) → Windows Device 1
Task D (no requirement) → Any available device
Load Balance
Assigns tasks to minimize device load:
async def _assign_load_balance(
self,
constellation: TaskConstellation,
available_devices: List[Dict[str, Any]],
preferences: Optional[Dict[str, str]] = None,
) -> Dict[str, str]:
"""Load-balanced device assignment strategy."""
assignments = {}
device_load = {d["device_id"]: 0 for d in available_devices}
for task_id, task in constellation.tasks.items():
# Check preferences first
if preferences and task_id in preferences:
preferred_device = preferences[task_id]
if any(d["device_id"] == preferred_device for d in available_devices):
assignments[task_id] = preferred_device
device_load[preferred_device] += 1
continue
# Find device with lowest load
min_load_device = min(device_load.keys(), key=lambda d: device_load[d])
assignments[task_id] = min_load_device
device_load[min_load_device] += 1
return assignments
Characteristics:
- Balanced: Minimizes maximum device load
- Dynamic: Adapts to varying task counts
- Overhead: O(N × log D) with priority queue optimization
- Best for: Constellations with varying task complexity
Example:
# 2 devices, 5 tasks with varying complexity
Task 1 (simple) → Device A [load: 1]
Task 2 (complex) → Device B [load: 1]
Task 3 (simple) → Device A [load: 2]
Task 4 (simple) → Device B [load: 2]
Task 5 (complex) → Device A [load: 3]
Constellation Lifecycle Management
Registration
Register a constellation for management:
def register_constellation(
self,
constellation: TaskConstellation,
metadata: Optional[Dict[str, Any]] = None,
) -> str:
"""Register a constellation for management."""
constellation_id = constellation.constellation_id
self._managed_constellations[constellation_id] = constellation
self._constellation_metadata[constellation_id] = metadata or {}
if self._logger:
self._logger.info(
f"Registered constellation '{constellation.name}' ({constellation_id})"
)
return constellation_id
Purpose: Track active constellations and their metadata
Metadata examples:
metadata = {
"user_id": "user123",
"session_id": "session_456",
"priority": "high",
"created_by": "automation_pipeline",
}
Status Querying
Get detailed status of a managed constellation:
async def get_constellation_status(
self, constellation_id: str
) -> Optional[Dict[str, Any]]:
"""Get detailed status of a managed constellation."""
constellation = self._managed_constellations.get(constellation_id)
if not constellation:
return None
metadata = self._constellation_metadata.get(constellation_id, {})
return {
"constellation_id": constellation_id,
"name": constellation.name,
"state": constellation.state.value,
"statistics": constellation.get_statistics(),
"ready_tasks": [task.task_id for task in constellation.get_ready_tasks()],
"running_tasks": [task.task_id for task in constellation.get_running_tasks()],
"completed_tasks": [task.task_id for task in constellation.get_completed_tasks()],
"failed_tasks": [task.task_id for task in constellation.get_failed_tasks()],
"metadata": metadata,
}
Returns:
{
"constellation_id": "constellation_20251106_143052_a1b2c3d4",
"name": "Multi-Device Data Collection",
"state": "executing",
"statistics": {
"total_tasks": 10,
"task_status_counts": {
"completed": 3,
"running": 2,
"pending": 5
},
"parallelism_ratio": 2.5
},
"ready_tasks": ["task_6", "task_7"],
"running_tasks": ["task_4", "task_5"],
"completed_tasks": ["task_1", "task_2", "task_3"],
"failed_tasks": [],
"metadata": {
"user_id": "user123",
"priority": "high"
}
}
Unregistration
Remove a constellation from management:
def unregister_constellation(self, constellation_id: str) -> bool:
"""Unregister a constellation from management."""
if constellation_id in self._managed_constellations:
constellation = self._managed_constellations[constellation_id]
del self._managed_constellations[constellation_id]
del self._constellation_metadata[constellation_id]
if self._logger:
self._logger.info(
f"Unregistered constellation '{constellation.name}' ({constellation_id})"
)
return True
return False
Purpose: Clean up resources after orchestration completes
Device Operations
Getting Available Devices
Retrieve list of connected devices:
async def get_available_devices(self) -> List[Dict[str, Any]]:
"""Get list of available devices from device manager."""
if not self._device_manager:
return []
try:
connected_device_ids = self._device_manager.get_connected_devices()
devices = []
for device_id in connected_device_ids:
device_info = self._device_manager.device_registry.get_device_info(
device_id
)
if device_info:
devices.append({
"device_id": device_id,
"device_type": getattr(device_info, "device_type", "unknown"),
"capabilities": getattr(device_info, "capabilities", []),
"status": "connected",
"metadata": getattr(device_info, "metadata", {}),
})
return devices
except Exception as e:
if self._logger:
self._logger.error(f"Failed to get available devices: {e}")
return []
Returns:
[
{
"device_id": "windows_main",
"device_type": "windows",
"capabilities": ["file_ops", "browser", "office"],
"status": "connected",
"metadata": {"os_version": "Windows 11"}
},
{
"device_id": "android_pixel",
"device_type": "android",
"capabilities": ["touch", "camera", "gps"],
"status": "connected",
"metadata": {"android_version": "14"}
}
]
Device Assignment
Automatically assign devices to all tasks:
async def assign_devices_automatically(
self,
constellation: TaskConstellation,
strategy: str = "round_robin",
device_preferences: Optional[Dict[str, str]] = None,
) -> Dict[str, str]:
"""Automatically assign devices to tasks in a constellation."""
if not self._device_manager:
raise ValueError("Device manager not available for device assignment")
available_devices = await self._get_available_devices()
if not available_devices:
raise ValueError("No available devices for assignment")
if self._logger:
self._logger.info(
f"Assigning devices to constellation '{constellation.name}' "
f"using strategy '{strategy}'"
)
# Select strategy
if strategy == "round_robin":
assignments = await self._assign_round_robin(
constellation, available_devices, device_preferences
)
elif strategy == "capability_match":
assignments = await self._assign_capability_match(
constellation, available_devices, device_preferences
)
elif strategy == "load_balance":
assignments = await self._assign_load_balance(
constellation, available_devices, device_preferences
)
else:
raise ValueError(f"Unknown assignment strategy: {strategy}")
# Apply assignments to tasks
for task_id, device_id in assignments.items():
task = constellation.get_task(task_id)
if task:
task.target_device_id = device_id
if self._logger:
self._logger.info(f"Assigned {len(assignments)} tasks to devices")
return assignments
Manual Reassignment
Reassign a single task to a different device:
def reassign_task_device(
self,
constellation: TaskConstellation,
task_id: str,
new_device_id: str,
) -> bool:
"""Reassign a task to a different device."""
task = constellation.get_task(task_id)
if not task:
return False
old_device_id = task.target_device_id
task.target_device_id = new_device_id
if self._logger:
self._logger.info(
f"Reassigned task '{task_id}' from device '{old_device_id}' "
f"to '{new_device_id}'"
)
return True
Validation
Assignment Validation
Validate that all tasks have valid device assignments:
def validate_constellation_assignments(
self, constellation: TaskConstellation
) -> tuple[bool, List[str]]:
"""Validate that all tasks have valid device assignments."""
errors = []
for task_id, task in constellation.tasks.items():
if not task.target_device_id:
errors.append(f"Task '{task_id}' has no device assignment")
is_valid = len(errors) == 0
if self._logger:
if is_valid:
self._logger.info(
f"All tasks in constellation '{constellation.name}' have "
f"valid assignments"
)
else:
self._logger.warning(
f"Constellation '{constellation.name}' has {len(errors)} "
f"assignment errors"
)
return is_valid, errors
Device Information
Get device information for a specific task:
def get_task_device_info(
self, constellation: TaskConstellation, task_id: str
) -> Optional[Dict[str, Any]]:
"""Get device information for a specific task."""
task = constellation.get_task(task_id)
if not task or not task.target_device_id:
return None
# Get device info from device manager
if self._device_manager:
try:
device_info = self._device_manager.device_registry.get_device_info(
task.target_device_id
)
if device_info:
return {
"device_id": task.target_device_id,
"device_type": getattr(device_info, "device_type", "unknown"),
"capabilities": getattr(device_info, "capabilities", []),
"metadata": getattr(device_info, "metadata", {}),
}
except Exception as e:
if self._logger:
self._logger.error(
f"Failed to get device info for task '{task_id}': {e}"
)
return None
Utilization Tracking
Device Utilization Statistics
Get device utilization across constellation:
def get_device_utilization(
self, constellation: TaskConstellation
) -> Dict[str, int]:
"""Get device utilization statistics for a constellation."""
utilization = {}
for task in constellation.tasks.values():
if task.target_device_id:
utilization[task.target_device_id] = (
utilization.get(task.target_device_id, 0) + 1
)
return utilization
Example output:
{
"windows_main": 5,
"android_pixel": 3,
"ios_iphone": 2
}
Listing All Constellations
List all managed constellations:
def list_constellations(self) -> List[Dict[str, Any]]:
"""List all managed constellations with basic information."""
result = []
for constellation_id, constellation in self._managed_constellations.items():
metadata = self._constellation_metadata.get(constellation_id, {})
result.append({
"constellation_id": constellation_id,
"name": constellation.name,
"state": constellation.state.value,
"task_count": constellation.task_count,
"dependency_count": constellation.dependency_count,
"metadata": metadata,
})
return result
Usage Patterns
Basic Setup
from galaxy.constellation.orchestrator import ConstellationManager
from galaxy.client.device_manager import ConstellationDeviceManager
# Create device manager
device_manager = ConstellationDeviceManager()
# Create constellation manager
manager = ConstellationManager(device_manager, enable_logging=True)
# Register constellation
constellation_id = manager.register_constellation(
constellation,
metadata={"priority": "high"}
)
Automatic Assignment
# Assign devices using capability matching
assignments = await manager.assign_devices_automatically(
constellation,
strategy="capability_match"
)
print(f"Assigned {len(assignments)} tasks")
With Preferences
# Specify preferred devices for specific tasks
preferences = {
"critical_task_1": "windows_main",
"gpu_task_2": "windows_gpu",
}
assignments = await manager.assign_devices_automatically(
constellation,
strategy="load_balance",
device_preferences=preferences
)
Manual Override
# Reassign specific task
manager.reassign_task_device(
constellation,
task_id="task_5",
new_device_id="android_backup"
)
Validation
# Validate assignments before orchestration
is_valid, errors = manager.validate_constellation_assignments(constellation)
if not is_valid:
print(f"Validation errors: {errors}")
# Fix assignments...
Monitoring
# Check constellation status during execution
status = await manager.get_constellation_status(constellation_id)
print(f"State: {status['state']}")
print(f"Running tasks: {len(status['running_tasks'])}")
print(f"Completed tasks: {len(status['completed_tasks'])}")
# Get device utilization
utilization = manager.get_device_utilization(constellation)
for device_id, task_count in utilization.items():
print(f"{device_id}: {task_count} tasks")
Integration with Orchestrator
The orchestrator uses the manager internally:
class TaskConstellationOrchestrator:
def __init__(self, device_manager, enable_logging=True):
self._device_manager = device_manager
self._constellation_manager = ConstellationManager(
device_manager, enable_logging
)
async def orchestrate_constellation(self, constellation, ...):
# Use manager for assignment
await self._constellation_manager.assign_devices_automatically(
constellation, assignment_strategy
)
# Use manager for validation
is_valid, errors = self._constellation_manager \
.validate_constellation_assignments(constellation)
if not is_valid:
raise ValueError(f"Device assignment validation failed: {errors}")
# Continue orchestration...
Related Documentation
- Overview - Orchestrator architecture and design
- Asynchronous Scheduling - Task execution model
- Consistency Guarantees - Device assignment validation
- API Reference - Complete API documentation