# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
"""
Perform person-to-person network analysis and visualization.
"""
__all__ = ['network_p2p']
import vivainsights as vi
import pandas as pd
import igraph as ig
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from matplotlib.legend_handler import HandlerTuple
import matplotlib.lines as mlines
from matplotlib.backends.backend_pdf import PdfPages
import random
from sklearn.preprocessing import minmax_scale
import warnings
import time
[docs]
def network_p2p(data,
hrvar = "Organization",
return_type = "plot",
centrality = None,
community = None,
weight = None,
comm_args = None,
layout = "mds",
path = "",
style = "igraph",
bg_fill = "#FFFFFF",
font_col = "grey20",
legend_pos = "best",
palette = "rainbow",
node_alpha = 0.7,
edge_alpha = 1,
edge_col = "#777777",
node_sizes = [1, 20],
node_scale = 1,
seed = 1,
legend_ncols=0,
figsize: tuple = None
):
"""
Return a network plot given a data frame containing a person-to-person query.
Parameters
----------
data : pandas.DataFrame
Data frame containing a person-to-person query.
hrvar : str
Label for the HR attribute. Defaults to ``"Organization"``.
return_type : str
Type of output to return. Valid values:
- ``"plot"`` (default): matplotlib Figure.
- ``"plot-pdf"``: save network plot as PDF.
- ``"sankey"``: sankey plot of communities × HR attribute.
- ``"table"``: vertex summary table.
- ``"data"``: vertex-level DataFrame.
- ``"network"``: igraph object.
centrality : str, optional
Centrality measure used to scale node sizes. Valid values:
``"betweenness"``, ``"closeness"``, ``"degree"``, ``"eigenvector"``,
``"pagerank"``. When ``None`` (default), nodes are uniform size.
community : str, optional
Community detection algorithm. Valid values:
``"multilevel"``, ``"leiden"``, ``"edge_betweenness"``,
``"fastgreedy"``, ``"infomap"``, ``"label_propagation"``,
``"leading_eigenvector"``, ``"optimal_modularity"``,
``"spinglass"``, ``"walk_trap"``. Defaults to ``None``.
weight : str, optional
Column to use as edge weights. ``None`` creates an unweighted graph.
comm_args : dict, optional
Keyword arguments passed to igraph's clustering algorithm.
layout : str
Node placement algorithm. Defaults to ``"mds"``.
path : str
File path for PDF output. Defaults to an auto-generated name.
bg_fill : str
Background fill colour. Defaults to ``"#FFFFFF"``.
font_col : str
Font colour. Defaults to ``"grey20"``.
legend_pos : str
Legend position (e.g., ``"best"``, ``"upper left"``).
palette : str
Colour palette name. Defaults to ``"rainbow"``.
node_alpha : float
Node transparency (0–1). Defaults to 0.7.
edge_alpha : float
Edge transparency (0–1). Defaults to 1.
edge_col : str
Edge colour. Defaults to ``"#777777"``.
node_sizes : list of int
Two-element list ``[min, max]`` for rescaling node sizes when
``centrality`` is set. Defaults to ``[1, 20]``.
node_scale : float
Multiplier applied to node sizes. Defaults to 1.
seed : int
Random seed for community detection reproducibility.
legend_ncols : int
``0`` for horizontal legend, ``1`` for vertical.
figsize : tuple, optional
Figure size as ``(width, height)`` in inches. Defaults to ``(8, 6)``.
Returns
-------
matplotlib.figure.Figure, pandas.DataFrame, or igraph.Graph
Output depends on ``return_type``:
- ``"plot"``: matplotlib Figure.
- ``"plot-pdf"``: saves PDF and returns ``None``.
- ``"sankey"``: sankey plot Figure.
- ``"table"``: vertex summary DataFrame.
- ``"data"``: vertex-level DataFrame.
- ``"network"``: igraph object.
Examples
--------
>>> import vivainsights as vi
>>> sample_data = vi.p2p_data_sim()
>>> vi.network_p2p(data=sample_data, return_type="plot")
>>>
>>> # Community detection with custom resolution
>>> vi.network_p2p(
... data=sample_data,
... community="leiden",
... comm_args={"resolution": 0.01},
... return_type="table",
... )
>>>
>>> # Centrality-based node sizing
>>> vi.network_p2p(data=sample_data, centrality="betweenness", return_type="table")
"""
# Only set default path if user didn't provide one
if path == "":
path = "p2p" + ("" if community is None else '_' + community)
# `style` is currently a placeholder as only igraph is supported
# legacy argument from the R implementation
style = "igraph"
if len(node_sizes) != 2:
raise ValueError("`node_sizes` must be of length 2")
#Set data frame for edges
if weight is None:
edges = data.assign(NoWeight = 1).loc[:, ["PrimaryCollaborator_PersonId", "SecondaryCollaborator_PersonId", "NoWeight"]].rename(columns = {"NoWeight": "weight"})
else:
edges = data.loc[:, ["PrimaryCollaborator_PersonId", "SecondaryCollaborator_PersonId", weight]]
pc_hrvar = "PrimaryCollaborator_" + hrvar
sc_hrvar = "SecondaryCollaborator_" + hrvar
# TieOrigin = PrimaryCollaborator
tieOrigin = (
edges[["PrimaryCollaborator_PersonId"]].drop_duplicates()
.merge(data[["PrimaryCollaborator_PersonId", pc_hrvar]], on = "PrimaryCollaborator_PersonId", how = "left") #left join
.rename(columns = {"PrimaryCollaborator_PersonId": "node"})
.assign(**{hrvar: lambda row: row[pc_hrvar]}) #assign new column
.drop(columns = [pc_hrvar])
)
# TieDest = SecondaryCollaborator
tieDest = (
edges[["SecondaryCollaborator_PersonId"]].drop_duplicates()
.merge(data[["SecondaryCollaborator_PersonId", sc_hrvar]], on = "SecondaryCollaborator_PersonId", how = "left")
.rename(columns = {"SecondaryCollaborator_PersonId": "node"})
.assign(**{hrvar: lambda row: row[sc_hrvar]})
.drop(columns = [sc_hrvar])
)
# Vertices data frame to provide meta-data
vert_ft = pd.concat([tieOrigin, tieDest]).drop_duplicates()
# Create igraph object
g_raw = ig.Graph.TupleList(edges.itertuples(index=False), directed=True, weights=True)
# Assign vertex attributes - HR attribute and node
g_raw.vs[hrvar] = vert_ft[hrvar].tolist()
g_raw.vs["node"] = vert_ft["node"].tolist()
# Assign weights
g_raw.es["weight"] = edges["weight"]
# allowed community values
valid_comm = ["leiden", "multilevel", "edge_betweenness", "fastgreedy", "infomap", "label_propagation", "leading_eigenvector", "optimal_modularity", "spinglass", "walk_trap"]
# Finalise `g` object
# If community detection is selected, this is where the communities are appended
if community is None:
# g = g_raw.simplify()
g = g_raw # Note: NOT simplified as simplification may remove too many edges
v_attr = hrvar
elif community in valid_comm:
random.seed(seed)
g_ud = g_raw.as_undirected() # Convert to undirected graph
# combine arguments to clustering algorithms
comm_func = getattr(ig.Graph, "community_" + community)
if comm_args is None:
comm_args = {}
# call community detection function
comm_out = comm_func(graph = g_ud, **comm_args)
# g = g_ud.simplify()
g = g_ud # Note: NOT simplified as simplification may remove too many edges
g.vs["cluster"] = [str(member) for member in comm_out.membership]
#Name of vertex attribute
v_attr = "cluster"
else:
raise ValueError("Please enter a valid input for `community`.")
# centrality calculations ------------------------
# valid values of `centrality`
valid_cent = ['betweenness', 'closeness', 'degree', 'eigenvector', 'pagerank']
# attach centrality calculations if `centrality` is not None
if centrality in valid_cent:
g = vi.network_summary(g, return_type="network")
node_sizes = (node_sizes[1] - node_sizes[0])
node_sizes *= minmax_scale(g.vs[centrality]) + node_sizes #min and max values
g.vs["node_size"] = node_sizes/100 #scale for plotting
elif centrality is None:
# all nodes with the same size if centrality is not calculated
# adjust for plotting formats
if style == "igraph":
g.vs["node_size"] = [0.08] * g.vcount()
elif style == "ggraph":
g.vs["node_size"] = [0.08] * g.vcount()
node_sizes = [0.03,0.03] #fix node size
else:
raise ValueError("Please enter a valid input for `centrality`.")
# Common area ------------------- ----------------
# vertex table
vert_ft = vert_ft.rename(columns = {"node": "name"})
if centrality is not None:
if community is None:
vert_tb = pd.DataFrame({
"name": g.vs["name"],
"betweenness": g.vs["betweenness"],
"closeness": g.vs["closeness"],
"degree": g.vs["degree"],
"eigenvector": g.vs["eigenvector"],
"pagerank": g.vs["pagerank"],
})
else :
vert_tb = pd.DataFrame({
"name": g.vs["name"],
"cluster": g.vs[v_attr],
"betweenness": g.vs["betweenness"],
"closeness": g.vs["closeness"],
"degree": g.vs["degree"],
"eigenvector": g.vs["eigenvector"],
"pagerank": g.vs["pagerank"],
})
else:
if community is None:
vert_tb = pd.DataFrame({
"name": g.vs["name"],
})
else:
vert_tb = pd.DataFrame({
"name": g.vs["name"],
"cluster": g.vs[v_attr]
})
vert_tb = vert_tb.merge(vert_ft, on = "name", how = "left").drop_duplicates() #merge hrvar to vertex table
g_layout = g.layout(layout)
out_path = path + '_' + time.strftime("%y%m%d_%H%M%S") + '.pdf'
# Return outputs ---------------------------------------
#use fast plotting method
if return_type in ["plot", "plot-pdf"]:
def rainbow(n):
return [f"#{random.randint(0, 0xFFFFFF):06x}" for _ in range(n)]
# Set colours
vert_tb = vert_tb.drop_duplicates()
colour_tb = (
pd.DataFrame({v_attr: g.vs[v_attr]})
.assign(colour = eval(f"{palette}(len(vert_tb))"))
)
# Colour vector
colour_v = (
pd.DataFrame({v_attr: g.vs[v_attr]})
.merge(colour_tb, on = v_attr, how = "left")
.loc[:, "colour"]
)
if style == "igraph":
# Set graph plot colours
# color_names = list(mcolors.CSS4_COLORS.keys())
# g.vs["color"] = [mcolors.to_rgba(color_names[i % len(color_names)], alpha=node_alpha) for i in range(len(g.vs))]
g.vs["frame_color"] = None
g.es["width"] = 1
#Internal basic plotting function used inside 'network_p2p()'
def plot_basic_graph(lpos = legend_pos, pdf=False, node_scale=node_scale):
fig, ax = plt.subplots(figsize=figsize if figsize else (8, 6))
plt.rcParams["figure.facecolor"] = bg_fill
layout_func = getattr(ig.Graph, f"layout_{layout}")
# Get the unique values of the vertex attribute
unique_values = list(set(g.vs[v_attr]))
# Create a colormap with one color for each unique value
cmap = mcolors.ListedColormap([plt.get_cmap('tab20')(i) for i in range(len(unique_values))])
handles = []
labels = []
# Map each unique value to an index in the colormap
value_to_index = {value: i for i, value in enumerate(unique_values)}
# Legend
for i, value in enumerate(unique_values):
marker = mlines.Line2D([0], [0], marker='o', color='w', label=value, markerfacecolor=cmap(i), markersize=5)
handles.append(marker)
labels.append(value)
# Set node colours
for i, value in enumerate(g.vs[v_attr]):
index = value_to_index[g.vs[i][v_attr]]
color = cmap(index)
g.vs[i]["color"] = color
g.vs["node_size"] = [x*node_scale for x in g.vs["node_size"]] # scale the size of the nodes
ig.plot(
g,
layout = layout_func(g),
target=ax,
vertex_label = None,
vertex_size = g.vs["node_size"],
edge_arrow_mode = "0",
edge_arrow_size=0,
edge_color = "#adadad",
)
# Number of legend columns
if legend_ncols==0:
if len(handles)<=10:
leg_cols=len(handles)
elif 10<len(handles)<=20:
leg_cols = (len(handles) // 2)
else:
leg_cols = (len(handles) // 4)
warnings.warn("There are over 20 unique node categories. Consider changing your grouping variable, merging existing groups, or tweaking algorithm parameters (if applicable).", UserWarning)
else:
leg_cols=1
plt.legend(
handles = handles,
labels = labels,
handler_map={tuple: HandlerTuple(ndivide=20)},
loc = legend_pos,
edgecolor= edge_col,
frameon = True,
markerscale = 1,
fontsize= 5,
labelcolor = 'grey',
ncols = leg_cols
)
return fig
# Default PDF output unless None supplied to path
if return_type == "plot":
return plot_basic_graph(lpos = legend_pos)
elif return_type == "plot-pdf":
with PdfPages(out_path) as pdf:
pdf.savefig(plot_basic_graph(pdf=True))
print(f"Saved to {out_path}.")
else:
raise ValueError("Invalid input for `style`.")
elif return_type == "data":
vert_tb = vert_tb.reset_index(drop = True)
return vert_tb
elif return_type == "network":
return g
elif return_type == "sankey":
if community is None:
raise ValueError("Note: no sankey return option is available if `None` is selected at `community`. Please specify a valid community detection algorithm.")
elif community in valid_comm:
vi.create_sankey(data = vert_tb.groupby([hrvar, 'cluster']).size().reset_index(name='n'), var1=hrvar, var2='cluster')
elif return_type == "table":
if community is None:
if centrality is None:
vert_tb = vert_tb.groupby(hrvar).size().reset_index(name='n')
else:
vert_tb = vert_tb.groupby(hrvar).agg(
n=('betweenness', 'size'),
betweenness=('betweenness', 'mean'),
closeness=('closeness', 'mean'),
degree=('degree', 'mean'),
eigenvector=('eigenvector', 'mean'),
pagerank=('pagerank', 'mean')
)
elif community in valid_comm:
if centrality is None:
vert_tb = vert_tb.groupby([hrvar, 'cluster']).size().reset_index(name='n')
else:
vert_tb = vert_tb.groupby([hrvar, 'cluster']).agg(
n=('betweenness', 'size'),
betweenness=('betweenness', 'mean'),
closeness=('closeness', 'mean'),
degree=('degree', 'mean'),
eigenvector=('eigenvector', 'mean'),
pagerank=('pagerank', 'mean')
)
return vert_tb
else:
raise ValueError("invalid input for `return_type`.")