ConstellationEditor — Interactive DAG Editor
📋 Overview
ConstellationEditor provides a high-level, command pattern-based interface for safe and comprehensive TaskConstellation manipulation. It offers undo/redo capabilities, batch operations, validation, and observer patterns for building, modifying, and managing complex workflow DAGs interactively.
The editor uses the Command Pattern to encapsulate all operations as reversible command objects, enabling undo/redo with full command history, transactional safety with atomic operations, complete operation tracking for auditability, and easy extensibility for new command types.
Usage in Galaxy: The ConstellationEditor is primarily used by the Constellation Agent to programmatically build task workflows, but can also be used directly for manual constellation creation and debugging.
🏗️ Architecture
Core Components
graph TD
A[ConstellationEditor] -->|manages| B[TaskConstellation]
A -->|uses| C[CommandInvoker]
C -->|executes| D[Commands]
D -->|modifies| B
A -->|notifies| E[Observers]
style A fill:#87CEEB
style B fill:#90EE90
style C fill:#FFD700
style D fill:#FFB6C1
style E fill:#DDA0DD
| Component |
Purpose |
| ConstellationEditor |
High-level interface for constellation editing |
| CommandInvoker |
Manages command execution, history, undo/redo |
| Commands |
Encapsulated operations (Add, Remove, Update, etc.) |
| Observers |
Callback functions notified on changes |
💻 Basic Usage
Creating an Editor
from galaxy.constellation import TaskConstellation
from galaxy.constellation.editor import ConstellationEditor
# Create editor with new constellation
editor = ConstellationEditor()
# Create editor with existing constellation
existing = TaskConstellation(name="my_workflow")
editor = ConstellationEditor(
constellation=existing,
enable_history=True, # Enable undo/redo
max_history_size=100 # Keep last 100 commands
)
# Access constellation
print(f"Editing: {editor.constellation.name}")
🎯 Task Operations
Adding Tasks
from galaxy.constellation import TaskStar
# Method 1: Add existing TaskStar
task = TaskStar(
task_id="fetch_data",
description="Download dataset from S3",
target_device_id="linux_server_1"
)
added_task = editor.add_task(task)
# Method 2: Add from dictionary
task_dict = {
"task_id": "preprocess",
"description": "Clean and normalize data",
"target_device_id": "linux_server_2",
"timeout": 300.0
}
added_task = editor.add_task(task_dict)
# Method 3: Create and add in one step
task = editor.create_and_add_task(
task_id="train_model",
description="Train neural network on preprocessed data",
name="Model Training",
target_device_id="gpu_server",
priority="HIGH",
timeout=3600.0,
retry_count=2
)
Updating Tasks
# Update task properties
updated_task = editor.update_task(
task_id="train_model",
description="Train BERT model on preprocessed text data",
timeout=7200.0,
priority="CRITICAL"
)
# Update with task_data
editor.update_task(
task_id="train_model",
task_data={
"model_type": "BERT",
"epochs": 10,
"batch_size": 32
}
)
Removing Tasks
# Remove task (also removes related dependencies)
removed_id = editor.remove_task("preprocess")
print(f"Removed task: {removed_id}")
Querying Tasks
# Get specific task
task = editor.get_task("fetch_data")
# List all tasks
all_tasks = editor.list_tasks()
for task in all_tasks:
print(f"{task.name}: {task.status.value}")
# Get ready tasks
ready = editor.get_ready_tasks()
🔗 Dependency Operations
Adding Dependencies
from galaxy.constellation import TaskStarLine
# Method 1: Add existing TaskStarLine
dep = TaskStarLine.create_success_only(
from_task_id="fetch_data",
to_task_id="preprocess",
description="Preprocess after successful download"
)
added_dep = editor.add_dependency(dep)
# Method 2: Add from dictionary
dep_dict = {
"from_task_id": "preprocess",
"to_task_id": "train_model",
"dependency_type": "SUCCESS_ONLY",
"condition_description": "Train on preprocessed data"
}
added_dep = editor.add_dependency(dep_dict)
# Method 3: Create and add in one step
dep = editor.create_and_add_dependency(
from_task_id="train_model",
to_task_id="evaluate_model",
dependency_type="UNCONDITIONAL",
condition_description="Evaluate after training completes"
)
Updating Dependencies
# Update dependency properties
updated_dep = editor.update_dependency(
dependency_id=dep.line_id,
dependency_type="CONDITIONAL",
condition_description="Evaluate only if training accuracy > 90%"
)
Removing Dependencies
# Remove dependency
removed_id = editor.remove_dependency(dep.line_id)
Querying Dependencies
# Get specific dependency
dep = editor.get_dependency(dep_id)
# List all dependencies
all_deps = editor.list_dependencies()
# Get dependencies for specific task
task_deps = editor.get_task_dependencies("train_model")
🔄 Undo/Redo Operations
Basic Undo/Redo
# Add a task
task = editor.create_and_add_task(
task_id="test_task",
description="Run unit tests"
)
# Oops, didn't mean to add that
if editor.can_undo():
editor.undo()
print("✅ Task addition undone")
# Actually, let's keep it
if editor.can_redo():
editor.redo()
print("✅ Task addition redone")
Checking Undo/Redo Availability
# Check if undo/redo is available
print(f"Can undo: {editor.can_undo()}")
print(f"Can redo: {editor.can_redo()}")
# Get description of what would be undone/redone
if editor.can_undo():
print(f"Undo: {editor.get_undo_description()}")
if editor.can_redo():
print(f"Redo: {editor.get_redo_description()}")
Command History
# Get command history
history = editor.get_history()
for i, cmd_desc in enumerate(history):
print(f"{i+1}. {cmd_desc}")
# Example output:
# 1. Add task: fetch_data
# 2. Add task: preprocess
# 3. Add dependency: fetch_data → preprocess
# 4. Update task: preprocess
# Clear history (cannot undo after this)
editor.clear_history()
🏗️ Bulk Operations
Building from Configuration
from galaxy.agents.schema import TaskConstellationSchema
# Build constellation from schema
config = TaskConstellationSchema(
name="ml_pipeline",
tasks=[
{
"task_id": "fetch",
"description": "Fetch data",
"target_device_id": "server_1"
},
{
"task_id": "process",
"description": "Process data",
"target_device_id": "server_2"
}
],
dependencies=[
{
"from_task_id": "fetch",
"to_task_id": "process",
"dependency_type": "SUCCESS_ONLY"
}
]
)
constellation = editor.build_constellation(
config=config,
clear_existing=True # Clear current constellation first
)
Building from Lists
# Build from task and dependency lists
tasks = [
{
"task_id": "a",
"description": "Task A",
"target_device_id": "device_1"
},
{
"task_id": "b",
"description": "Task B",
"target_device_id": "device_2"
}
]
dependencies = [
{
"from_task_id": "a",
"to_task_id": "b",
"dependency_type": "UNCONDITIONAL"
}
]
constellation = editor.build_from_tasks_and_dependencies(
tasks=tasks,
dependencies=dependencies,
clear_existing=True,
metadata={"version": "1.0", "author": "system"}
)
Clearing Constellation
# Remove all tasks and dependencies
cleared = editor.clear_constellation()
print(f"Constellation cleared: {cleared.task_count == 0}")
💾 File Operations
Saving Constellation
# Save to JSON file
file_path = editor.save_constellation("my_workflow.json")
print(f"Saved to: {file_path}")
Loading Constellation
# Load from JSON file
loaded = editor.load_constellation("my_workflow.json")
print(f"Loaded: {loaded.name}")
print(f"Tasks: {loaded.task_count}")
print(f"Dependencies: {loaded.dependency_count}")
Loading from Data
# Load from dictionary
data = {
"name": "test_workflow",
"tasks": {...},
"dependencies": {...}
}
constellation = editor.load_from_dict(data)
# Load from JSON string
json_string = '{"name": "workflow", "tasks": {...}}'
constellation = editor.load_from_json_string(json_string)
🔍 Validation and Analysis
DAG Validation
# Validate constellation structure
is_valid, errors = editor.validate_constellation()
if not is_valid:
print("❌ Validation errors:")
for error in errors:
print(f" - {error}")
else:
print("✅ Constellation is valid")
# Check for cycles
if editor.has_cycles():
print("❌ Constellation contains cycles")
Topological Analysis
# Get topological order
try:
order = editor.get_topological_order()
print(f"Execution order: {' → '.join(order)}")
except ValueError as e:
print(f"Cannot get order: {e}")
Statistics
# Get comprehensive statistics
stats = editor.get_statistics()
print(f"Constellation: {stats['constellation_id']}")
print(f"Tasks: {stats['total_tasks']}")
print(f"Dependencies: {stats['total_dependencies']}")
print(f"Longest path: {stats['longest_path_length']}")
print(f"Max width: {stats['max_width']}")
print(f"Parallelism ratio: {stats['parallelism_ratio']:.2f}")
# Editor-specific stats
print(f"Commands executed: {stats['editor_execution_count']}")
print(f"History size: {stats['editor_history_size']}")
print(f"Can undo: {stats['editor_can_undo']}")
print(f"Can redo: {stats['editor_can_redo']}")
👀 Observer Pattern
Adding Observers
# Define observer callback
def on_change(editor, command, result):
print(f"Operation: {command}")
print(f"Result: {result}")
print(f"Constellation state: {editor.constellation.state.value}")
# Add observer
editor.add_observer(on_change)
# Now all operations trigger the observer
task = editor.create_and_add_task(
task_id="observed_task",
description="This triggers the observer"
)
# Output:
# Operation: add_task
# Result: <TaskStar object>
# Constellation state: ready
Removing Observers
# Remove specific observer
editor.remove_observer(on_change)
# Operations no longer trigger this observer
Multiple Observers
def log_observer(editor, command, result):
with open("constellation_log.txt", "a") as f:
f.write(f"{command}: {result}\n")
def metrics_observer(editor, command, result):
stats = editor.get_statistics()
print(f"Current metrics: P={stats['parallelism_ratio']:.2f}")
# Add multiple observers
editor.add_observer(log_observer)
editor.add_observer(metrics_observer)
# All observers are notified on each operation
🎨 Advanced Features
Batch Operations
# Execute multiple operations in sequence
operations = [
lambda e: e.create_and_add_task("task_a", "Task A"),
lambda e: e.create_and_add_task("task_b", "Task B"),
lambda e: e.create_and_add_dependency("task_a", "task_b", "UNCONDITIONAL"),
]
results = editor.batch_operations(operations)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Operation {i+1} failed: {result}")
else:
print(f"Operation {i+1} succeeded: {result}")
Creating Subgraphs
# Extract subgraph with specific tasks
task_ids = ["fetch_data", "preprocess", "train_model"]
subgraph_editor = editor.create_subgraph(task_ids)
print(f"Subgraph tasks: {subgraph_editor.constellation.task_count}")
print(f"Subgraph deps: {subgraph_editor.constellation.dependency_count}")
# Subgraph includes only dependencies between included tasks
Merging Constellations
# Create two separate workflows
editor1 = ConstellationEditor()
editor1.create_and_add_task("task_a", "Task A from editor1")
editor2 = ConstellationEditor()
editor2.create_and_add_task("task_b", "Task B from editor2")
# Merge editor2 into editor1 with prefix
editor1.merge_constellation(
other_editor=editor2,
prefix="imported_"
)
# editor1 now contains: task_a, imported_task_b
🛡️ Error Handling
Validation Errors
try:
# Try to add task with duplicate ID
editor.create_and_add_task("existing_id", "Duplicate task")
except Exception as e:
print(f"❌ Error: {e}")
# Can undo to previous valid state
if editor.can_undo():
editor.undo()
Cyclic Dependency Detection
# Create cycle: A → B → C → A
editor.create_and_add_task("a", "Task A")
editor.create_and_add_task("b", "Task B")
editor.create_and_add_task("c", "Task C")
editor.create_and_add_dependency("a", "b", "UNCONDITIONAL")
editor.create_and_add_dependency("b", "c", "UNCONDITIONAL")
try:
# This creates a cycle
editor.create_and_add_dependency("c", "a", "UNCONDITIONAL")
except Exception as e:
print(f"❌ Cycle detected: {e}")
# Undo the failed operation
# (Actually, the operation fails before execution, so nothing to undo)
📊 Complete Example Workflow
from galaxy.constellation.editor import ConstellationEditor
# Create editor
editor = ConstellationEditor(enable_history=True)
# Build ML training pipeline
# Step 1: Add tasks
fetch = editor.create_and_add_task(
task_id="fetch_data",
description="Download dataset from S3",
target_device_id="linux_server_1",
timeout=300.0
)
preprocess = editor.create_and_add_task(
task_id="preprocess",
description="Clean and normalize data",
target_device_id="linux_server_2",
timeout=600.0
)
train = editor.create_and_add_task(
task_id="train_model",
description="Train BERT model",
target_device_id="gpu_server_a100",
priority="HIGH",
timeout=7200.0,
retry_count=2
)
evaluate = editor.create_and_add_task(
task_id="evaluate",
description="Evaluate model on test set",
target_device_id="linux_server_3"
)
# Step 2: Add dependencies
editor.create_and_add_dependency(
"fetch_data", "preprocess", "SUCCESS_ONLY"
)
editor.create_and_add_dependency(
"preprocess", "train_model", "SUCCESS_ONLY"
)
editor.create_and_add_dependency(
"train_model", "evaluate", "UNCONDITIONAL"
)
# Step 3: Validate
is_valid, errors = editor.validate_constellation()
assert is_valid, f"Validation failed: {errors}"
# Step 4: Analyze
stats = editor.get_statistics()
print(f"Pipeline: {stats['total_tasks']} tasks, {stats['total_dependencies']} dependencies")
print(f"Critical path: {stats['longest_path_length']}")
print(f"Parallelism: {stats['parallelism_ratio']:.2f}")
# Step 5: Save
editor.save_constellation("ml_training_pipeline.json")
# Step 6: Execute (via orchestrator)
constellation = editor.constellation
# Pass to ConstellationOrchestrator for distributed execution
# See: ../constellation_orchestrator/overview.md for execution details
For details on executing the built constellation, see the Constellation Orchestrator documentation.
🎯 Best Practices
Editor Usage Guidelines
- Enable history: Always enable undo/redo for interactive editing sessions
- Validate frequently: Run
validate_constellation() after major structural changes
- Use observers: Add observers for logging, metrics tracking, or UI updates
- Batch operations: Use
batch_operations() for multiple related changes to improve efficiency
- Save incrementally: Create constellation checkpoints during complex editing workflows
Command Pattern Benefits
The command pattern architecture provides several key advantages:
- Undo/Redo: Full operation history with rollback capabilities
- Audit trail: Every change is recorded and traceable
- Transaction safety: Operations are atomic and validated
- Extensibility: New operation types can be added easily
Common Pitfalls
- Forgetting to validate: Always validate before passing to orchestrator for execution
- Clearing history prematurely: Cannot undo operations after calling
clear_history()
- Modifying running constellations: Editor operations will fail if constellation is currently executing
- Ignoring observer errors: Observers should handle their own exceptions to avoid breaking the editor
📚 Command Registry
Available Commands
# List all available commands
commands = editor.list_available_commands()
for name, metadata in commands.items():
print(f"{name}: {metadata['description']}")
print(f" Category: {metadata['category']}")
# Get command categories
categories = editor.get_command_categories()
print(f"Categories: {categories}")
# Get metadata for specific command
metadata = editor.get_command_metadata("add_task")
print(metadata)
Executing Commands by Name
# Execute command using registry
result = editor.execute_command_by_name(
"add_task",
task_data={"task_id": "new_task", "description": "New task"}
)
# This is equivalent to:
# editor.add_task({"task_id": "new_task", "description": "New task"})
- TaskStar — Individual tasks that can be edited and managed
- TaskStarLine — Dependencies between tasks that define execution order
- TaskConstellation — The constellation DAG being edited
- Overview — Task Constellation framework overview
📚 API Reference
Constructor
ConstellationEditor(
constellation: Optional[TaskConstellation] = None,
enable_history: bool = True,
max_history_size: int = 100
)
Task Operations
| Method |
Description |
add_task(task) |
Add task (TaskStar or dict), returns TaskStar |
create_and_add_task(task_id, description, name, **kwargs) |
Create and add new task, returns TaskStar |
update_task(task_id, **updates) |
Update task properties, returns updated TaskStar |
remove_task(task_id) |
Remove task and related dependencies, returns removed task ID (str) |
get_task(task_id) |
Get task by ID, returns Optional[TaskStar] |
list_tasks() |
Get all tasks, returns List[TaskStar] |
Dependency Operations
| Method |
Description |
add_dependency(dependency) |
Add dependency (TaskStarLine or dict), returns TaskStarLine |
create_and_add_dependency(from_id, to_id, type, **kwargs) |
Create and add dependency, returns TaskStarLine |
update_dependency(dependency_id, **updates) |
Update dependency properties, returns updated TaskStarLine |
remove_dependency(dependency_id) |
Remove dependency, returns removed dependency ID (str) |
get_dependency(dependency_id) |
Get dependency by ID, returns Optional[TaskStarLine] |
list_dependencies() |
Get all dependencies, returns List[TaskStarLine] |
get_task_dependencies(task_id) |
Get dependencies for specific task, returns List[TaskStarLine] |
Bulk Operations
| Method |
Description |
build_constellation(config, clear_existing) |
Build constellation from TaskConstellationSchema |
build_from_tasks_and_dependencies(tasks, deps, ...) |
Build constellation from task and dependency lists (returns TaskConstellation) |
clear_constellation() |
Remove all tasks and dependencies from constellation |
batch_operations(operations) |
Execute multiple operations in sequence, returning list of results |
File Operations
| Method |
Description |
save_constellation(file_path) |
Save constellation to JSON file, returns file path |
load_constellation(file_path) |
Load constellation from JSON file, returns TaskConstellation |
load_from_dict(data) |
Load constellation from dictionary, returns TaskConstellation |
load_from_json_string(json_string) |
Load constellation from JSON string, returns TaskConstellation |
History Operations
| Method |
Description |
undo() |
Undo last command, returns True if successful, False if no undo available |
redo() |
Redo next command, returns True if successful, False if no redo available |
can_undo() |
Check if undo is available (returns bool) |
can_redo() |
Check if redo is available (returns bool) |
get_undo_description() |
Get description of operation that would be undone (returns Optional[str]) |
get_redo_description() |
Get description of operation that would be redone (returns Optional[str]) |
clear_history() |
Clear command history (no return value) |
get_history() |
Get list of command descriptions (returns List[str]) |
Validation
| Method |
Description |
validate_constellation() |
Validate DAG structure, returns tuple of (is_valid: bool, errors: List[str]) |
has_cycles() |
Check for cycles in the DAG, returns bool |
get_topological_order() |
Get topological ordering of tasks, returns List[str] of task IDs |
get_ready_tasks() |
Get tasks ready to execute (no pending dependencies), returns List[TaskStar] |
get_statistics() |
Get comprehensive constellation and editor statistics, returns Dict[str, Any] |
Observers
| Method |
Description |
add_observer(observer) |
Add change observer callable that receives (editor, command, result) |
remove_observer(observer) |
Remove previously added observer |
Advanced
| Method |
Description |
create_subgraph(task_ids) |
Extract subgraph with specific tasks |
merge_constellation(other_editor, prefix) |
Merge another constellation with optional ID prefix |
display_constellation(mode) |
Display visualization (modes: 'overview', 'topology', 'details', 'execution') |
For interactive web-based visualization and editing, see the Galaxy WebUI.
ConstellationEditor — Safe, interactive, and reversible constellation manipulation