# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from vivainsights.identify_habit import identify_habit
[docs]
def identify_usage_segments(data, metric=None, metric_str=None, version="12w", return_type="data",
threshold=None, width=None, max_window=None, power_thres=None):
"""
Identify usage segments based on a metric.
Parameters
----------
data : pandas.DataFrame
A dataset containing the metric to be classified. Must include 'PersonId' and 'MetricDate' columns.
metric : str, optional
Name of the metric column to classify.
metric_str : list of str, optional
List of metric columns to aggregate for classification.
version : str, optional
Version of classification: "12w" (12-week rolling average), "4w" (4-week rolling average), or None for custom parameters. Default is "12w".
return_type : str, optional
What to return: "data" (default), "plot", or "table".
threshold : int, optional
Threshold for habit identification. Required when version=None.
width : int, optional
Width parameter for habit identification. Required when version=None.
max_window : int, optional
Maximum window for habit identification. Required when version=None.
power_thres : float, optional
Power user threshold for usage segment classification. Required when version=None.
Returns
-------
pandas.DataFrame or matplotlib.figure.Figure
Depending on `return_type`, returns a DataFrame with usage segments, a plot visualizing the segments over time, or a summary table.
Examples
--------
>>> import vivainsights as vi
>>> pq_data = vi.load_pq_data()
# Example usage with a single metric column
>>> vi.identify_usage_segments(data=pq_data, metric="Emails_sent", version="12w", return_type="data")
# Example usage with multiple metric columns
>>> result = vi.identify_usage_segments(
>>> data=pq_data,
>>> metric_str=[
>>> "Copilot_actions_taken_in_Teams",
>>> "Copilot_actions_taken_in_Outlook",
>>> "Copilot_actions_taken_in_Excel",
>>> "Copilot_actions_taken_in_Word",
>>> "Copilot_actions_taken_in_Powerpoint"
>>> ],
>>> version="4w",
>>> return_type="plot"
>>> )
>>> result.show()
# Example usage with custom parameters
>>> result = vi.identify_usage_segments(
>>> data=pq_data,
>>> metric="Emails_sent",
>>> version=None,
>>> threshold=2,
>>> width=5,
>>> max_window=8,
>>> power_thres=20,
>>> return_type="table"
>>> )
"""
if metric is None and metric_str is None:
raise ValueError("Please provide either a metric or a metric_str.")
if metric is not None and metric_str is not None:
raise ValueError("Please provide either a metric or a metric_str, not both.")
# Validate version and custom parameters
if version is None:
if any(param is None for param in [threshold, width, max_window, power_thres]):
raise ValueError("When version=None, all of threshold, width, max_window, and power_thres must be provided.")
elif version not in ["12w", "4w"]:
raise ValueError("version must be '12w', '4w', or None.")
# Validate return_type
if return_type not in ["data", "plot", "table"]:
raise ValueError("return_type must be 'data', 'plot', or 'table'.")
# Prepare the target metric
if metric is not None:
data["target_metric"] = data[metric]
else:
data["target_metric"] = data[metric_str].sum(axis=1, skipna=True)
# Create rolling averages based on version or custom parameters
data = data.sort_values(by=["PersonId", "MetricDate"])
if version is None:
# Custom parameters provided
# Create rolling average based on max_window
data[f"target_metric_l{max_window}w"] = data.groupby("PersonId")["target_metric"].transform(
lambda x: x.rolling(window=max_window, min_periods=1).mean()
)
# Print diagnostic message
print(f"Usage segments generated with custom parameters:")
print(f" - threshold: {threshold}")
print(f" - width: {width}")
print(f" - max_window: {max_window}")
print(f" - power_thres: {power_thres}")
# Identify habit with custom parameters
habit_custom = identify_habit(data, metric="target_metric", threshold=threshold, width=width, max_window=max_window, return_type="data")
habit_custom = habit_custom.rename(columns={"IsHabit": "IsHabitCustom"})[["PersonId", "MetricDate", "IsHabitCustom"]]
# Merge habit back into the main dataset
data = data.merge(habit_custom, on=["PersonId", "MetricDate"], how="left")
# Define custom usage segments
data["UsageSegments"] = np.select(
[
(data["IsHabitCustom"] == True) & (data[f"target_metric_l{max_window}w"] >= power_thres),
(data["IsHabitCustom"] == True),
(data[f"target_metric_l{max_window}w"] >= 1),
(data[f"target_metric_l{max_window}w"] > 0),
(data[f"target_metric_l{max_window}w"] == 0),
],
["Power User", "Habitual User", "Novice User", "Low User", "Non-user"],
default=None,
)
else:
# Use existing 12w and 4w logic
data["target_metric_l12w"] = data.groupby("PersonId")["target_metric"].transform(
lambda x: x.rolling(window=12, min_periods=1).mean()
)
data["target_metric_l4w"] = data.groupby("PersonId")["target_metric"].transform(
lambda x: x.rolling(window=4, min_periods=1).mean()
)
# Identify habits
habit_12w = identify_habit(data, metric="target_metric", threshold=1, width=9, max_window=12, return_type="data")
habit_4w = identify_habit(data, metric="target_metric", threshold=1, width=4, max_window=4, return_type="data")
habit_12w = habit_12w.rename(columns={"IsHabit": "IsHabit12w"})[["PersonId", "MetricDate", "IsHabit12w"]]
habit_4w = habit_4w.rename(columns={"IsHabit": "IsHabit4w"})[["PersonId", "MetricDate", "IsHabit4w"]]
# Merge habits back into the main dataset
data = data.merge(habit_12w, on=["PersonId", "MetricDate"], how="left")
data = data.merge(habit_4w, on=["PersonId", "MetricDate"], how="left")
# Define usage segments
data["UsageSegments_12w"] = np.select(
[
(data["IsHabit12w"] == True) & (data["target_metric_l12w"] >= 15),
(data["IsHabit12w"] == True),
(data["target_metric_l12w"] >= 1),
(data["target_metric_l12w"] > 0),
(data["target_metric_l12w"] == 0),
],
["Power User", "Habitual User", "Novice User", "Low User", "Non-user"],
default=None,
)
data["UsageSegments_4w"] = np.select(
[
(data["IsHabit4w"] == True) & (data["target_metric_l4w"] >= 15),
(data["IsHabit4w"] == True),
(data["target_metric_l4w"] >= 1),
(data["target_metric_l4w"] > 0),
(data["target_metric_l4w"] == 0),
],
["Power User", "Habitual User", "Novice User", "Low User", "Non-user"],
default=None,
)
if return_type == "data":
return data
elif return_type == "plot":
if version == "12w":
return plot_ts_us(data, cus="UsageSegments_12w", caption="Usage Segments - 12 weeks")
elif version == "4w":
return plot_ts_us(data, cus="UsageSegments_4w", caption="Usage Segments - 4 weeks")
elif version is None:
custom_caption = f"Usage Segments - Custom (threshold={threshold}, width={width}, max_window={max_window}w, power_thres={power_thres})"
return plot_ts_us(data, cus="UsageSegments", caption=custom_caption)
else:
raise ValueError("Please provide either `12w`, `4w`, or None to `version`.")
elif return_type == "table":
# Create summary table with MetricDate as rows and segments as columns
if version == "12w":
segment_col = "UsageSegments_12w"
print("Usage segments generated with 12-week parameters: threshold=1, width=9, max_window=12, power_thres=15")
elif version == "4w":
segment_col = "UsageSegments_4w"
print("Usage segments generated with 4-week parameters: threshold=1, width=4, max_window=4, power_thres=15")
elif version is None:
segment_col = "UsageSegments"
# Diagnostic message already printed above
else:
raise ValueError("Please provide either `12w`, `4w`, or None to `version`.")
# Create pivot table
summary_table = (
data.groupby(["MetricDate", segment_col])
.size()
.reset_index(name="count")
)
summary_table = summary_table.pivot(index="MetricDate", columns=segment_col, values="count").fillna(0)
# Ensure all usage segment categories are present
expected_segments = ["Non-user", "Low User", "Novice User", "Habitual User", "Power User"]
for segment in expected_segments:
if segment not in summary_table.columns:
summary_table[segment] = 0
# Reorder columns to match expected order
summary_table = summary_table[expected_segments]
return summary_table
else:
raise ValueError("Invalid return_type. Choose 'data', 'plot', or 'table'.")
[docs]
def plot_ts_us(data, cus, caption):
"""
Plot usage segments over time.
Parameters
----------
data : pandas.DataFrame
A dataset containing the usage segments and 'MetricDate' column.
cus : str
Column name containing the usage segment classifications.
caption : str
Caption for the plot.
Returns
-------
matplotlib.figure.Figure
A stacked bar plot of usage segments over time.
"""
# Group data and calculate proportions
data = data.groupby(["MetricDate", cus]).size().reset_index(name="count")
data["proportion"] = data.groupby("MetricDate")["count"].transform(lambda x: x / x.sum())
# Pivot data for stacked bar plot
pivot_data = data.pivot(index="MetricDate", columns=cus, values="proportion").fillna(0)
# Define the order of categories and corresponding colors (reversed order for stacking)
category_order = ["Non-user", "Low User", "Novice User", "Habitual User", "Power User"]
colors = ["grey", "#808080", "#80baea", "#1c66b0", "#0c336e"]
# Ensure all categories are present in the data
for category in category_order:
if category not in pivot_data.columns:
pivot_data[category] = 0
# Reorder columns to match the desired category order
pivot_data = pivot_data[category_order]
# Plot the stacked bar chart
fig, ax = plt.subplots(figsize=(10, 6))
pivot_data.plot(kind="bar", stacked=True, color=colors, ax=ax)
# Customize the plot
ax.set_title("Usage Segments", fontsize=14, fontweight="bold")
ax.set_xlabel("Date", fontsize=12)
ax.set_ylabel("Proportion of Users", fontsize=12)
ax.legend(title="Usage Segment", fontsize=10)
ax.set_xticks(range(len(pivot_data.index)))
ax.set_xticklabels(pivot_data.index.strftime("%Y-%m-%d"), rotation=45, ha="right")
ax.text(
0, -0.15, caption, transform=ax.transAxes, fontsize=10, alpha=0.7, ha="left"
)
plt.tight_layout()
return fig