Source code for vivainsights.create_lorenz

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
"""
This module calculates the Gini coefficient and plots the Lorenz curve for a given metric.
"""

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from typing import Optional, Tuple

from matplotlib.lines import Line2D

# Try vivainsights color, fall back to the hex
try:
    from vivainsights.color_codes import Colors
    _HIGHLIGHT = Colors.HIGHLIGHT_NEGATIVE.value
except Exception:
    _HIGHLIGHT = "#fe7f4f"

# Header layout constants
_TITLE_Y   = 0.955
_SUB_Y     = 0.915
_RULE_Y    = 0.900
_TOP_LIMIT = 0.84   # push axes down to leave room for header

def _retitle_left(fig, title_text, subtitle_text=None, left=0.01):
    """Left-aligned figure-level title/subtitle; hide any axes/supertitle."""
    # Remove any axes titles and a prior suptitle
    for ax in fig.get_axes():
        try: ax.set_title("")
        except Exception: pass
    if getattr(fig, "_suptitle", None) is not None:
        fig._suptitle.set_visible(False)

    fig.text(left, _TITLE_Y, title_text, ha="left", fontsize=13, weight="bold", alpha=.8)
    if subtitle_text:
        fig.text(left, _SUB_Y, subtitle_text, ha="left", fontsize=11, alpha=.8)

def _add_header_decoration(fig, color=_HIGHLIGHT, y=_RULE_Y):
    """Orange rule + small box under the subtitle, drawn on a top overlay."""
    overlay = fig.add_axes([0, 0, 1, 1], frameon=False, zorder=10)
    overlay.set_axis_off()
    overlay.add_line(Line2D([0.01, 1.0], [y, y], transform=overlay.transAxes,
                            color=color, linewidth=1.2))
    overlay.add_patch(plt.Rectangle((0.01, y), 0.03, -0.015,
                                    transform=overlay.transAxes,
                                    facecolor=color, linewidth=0))

def _reserve_header_space(fig, top=_TOP_LIMIT):
    """Move the plot area down so it doesn't overlap the header."""
    try:
        if hasattr(fig, "get_constrained_layout") and fig.get_constrained_layout():
            fig.set_constrained_layout(False)
    except Exception:
        pass
    fig.subplots_adjust(top=top)


[docs] def get_value_proportion(df, population_share): """ Calculate the proportion of total values (e.g., income, email sent) that corresponds to a given cumulative share of the population. Parameters: df (pd.DataFrame): DataFrame containing cumulative population and value proportions. population_share (float): The cumulative share of the population (between 0 and 1). Returns: float: The proportion of total values corresponding to the given population share. Raises: ValueError: If population_share is not between 0 and 1. """ if population_share < 0 or population_share > 1: raise ValueError("Population share must be between 0 and 1") # Find the row in the DataFrame where the cumulative population meets or exceeds the input share closest_row = df[df['cum_population'] >= population_share].iloc[0] return closest_row['cum_values_prop']
[docs] def compute_gini(x): """ Description ----------- Compute the Gini coefficient, a measure of statistical dispersion to represent inequality. Parameters ---------- x (list, np.ndarray, pd.Series): A numeric vector representing values (e.g., income, emails sent). Returns ------- float: The Gini coefficient for the given vector of values. Raises ------ ValueError: If input is not a numeric vector. """ if not isinstance(x, (list, np.ndarray, pd.Series)): raise ValueError("Input must be a numeric vector") # Sort the values in ascending order x = np.sort(x) n = len(x) # Calculate the Gini coefficient using the formula gini = (2 * np.sum((np.arange(1, n + 1)) * x) - (n + 1) * np.sum(x)) / (n * np.sum(x)) return gini
[docs] def create_lorenz(data, metric, return_type="plot",figsize: Optional[Tuple[float, float]] = None ): """ Name ---- create_lorenz Description ----------- Calculate and return the Lorenz curve and Gini coefficient for a given metric. Parameters ---------- data (pd.DataFrame): DataFrame containing the data to analyze. metric (str): The column name in the DataFrame representing the values to analyze. return_type (str): The type of output to return: - "gini": returns the Gini coefficient. - "table": returns a DataFrame of cumulative population and value shares. - "plot" (default): displays a Lorenz curve plot with the Gini coefficient. Returns ------- float/pd.DataFrame/None: Depending on return_type: - "gini": returns the Gini coefficient. - "table": returns a DataFrame of population and value shares. - "plot": displays the Lorenz curve plot Raises ------ ValueError: If the metric is not found in the DataFrame, or if an invalid return_type is specified. Examples -------- Using `pq_data` from `vi.load_pq_data()`, which returns a DataFrame with an "Emails_sent" column. >>> import vivainsights as vi >>> # Compute the Gini coefficient: >>> vi.create_lorenz(data=vi.load_pq_data(), metric="Emails_sent", return_type="gini") >>> # Compute the underlying table for the Lorenz curve: >>> vi.create_lorenz(data=vi.load_pq_data(), metric="Emails_sent", return_type="table") >>> # Plot the Lorenz curve >>> vi.create_lorenz(data=vi.load_pq_data(), metric="Emails_sent", return_type="plot") """ if metric not in data.columns: raise ValueError(f"Metric '{metric}' not found in data.") n = data[metric] # Create a DataFrame sorted by the metric to analyze lorenz_df = pd.DataFrame({ 'n': n }).sort_values(by='n') # Calculate cumulative sums for values and population lorenz_df['cum_values'] = lorenz_df['n'].cumsum() lorenz_df['cum_population'] = np.cumsum(np.ones(len(lorenz_df))) / len(lorenz_df) lorenz_df['cum_values_prop'] = lorenz_df['cum_values'] / lorenz_df['n'].sum() if return_type == "plot": gini_coef = compute_gini(n) fig, ax = plt.subplots(figsize=figsize if figsize else (8, 6)) # Lorenz curve + equality line ax.plot(lorenz_df['cum_population'], lorenz_df['cum_values_prop'], color='#C75B7A') ax.plot([0, 1], [0, 1], linestyle='dashed', color='darkgrey') # Axes labels and limits ax.set_xlabel("Cumulative Share of Population") ax.set_ylabel("Cumulative Share of Values") ax.set_xlim([0, 1]); ax.set_ylim([0, 1]) ax.grid(True) # Gini annotation inside axes ax.annotate(f"Gini coefficient: {round(gini_coef, 2)}", xy=(0.5, 0.1), xycoords='axes fraction') # --- Header (figure-level, left-aligned) + orange rule/box --- title = f"Lorenz curve for {metric}" subtitle = f"% of population sharing % of {metric}" _retitle_left(fig, title, subtitle, left=0.01) _add_header_decoration(fig) # draws at _RULE_Y below subtitle _reserve_header_space(fig) # shifts Axes down; avoids overlap return fig elif return_type == "gini": # Return the Gini coefficient return compute_gini(n) elif return_type == "table": # Create and return a table of cumulative population and value shares population_shares = np.arange(0, 1.1, 0.1) return pd.DataFrame({ 'population_share': population_shares, 'value_share': [get_value_proportion(lorenz_df, x) for x in population_shares] }) else: raise ValueError("Invalid return type. Choose 'gini', 'table', or 'plot'.")