Module tinytroupe.validation.simulation_validator
Simulation experiment empirical validation mechanisms for TinyTroupe.
This module provides tools to validate simulation experiment results against empirical control data, supporting both statistical hypothesis testing and semantic validation approaches. This is distinct from LLM-based evaluations, focusing on data-driven validation against known empirical benchmarks.
Expand source code
"""
Simulation experiment empirical validation mechanisms for TinyTroupe.
This module provides tools to validate simulation experiment results against empirical control data,
supporting both statistical hypothesis testing and semantic validation approaches.
This is distinct from LLM-based evaluations, focusing on data-driven validation
against known empirical benchmarks.
"""
from typing import Dict, List, Optional, Union, Any
import json
from datetime import datetime
from pydantic import BaseModel, Field
from tinytroupe.experimentation.statistical_tests import StatisticalTester
from tinytroupe.utils.semantics import compute_semantic_proximity
# TODO Work-in-Progress below
class SimulationExperimentDataset(BaseModel):
"""
Represents a dataset from a simulation experiment or empirical study.
This contains data that can be used for validation, including quantitative metrics
and qualitative agent justifications from simulation experiments or empirical studies.
Attributes:
name: Optional name for the dataset
description: Optional description of the dataset
key_results: Map from result names to their values (numbers, proportions, booleans, etc.)
result_types: Map indicating whether each result is "aggregate" or "per_agent"
agent_names: Optional list of agent names (can be referenced by index in results)
agent_justifications: List of justifications (with optional agent references)
justification_summary: Optional summary of all agent justifications
"""
name: Optional[str] = None
description: Optional[str] = None
key_results: Dict[str, Union[float, int, bool, List[Union[float, int, bool, None]], None]] = Field(default_factory=dict)
result_types: Dict[str, str] = Field(default_factory=dict, description="Map from result name to 'aggregate' or 'per_agent'")
agent_names: Optional[List[Optional[str]]] = Field(None, description="Optional list of agent names for reference (can contain None for unnamed agents)")
agent_justifications: List[Union[str, Dict[str, Union[str, int]]]] = Field(
default_factory=list,
description="List of justifications as strings or dicts with optional 'agent_name'/'agent_index' and 'justification'"
)
justification_summary: Optional[str] = None
class Config:
"""Pydantic configuration."""
extra = "forbid" # Prevent accidental extra fields
validate_assignment = True # Validate on assignment after creation
def get_agent_name(self, index: int) -> Optional[str]:
"""Get agent name by index, if available."""
if self.agent_names and 0 <= index < len(self.agent_names):
agent_name = self.agent_names[index]
return agent_name if agent_name is not None else None
return None
def get_agent_data(self, metric_name: str, agent_index: int) -> Optional[Union[float, int, bool]]:
"""Get a specific agent's data for a given metric. Returns None for missing data."""
if metric_name not in self.key_results:
return None
metric_data = self.key_results[metric_name]
# Check if it's per-agent data
if self.result_types.get(metric_name) == "per_agent" and isinstance(metric_data, list):
if 0 <= agent_index < len(metric_data):
return metric_data[agent_index] # This can be None for missing data
return None
def get_all_agent_data(self, metric_name: str) -> Dict[str, Union[float, int, bool]]:
"""Get all agents' data for a given metric as a dictionary mapping agent names/indices to values."""
if metric_name not in self.key_results:
return {}
metric_data = self.key_results[metric_name]
result = {}
# For per-agent data, create mapping
if self.result_types.get(metric_name) == "per_agent" and isinstance(metric_data, list):
for i, value in enumerate(metric_data):
agent_name = self.get_agent_name(i) or f"Agent_{i}"
# Only include non-None values in the result
if value is not None:
result[agent_name] = value
# For aggregate data, return single value
elif self.result_types.get(metric_name) == "aggregate":
result["aggregate"] = metric_data
return result
def get_valid_agent_data(self, metric_name: str) -> List[Union[float, int, bool]]:
"""Get only valid (non-None) values for a per-agent metric."""
if metric_name not in self.key_results:
return []
metric_data = self.key_results[metric_name]
if self.result_types.get(metric_name) == "per_agent" and isinstance(metric_data, list):
return [value for value in metric_data if value is not None]
return []
def validate_data_consistency(self) -> List[str]:
"""Validate that per-agent data is consistent across metrics and with agent names."""
errors = []
warnings = []
# Check per-agent metrics have consistent lengths
per_agent_lengths = []
per_agent_metrics = []
for metric_name, result_type in self.result_types.items():
if result_type == "per_agent" and metric_name in self.key_results:
metric_data = self.key_results[metric_name]
if isinstance(metric_data, list):
per_agent_lengths.append(len(metric_data))
per_agent_metrics.append(metric_name)
else:
errors.append(f"Metric '{metric_name}' marked as per_agent but is not a list")
# Check all per-agent metrics have same length
if per_agent_lengths and len(set(per_agent_lengths)) > 1:
errors.append(f"Per-agent metrics have inconsistent lengths: {dict(zip(per_agent_metrics, per_agent_lengths))}")
# Check agent_names length matches per-agent data length
if self.agent_names and per_agent_lengths:
agent_count = len(self.agent_names)
data_length = per_agent_lengths[0] if per_agent_lengths else 0
if agent_count != data_length:
errors.append(f"agent_names length ({agent_count}) doesn't match per-agent data length ({data_length})")
# Check for None values in agent_names and provide warnings
if self.agent_names:
none_indices = [i for i, name in enumerate(self.agent_names) if name is None]
if none_indices:
warnings.append(f"agent_names contains None values at indices: {none_indices}")
# Check for None values in per-agent data and provide info
for metric_name in per_agent_metrics:
if metric_name in self.key_results:
metric_data = self.key_results[metric_name]
none_indices = [i for i, value in enumerate(metric_data) if value is None]
if none_indices:
warnings.append(f"Metric '{metric_name}' has missing data (None) at indices: {none_indices}")
# Return errors and warnings combined
return errors + [f"WARNING: {warning}" for warning in warnings]
def get_justification_text(self, justification_item: Union[str, Dict[str, Union[str, int]]]) -> str:
"""Extract justification text from various formats."""
if isinstance(justification_item, str):
return justification_item
elif isinstance(justification_item, dict):
return justification_item.get("justification", "")
return ""
def get_justification_agent_reference(self, justification_item: Union[str, Dict[str, Union[str, int]]]) -> Optional[str]:
"""Get agent reference from justification, returning name if available."""
if isinstance(justification_item, dict):
# Direct agent name
if "agent_name" in justification_item:
return justification_item["agent_name"]
# Agent index reference
elif "agent_index" in justification_item:
return self.get_agent_name(justification_item["agent_index"])
return None
class SimulationExperimentEmpiricalValidationResult(BaseModel):
"""
Contains the results of a simulation experiment validation against empirical data.
This represents the outcome of validating simulation experiment data
against empirical benchmarks, using statistical and semantic methods.
Attributes:
validation_type: Type of validation performed
control_name: Name of the control/empirical dataset
treatment_name: Name of the treatment/simulation experiment dataset
statistical_results: Results from statistical tests (if performed)
semantic_results: Results from semantic proximity analysis (if performed)
overall_score: Overall validation score (0.0 to 1.0)
summary: Summary of validation findings
timestamp: When the validation was performed
"""
validation_type: str
control_name: str
treatment_name: str
statistical_results: Optional[Dict[str, Any]] = None
semantic_results: Optional[Dict[str, Any]] = None
overall_score: Optional[float] = Field(None, ge=0.0, le=1.0, description="Overall validation score between 0.0 and 1.0")
summary: str = ""
timestamp: str = Field(default_factory=lambda: datetime.now().isoformat())
class Config:
"""Pydantic configuration."""
extra = "forbid"
validate_assignment = True
class SimulationExperimentEmpiricalValidator:
"""
A validator for comparing simulation experiment data against empirical control data.
This validator performs data-driven validation using statistical hypothesis testing
and semantic proximity analysis of agent justifications. It is designed to validate
simulation experiment results against known empirical benchmarks, distinct from LLM-based evaluations.
"""
def __init__(self):
"""Initialize the simulation experiment empirical validator."""
pass
def validate(self,
control: SimulationExperimentDataset,
treatment: SimulationExperimentDataset,
validation_types: List[str] = ["statistical", "semantic"],
significance_level: float = 0.05,
output_format: str = "values") -> Union[SimulationExperimentEmpiricalValidationResult, str]:
"""
Validate a simulation experiment dataset against an empirical control dataset.
Args:
control: The control/empirical reference dataset
treatment: The treatment/simulation experiment dataset to validate
validation_types: List of validation types to perform ("statistical", "semantic")
significance_level: Significance level for statistical tests
output_format: "values" for SimulationExperimentEmpiricalValidationResult object, "report" for markdown report
Returns:
SimulationExperimentEmpiricalValidationResult object or markdown report string
"""
result = SimulationExperimentEmpiricalValidationResult(
validation_type=", ".join(validation_types),
control_name=control.name or "Control",
treatment_name=treatment.name or "Treatment"
)
# Perform statistical validation
if "statistical" in validation_types:
result.statistical_results = self._perform_statistical_validation(
control, treatment, significance_level
)
# Perform semantic validation
if "semantic" in validation_types:
result.semantic_results = self._perform_semantic_validation(
control, treatment
)
# Calculate overall score and summary
result.overall_score = self._calculate_overall_score(result)
result.summary = self._generate_summary(result)
if output_format == "report":
return self._generate_markdown_report(result)
else:
return result
def _perform_statistical_validation(self,
control: SimulationExperimentDataset,
treatment: SimulationExperimentDataset,
significance_level: float) -> Dict[str, Any]:
"""Perform statistical hypothesis testing on simulation experiment key results."""
if not control.key_results or not treatment.key_results:
return {"error": "No key results available for statistical testing"}
try:
# Prepare data for StatisticalTester
control_data = {"control": {}}
treatment_data = {"treatment": {}}
# Convert single values to lists if needed and find common metrics
common_metrics = set(control.key_results.keys()) & set(treatment.key_results.keys())
for metric in common_metrics:
control_value = control.key_results[metric]
treatment_value = treatment.key_results[metric]
# Convert single values to lists and filter out None values
if not isinstance(control_value, list):
control_value = [control_value] if control_value is not None else []
else:
control_value = [v for v in control_value if v is not None]
if not isinstance(treatment_value, list):
treatment_value = [treatment_value] if treatment_value is not None else []
else:
treatment_value = [v for v in treatment_value if v is not None]
# Only include metrics that have valid data points
if len(control_value) > 0 and len(treatment_value) > 0:
control_data["control"][metric] = control_value
treatment_data["treatment"][metric] = treatment_value
if not common_metrics:
return {"error": "No common metrics found between control and treatment"}
# Run statistical tests
tester = StatisticalTester(control_data, treatment_data)
test_results = tester.run_test(
test_type="welch_t_test",
alpha=significance_level
)
return {
"common_metrics": list(common_metrics),
"test_results": test_results,
"significance_level": significance_level
}
except Exception as e:
return {"error": f"Statistical testing failed: {str(e)}"}
def _perform_semantic_validation(self,
control: SimulationExperimentDataset,
treatment: SimulationExperimentDataset) -> Dict[str, Any]:
"""Perform semantic proximity analysis on simulation experiment agent justifications."""
results = {
"individual_comparisons": [],
"summary_comparison": None,
"average_proximity": None
}
# Compare individual justifications if available
if control.agent_justifications and treatment.agent_justifications:
proximities = []
for i, control_just in enumerate(control.agent_justifications):
for j, treatment_just in enumerate(treatment.agent_justifications):
control_text = control.get_justification_text(control_just)
treatment_text = treatment.get_justification_text(treatment_just)
if control_text and treatment_text:
proximity_result = compute_semantic_proximity(
control_text,
treatment_text,
context="Comparing agent justifications from simulation experiments"
)
# Get agent references (names or indices)
control_agent_ref = control.get_justification_agent_reference(control_just) or f"Agent_{i}"
treatment_agent_ref = treatment.get_justification_agent_reference(treatment_just) or f"Agent_{j}"
comparison = {
"control_agent": control_agent_ref,
"treatment_agent": treatment_agent_ref,
"proximity_score": proximity_result["proximity_score"],
"justification": proximity_result["justification"]
}
results["individual_comparisons"].append(comparison)
proximities.append(proximity_result["proximity_score"])
if proximities:
results["average_proximity"] = sum(proximities) / len(proximities)
# Compare summary justifications if available
if control.justification_summary and treatment.justification_summary:
summary_proximity = compute_semantic_proximity(
control.justification_summary,
treatment.justification_summary,
context="Comparing summary justifications from simulation experiments"
)
results["summary_comparison"] = summary_proximity
return results
def _calculate_overall_score(self, result: SimulationExperimentEmpiricalValidationResult) -> float:
"""Calculate an overall simulation experiment empirical validation score based on statistical and semantic results."""
scores = []
# Statistical component based on effect sizes
if result.statistical_results and "test_results" in result.statistical_results:
test_results = result.statistical_results["test_results"]
effect_sizes = []
for treatment_name, treatment_results in test_results.items():
for metric, metric_result in treatment_results.items():
# Extract effect size based on test type
effect_size = self._extract_effect_size(metric_result)
if effect_size is not None:
effect_sizes.append(effect_size)
if effect_sizes:
# Convert effect sizes to similarity scores (closer to 0 = more similar)
# Use inverse transformation: similarity = 1 / (1 + |effect_size|)
similarity_scores = [1.0 / (1.0 + abs(es)) for es in effect_sizes]
statistical_score = sum(similarity_scores) / len(similarity_scores)
scores.append(statistical_score)
# Semantic component
if result.semantic_results:
semantic_scores = []
# Average proximity from individual comparisons
if result.semantic_results.get("average_proximity") is not None:
semantic_scores.append(result.semantic_results["average_proximity"])
# Summary proximity
if result.semantic_results.get("summary_comparison"):
semantic_scores.append(result.semantic_results["summary_comparison"]["proximity_score"])
if semantic_scores:
scores.append(sum(semantic_scores) / len(semantic_scores))
return sum(scores) / len(scores) if scores else 0.0
def _generate_summary(self, result: SimulationExperimentEmpiricalValidationResult) -> str:
"""Generate a text summary of the simulation experiment empirical validation results."""
summary_parts = []
if result.statistical_results:
if "error" in result.statistical_results:
summary_parts.append(f"Statistical validation: {result.statistical_results['error']}")
else:
test_results = result.statistical_results.get("test_results", {})
effect_sizes = []
significant_tests = 0
total_tests = 0
for treatment_results in test_results.values():
for metric_result in treatment_results.values():
total_tests += 1
if metric_result.get("significant", False):
significant_tests += 1
# Collect effect sizes
effect_size = self._extract_effect_size(metric_result)
if effect_size is not None:
effect_sizes.append(abs(effect_size))
if effect_sizes:
avg_effect_size = sum(effect_sizes) / len(effect_sizes)
summary_parts.append(
f"Statistical validation: {significant_tests}/{total_tests} tests significant, "
f"average effect size: {avg_effect_size:.3f}"
)
else:
summary_parts.append(
f"Statistical validation: {significant_tests}/{total_tests} tests showed significant differences"
)
if result.semantic_results:
avg_proximity = result.semantic_results.get("average_proximity")
if avg_proximity is not None:
summary_parts.append(
f"Semantic validation: Average proximity score of {avg_proximity:.3f}"
)
summary_comparison = result.semantic_results.get("summary_comparison")
if summary_comparison:
summary_parts.append(
f"Summary proximity: {summary_comparison['proximity_score']:.3f}"
)
if result.overall_score is not None:
summary_parts.append(f"Overall validation score: {result.overall_score:.3f}")
return "; ".join(summary_parts) if summary_parts else "No validation results available"
def _generate_markdown_report(self, result: SimulationExperimentEmpiricalValidationResult) -> str:
"""Generate a comprehensive markdown report for simulation experiment empirical validation."""
overall_score_str = f"{result.overall_score:.3f}" if result.overall_score is not None else "N/A"
report = f"""# Simulation Experiment Empirical Validation Report
**Validation Type:** {result.validation_type}
**Control/Empirical:** {result.control_name}
**Treatment/Simulation:** {result.treatment_name}
**Timestamp:** {result.timestamp}
**Overall Score:** {overall_score_str}
## Summary
{result.summary}
"""
# Statistical Results Section
if result.statistical_results:
report += "## Statistical Validation\n\n"
if "error" in result.statistical_results:
report += f"**Error:** {result.statistical_results['error']}\n\n"
else:
stats = result.statistical_results
report += f"**Common Metrics:** {', '.join(stats.get('common_metrics', []))}\n\n"
report += f"**Significance Level:** {stats.get('significance_level', 'N/A')}\n\n"
test_results = stats.get("test_results", {})
if test_results:
report += "### Test Results\n\n"
for treatment_name, treatment_results in test_results.items():
report += f"#### {treatment_name}\n\n"
for metric, metric_result in treatment_results.items():
report += f"**{metric}:**\n\n"
significant = metric_result.get("significant", False)
p_value = metric_result.get("p_value", "N/A")
test_type = metric_result.get("test_type", "N/A")
effect_size = self._extract_effect_size(metric_result)
# Get the appropriate statistic based on test type
statistic = "N/A"
if "t_statistic" in metric_result:
statistic = metric_result["t_statistic"]
elif "u_statistic" in metric_result:
statistic = metric_result["u_statistic"]
elif "f_statistic" in metric_result:
statistic = metric_result["f_statistic"]
elif "chi2_statistic" in metric_result:
statistic = metric_result["chi2_statistic"]
status = "✅ Significant" if significant else "❌ Not Significant"
report += f"- **{test_type}:** {status}\n"
report += f" - p-value: {p_value}\n"
report += f" - statistic: {statistic}\n"
if effect_size is not None:
effect_interpretation = self._interpret_effect_size(abs(effect_size))
report += f" - effect size: {effect_size:.3f} ({effect_interpretation})\n"
report += "\n"
# Semantic Results Section
if result.semantic_results:
report += "## Semantic Validation\n\n"
semantic = result.semantic_results
# Individual comparisons
individual_comps = semantic.get("individual_comparisons", [])
if individual_comps:
report += "### Individual Agent Comparisons\n\n"
for comp in individual_comps:
score = comp["proximity_score"]
control_agent = comp["control_agent"]
treatment_agent = comp["treatment_agent"]
justification = comp["justification"]
report += f"**{control_agent} vs {treatment_agent}:** {score:.3f}\n\n"
report += f"{justification}\n\n"
avg_proximity = semantic.get("average_proximity")
if avg_proximity:
report += f"**Average Proximity Score:** {avg_proximity:.3f}\n\n"
# Summary comparison
summary_comp = semantic.get("summary_comparison")
if summary_comp:
report += "### Summary Comparison\n\n"
report += f"**Proximity Score:** {summary_comp['proximity_score']:.3f}\n\n"
report += f"**Justification:** {summary_comp['justification']}\n\n"
return report
def _extract_effect_size(self, metric_result: Dict[str, Any]) -> Optional[float]:
"""Extract effect size from statistical test result, regardless of test type."""
# Cohen's d for t-tests (most common)
if "effect_size" in metric_result:
return metric_result["effect_size"]
# For tests that don't provide Cohen's d, calculate standardized effect size
test_type = metric_result.get("test_type", "").lower()
if "t-test" in test_type:
# For t-tests, effect_size should be Cohen's d
return metric_result.get("effect_size", 0.0)
elif "mann-whitney" in test_type:
# For Mann-Whitney, use Common Language Effect Size (CLES)
# Convert CLES to Cohen's d equivalent: d ≈ 2 * Φ^(-1)(CLES)
cles = metric_result.get("effect_size", 0.5)
# Simple approximation: convert CLES to d-like measure
# CLES of 0.5 = no effect, CLES of 0.71 ≈ small effect (d=0.2)
return 2 * (cles - 0.5)
elif "anova" in test_type:
# For ANOVA, use eta-squared and convert to Cohen's d equivalent
eta_squared = metric_result.get("effect_size", 0.0)
# Convert eta-squared to Cohen's d: d = 2 * sqrt(eta^2 / (1 - eta^2))
if eta_squared > 0 and eta_squared < 1:
return 2 * (eta_squared / (1 - eta_squared)) ** 0.5
return 0.0
elif "chi-square" in test_type:
# For Chi-square, use Cramer's V and convert to Cohen's d equivalent
cramers_v = metric_result.get("effect_size", 0.0)
# Rough conversion: d ≈ 2 * Cramer's V
return 2 * cramers_v
# Fallback: try to calculate from means and standard deviations
if all(k in metric_result for k in ["control_mean", "treatment_mean", "control_std", "treatment_std"]):
control_mean = metric_result["control_mean"]
treatment_mean = metric_result["treatment_mean"]
control_std = metric_result["control_std"]
treatment_std = metric_result["treatment_std"]
# Calculate pooled standard deviation
pooled_std = ((control_std ** 2 + treatment_std ** 2) / 2) ** 0.5
if pooled_std > 0:
return abs(treatment_mean - control_mean) / pooled_std
# If all else fails, return 0 (no effect)
return 0.0
def _interpret_effect_size(self, effect_size: float) -> str:
"""Provide interpretation of effect size magnitude (Cohen's conventions)."""
if effect_size < 0.2:
return "negligible"
elif effect_size < 0.5:
return "small"
elif effect_size < 0.8:
return "medium"
else:
return "large"
def validate_simulation_experiment_empirically(control_data: Dict[str, Any],
treatment_data: Dict[str, Any],
validation_types: List[str] = ["statistical", "semantic"],
significance_level: float = 0.05,
output_format: str = "values") -> Union[SimulationExperimentEmpiricalValidationResult, str]:
"""
Convenience function to validate simulation experiment data against empirical control data.
This performs data-driven validation using statistical and semantic methods,
distinct from LLM-based evaluations.
Args:
control_data: Dictionary containing control/empirical data
treatment_data: Dictionary containing treatment/simulation experiment data
validation_types: List of validation types to perform
significance_level: Significance level for statistical tests
output_format: "values" for SimulationExperimentEmpiricalValidationResult object, "report" for markdown report
Returns:
SimulationExperimentEmpiricalValidationResult object or markdown report string
"""
# Use Pydantic's built-in parsing instead of from_dict
control_dataset = SimulationExperimentDataset.parse_obj(control_data)
treatment_dataset = SimulationExperimentDataset.parse_obj(treatment_data)
validator = SimulationExperimentEmpiricalValidator()
return validator.validate(
control_dataset,
treatment_dataset,
validation_types=validation_types,
significance_level=significance_level,
output_format=output_format
)
Functions
def validate_simulation_experiment_empirically(control_data: Dict[str, Any], treatment_data: Dict[str, Any], validation_types: List[str] = ['statistical', 'semantic'], significance_level: float = 0.05, output_format: str = 'values') ‑> Union[SimulationExperimentEmpiricalValidationResult, str]
-
Convenience function to validate simulation experiment data against empirical control data.
This performs data-driven validation using statistical and semantic methods, distinct from LLM-based evaluations.
Args
control_data
- Dictionary containing control/empirical data
treatment_data
- Dictionary containing treatment/simulation experiment data
validation_types
- List of validation types to perform
significance_level
- Significance level for statistical tests
output_format
- "values" for SimulationExperimentEmpiricalValidationResult object, "report" for markdown report
Returns
SimulationExperimentEmpiricalValidationResult object or markdown report string
Expand source code
def validate_simulation_experiment_empirically(control_data: Dict[str, Any], treatment_data: Dict[str, Any], validation_types: List[str] = ["statistical", "semantic"], significance_level: float = 0.05, output_format: str = "values") -> Union[SimulationExperimentEmpiricalValidationResult, str]: """ Convenience function to validate simulation experiment data against empirical control data. This performs data-driven validation using statistical and semantic methods, distinct from LLM-based evaluations. Args: control_data: Dictionary containing control/empirical data treatment_data: Dictionary containing treatment/simulation experiment data validation_types: List of validation types to perform significance_level: Significance level for statistical tests output_format: "values" for SimulationExperimentEmpiricalValidationResult object, "report" for markdown report Returns: SimulationExperimentEmpiricalValidationResult object or markdown report string """ # Use Pydantic's built-in parsing instead of from_dict control_dataset = SimulationExperimentDataset.parse_obj(control_data) treatment_dataset = SimulationExperimentDataset.parse_obj(treatment_data) validator = SimulationExperimentEmpiricalValidator() return validator.validate( control_dataset, treatment_dataset, validation_types=validation_types, significance_level=significance_level, output_format=output_format )
Classes
class SimulationExperimentDataset (**data: Any)
-
Represents a dataset from a simulation experiment or empirical study.
This contains data that can be used for validation, including quantitative metrics and qualitative agent justifications from simulation experiments or empirical studies.
Attributes
name
- Optional name for the dataset
description
- Optional description of the dataset
key_results
- Map from result names to their values (numbers, proportions, booleans, etc.)
result_types
- Map indicating whether each result is "aggregate" or "per_agent"
agent_names
- Optional list of agent names (can be referenced by index in results)
agent_justifications
- List of justifications (with optional agent references)
justification_summary
- Optional summary of all agent justifications
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.__init__
uses__pydantic_self__
instead of the more commonself
for the first arg to allowself
as a field name.Expand source code
class SimulationExperimentDataset(BaseModel): """ Represents a dataset from a simulation experiment or empirical study. This contains data that can be used for validation, including quantitative metrics and qualitative agent justifications from simulation experiments or empirical studies. Attributes: name: Optional name for the dataset description: Optional description of the dataset key_results: Map from result names to their values (numbers, proportions, booleans, etc.) result_types: Map indicating whether each result is "aggregate" or "per_agent" agent_names: Optional list of agent names (can be referenced by index in results) agent_justifications: List of justifications (with optional agent references) justification_summary: Optional summary of all agent justifications """ name: Optional[str] = None description: Optional[str] = None key_results: Dict[str, Union[float, int, bool, List[Union[float, int, bool, None]], None]] = Field(default_factory=dict) result_types: Dict[str, str] = Field(default_factory=dict, description="Map from result name to 'aggregate' or 'per_agent'") agent_names: Optional[List[Optional[str]]] = Field(None, description="Optional list of agent names for reference (can contain None for unnamed agents)") agent_justifications: List[Union[str, Dict[str, Union[str, int]]]] = Field( default_factory=list, description="List of justifications as strings or dicts with optional 'agent_name'/'agent_index' and 'justification'" ) justification_summary: Optional[str] = None class Config: """Pydantic configuration.""" extra = "forbid" # Prevent accidental extra fields validate_assignment = True # Validate on assignment after creation def get_agent_name(self, index: int) -> Optional[str]: """Get agent name by index, if available.""" if self.agent_names and 0 <= index < len(self.agent_names): agent_name = self.agent_names[index] return agent_name if agent_name is not None else None return None def get_agent_data(self, metric_name: str, agent_index: int) -> Optional[Union[float, int, bool]]: """Get a specific agent's data for a given metric. Returns None for missing data.""" if metric_name not in self.key_results: return None metric_data = self.key_results[metric_name] # Check if it's per-agent data if self.result_types.get(metric_name) == "per_agent" and isinstance(metric_data, list): if 0 <= agent_index < len(metric_data): return metric_data[agent_index] # This can be None for missing data return None def get_all_agent_data(self, metric_name: str) -> Dict[str, Union[float, int, bool]]: """Get all agents' data for a given metric as a dictionary mapping agent names/indices to values.""" if metric_name not in self.key_results: return {} metric_data = self.key_results[metric_name] result = {} # For per-agent data, create mapping if self.result_types.get(metric_name) == "per_agent" and isinstance(metric_data, list): for i, value in enumerate(metric_data): agent_name = self.get_agent_name(i) or f"Agent_{i}" # Only include non-None values in the result if value is not None: result[agent_name] = value # For aggregate data, return single value elif self.result_types.get(metric_name) == "aggregate": result["aggregate"] = metric_data return result def get_valid_agent_data(self, metric_name: str) -> List[Union[float, int, bool]]: """Get only valid (non-None) values for a per-agent metric.""" if metric_name not in self.key_results: return [] metric_data = self.key_results[metric_name] if self.result_types.get(metric_name) == "per_agent" and isinstance(metric_data, list): return [value for value in metric_data if value is not None] return [] def validate_data_consistency(self) -> List[str]: """Validate that per-agent data is consistent across metrics and with agent names.""" errors = [] warnings = [] # Check per-agent metrics have consistent lengths per_agent_lengths = [] per_agent_metrics = [] for metric_name, result_type in self.result_types.items(): if result_type == "per_agent" and metric_name in self.key_results: metric_data = self.key_results[metric_name] if isinstance(metric_data, list): per_agent_lengths.append(len(metric_data)) per_agent_metrics.append(metric_name) else: errors.append(f"Metric '{metric_name}' marked as per_agent but is not a list") # Check all per-agent metrics have same length if per_agent_lengths and len(set(per_agent_lengths)) > 1: errors.append(f"Per-agent metrics have inconsistent lengths: {dict(zip(per_agent_metrics, per_agent_lengths))}") # Check agent_names length matches per-agent data length if self.agent_names and per_agent_lengths: agent_count = len(self.agent_names) data_length = per_agent_lengths[0] if per_agent_lengths else 0 if agent_count != data_length: errors.append(f"agent_names length ({agent_count}) doesn't match per-agent data length ({data_length})") # Check for None values in agent_names and provide warnings if self.agent_names: none_indices = [i for i, name in enumerate(self.agent_names) if name is None] if none_indices: warnings.append(f"agent_names contains None values at indices: {none_indices}") # Check for None values in per-agent data and provide info for metric_name in per_agent_metrics: if metric_name in self.key_results: metric_data = self.key_results[metric_name] none_indices = [i for i, value in enumerate(metric_data) if value is None] if none_indices: warnings.append(f"Metric '{metric_name}' has missing data (None) at indices: {none_indices}") # Return errors and warnings combined return errors + [f"WARNING: {warning}" for warning in warnings] def get_justification_text(self, justification_item: Union[str, Dict[str, Union[str, int]]]) -> str: """Extract justification text from various formats.""" if isinstance(justification_item, str): return justification_item elif isinstance(justification_item, dict): return justification_item.get("justification", "") return "" def get_justification_agent_reference(self, justification_item: Union[str, Dict[str, Union[str, int]]]) -> Optional[str]: """Get agent reference from justification, returning name if available.""" if isinstance(justification_item, dict): # Direct agent name if "agent_name" in justification_item: return justification_item["agent_name"] # Agent index reference elif "agent_index" in justification_item: return self.get_agent_name(justification_item["agent_index"]) return None
Ancestors
- pydantic.main.BaseModel
Class variables
var Config
-
Pydantic configuration.
var agent_justifications : List[Union[str, Dict[str, Union[str, int]]]]
var agent_names : Optional[List[Optional[str]]]
var description : Optional[str]
var justification_summary : Optional[str]
var key_results : Dict[str, Union[float, int, bool, List[Union[float, int, bool, ForwardRef(None)]], ForwardRef(None)]]
var model_config
var model_fields
var name : Optional[str]
var result_types : Dict[str, str]
Methods
def get_agent_data(self, metric_name: str, agent_index: int) ‑> Union[float, int, bool, ForwardRef(None)]
-
Get a specific agent's data for a given metric. Returns None for missing data.
Expand source code
def get_agent_data(self, metric_name: str, agent_index: int) -> Optional[Union[float, int, bool]]: """Get a specific agent's data for a given metric. Returns None for missing data.""" if metric_name not in self.key_results: return None metric_data = self.key_results[metric_name] # Check if it's per-agent data if self.result_types.get(metric_name) == "per_agent" and isinstance(metric_data, list): if 0 <= agent_index < len(metric_data): return metric_data[agent_index] # This can be None for missing data return None
def get_agent_name(self, index: int) ‑> Optional[str]
-
Get agent name by index, if available.
Expand source code
def get_agent_name(self, index: int) -> Optional[str]: """Get agent name by index, if available.""" if self.agent_names and 0 <= index < len(self.agent_names): agent_name = self.agent_names[index] return agent_name if agent_name is not None else None return None
def get_all_agent_data(self, metric_name: str) ‑> Dict[str, Union[float, int, bool]]
-
Get all agents' data for a given metric as a dictionary mapping agent names/indices to values.
Expand source code
def get_all_agent_data(self, metric_name: str) -> Dict[str, Union[float, int, bool]]: """Get all agents' data for a given metric as a dictionary mapping agent names/indices to values.""" if metric_name not in self.key_results: return {} metric_data = self.key_results[metric_name] result = {} # For per-agent data, create mapping if self.result_types.get(metric_name) == "per_agent" and isinstance(metric_data, list): for i, value in enumerate(metric_data): agent_name = self.get_agent_name(i) or f"Agent_{i}" # Only include non-None values in the result if value is not None: result[agent_name] = value # For aggregate data, return single value elif self.result_types.get(metric_name) == "aggregate": result["aggregate"] = metric_data return result
def get_justification_agent_reference(self, justification_item: Union[str, Dict[str, Union[str, int]]]) ‑> Optional[str]
-
Get agent reference from justification, returning name if available.
Expand source code
def get_justification_agent_reference(self, justification_item: Union[str, Dict[str, Union[str, int]]]) -> Optional[str]: """Get agent reference from justification, returning name if available.""" if isinstance(justification_item, dict): # Direct agent name if "agent_name" in justification_item: return justification_item["agent_name"] # Agent index reference elif "agent_index" in justification_item: return self.get_agent_name(justification_item["agent_index"]) return None
def get_justification_text(self, justification_item: Union[str, Dict[str, Union[str, int]]]) ‑> str
-
Extract justification text from various formats.
Expand source code
def get_justification_text(self, justification_item: Union[str, Dict[str, Union[str, int]]]) -> str: """Extract justification text from various formats.""" if isinstance(justification_item, str): return justification_item elif isinstance(justification_item, dict): return justification_item.get("justification", "") return ""
def get_valid_agent_data(self, metric_name: str) ‑> List[Union[float, int, bool]]
-
Get only valid (non-None) values for a per-agent metric.
Expand source code
def get_valid_agent_data(self, metric_name: str) -> List[Union[float, int, bool]]: """Get only valid (non-None) values for a per-agent metric.""" if metric_name not in self.key_results: return [] metric_data = self.key_results[metric_name] if self.result_types.get(metric_name) == "per_agent" and isinstance(metric_data, list): return [value for value in metric_data if value is not None] return []
def validate_data_consistency(self) ‑> List[str]
-
Validate that per-agent data is consistent across metrics and with agent names.
Expand source code
def validate_data_consistency(self) -> List[str]: """Validate that per-agent data is consistent across metrics and with agent names.""" errors = [] warnings = [] # Check per-agent metrics have consistent lengths per_agent_lengths = [] per_agent_metrics = [] for metric_name, result_type in self.result_types.items(): if result_type == "per_agent" and metric_name in self.key_results: metric_data = self.key_results[metric_name] if isinstance(metric_data, list): per_agent_lengths.append(len(metric_data)) per_agent_metrics.append(metric_name) else: errors.append(f"Metric '{metric_name}' marked as per_agent but is not a list") # Check all per-agent metrics have same length if per_agent_lengths and len(set(per_agent_lengths)) > 1: errors.append(f"Per-agent metrics have inconsistent lengths: {dict(zip(per_agent_metrics, per_agent_lengths))}") # Check agent_names length matches per-agent data length if self.agent_names and per_agent_lengths: agent_count = len(self.agent_names) data_length = per_agent_lengths[0] if per_agent_lengths else 0 if agent_count != data_length: errors.append(f"agent_names length ({agent_count}) doesn't match per-agent data length ({data_length})") # Check for None values in agent_names and provide warnings if self.agent_names: none_indices = [i for i, name in enumerate(self.agent_names) if name is None] if none_indices: warnings.append(f"agent_names contains None values at indices: {none_indices}") # Check for None values in per-agent data and provide info for metric_name in per_agent_metrics: if metric_name in self.key_results: metric_data = self.key_results[metric_name] none_indices = [i for i, value in enumerate(metric_data) if value is None] if none_indices: warnings.append(f"Metric '{metric_name}' has missing data (None) at indices: {none_indices}") # Return errors and warnings combined return errors + [f"WARNING: {warning}" for warning in warnings]
class SimulationExperimentEmpiricalValidationResult (**data: Any)
-
Contains the results of a simulation experiment validation against empirical data.
This represents the outcome of validating simulation experiment data against empirical benchmarks, using statistical and semantic methods.
Attributes
validation_type
- Type of validation performed
control_name
- Name of the control/empirical dataset
treatment_name
- Name of the treatment/simulation experiment dataset
statistical_results
- Results from statistical tests (if performed)
semantic_results
- Results from semantic proximity analysis (if performed)
overall_score
- Overall validation score (0.0 to 1.0)
summary
- Summary of validation findings
timestamp
- When the validation was performed
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.__init__
uses__pydantic_self__
instead of the more commonself
for the first arg to allowself
as a field name.Expand source code
class SimulationExperimentEmpiricalValidationResult(BaseModel): """ Contains the results of a simulation experiment validation against empirical data. This represents the outcome of validating simulation experiment data against empirical benchmarks, using statistical and semantic methods. Attributes: validation_type: Type of validation performed control_name: Name of the control/empirical dataset treatment_name: Name of the treatment/simulation experiment dataset statistical_results: Results from statistical tests (if performed) semantic_results: Results from semantic proximity analysis (if performed) overall_score: Overall validation score (0.0 to 1.0) summary: Summary of validation findings timestamp: When the validation was performed """ validation_type: str control_name: str treatment_name: str statistical_results: Optional[Dict[str, Any]] = None semantic_results: Optional[Dict[str, Any]] = None overall_score: Optional[float] = Field(None, ge=0.0, le=1.0, description="Overall validation score between 0.0 and 1.0") summary: str = "" timestamp: str = Field(default_factory=lambda: datetime.now().isoformat()) class Config: """Pydantic configuration.""" extra = "forbid" validate_assignment = True
Ancestors
- pydantic.main.BaseModel
Class variables
var Config
-
Pydantic configuration.
var control_name : str
var model_config
var model_fields
var overall_score : Optional[float]
var semantic_results : Optional[Dict[str, Any]]
var statistical_results : Optional[Dict[str, Any]]
var summary : str
var timestamp : str
var treatment_name : str
var validation_type : str
class SimulationExperimentEmpiricalValidator
-
A validator for comparing simulation experiment data against empirical control data.
This validator performs data-driven validation using statistical hypothesis testing and semantic proximity analysis of agent justifications. It is designed to validate simulation experiment results against known empirical benchmarks, distinct from LLM-based evaluations.
Initialize the simulation experiment empirical validator.
Expand source code
class SimulationExperimentEmpiricalValidator: """ A validator for comparing simulation experiment data against empirical control data. This validator performs data-driven validation using statistical hypothesis testing and semantic proximity analysis of agent justifications. It is designed to validate simulation experiment results against known empirical benchmarks, distinct from LLM-based evaluations. """ def __init__(self): """Initialize the simulation experiment empirical validator.""" pass def validate(self, control: SimulationExperimentDataset, treatment: SimulationExperimentDataset, validation_types: List[str] = ["statistical", "semantic"], significance_level: float = 0.05, output_format: str = "values") -> Union[SimulationExperimentEmpiricalValidationResult, str]: """ Validate a simulation experiment dataset against an empirical control dataset. Args: control: The control/empirical reference dataset treatment: The treatment/simulation experiment dataset to validate validation_types: List of validation types to perform ("statistical", "semantic") significance_level: Significance level for statistical tests output_format: "values" for SimulationExperimentEmpiricalValidationResult object, "report" for markdown report Returns: SimulationExperimentEmpiricalValidationResult object or markdown report string """ result = SimulationExperimentEmpiricalValidationResult( validation_type=", ".join(validation_types), control_name=control.name or "Control", treatment_name=treatment.name or "Treatment" ) # Perform statistical validation if "statistical" in validation_types: result.statistical_results = self._perform_statistical_validation( control, treatment, significance_level ) # Perform semantic validation if "semantic" in validation_types: result.semantic_results = self._perform_semantic_validation( control, treatment ) # Calculate overall score and summary result.overall_score = self._calculate_overall_score(result) result.summary = self._generate_summary(result) if output_format == "report": return self._generate_markdown_report(result) else: return result def _perform_statistical_validation(self, control: SimulationExperimentDataset, treatment: SimulationExperimentDataset, significance_level: float) -> Dict[str, Any]: """Perform statistical hypothesis testing on simulation experiment key results.""" if not control.key_results or not treatment.key_results: return {"error": "No key results available for statistical testing"} try: # Prepare data for StatisticalTester control_data = {"control": {}} treatment_data = {"treatment": {}} # Convert single values to lists if needed and find common metrics common_metrics = set(control.key_results.keys()) & set(treatment.key_results.keys()) for metric in common_metrics: control_value = control.key_results[metric] treatment_value = treatment.key_results[metric] # Convert single values to lists and filter out None values if not isinstance(control_value, list): control_value = [control_value] if control_value is not None else [] else: control_value = [v for v in control_value if v is not None] if not isinstance(treatment_value, list): treatment_value = [treatment_value] if treatment_value is not None else [] else: treatment_value = [v for v in treatment_value if v is not None] # Only include metrics that have valid data points if len(control_value) > 0 and len(treatment_value) > 0: control_data["control"][metric] = control_value treatment_data["treatment"][metric] = treatment_value if not common_metrics: return {"error": "No common metrics found between control and treatment"} # Run statistical tests tester = StatisticalTester(control_data, treatment_data) test_results = tester.run_test( test_type="welch_t_test", alpha=significance_level ) return { "common_metrics": list(common_metrics), "test_results": test_results, "significance_level": significance_level } except Exception as e: return {"error": f"Statistical testing failed: {str(e)}"} def _perform_semantic_validation(self, control: SimulationExperimentDataset, treatment: SimulationExperimentDataset) -> Dict[str, Any]: """Perform semantic proximity analysis on simulation experiment agent justifications.""" results = { "individual_comparisons": [], "summary_comparison": None, "average_proximity": None } # Compare individual justifications if available if control.agent_justifications and treatment.agent_justifications: proximities = [] for i, control_just in enumerate(control.agent_justifications): for j, treatment_just in enumerate(treatment.agent_justifications): control_text = control.get_justification_text(control_just) treatment_text = treatment.get_justification_text(treatment_just) if control_text and treatment_text: proximity_result = compute_semantic_proximity( control_text, treatment_text, context="Comparing agent justifications from simulation experiments" ) # Get agent references (names or indices) control_agent_ref = control.get_justification_agent_reference(control_just) or f"Agent_{i}" treatment_agent_ref = treatment.get_justification_agent_reference(treatment_just) or f"Agent_{j}" comparison = { "control_agent": control_agent_ref, "treatment_agent": treatment_agent_ref, "proximity_score": proximity_result["proximity_score"], "justification": proximity_result["justification"] } results["individual_comparisons"].append(comparison) proximities.append(proximity_result["proximity_score"]) if proximities: results["average_proximity"] = sum(proximities) / len(proximities) # Compare summary justifications if available if control.justification_summary and treatment.justification_summary: summary_proximity = compute_semantic_proximity( control.justification_summary, treatment.justification_summary, context="Comparing summary justifications from simulation experiments" ) results["summary_comparison"] = summary_proximity return results def _calculate_overall_score(self, result: SimulationExperimentEmpiricalValidationResult) -> float: """Calculate an overall simulation experiment empirical validation score based on statistical and semantic results.""" scores = [] # Statistical component based on effect sizes if result.statistical_results and "test_results" in result.statistical_results: test_results = result.statistical_results["test_results"] effect_sizes = [] for treatment_name, treatment_results in test_results.items(): for metric, metric_result in treatment_results.items(): # Extract effect size based on test type effect_size = self._extract_effect_size(metric_result) if effect_size is not None: effect_sizes.append(effect_size) if effect_sizes: # Convert effect sizes to similarity scores (closer to 0 = more similar) # Use inverse transformation: similarity = 1 / (1 + |effect_size|) similarity_scores = [1.0 / (1.0 + abs(es)) for es in effect_sizes] statistical_score = sum(similarity_scores) / len(similarity_scores) scores.append(statistical_score) # Semantic component if result.semantic_results: semantic_scores = [] # Average proximity from individual comparisons if result.semantic_results.get("average_proximity") is not None: semantic_scores.append(result.semantic_results["average_proximity"]) # Summary proximity if result.semantic_results.get("summary_comparison"): semantic_scores.append(result.semantic_results["summary_comparison"]["proximity_score"]) if semantic_scores: scores.append(sum(semantic_scores) / len(semantic_scores)) return sum(scores) / len(scores) if scores else 0.0 def _generate_summary(self, result: SimulationExperimentEmpiricalValidationResult) -> str: """Generate a text summary of the simulation experiment empirical validation results.""" summary_parts = [] if result.statistical_results: if "error" in result.statistical_results: summary_parts.append(f"Statistical validation: {result.statistical_results['error']}") else: test_results = result.statistical_results.get("test_results", {}) effect_sizes = [] significant_tests = 0 total_tests = 0 for treatment_results in test_results.values(): for metric_result in treatment_results.values(): total_tests += 1 if metric_result.get("significant", False): significant_tests += 1 # Collect effect sizes effect_size = self._extract_effect_size(metric_result) if effect_size is not None: effect_sizes.append(abs(effect_size)) if effect_sizes: avg_effect_size = sum(effect_sizes) / len(effect_sizes) summary_parts.append( f"Statistical validation: {significant_tests}/{total_tests} tests significant, " f"average effect size: {avg_effect_size:.3f}" ) else: summary_parts.append( f"Statistical validation: {significant_tests}/{total_tests} tests showed significant differences" ) if result.semantic_results: avg_proximity = result.semantic_results.get("average_proximity") if avg_proximity is not None: summary_parts.append( f"Semantic validation: Average proximity score of {avg_proximity:.3f}" ) summary_comparison = result.semantic_results.get("summary_comparison") if summary_comparison: summary_parts.append( f"Summary proximity: {summary_comparison['proximity_score']:.3f}" ) if result.overall_score is not None: summary_parts.append(f"Overall validation score: {result.overall_score:.3f}") return "; ".join(summary_parts) if summary_parts else "No validation results available" def _generate_markdown_report(self, result: SimulationExperimentEmpiricalValidationResult) -> str: """Generate a comprehensive markdown report for simulation experiment empirical validation.""" overall_score_str = f"{result.overall_score:.3f}" if result.overall_score is not None else "N/A" report = f"""# Simulation Experiment Empirical Validation Report **Validation Type:** {result.validation_type} **Control/Empirical:** {result.control_name} **Treatment/Simulation:** {result.treatment_name} **Timestamp:** {result.timestamp} **Overall Score:** {overall_score_str} ## Summary {result.summary} """ # Statistical Results Section if result.statistical_results: report += "## Statistical Validation\n\n" if "error" in result.statistical_results: report += f"**Error:** {result.statistical_results['error']}\n\n" else: stats = result.statistical_results report += f"**Common Metrics:** {', '.join(stats.get('common_metrics', []))}\n\n" report += f"**Significance Level:** {stats.get('significance_level', 'N/A')}\n\n" test_results = stats.get("test_results", {}) if test_results: report += "### Test Results\n\n" for treatment_name, treatment_results in test_results.items(): report += f"#### {treatment_name}\n\n" for metric, metric_result in treatment_results.items(): report += f"**{metric}:**\n\n" significant = metric_result.get("significant", False) p_value = metric_result.get("p_value", "N/A") test_type = metric_result.get("test_type", "N/A") effect_size = self._extract_effect_size(metric_result) # Get the appropriate statistic based on test type statistic = "N/A" if "t_statistic" in metric_result: statistic = metric_result["t_statistic"] elif "u_statistic" in metric_result: statistic = metric_result["u_statistic"] elif "f_statistic" in metric_result: statistic = metric_result["f_statistic"] elif "chi2_statistic" in metric_result: statistic = metric_result["chi2_statistic"] status = "✅ Significant" if significant else "❌ Not Significant" report += f"- **{test_type}:** {status}\n" report += f" - p-value: {p_value}\n" report += f" - statistic: {statistic}\n" if effect_size is not None: effect_interpretation = self._interpret_effect_size(abs(effect_size)) report += f" - effect size: {effect_size:.3f} ({effect_interpretation})\n" report += "\n" # Semantic Results Section if result.semantic_results: report += "## Semantic Validation\n\n" semantic = result.semantic_results # Individual comparisons individual_comps = semantic.get("individual_comparisons", []) if individual_comps: report += "### Individual Agent Comparisons\n\n" for comp in individual_comps: score = comp["proximity_score"] control_agent = comp["control_agent"] treatment_agent = comp["treatment_agent"] justification = comp["justification"] report += f"**{control_agent} vs {treatment_agent}:** {score:.3f}\n\n" report += f"{justification}\n\n" avg_proximity = semantic.get("average_proximity") if avg_proximity: report += f"**Average Proximity Score:** {avg_proximity:.3f}\n\n" # Summary comparison summary_comp = semantic.get("summary_comparison") if summary_comp: report += "### Summary Comparison\n\n" report += f"**Proximity Score:** {summary_comp['proximity_score']:.3f}\n\n" report += f"**Justification:** {summary_comp['justification']}\n\n" return report def _extract_effect_size(self, metric_result: Dict[str, Any]) -> Optional[float]: """Extract effect size from statistical test result, regardless of test type.""" # Cohen's d for t-tests (most common) if "effect_size" in metric_result: return metric_result["effect_size"] # For tests that don't provide Cohen's d, calculate standardized effect size test_type = metric_result.get("test_type", "").lower() if "t-test" in test_type: # For t-tests, effect_size should be Cohen's d return metric_result.get("effect_size", 0.0) elif "mann-whitney" in test_type: # For Mann-Whitney, use Common Language Effect Size (CLES) # Convert CLES to Cohen's d equivalent: d ≈ 2 * Φ^(-1)(CLES) cles = metric_result.get("effect_size", 0.5) # Simple approximation: convert CLES to d-like measure # CLES of 0.5 = no effect, CLES of 0.71 ≈ small effect (d=0.2) return 2 * (cles - 0.5) elif "anova" in test_type: # For ANOVA, use eta-squared and convert to Cohen's d equivalent eta_squared = metric_result.get("effect_size", 0.0) # Convert eta-squared to Cohen's d: d = 2 * sqrt(eta^2 / (1 - eta^2)) if eta_squared > 0 and eta_squared < 1: return 2 * (eta_squared / (1 - eta_squared)) ** 0.5 return 0.0 elif "chi-square" in test_type: # For Chi-square, use Cramer's V and convert to Cohen's d equivalent cramers_v = metric_result.get("effect_size", 0.0) # Rough conversion: d ≈ 2 * Cramer's V return 2 * cramers_v # Fallback: try to calculate from means and standard deviations if all(k in metric_result for k in ["control_mean", "treatment_mean", "control_std", "treatment_std"]): control_mean = metric_result["control_mean"] treatment_mean = metric_result["treatment_mean"] control_std = metric_result["control_std"] treatment_std = metric_result["treatment_std"] # Calculate pooled standard deviation pooled_std = ((control_std ** 2 + treatment_std ** 2) / 2) ** 0.5 if pooled_std > 0: return abs(treatment_mean - control_mean) / pooled_std # If all else fails, return 0 (no effect) return 0.0 def _interpret_effect_size(self, effect_size: float) -> str: """Provide interpretation of effect size magnitude (Cohen's conventions).""" if effect_size < 0.2: return "negligible" elif effect_size < 0.5: return "small" elif effect_size < 0.8: return "medium" else: return "large"
Methods
def validate(self, control: SimulationExperimentDataset, treatment: SimulationExperimentDataset, validation_types: List[str] = ['statistical', 'semantic'], significance_level: float = 0.05, output_format: str = 'values') ‑> Union[SimulationExperimentEmpiricalValidationResult, str]
-
Validate a simulation experiment dataset against an empirical control dataset.
Args
control
- The control/empirical reference dataset
treatment
- The treatment/simulation experiment dataset to validate
validation_types
- List of validation types to perform ("statistical", "semantic")
significance_level
- Significance level for statistical tests
output_format
- "values" for SimulationExperimentEmpiricalValidationResult object, "report" for markdown report
Returns
SimulationExperimentEmpiricalValidationResult object or markdown report string
Expand source code
def validate(self, control: SimulationExperimentDataset, treatment: SimulationExperimentDataset, validation_types: List[str] = ["statistical", "semantic"], significance_level: float = 0.05, output_format: str = "values") -> Union[SimulationExperimentEmpiricalValidationResult, str]: """ Validate a simulation experiment dataset against an empirical control dataset. Args: control: The control/empirical reference dataset treatment: The treatment/simulation experiment dataset to validate validation_types: List of validation types to perform ("statistical", "semantic") significance_level: Significance level for statistical tests output_format: "values" for SimulationExperimentEmpiricalValidationResult object, "report" for markdown report Returns: SimulationExperimentEmpiricalValidationResult object or markdown report string """ result = SimulationExperimentEmpiricalValidationResult( validation_type=", ".join(validation_types), control_name=control.name or "Control", treatment_name=treatment.name or "Treatment" ) # Perform statistical validation if "statistical" in validation_types: result.statistical_results = self._perform_statistical_validation( control, treatment, significance_level ) # Perform semantic validation if "semantic" in validation_types: result.semantic_results = self._perform_semantic_validation( control, treatment ) # Calculate overall score and summary result.overall_score = self._calculate_overall_score(result) result.summary = self._generate_summary(result) if output_format == "report": return self._generate_markdown_report(result) else: return result