Module tinytroupe.experimentation.statistical_tests

Expand source code
import numpy as np
import scipy.stats as stats
from typing import Dict, List, Union, Callable, Any, Optional

from tinytroupe.experimentation import logger


class StatisticalTester:
    """
    A class to perform statistical tests on experiment results. To do so, a control is defined, and then one or 
    more treatments are compared to the control. The class supports various statistical tests, including t-tests,
    Mann-Whitney U tests, and ANOVA. The user can specify the type of test to run, the significance level, and
    the specific metrics to analyze. The results of the tests are returned in a structured format.
    """

    def __init__(self, control_experiment_data: Dict[str, list], 
                 treatments_experiment_data: Dict[str, Dict[str, list]],
                 results_key:str = None):
        """
        Initialize with experiment results.
        
        Args:
            control_experiment_data (dict): Dictionary containing control experiment results with keys 
                as metric names and values as lists of values.
                e.g.,{"control_exp": {"metric1": [0.1, 0.2], "metric2": [0.3, 0.4], ...}}
           treatments_experiment_data (dict): Dictionary containing experiment results with keys 
                as experiment IDs and values as dicts of metric names to lists of values.
                e.g., {"exp1": {"metric1": [0.1, 0.2], "metric2": [0.3, 0.4]}, 
                       "exp2": {"metric1": [0.5, 0.6], "metric2": [0.7, 0.8]}, ...}
        """
        
        # if results_key is provided, use it to extract the relevant data from the control and treatment data
        # e.g., {"exp1": {"results": {"metric1": [0.1, 0.2], "metric2": [0.3, 0.4]}}
        if results_key:
            control_experiment_data = {k: v[results_key] for k, v in control_experiment_data.items()}
            treatments_experiment_data = {k: v[results_key] for k, v in treatments_experiment_data.items()}
            
        self.control_experiment_data = control_experiment_data
        self.treatments_experiment_data = treatments_experiment_data
        
        # Validate input data
        self._validate_input_data()
    
    def _validate_input_data(self):
        """Validate the input data formats and structure."""
        # Check that control and treatments are dictionaries
        if not isinstance(self.control_experiment_data, dict):
            raise TypeError("Control experiment data must be a dictionary")
        if not isinstance(self.treatments_experiment_data, dict):
            raise TypeError("Treatments experiment data must be a dictionary")
        
        # Check that control has at least one experiment
        if not self.control_experiment_data:
            raise ValueError("Control experiment data cannot be empty")

        # Check only one control
        if len(self.control_experiment_data) > 1:
            raise ValueError("Only one control experiment is allowed")
            
        # Validate control experiment structure
        for control_id, control_metrics in self.control_experiment_data.items():
            if not isinstance(control_metrics, dict):
                raise TypeError(f"Metrics for control experiment '{control_id}' must be a dictionary")
            
            # Check that the metrics dictionary is not empty
            if not control_metrics:
                raise ValueError(f"Control experiment '{control_id}' has no metrics")
                
            # Validate that metric values are lists
            for metric, values in control_metrics.items():
                if not isinstance(values, list):
                    raise TypeError(f"Values for metric '{metric}' in control experiment '{control_id}' must be a list")
        
        # Check treatments have at least one experiment
        if not self.treatments_experiment_data:
            raise ValueError("Treatments experiment data cannot be empty")
        
        # Validate treatment experiment structure
        for treatment_id, treatment_data in self.treatments_experiment_data.items():
            if not isinstance(treatment_data, dict):
                raise TypeError(f"Data for treatment '{treatment_id}' must be a dictionary")
                
            # Check that the metrics dictionary is not empty
            if not treatment_data:
                raise ValueError(f"Treatment '{treatment_id}' has no metrics")
            
            # Get all control metrics for overlap checking
            all_control_metrics = set()
            for control_metrics in self.control_experiment_data.values():
                all_control_metrics.update(control_metrics.keys())
                
            # Check if there's any overlap between control and treatment metrics
            common_metrics = all_control_metrics.intersection(set(treatment_data.keys()))
            if not common_metrics:
                logger.warning(f"Treatment '{treatment_id}' has no metrics in common with any control experiment")
            
            # Check that treatment metrics are lists
            for metric, values in treatment_data.items():
                if not isinstance(values, list):
                    raise TypeError(f"Values for metric '{metric}' in treatment '{treatment_id}' must be a list")
    
    def run_test(self, 
                test_type: str="welch_t_test", 
                alpha: float = 0.05, 
                **kwargs) -> Dict[str, Dict[str, Any]]:
        """
        Run the specified statistical test on the control and treatments data.

        Args:
            test_type (str): Type of statistical test to run. 
                Options: 't_test', 'welch_t_test', 'mann_whitney', 'anova', 'chi_square'
            alpha (float): Significance level, defaults to 0.05
            **kwargs: Additional arguments for specific test types.

        Returns:
            dict: Dictionary containing the results of the statistical tests for each treatment (vs the one control).
                Each key is the treatment ID and each value is a dictionary with test results.
        """
        supported_tests = {
            't_test': self._run_t_test,
            'welch_t_test': self._run_welch_t_test,
            'mann_whitney': self._run_mann_whitney,
            'anova': self._run_anova,
            'chi_square': self._run_chi_square
        }
        
        if test_type not in supported_tests:
            raise ValueError(f"Unsupported test type: {test_type}. Supported types: {list(supported_tests.keys())}")
            
        results = {}
        for control_id, control_data in self.control_experiment_data.items():
            # get all metrics from control data
            metrics = set()
            metrics.update(control_data.keys())
            for treatment_id, treatment_data in self.treatments_experiment_data.items():
                results[treatment_id] = {}
                
                for metric in metrics:
                    # Skip metrics not in treatment data
                    if metric not in treatment_data:
                        logger.warning(f"Metric '{metric}' not found in treatment '{treatment_id}'")
                        continue
                    
                    control_values = control_data[metric]
                    treatment_values = treatment_data[metric]
                    
                    # Skip if either control or treatment has no values
                    if len(control_values) == 0 or len(treatment_values) == 0:
                        logger.warning(f"Skipping metric '{metric}' for treatment '{treatment_id}' due to empty values")
                        continue
                    
                    # Run the selected test and convert to JSON serializable types
                    test_result = supported_tests[test_type](control_values, treatment_values, alpha, **kwargs)
                    results[treatment_id][metric] = convert_to_serializable(test_result)
        
        return results
    
    def _run_t_test(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]:
        """Run Student's t-test (equal variance assumed)."""
        # Convert to numpy arrays for calculations
        control = np.array(control_values, dtype=float)
        treatment = np.array(treatment_values, dtype=float)
        
        # Calculate basic statistics
        control_mean = np.mean(control)
        treatment_mean = np.mean(treatment)
        mean_diff = treatment_mean - control_mean
        
        # Run the t-test
        t_stat, p_value = stats.ttest_ind(control, treatment, equal_var=True)
        
        # Calculate confidence interval
        control_std = np.std(control, ddof=1)
        treatment_std = np.std(treatment, ddof=1)
        pooled_std = np.sqrt(((len(control) - 1) * control_std**2 + 
                              (len(treatment) - 1) * treatment_std**2) / 
                            (len(control) + len(treatment) - 2))
        
        se = pooled_std * np.sqrt(1/len(control) + 1/len(treatment))
        critical_value = stats.t.ppf(1 - alpha/2, len(control) + len(treatment) - 2)
        margin_error = critical_value * se
        ci_lower = mean_diff - margin_error
        ci_upper = mean_diff + margin_error
        
        # Determine if the result is significant
        significant = p_value < alpha
        
        return {
            'test_type': 'Student t-test (equal variance)',
            'control_mean': control_mean,
            'treatment_mean': treatment_mean,
            'mean_difference': mean_diff,
            'percent_change': (mean_diff / control_mean * 100) if control_mean != 0 else float('inf'),
            't_statistic': t_stat,
            'p_value': p_value,
            'confidence_interval': (ci_lower, ci_upper),
            'confidence_level': 1 - alpha,
            'significant': significant,
            'control_sample_size': len(control),
            'treatment_sample_size': len(treatment),
            'control_std': control_std,
            'treatment_std': treatment_std,
            'effect_size': cohen_d(control, treatment)
        }
    
    def _run_welch_t_test(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]:
        """Run Welch's t-test (unequal variance)."""
        # Convert to numpy arrays for calculations
        control = np.array(control_values, dtype=float)
        treatment = np.array(treatment_values, dtype=float)
        
        # Calculate basic statistics
        control_mean = np.mean(control)
        treatment_mean = np.mean(treatment)
        mean_diff = treatment_mean - control_mean
        
        # Run Welch's t-test
        t_stat, p_value = stats.ttest_ind(control, treatment, equal_var=False)
        
        # Calculate confidence interval (for Welch's t-test)
        control_var = np.var(control, ddof=1)
        treatment_var = np.var(treatment, ddof=1)
        
        # Calculate effective degrees of freedom (Welch-Satterthwaite equation)
        v_num = (control_var/len(control) + treatment_var/len(treatment))**2
        v_denom = (control_var/len(control))**2/(len(control)-1) + (treatment_var/len(treatment))**2/(len(treatment)-1)
        df = v_num / v_denom if v_denom > 0 else float('inf')
        
        se = np.sqrt(control_var/len(control) + treatment_var/len(treatment))
        critical_value = stats.t.ppf(1 - alpha/2, df)
        margin_error = critical_value * se
        ci_lower = mean_diff - margin_error
        ci_upper = mean_diff + margin_error
        
        control_std = np.std(control, ddof=1)
        treatment_std = np.std(treatment, ddof=1)
        
        # Determine if the result is significant
        significant = p_value < alpha
        
        return {
            'test_type': 'Welch t-test (unequal variance)',
            'control_mean': control_mean,
            'treatment_mean': treatment_mean,
            'mean_difference': mean_diff,
            'percent_change': (mean_diff / control_mean * 100) if control_mean != 0 else float('inf'),
            't_statistic': t_stat,
            'p_value': p_value,
            'confidence_interval': (ci_lower, ci_upper),
            'confidence_level': 1 - alpha,
            'significant': significant,
            'degrees_of_freedom': df,
            'control_sample_size': len(control),
            'treatment_sample_size': len(treatment),
            'control_std': control_std,
            'treatment_std': treatment_std,
            'effect_size': cohen_d(control, treatment)
        }
    
    def _run_mann_whitney(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]:
        """Run Mann-Whitney U test (non-parametric test)."""
        # Convert to numpy arrays
        control = np.array(control_values, dtype=float)
        treatment = np.array(treatment_values, dtype=float)
        
        # Calculate basic statistics
        control_median = np.median(control)
        treatment_median = np.median(treatment)
        median_diff = treatment_median - control_median
        
        # Run the Mann-Whitney U test
        u_stat, p_value = stats.mannwhitneyu(control, treatment, alternative='two-sided')
        
        # Calculate common language effect size
        # (probability that a randomly selected value from treatment is greater than control)
        count = 0
        for tc in treatment:
            for cc in control:
                if tc > cc:
                    count += 1
        cles = count / (len(treatment) * len(control))
        
        # Calculate approximate confidence interval using bootstrap
        try:
            from scipy.stats import bootstrap
            
            def median_diff_func(x, y):
                return np.median(x) - np.median(y)
            
            res = bootstrap((control, treatment), median_diff_func, 
                            confidence_level=1-alpha, 
                            n_resamples=1000,
                            random_state=42)
            ci_lower, ci_upper = res.confidence_interval
        except ImportError:
            # If bootstrap is not available, return None for confidence interval
            ci_lower, ci_upper = None, None
            logger.warning("SciPy bootstrap not available, skipping confidence interval calculation")
        
        # Determine if the result is significant
        significant = p_value < alpha
        
        return {
            'test_type': 'Mann-Whitney U test',
            'control_median': control_median,
            'treatment_median': treatment_median,
            'median_difference': median_diff,
            'percent_change': (median_diff / control_median * 100) if control_median != 0 else float('inf'),
            'u_statistic': u_stat,
            'p_value': p_value,
            'confidence_interval': (ci_lower, ci_upper) if ci_lower is not None else None,
            'confidence_level': 1 - alpha,
            'significant': significant,
            'control_sample_size': len(control),
            'treatment_sample_size': len(treatment),
            'effect_size': cles
        }
    
    def _run_anova(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]:
        """Run one-way ANOVA test."""
        # For ANOVA, we typically need multiple groups, but we can still run it with just two
        # Convert to numpy arrays
        control = np.array(control_values, dtype=float)
        treatment = np.array(treatment_values, dtype=float)
        
        # Run one-way ANOVA
        f_stat, p_value = stats.f_oneway(control, treatment)
        
        # Calculate effect size (eta-squared)
        total_values = np.concatenate([control, treatment])
        grand_mean = np.mean(total_values)
        
        ss_total = np.sum((total_values - grand_mean) ** 2)
        ss_between = (len(control) * (np.mean(control) - grand_mean) ** 2 + 
                     len(treatment) * (np.mean(treatment) - grand_mean) ** 2)
        
        eta_squared = ss_between / ss_total if ss_total > 0 else 0
        
        # Determine if the result is significant
        significant = p_value < alpha
        
        return {
            'test_type': 'One-way ANOVA',
            'f_statistic': f_stat,
            'p_value': p_value,
            'significant': significant,
            'control_sample_size': len(control),
            'treatment_sample_size': len(treatment),
            'effect_size': eta_squared,
            'effect_size_type': 'eta_squared'
        }
    
    def _run_chi_square(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]:
        """Run Chi-square test for categorical data."""
        # For chi-square, we assume the values represent counts in different categories
        # Convert to numpy arrays
        control = np.array(control_values, dtype=float)
        treatment = np.array(treatment_values, dtype=float)
        
        # Check if the arrays are the same length (same number of categories)
        if len(control) != len(treatment):
            raise ValueError("Control and treatment must have the same number of categories for chi-square test")
        
        # Run chi-square test
        contingency_table = np.vstack([control, treatment])
        chi2_stat, p_value, dof, expected = stats.chi2_contingency(contingency_table)
        
        # Calculate Cramer's V as effect size
        n = np.sum(contingency_table)
        min_dim = min(contingency_table.shape) - 1
        cramers_v = np.sqrt(chi2_stat / (n * min_dim)) if n * min_dim > 0 else 0
        
        # Determine if the result is significant
        significant = p_value < alpha
        
        return {
            'test_type': 'Chi-square test',
            'chi2_statistic': chi2_stat,
            'p_value': p_value,
            'degrees_of_freedom': dof,
            'significant': significant,
            'effect_size': cramers_v,
            'effect_size_type': 'cramers_v'
        }
    
    def check_assumptions(self, metric: str) -> Dict[str, Dict[str, Any]]:
        """
        Check statistical assumptions for the given metric across all treatments.
        
        Args:
            metric (str): The metric to check assumptions for.
            
        Returns:
            dict: Dictionary with results of assumption checks for each treatment.
        """
        if metric not in self.control_experiment_data:
            raise ValueError(f"Metric '{metric}' not found in control data")
            
        results = {}
        control_values = np.array(self.control_experiment_data[metric], dtype=float)
        
        # Check normality of control
        control_shapiro = stats.shapiro(control_values)
        control_normality = {
            'test': 'Shapiro-Wilk',
            'statistic': control_shapiro[0],
            'p_value': control_shapiro[1],
            'normal': control_shapiro[1] >= 0.05
        }
        
        for treatment_id, treatment_data in self.treatments_experiment_data.items():
            if metric not in treatment_data:
                logger.warning(f"Metric '{metric}' not found in treatment '{treatment_id}'")
                continue
                
            treatment_values = np.array(treatment_data[metric], dtype=float)
            
            # Check normality of treatment
            treatment_shapiro = stats.shapiro(treatment_values)
            treatment_normality = {
                'test': 'Shapiro-Wilk',
                'statistic': treatment_shapiro[0],
                'p_value': treatment_shapiro[1],
                'normal': treatment_shapiro[1] >= 0.05
            }
            
            # Check homogeneity of variance
            levene_test = stats.levene(control_values, treatment_values)
            variance_homogeneity = {
                'test': 'Levene',
                'statistic': levene_test[0],
                'p_value': levene_test[1],
                'equal_variance': levene_test[1] >= 0.05
            }
            
            # Store results and convert to JSON serializable types
            results[treatment_id] = convert_to_serializable({
                'control_normality': control_normality,
                'treatment_normality': treatment_normality,
                'variance_homogeneity': variance_homogeneity,
                'recommended_test': self._recommend_test(control_normality['normal'], 
                                                       treatment_normality['normal'],
                                                       variance_homogeneity['equal_variance'])
            })
            
        return results
    
    def _recommend_test(self, control_normal: bool, treatment_normal: bool, equal_variance: bool) -> str:
        """Recommend a statistical test based on assumption checks."""
        if control_normal and treatment_normal:
            if equal_variance:
                return 't_test'
            else:
                return 'welch_t_test'
        else:
            return 'mann_whitney'


def cohen_d(x: Union[list, np.ndarray], y: Union[list, np.ndarray]) -> float:
    """
    Calculate Cohen's d effect size for two samples.
    
    Args:
        x: First sample
        y: Second sample
        
    Returns:
        float: Cohen's d effect size
    """
    nx = len(x)
    ny = len(y)
    
    # Convert to numpy arrays
    x = np.array(x, dtype=float)
    y = np.array(y, dtype=float)
    
    # Calculate means
    mx = np.mean(x)
    my = np.mean(y)
    
    # Calculate standard deviations
    sx = np.std(x, ddof=1)
    sy = np.std(y, ddof=1)
    
    # Pooled standard deviation
    pooled_sd = np.sqrt(((nx - 1) * sx**2 + (ny - 1) * sy**2) / (nx + ny - 2))
    
    # Cohen's d
    return (my - mx) / pooled_sd if pooled_sd > 0 else 0


def convert_to_serializable(obj):
    """
    Convert NumPy types to native Python types recursively to ensure JSON serialization works.
    
    Args:
        obj: Any object that might contain NumPy types
        
    Returns:
        Object with NumPy types converted to Python native types
    """
    if isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, (np.number, np.bool_)):
        return obj.item()
    elif isinstance(obj, dict):
        return {k: convert_to_serializable(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_to_serializable(i) for i in obj]
    elif isinstance(obj, tuple):
        return tuple(convert_to_serializable(i) for i in obj)
    else:
        return obj

Functions

def cohen_d(x: Union[list, numpy.ndarray], y: Union[list, numpy.ndarray]) ‑> float

Calculate Cohen's d effect size for two samples.

Args

x
First sample
y
Second sample

Returns

float
Cohen's d effect size
Expand source code
def cohen_d(x: Union[list, np.ndarray], y: Union[list, np.ndarray]) -> float:
    """
    Calculate Cohen's d effect size for two samples.
    
    Args:
        x: First sample
        y: Second sample
        
    Returns:
        float: Cohen's d effect size
    """
    nx = len(x)
    ny = len(y)
    
    # Convert to numpy arrays
    x = np.array(x, dtype=float)
    y = np.array(y, dtype=float)
    
    # Calculate means
    mx = np.mean(x)
    my = np.mean(y)
    
    # Calculate standard deviations
    sx = np.std(x, ddof=1)
    sy = np.std(y, ddof=1)
    
    # Pooled standard deviation
    pooled_sd = np.sqrt(((nx - 1) * sx**2 + (ny - 1) * sy**2) / (nx + ny - 2))
    
    # Cohen's d
    return (my - mx) / pooled_sd if pooled_sd > 0 else 0
def convert_to_serializable(obj)

Convert NumPy types to native Python types recursively to ensure JSON serialization works.

Args

obj
Any object that might contain NumPy types

Returns

Object with NumPy types converted to Python native types

Expand source code
def convert_to_serializable(obj):
    """
    Convert NumPy types to native Python types recursively to ensure JSON serialization works.
    
    Args:
        obj: Any object that might contain NumPy types
        
    Returns:
        Object with NumPy types converted to Python native types
    """
    if isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, (np.number, np.bool_)):
        return obj.item()
    elif isinstance(obj, dict):
        return {k: convert_to_serializable(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_to_serializable(i) for i in obj]
    elif isinstance(obj, tuple):
        return tuple(convert_to_serializable(i) for i in obj)
    else:
        return obj

Classes

class StatisticalTester (control_experiment_data: Dict[str, list], treatments_experiment_data: Dict[str, Dict[str, list]], results_key: str = None)

A class to perform statistical tests on experiment results. To do so, a control is defined, and then one or more treatments are compared to the control. The class supports various statistical tests, including t-tests, Mann-Whitney U tests, and ANOVA. The user can specify the type of test to run, the significance level, and the specific metrics to analyze. The results of the tests are returned in a structured format.

Initialize with experiment results.

Args

control_experiment_data (dict): Dictionary containing control experiment results with keys
as metric names and values as lists of values.
e.g.,{"control_exp": {"metric1": [0.1, 0.2], "metric2": [0.3, 0.4], …}}
treatments_experiment_data : dict
Dictionary containing experiment results with keys as experiment IDs and values as dicts of metric names to lists of values. e.g., {"exp1": {"metric1": [0.1, 0.2], "metric2": [0.3, 0.4]}, "exp2": {"metric1": [0.5, 0.6], "metric2": [0.7, 0.8]}, …}
Expand source code
class StatisticalTester:
    """
    A class to perform statistical tests on experiment results. To do so, a control is defined, and then one or 
    more treatments are compared to the control. The class supports various statistical tests, including t-tests,
    Mann-Whitney U tests, and ANOVA. The user can specify the type of test to run, the significance level, and
    the specific metrics to analyze. The results of the tests are returned in a structured format.
    """

    def __init__(self, control_experiment_data: Dict[str, list], 
                 treatments_experiment_data: Dict[str, Dict[str, list]],
                 results_key:str = None):
        """
        Initialize with experiment results.
        
        Args:
            control_experiment_data (dict): Dictionary containing control experiment results with keys 
                as metric names and values as lists of values.
                e.g.,{"control_exp": {"metric1": [0.1, 0.2], "metric2": [0.3, 0.4], ...}}
           treatments_experiment_data (dict): Dictionary containing experiment results with keys 
                as experiment IDs and values as dicts of metric names to lists of values.
                e.g., {"exp1": {"metric1": [0.1, 0.2], "metric2": [0.3, 0.4]}, 
                       "exp2": {"metric1": [0.5, 0.6], "metric2": [0.7, 0.8]}, ...}
        """
        
        # if results_key is provided, use it to extract the relevant data from the control and treatment data
        # e.g., {"exp1": {"results": {"metric1": [0.1, 0.2], "metric2": [0.3, 0.4]}}
        if results_key:
            control_experiment_data = {k: v[results_key] for k, v in control_experiment_data.items()}
            treatments_experiment_data = {k: v[results_key] for k, v in treatments_experiment_data.items()}
            
        self.control_experiment_data = control_experiment_data
        self.treatments_experiment_data = treatments_experiment_data
        
        # Validate input data
        self._validate_input_data()
    
    def _validate_input_data(self):
        """Validate the input data formats and structure."""
        # Check that control and treatments are dictionaries
        if not isinstance(self.control_experiment_data, dict):
            raise TypeError("Control experiment data must be a dictionary")
        if not isinstance(self.treatments_experiment_data, dict):
            raise TypeError("Treatments experiment data must be a dictionary")
        
        # Check that control has at least one experiment
        if not self.control_experiment_data:
            raise ValueError("Control experiment data cannot be empty")

        # Check only one control
        if len(self.control_experiment_data) > 1:
            raise ValueError("Only one control experiment is allowed")
            
        # Validate control experiment structure
        for control_id, control_metrics in self.control_experiment_data.items():
            if not isinstance(control_metrics, dict):
                raise TypeError(f"Metrics for control experiment '{control_id}' must be a dictionary")
            
            # Check that the metrics dictionary is not empty
            if not control_metrics:
                raise ValueError(f"Control experiment '{control_id}' has no metrics")
                
            # Validate that metric values are lists
            for metric, values in control_metrics.items():
                if not isinstance(values, list):
                    raise TypeError(f"Values for metric '{metric}' in control experiment '{control_id}' must be a list")
        
        # Check treatments have at least one experiment
        if not self.treatments_experiment_data:
            raise ValueError("Treatments experiment data cannot be empty")
        
        # Validate treatment experiment structure
        for treatment_id, treatment_data in self.treatments_experiment_data.items():
            if not isinstance(treatment_data, dict):
                raise TypeError(f"Data for treatment '{treatment_id}' must be a dictionary")
                
            # Check that the metrics dictionary is not empty
            if not treatment_data:
                raise ValueError(f"Treatment '{treatment_id}' has no metrics")
            
            # Get all control metrics for overlap checking
            all_control_metrics = set()
            for control_metrics in self.control_experiment_data.values():
                all_control_metrics.update(control_metrics.keys())
                
            # Check if there's any overlap between control and treatment metrics
            common_metrics = all_control_metrics.intersection(set(treatment_data.keys()))
            if not common_metrics:
                logger.warning(f"Treatment '{treatment_id}' has no metrics in common with any control experiment")
            
            # Check that treatment metrics are lists
            for metric, values in treatment_data.items():
                if not isinstance(values, list):
                    raise TypeError(f"Values for metric '{metric}' in treatment '{treatment_id}' must be a list")
    
    def run_test(self, 
                test_type: str="welch_t_test", 
                alpha: float = 0.05, 
                **kwargs) -> Dict[str, Dict[str, Any]]:
        """
        Run the specified statistical test on the control and treatments data.

        Args:
            test_type (str): Type of statistical test to run. 
                Options: 't_test', 'welch_t_test', 'mann_whitney', 'anova', 'chi_square'
            alpha (float): Significance level, defaults to 0.05
            **kwargs: Additional arguments for specific test types.

        Returns:
            dict: Dictionary containing the results of the statistical tests for each treatment (vs the one control).
                Each key is the treatment ID and each value is a dictionary with test results.
        """
        supported_tests = {
            't_test': self._run_t_test,
            'welch_t_test': self._run_welch_t_test,
            'mann_whitney': self._run_mann_whitney,
            'anova': self._run_anova,
            'chi_square': self._run_chi_square
        }
        
        if test_type not in supported_tests:
            raise ValueError(f"Unsupported test type: {test_type}. Supported types: {list(supported_tests.keys())}")
            
        results = {}
        for control_id, control_data in self.control_experiment_data.items():
            # get all metrics from control data
            metrics = set()
            metrics.update(control_data.keys())
            for treatment_id, treatment_data in self.treatments_experiment_data.items():
                results[treatment_id] = {}
                
                for metric in metrics:
                    # Skip metrics not in treatment data
                    if metric not in treatment_data:
                        logger.warning(f"Metric '{metric}' not found in treatment '{treatment_id}'")
                        continue
                    
                    control_values = control_data[metric]
                    treatment_values = treatment_data[metric]
                    
                    # Skip if either control or treatment has no values
                    if len(control_values) == 0 or len(treatment_values) == 0:
                        logger.warning(f"Skipping metric '{metric}' for treatment '{treatment_id}' due to empty values")
                        continue
                    
                    # Run the selected test and convert to JSON serializable types
                    test_result = supported_tests[test_type](control_values, treatment_values, alpha, **kwargs)
                    results[treatment_id][metric] = convert_to_serializable(test_result)
        
        return results
    
    def _run_t_test(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]:
        """Run Student's t-test (equal variance assumed)."""
        # Convert to numpy arrays for calculations
        control = np.array(control_values, dtype=float)
        treatment = np.array(treatment_values, dtype=float)
        
        # Calculate basic statistics
        control_mean = np.mean(control)
        treatment_mean = np.mean(treatment)
        mean_diff = treatment_mean - control_mean
        
        # Run the t-test
        t_stat, p_value = stats.ttest_ind(control, treatment, equal_var=True)
        
        # Calculate confidence interval
        control_std = np.std(control, ddof=1)
        treatment_std = np.std(treatment, ddof=1)
        pooled_std = np.sqrt(((len(control) - 1) * control_std**2 + 
                              (len(treatment) - 1) * treatment_std**2) / 
                            (len(control) + len(treatment) - 2))
        
        se = pooled_std * np.sqrt(1/len(control) + 1/len(treatment))
        critical_value = stats.t.ppf(1 - alpha/2, len(control) + len(treatment) - 2)
        margin_error = critical_value * se
        ci_lower = mean_diff - margin_error
        ci_upper = mean_diff + margin_error
        
        # Determine if the result is significant
        significant = p_value < alpha
        
        return {
            'test_type': 'Student t-test (equal variance)',
            'control_mean': control_mean,
            'treatment_mean': treatment_mean,
            'mean_difference': mean_diff,
            'percent_change': (mean_diff / control_mean * 100) if control_mean != 0 else float('inf'),
            't_statistic': t_stat,
            'p_value': p_value,
            'confidence_interval': (ci_lower, ci_upper),
            'confidence_level': 1 - alpha,
            'significant': significant,
            'control_sample_size': len(control),
            'treatment_sample_size': len(treatment),
            'control_std': control_std,
            'treatment_std': treatment_std,
            'effect_size': cohen_d(control, treatment)
        }
    
    def _run_welch_t_test(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]:
        """Run Welch's t-test (unequal variance)."""
        # Convert to numpy arrays for calculations
        control = np.array(control_values, dtype=float)
        treatment = np.array(treatment_values, dtype=float)
        
        # Calculate basic statistics
        control_mean = np.mean(control)
        treatment_mean = np.mean(treatment)
        mean_diff = treatment_mean - control_mean
        
        # Run Welch's t-test
        t_stat, p_value = stats.ttest_ind(control, treatment, equal_var=False)
        
        # Calculate confidence interval (for Welch's t-test)
        control_var = np.var(control, ddof=1)
        treatment_var = np.var(treatment, ddof=1)
        
        # Calculate effective degrees of freedom (Welch-Satterthwaite equation)
        v_num = (control_var/len(control) + treatment_var/len(treatment))**2
        v_denom = (control_var/len(control))**2/(len(control)-1) + (treatment_var/len(treatment))**2/(len(treatment)-1)
        df = v_num / v_denom if v_denom > 0 else float('inf')
        
        se = np.sqrt(control_var/len(control) + treatment_var/len(treatment))
        critical_value = stats.t.ppf(1 - alpha/2, df)
        margin_error = critical_value * se
        ci_lower = mean_diff - margin_error
        ci_upper = mean_diff + margin_error
        
        control_std = np.std(control, ddof=1)
        treatment_std = np.std(treatment, ddof=1)
        
        # Determine if the result is significant
        significant = p_value < alpha
        
        return {
            'test_type': 'Welch t-test (unequal variance)',
            'control_mean': control_mean,
            'treatment_mean': treatment_mean,
            'mean_difference': mean_diff,
            'percent_change': (mean_diff / control_mean * 100) if control_mean != 0 else float('inf'),
            't_statistic': t_stat,
            'p_value': p_value,
            'confidence_interval': (ci_lower, ci_upper),
            'confidence_level': 1 - alpha,
            'significant': significant,
            'degrees_of_freedom': df,
            'control_sample_size': len(control),
            'treatment_sample_size': len(treatment),
            'control_std': control_std,
            'treatment_std': treatment_std,
            'effect_size': cohen_d(control, treatment)
        }
    
    def _run_mann_whitney(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]:
        """Run Mann-Whitney U test (non-parametric test)."""
        # Convert to numpy arrays
        control = np.array(control_values, dtype=float)
        treatment = np.array(treatment_values, dtype=float)
        
        # Calculate basic statistics
        control_median = np.median(control)
        treatment_median = np.median(treatment)
        median_diff = treatment_median - control_median
        
        # Run the Mann-Whitney U test
        u_stat, p_value = stats.mannwhitneyu(control, treatment, alternative='two-sided')
        
        # Calculate common language effect size
        # (probability that a randomly selected value from treatment is greater than control)
        count = 0
        for tc in treatment:
            for cc in control:
                if tc > cc:
                    count += 1
        cles = count / (len(treatment) * len(control))
        
        # Calculate approximate confidence interval using bootstrap
        try:
            from scipy.stats import bootstrap
            
            def median_diff_func(x, y):
                return np.median(x) - np.median(y)
            
            res = bootstrap((control, treatment), median_diff_func, 
                            confidence_level=1-alpha, 
                            n_resamples=1000,
                            random_state=42)
            ci_lower, ci_upper = res.confidence_interval
        except ImportError:
            # If bootstrap is not available, return None for confidence interval
            ci_lower, ci_upper = None, None
            logger.warning("SciPy bootstrap not available, skipping confidence interval calculation")
        
        # Determine if the result is significant
        significant = p_value < alpha
        
        return {
            'test_type': 'Mann-Whitney U test',
            'control_median': control_median,
            'treatment_median': treatment_median,
            'median_difference': median_diff,
            'percent_change': (median_diff / control_median * 100) if control_median != 0 else float('inf'),
            'u_statistic': u_stat,
            'p_value': p_value,
            'confidence_interval': (ci_lower, ci_upper) if ci_lower is not None else None,
            'confidence_level': 1 - alpha,
            'significant': significant,
            'control_sample_size': len(control),
            'treatment_sample_size': len(treatment),
            'effect_size': cles
        }
    
    def _run_anova(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]:
        """Run one-way ANOVA test."""
        # For ANOVA, we typically need multiple groups, but we can still run it with just two
        # Convert to numpy arrays
        control = np.array(control_values, dtype=float)
        treatment = np.array(treatment_values, dtype=float)
        
        # Run one-way ANOVA
        f_stat, p_value = stats.f_oneway(control, treatment)
        
        # Calculate effect size (eta-squared)
        total_values = np.concatenate([control, treatment])
        grand_mean = np.mean(total_values)
        
        ss_total = np.sum((total_values - grand_mean) ** 2)
        ss_between = (len(control) * (np.mean(control) - grand_mean) ** 2 + 
                     len(treatment) * (np.mean(treatment) - grand_mean) ** 2)
        
        eta_squared = ss_between / ss_total if ss_total > 0 else 0
        
        # Determine if the result is significant
        significant = p_value < alpha
        
        return {
            'test_type': 'One-way ANOVA',
            'f_statistic': f_stat,
            'p_value': p_value,
            'significant': significant,
            'control_sample_size': len(control),
            'treatment_sample_size': len(treatment),
            'effect_size': eta_squared,
            'effect_size_type': 'eta_squared'
        }
    
    def _run_chi_square(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]:
        """Run Chi-square test for categorical data."""
        # For chi-square, we assume the values represent counts in different categories
        # Convert to numpy arrays
        control = np.array(control_values, dtype=float)
        treatment = np.array(treatment_values, dtype=float)
        
        # Check if the arrays are the same length (same number of categories)
        if len(control) != len(treatment):
            raise ValueError("Control and treatment must have the same number of categories for chi-square test")
        
        # Run chi-square test
        contingency_table = np.vstack([control, treatment])
        chi2_stat, p_value, dof, expected = stats.chi2_contingency(contingency_table)
        
        # Calculate Cramer's V as effect size
        n = np.sum(contingency_table)
        min_dim = min(contingency_table.shape) - 1
        cramers_v = np.sqrt(chi2_stat / (n * min_dim)) if n * min_dim > 0 else 0
        
        # Determine if the result is significant
        significant = p_value < alpha
        
        return {
            'test_type': 'Chi-square test',
            'chi2_statistic': chi2_stat,
            'p_value': p_value,
            'degrees_of_freedom': dof,
            'significant': significant,
            'effect_size': cramers_v,
            'effect_size_type': 'cramers_v'
        }
    
    def check_assumptions(self, metric: str) -> Dict[str, Dict[str, Any]]:
        """
        Check statistical assumptions for the given metric across all treatments.
        
        Args:
            metric (str): The metric to check assumptions for.
            
        Returns:
            dict: Dictionary with results of assumption checks for each treatment.
        """
        if metric not in self.control_experiment_data:
            raise ValueError(f"Metric '{metric}' not found in control data")
            
        results = {}
        control_values = np.array(self.control_experiment_data[metric], dtype=float)
        
        # Check normality of control
        control_shapiro = stats.shapiro(control_values)
        control_normality = {
            'test': 'Shapiro-Wilk',
            'statistic': control_shapiro[0],
            'p_value': control_shapiro[1],
            'normal': control_shapiro[1] >= 0.05
        }
        
        for treatment_id, treatment_data in self.treatments_experiment_data.items():
            if metric not in treatment_data:
                logger.warning(f"Metric '{metric}' not found in treatment '{treatment_id}'")
                continue
                
            treatment_values = np.array(treatment_data[metric], dtype=float)
            
            # Check normality of treatment
            treatment_shapiro = stats.shapiro(treatment_values)
            treatment_normality = {
                'test': 'Shapiro-Wilk',
                'statistic': treatment_shapiro[0],
                'p_value': treatment_shapiro[1],
                'normal': treatment_shapiro[1] >= 0.05
            }
            
            # Check homogeneity of variance
            levene_test = stats.levene(control_values, treatment_values)
            variance_homogeneity = {
                'test': 'Levene',
                'statistic': levene_test[0],
                'p_value': levene_test[1],
                'equal_variance': levene_test[1] >= 0.05
            }
            
            # Store results and convert to JSON serializable types
            results[treatment_id] = convert_to_serializable({
                'control_normality': control_normality,
                'treatment_normality': treatment_normality,
                'variance_homogeneity': variance_homogeneity,
                'recommended_test': self._recommend_test(control_normality['normal'], 
                                                       treatment_normality['normal'],
                                                       variance_homogeneity['equal_variance'])
            })
            
        return results
    
    def _recommend_test(self, control_normal: bool, treatment_normal: bool, equal_variance: bool) -> str:
        """Recommend a statistical test based on assumption checks."""
        if control_normal and treatment_normal:
            if equal_variance:
                return 't_test'
            else:
                return 'welch_t_test'
        else:
            return 'mann_whitney'

Methods

def check_assumptions(self, metric: str) ‑> Dict[str, Dict[str, Any]]

Check statistical assumptions for the given metric across all treatments.

Args

metric : str
The metric to check assumptions for.

Returns

dict
Dictionary with results of assumption checks for each treatment.
Expand source code
def check_assumptions(self, metric: str) -> Dict[str, Dict[str, Any]]:
    """
    Check statistical assumptions for the given metric across all treatments.
    
    Args:
        metric (str): The metric to check assumptions for.
        
    Returns:
        dict: Dictionary with results of assumption checks for each treatment.
    """
    if metric not in self.control_experiment_data:
        raise ValueError(f"Metric '{metric}' not found in control data")
        
    results = {}
    control_values = np.array(self.control_experiment_data[metric], dtype=float)
    
    # Check normality of control
    control_shapiro = stats.shapiro(control_values)
    control_normality = {
        'test': 'Shapiro-Wilk',
        'statistic': control_shapiro[0],
        'p_value': control_shapiro[1],
        'normal': control_shapiro[1] >= 0.05
    }
    
    for treatment_id, treatment_data in self.treatments_experiment_data.items():
        if metric not in treatment_data:
            logger.warning(f"Metric '{metric}' not found in treatment '{treatment_id}'")
            continue
            
        treatment_values = np.array(treatment_data[metric], dtype=float)
        
        # Check normality of treatment
        treatment_shapiro = stats.shapiro(treatment_values)
        treatment_normality = {
            'test': 'Shapiro-Wilk',
            'statistic': treatment_shapiro[0],
            'p_value': treatment_shapiro[1],
            'normal': treatment_shapiro[1] >= 0.05
        }
        
        # Check homogeneity of variance
        levene_test = stats.levene(control_values, treatment_values)
        variance_homogeneity = {
            'test': 'Levene',
            'statistic': levene_test[0],
            'p_value': levene_test[1],
            'equal_variance': levene_test[1] >= 0.05
        }
        
        # Store results and convert to JSON serializable types
        results[treatment_id] = convert_to_serializable({
            'control_normality': control_normality,
            'treatment_normality': treatment_normality,
            'variance_homogeneity': variance_homogeneity,
            'recommended_test': self._recommend_test(control_normality['normal'], 
                                                   treatment_normality['normal'],
                                                   variance_homogeneity['equal_variance'])
        })
        
    return results
def run_test(self, test_type: str = 'welch_t_test', alpha: float = 0.05, **kwargs) ‑> Dict[str, Dict[str, Any]]

Run the specified statistical test on the control and treatments data.

Args

test_type : str
Type of statistical test to run. Options: 't_test', 'welch_t_test', 'mann_whitney', 'anova', 'chi_square'
alpha : float
Significance level, defaults to 0.05
**kwargs
Additional arguments for specific test types.

Returns

dict
Dictionary containing the results of the statistical tests for each treatment (vs the one control). Each key is the treatment ID and each value is a dictionary with test results.
Expand source code
def run_test(self, 
            test_type: str="welch_t_test", 
            alpha: float = 0.05, 
            **kwargs) -> Dict[str, Dict[str, Any]]:
    """
    Run the specified statistical test on the control and treatments data.

    Args:
        test_type (str): Type of statistical test to run. 
            Options: 't_test', 'welch_t_test', 'mann_whitney', 'anova', 'chi_square'
        alpha (float): Significance level, defaults to 0.05
        **kwargs: Additional arguments for specific test types.

    Returns:
        dict: Dictionary containing the results of the statistical tests for each treatment (vs the one control).
            Each key is the treatment ID and each value is a dictionary with test results.
    """
    supported_tests = {
        't_test': self._run_t_test,
        'welch_t_test': self._run_welch_t_test,
        'mann_whitney': self._run_mann_whitney,
        'anova': self._run_anova,
        'chi_square': self._run_chi_square
    }
    
    if test_type not in supported_tests:
        raise ValueError(f"Unsupported test type: {test_type}. Supported types: {list(supported_tests.keys())}")
        
    results = {}
    for control_id, control_data in self.control_experiment_data.items():
        # get all metrics from control data
        metrics = set()
        metrics.update(control_data.keys())
        for treatment_id, treatment_data in self.treatments_experiment_data.items():
            results[treatment_id] = {}
            
            for metric in metrics:
                # Skip metrics not in treatment data
                if metric not in treatment_data:
                    logger.warning(f"Metric '{metric}' not found in treatment '{treatment_id}'")
                    continue
                
                control_values = control_data[metric]
                treatment_values = treatment_data[metric]
                
                # Skip if either control or treatment has no values
                if len(control_values) == 0 or len(treatment_values) == 0:
                    logger.warning(f"Skipping metric '{metric}' for treatment '{treatment_id}' due to empty values")
                    continue
                
                # Run the selected test and convert to JSON serializable types
                test_result = supported_tests[test_type](control_values, treatment_values, alpha, **kwargs)
                results[treatment_id][metric] = convert_to_serializable(test_result)
    
    return results