# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
"""
Segment employees into usage-based groups from collaboration metrics.
"""
__all__ = ['identify_usage_segments', 'plot_ts_us']
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from vivainsights.identify_habit import identify_habit
[docs]
def identify_usage_segments(data, metric=None, metric_str=None, version="12w", return_type="data",
threshold=None, width=None, max_window=None, power_thres=None):
"""Segment employees into usage-based groups.
Classifies employees as Power User, Habitual User, Novice User, Low
User, or Non-user based on rolling averages and habit detection.
Parameters
----------
data : pandas.DataFrame
Person query data. Must include ``PersonId`` and ``MetricDate``.
metric : str or None, default None
Single metric column to classify.
metric_str : list of str or None, default None
Multiple metric columns to aggregate before classification.
Provide exactly one of *metric* or *metric_str*.
version : str or None, default "12w"
``"12w"`` for 12-week rolling, ``"4w"`` for 4-week rolling, or
``None`` for custom parameters.
return_type : str, default "data"
``"data"`` for a classified DataFrame, ``"plot"`` for a stacked
bar chart, or ``"table"`` for a summary pivot table.
threshold : int or None, default None
Habit identification threshold (required when ``version=None``).
width : int or None, default None
Habit width parameter (required when ``version=None``).
max_window : int or None, default None
Habit window parameter (required when ``version=None``).
power_thres : float or None, default None
Power-user threshold (required when ``version=None``).
Returns
-------
pandas.DataFrame or matplotlib.figure.Figure
Classified data, a stacked bar chart, or a summary table
depending on *return_type*.
Examples
--------
Classify usage segments using the 12-week preset:
>>> import vivainsights as vi
>>> pq_data = vi.load_pq_data()
>>> vi.identify_usage_segments(pq_data, metric="Emails_sent", version="12w")
Return a stacked bar chart:
>>> vi.identify_usage_segments(pq_data, metric="Emails_sent", version="12w", return_type="plot")
Return a summary table:
>>> vi.identify_usage_segments(pq_data, metric="Emails_sent", version="12w", return_type="table")
Use a metric string instead of a column name:
>>> vi.identify_usage_segments(pq_data, metric_str="Emails_sent", version="4w")
"""
if metric is None and metric_str is None:
raise ValueError("Please provide either a metric or a metric_str.")
if metric is not None and metric_str is not None:
raise ValueError("Please provide either a metric or a metric_str, not both.")
# Validate version and custom parameters
if version is None:
if any(param is None for param in [threshold, width, max_window, power_thres]):
raise ValueError("When version=None, all of threshold, width, max_window, and power_thres must be provided.")
elif version not in ["12w", "4w"]:
raise ValueError("version must be '12w', '4w', or None.")
# Validate return_type
if return_type not in ["data", "plot", "table"]:
raise ValueError("return_type must be 'data', 'plot', or 'table'.")
# Prepare the target metric
if metric is not None:
data["target_metric"] = data[metric]
else:
data["target_metric"] = data[metric_str].sum(axis=1, skipna=True)
# Create rolling averages based on version or custom parameters
data = data.sort_values(by=["PersonId", "MetricDate"])
if version is None:
# Custom parameters provided
# Create rolling average based on max_window
data[f"target_metric_l{max_window}w"] = data.groupby("PersonId")["target_metric"].transform(
lambda x: x.rolling(window=max_window, min_periods=1).mean()
)
# Print diagnostic message
print(f"Usage segments generated with custom parameters:")
print(f" - threshold: {threshold}")
print(f" - width: {width}")
print(f" - max_window: {max_window}")
print(f" - power_thres: {power_thres}")
# Identify habit with custom parameters
habit_custom = identify_habit(data, metric="target_metric", threshold=threshold, width=width, max_window=max_window, return_type="data")
habit_custom = habit_custom.rename(columns={"IsHabit": "IsHabitCustom"})[["PersonId", "MetricDate", "IsHabitCustom"]]
# Merge habit back into the main dataset
data = data.merge(habit_custom, on=["PersonId", "MetricDate"], how="left")
# Define custom usage segments
data["UsageSegments"] = np.select(
[
(data["IsHabitCustom"] == True) & (data[f"target_metric_l{max_window}w"] >= power_thres),
(data["IsHabitCustom"] == True),
(data[f"target_metric_l{max_window}w"] >= 1),
(data[f"target_metric_l{max_window}w"] > 0),
(data[f"target_metric_l{max_window}w"] == 0),
],
["Power User", "Habitual User", "Novice User", "Low User", "Non-user"],
default=None,
)
else:
# Use existing 12w and 4w logic
data["target_metric_l12w"] = data.groupby("PersonId")["target_metric"].transform(
lambda x: x.rolling(window=12, min_periods=1).mean()
)
data["target_metric_l4w"] = data.groupby("PersonId")["target_metric"].transform(
lambda x: x.rolling(window=4, min_periods=1).mean()
)
# Identify habits
habit_12w = identify_habit(data, metric="target_metric", threshold=1, width=9, max_window=12, return_type="data")
habit_4w = identify_habit(data, metric="target_metric", threshold=1, width=4, max_window=4, return_type="data")
habit_12w = habit_12w.rename(columns={"IsHabit": "IsHabit12w"})[["PersonId", "MetricDate", "IsHabit12w"]]
habit_4w = habit_4w.rename(columns={"IsHabit": "IsHabit4w"})[["PersonId", "MetricDate", "IsHabit4w"]]
# Merge habits back into the main dataset
data = data.merge(habit_12w, on=["PersonId", "MetricDate"], how="left")
data = data.merge(habit_4w, on=["PersonId", "MetricDate"], how="left")
# Define usage segments
data["UsageSegments_12w"] = np.select(
[
(data["IsHabit12w"] == True) & (data["target_metric_l12w"] >= 15),
(data["IsHabit12w"] == True),
(data["target_metric_l12w"] >= 1),
(data["target_metric_l12w"] > 0),
(data["target_metric_l12w"] == 0),
],
["Power User", "Habitual User", "Novice User", "Low User", "Non-user"],
default=None,
)
data["UsageSegments_4w"] = np.select(
[
(data["IsHabit4w"] == True) & (data["target_metric_l4w"] >= 15),
(data["IsHabit4w"] == True),
(data["target_metric_l4w"] >= 1),
(data["target_metric_l4w"] > 0),
(data["target_metric_l4w"] == 0),
],
["Power User", "Habitual User", "Novice User", "Low User", "Non-user"],
default=None,
)
if return_type == "data":
return data
elif return_type == "plot":
if version == "12w":
return plot_ts_us(data, cus="UsageSegments_12w", caption="Usage Segments - 12 weeks")
elif version == "4w":
return plot_ts_us(data, cus="UsageSegments_4w", caption="Usage Segments - 4 weeks")
elif version is None:
custom_caption = f"Usage Segments - Custom (threshold={threshold}, width={width}, max_window={max_window}w, power_thres={power_thres})"
return plot_ts_us(data, cus="UsageSegments", caption=custom_caption)
else:
raise ValueError("Please provide either `12w`, `4w`, or None to `version`.")
elif return_type == "table":
# Create summary table with MetricDate as rows and segments as columns
if version == "12w":
segment_col = "UsageSegments_12w"
print("Usage segments generated with 12-week parameters: threshold=1, width=9, max_window=12, power_thres=15")
elif version == "4w":
segment_col = "UsageSegments_4w"
print("Usage segments generated with 4-week parameters: threshold=1, width=4, max_window=4, power_thres=15")
elif version is None:
segment_col = "UsageSegments"
# Diagnostic message already printed above
else:
raise ValueError("Please provide either `12w`, `4w`, or None to `version`.")
# Create pivot table
summary_table = (
data.groupby(["MetricDate", segment_col])
.size()
.reset_index(name="count")
)
summary_table = summary_table.pivot(index="MetricDate", columns=segment_col, values="count").fillna(0)
# Ensure all usage segment categories are present
expected_segments = ["Non-user", "Low User", "Novice User", "Habitual User", "Power User"]
for segment in expected_segments:
if segment not in summary_table.columns:
summary_table[segment] = 0
# Reorder columns to match expected order
summary_table = summary_table[expected_segments]
return summary_table
else:
raise ValueError("Invalid return_type. Choose 'data', 'plot', or 'table'.")
[docs]
def plot_ts_us(data, cus, caption,figsize=None):
"""Plot usage segments over time as a stacked bar chart.
Parameters
----------
data : pandas.DataFrame
Dataset with usage segments and a ``MetricDate`` column.
cus : str
Column name containing usage segment classifications.
caption : str
Caption text displayed below the chart.
figsize : tuple or None, default None
Figure size ``(width, height)`` in inches. Defaults to ``(8, 6)``.
Returns
-------
matplotlib.figure.Figure
A stacked bar plot of usage segments over time.
"""
# Group data and calculate proportions
data = data.groupby(["MetricDate", cus]).size().reset_index(name="count")
data["proportion"] = data.groupby("MetricDate")["count"].transform(lambda x: x / x.sum())
# Pivot data for stacked bar plot
pivot_data = data.pivot(index="MetricDate", columns=cus, values="proportion").fillna(0)
# Define the order of categories and corresponding colors (reversed order for stacking)
category_order = ["Non-user", "Low User", "Novice User", "Habitual User", "Power User"]
colors = ["grey", "#808080", "#80baea", "#1c66b0", "#0c336e"]
# Ensure all categories are present in the data
for category in category_order:
if category not in pivot_data.columns:
pivot_data[category] = 0
# Reorder columns to match the desired category order
pivot_data = pivot_data[category_order]
# Plot the stacked bar chart
fig, ax = plt.subplots(figsize=figsize if figsize else (8, 6))
pivot_data.plot(kind="bar", stacked=True, color=colors, ax=ax)
# Customize the plot
ax.set_title("Usage Segments", fontsize=14, fontweight="bold")
ax.set_xlabel("Date", fontsize=12)
ax.set_ylabel("Proportion of Users", fontsize=12)
ax.legend(title="Usage Segment", fontsize=10)
ax.set_xticks(range(len(pivot_data.index)))
ax.set_xticklabels(pivot_data.index.strftime("%Y-%m-%d"), rotation=45, ha="right")
ax.text(
0, -0.15, caption, transform=ax.transAxes, fontsize=10, alpha=0.7, ha="left"
)
plt.tight_layout()
return fig