import cProfile
import json
import math
import os
import site
from collections import OrderedDict, deque
from io import StringIO
from pstats import SortKey, Stats
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
[docs]
class ProfileNode:
"""
Graph structure to represent a profiling.
:param filename: filename
:param line: line number
:param func_name: function name
:param nc1: number of calls 1
:param nc2: number of calls 2
:param tin: time spent in the function
:param tout: time spent in the function and in the sub functions
"""
def __init__(
self,
filename: str,
line: int,
func_name: str,
nc1: int,
nc2: int,
tin: float,
tall: float,
):
if "method 'disable' of '_lsprof.Profiler'" in func_name:
raise RuntimeError(f"Function not allowed in the profiling: {func_name!r}.")
self.filename = filename
self.line = line
self.func_name = func_name
self.nc1 = nc1
self.nc2 = nc2
self.tin = tin
self.tall = tall
self.called_by = []
self.calls_to = []
self.calls_to_elements = []
[docs]
def add_called_by(self, pnode: "ProfileNode"):
"This function is called by these lines."
self.called_by.append(pnode)
[docs]
def add_calls_to(self, pnode: "ProfileNode", time_elements):
"This function calls this node."
self.calls_to.append(pnode)
self.calls_to_elements.append(time_elements)
@staticmethod
def _key(filename: str, line: int, fct: Callable) -> str:
key = "%s:%d:%s" % (filename, line, fct)
return key
@property
def key(self):
"Returns `file:line`."
return ProfileNode._key(self.filename, self.line, self.func_name)
[docs]
def get_root(self):
"Returns the root of the graph."
done = set()
def _get_root(node, stor=None):
if stor is not None:
stor.append(node)
if not node.called_by:
return node
if len(node.called_by) == 1:
return _get_root(node.called_by[0], stor=stor)
res = None
for ct in node.called_by:
k = id(node), id(ct)
if k in done:
continue
res = ct
break
if res is None:
# All paths have been explored and no entry point was found.
# Choosing the most consuming function.
return None
done.add((id(node), id(res)))
return _get_root(res, stor=stor)
root = _get_root(self)
if root is None:
candidates = []
_get_root(self, stor=candidates)
tall = [(n.tall, n) for n in candidates]
tall.sort()
root = tall[-1][-1]
return root
def __repr__(self) -> str:
"usual"
return "%s(%r, %r, %r, %r, %r, %r, %r) # %d-%d" % (
self.__class__.__name__,
self.filename,
self.line,
self.func_name,
self.nc1,
self.nc2,
self.tin,
self.tall,
len(self.called_by),
len(self.calls_to),
)
def __iter__(self):
"Returns all nodes in the graph."
done = set()
stack = deque()
stack.append(self)
while len(stack) > 0:
node = stack.popleft()
if node.key in done:
continue
yield node
done.add(node.key)
stack.extend(node.calls_to)
_modules_ = {
"~",
"subprocess.py",
"posixpath.py",
"os.py",
"<frozen importlib._bootstrap>",
"inspect.py",
"version.py",
"typing.py",
"warnings.py",
"errors.py",
"numbers.py",
"ast.py",
"threading.py",
"_collections_abc.py",
"datetime.py",
"abc.py",
"argparse.py",
"__future__.py",
"functools.py",
"six.py",
"sre_parse.py",
"contextlib.py",
" _globals.py",
"_ios.py",
"types.py",
}
[docs]
@staticmethod
def filter_node_(node, info=None) -> bool:
"""
Filters out node to be displayed by default.
:param node: node
:param info: if the node is called by a function,
this dictionary can be used to overwrite the attributes
held by the node
:return: boolean (True to keep, False to forget)
"""
if node.filename in ProfileNode._modules_:
if info is None:
if node.nc1 <= 10 and node.nc2 <= 10 and node.tall <= 1e-4:
return False
else:
if info["nc1"] <= 10 and info["nc2"] <= 10 and info["tall"] <= 1e-4:
return False
return True
[docs]
def as_dict(self, filter_node=None, sort_key=SortKey.LINE):
"""
Renders the results of a profiling interpreted with
function @fn profile2graph. It can then be loaded with
a dataframe.
:param filter_node: display only the nodes for which
this function returns True, if None, the default function
removes built-in function with small impact
:param sort_key: sort sub nodes by...
:return: rows
"""
def sort_key_line(dr):
if isinstance(dr, tuple):
return (dr[0].filename, dr[0].line)
return (dr.filename, dr.line)
def sort_key_tin(dr):
if isinstance(dr, tuple):
return -dr[1][2]
return -dr.tin
def sort_key_tall(dr):
if isinstance(dr, tuple):
return -dr[1][3]
return -dr.tall
if sort_key == SortKey.LINE:
sortk = sort_key_line
elif sort_key == SortKey.CUMULATIVE:
sortk = sort_key_tall
elif sort_key == SortKey.TIME:
sortk = sort_key_tin
else:
raise NotImplementedError(
f"Unable to sort subcalls with this key {sort_key!r}."
)
def depth_first(node, roots_keys, indent=0):
text = {
"fct": node.func_name,
"where": node.key,
"nc1": node.nc1,
"nc2": node.nc2,
"tin": node.tin,
"tall": node.tall,
"indent": indent,
"ncalls": len(node.calls_to),
"debug": "A",
}
yield text
for n, nel in sorted(zip(node.calls_to, node.calls_to_elements), key=sortk):
if n.key in roots_keys:
text = {
"fct": n.func_name,
"where": n.key,
"nc1": nel[0],
"nc2": nel[1],
"tin": nel[2],
"tall": nel[3],
"indent": indent + 1,
"ncalls": len(n.calls_to),
"more": "+",
"debug": "B",
}
if filter_node is not None and not filter_node(n, info=text):
continue
yield text
else:
if filter_node is not None and not filter_node(n):
continue
for t in depth_first(n, roots_keys, indent + 1):
yield t
if filter_node is None:
filter_node = ProfileNode.filter_node_
nodes = list(self)
roots = [node for node in nodes if len(node.called_by) != 1]
roots_key = {r.key: r for r in roots}
rows = []
for root in sorted(roots, key=sortk):
if filter_node is not None and not filter_node(root):
continue
rows.extend(depth_first(root, roots_key))
return rows
[docs]
def to_text(self, filter_node=None, sort_key=SortKey.LINE, fct_width=60) -> str:
"""
Prints the profiling to text.
:param filter_node: display only the nodes for which
this function returns True, if None, the default function
removes built-in function with small impact
:param sort_key: sort sub nodes by...
:return: rows
"""
def align_text(text, size):
if size <= 0:
return text
if len(text) <= size:
return text + " " * (size - len(text))
h = size // 2 - 1
return text[:h] + "..." + text[-h + 1 :]
dicts = self.as_dict(filter_node=filter_node, sort_key=sort_key)
max_nc = max(max(_["nc1"] for _ in dicts), max(_["nc2"] for _ in dicts))
dg = int(math.log(max_nc) / math.log(10) + 1.5)
line_format = (
"{indent}{fct} -- {nc1: %dd} {nc2: %dd} -- {tin:1.5f} {tall:1.5f}"
" -- {name} ({fct2})" % (dg, dg)
)
text = []
for row in dicts:
line = line_format.format(
indent=" " * (row["indent"] * 4),
fct=align_text(row["fct"], fct_width - row["indent"] * 4),
nc1=row["nc1"],
nc2=row["nc2"],
tin=row["tin"],
tall=row["tall"],
name=row["where"],
fct2=row["fct"],
)
if row.get("more", "") == "+":
line += " +++"
text.append(line)
return "\n".join(text)
[docs]
def to_json(
self, filter_node=None, sort_key=SortKey.LINE, as_str=True, **kwargs
) -> Union[str, Dict[str, Any]]:
"""
Renders the results of a profiling interpreted with
function @fn profile2graph as :epkg:`JSON`.
:param filter_node: display only the nodes for which
this function returns True, if None, the default function
removes built-in function with small impact
:param sort_key: sort sub nodes by...
:param as_str: converts the json into a string
:param kwargs: see :func:`json.dumps`
:return: rows
"""
def sort_key_line(dr):
if isinstance(dr, tuple):
return (dr[0].filename, dr[0].line)
return (dr.filename, dr.line)
def sort_key_tin(dr):
if isinstance(dr, tuple):
return -dr[1][2]
return -dr.tin
def sort_key_tall(dr):
if isinstance(dr, tuple):
return -dr[1][3]
return -dr.tall
if sort_key == SortKey.LINE:
sortk = sort_key_line
elif sort_key == SortKey.CUMULATIVE:
sortk = sort_key_tall
elif sort_key == SortKey.TIME:
sortk = sort_key_tin
else:
raise NotImplementedError(
f"Unable to sort subcalls with this key {sort_key!r}."
)
def walk(node, roots_keys, indent=0):
item = {
"details": {
"fct": node.func_name,
"where": node.key,
"nc1": node.nc1,
"nc2": node.nc2,
"tin": node.tin,
"tall": node.tall,
"indent": indent,
"ncalls": len(node.calls_to),
}
}
child = OrderedDict()
for n, nel in sorted(zip(node.calls_to, node.calls_to_elements), key=sortk):
key = (nel[0], f"{nel[3]:1.5f}:{n.func_name}")
if n.key in roots_keys:
details = {
"fct": n.func_name,
"where": n.key,
"nc1": nel[0],
"nc2": nel[1],
"tin": nel[2],
"tall": nel[3],
"indent": indent,
"ncalls": len(node.calls_to),
}
if filter_node is not None and not filter_node(n, info=details):
continue
child[key] = {"details": details}
else:
if filter_node is not None and not filter_node(n):
continue
child[key] = walk(n, roots_key, indent + 1)
if child:
mx = max(_[0] for _ in child)
dg = int(math.log(mx) / math.log(10) + 1.5)
form = f"%-{dg}d-%s"
child = OrderedDict((form % k, v) for k, v in child.items())
item["calls"] = child
return item
if filter_node is None:
filter_node = ProfileNode.filter_node_
nodes = list(self)
roots = [node for node in nodes if len(node.called_by) != 1]
roots_key = {r.key: r for r in roots}
rows = OrderedDict()
for root in sorted(roots, key=sortk):
if filter_node is not None and not filter_node(root):
continue
key = (root.nc1, f"{root.tall:1.5f}:::{root.func_name}")
rows[key] = walk(root, roots_key)
mx = max(_[0] for _ in rows)
dg = int(math.log(mx) / math.log(10) + 1.5)
form = f"%-{dg}d-%s"
rows = OrderedDict((form % k, v) for k, v in rows.items())
if as_str:
return json.dumps({"profile": rows}, **kwargs)
return {"profile": rows}
def _process_pstats(
ps: Stats,
clean_text: Optional[Callable] = None,
verbose: bool = False,
) -> List[Dict[str, Any]]:
"""
Converts class `Stats <https://docs.python.org/3/library/
profile.html#pstats.Stats>`_ into something
readable for a dataframe.
:param ps: instance of type :func:`pstats.Stats`
:param clean_text: function to clean function names
:param verbose: change verbosity
:return: list of rows
"""
if clean_text is None:
clean_text = lambda x: x # noqa: E731
def add_rows(rows, d):
tt1, tt2 = 0, 0
for k, v in d.items():
stin = 0
stall = 0
if verbose:
print(
"[pstats] %s=%r"
% ((clean_text(k[0].replace("\\", "/")),) + k[1:], v)
)
if len(v) < 5:
continue
row = {
"file": "%s:%d" % (clean_text(k[0].replace("\\", "/")), k[1]),
"fct": k[2],
"ncalls1": v[0],
"ncalls2": v[1],
"tin": v[2],
"tall": v[3],
}
stin += v[2]
stall += v[3]
if len(v) == 5:
t1, t2 = add_rows(rows, v[-1])
stin += t1
stall += t2
row["cum_tin"] = stin
row["cum_tall"] = stall
rows.append(row)
tt1 += stin
tt2 += stall
return tt1, tt2
rows = []
add_rows(rows, ps.stats)
return rows
def profile2df(
ps: Stats,
as_df: bool = True,
clean_text: Optional[Callable] = None,
verbose: bool = False,
):
"""
Converts profiling statistics into a Dataframe.
:param ps: an instance of `pstats
<https://docs.python.org/3/library/profile.html#pstats.Stats>`_
:param as_df: returns the results as a dataframe (True)
or a list of dictionaries (False)
:param clean_text: function to clean function names
:param verbose: verbosity
:return: a DataFrame
::
import pstats
from onnxrt_backend_dev.monitoring.profiling import profile2df
ps = pstats.Stats('bench_ortmodule_nn_gpu6.prof')
df = profile2df(pd)
print(df)
"""
rows = _process_pstats(ps, clean_text, verbose=verbose)
if not as_df:
return rows
import pandas
df = pandas.DataFrame(rows)
df = df[["fct", "file", "ncalls1", "ncalls2", "tin", "cum_tin", "tall", "cum_tall"]]
df = (
df.groupby(["fct", "file"], as_index=False)
.sum()
.sort_values("cum_tall", ascending=False)
.reset_index(drop=True)
)
return df.copy()
[docs]
def profile(
fct: Callable,
sort: str = "cumulative",
rootrem: Optional[str] = None,
as_df: bool = False,
return_results: bool = False,
**kwargs,
) -> Union[Tuple[Stats, Any], Tuple[Stats, Any, Any]]:
"""
Profiles the execution of a function.
:param fct: function to profile
:param sort: see `sort_stats <https://docs.python.org/3/library/
profile.html#pstats.Stats.sort_stats>`_
:param rootrem: root to remove in filenames
:param as_df: return the results as a dataframe and not text
:param return_results: if True, return results as well
(in the first position)
:param kwargs: additional parameters used to create the profiler,
see :epkg:`cProfile.Profile`
:return: raw results, statistics text dump (or dataframe is *as_df* is True)
.. plot::
import matplotlib.pyplot as plt
from onnxrt_backend_dev.monitoring.profiling import profile
def subf(x):
return sum(x)
def fctm():
x1 = subf([1, 2, 3])
x2 = subf([1, 2, 3, 4])
return x1 + x2
pr, df = profile(lambda: [fctm() for i in range(0, 1000)], as_df=True)
ax = df[['namefct', 'cum_tall']].head(n=15).set_index(
'namefct').plot(kind='bar', figsize=(8, 3), rot=30)
ax.set_title("example of a graph")
for la in ax.get_xticklabels():
la.set_horizontalalignment('right');
plt.show()
"""
pr = cProfile.Profile(**kwargs)
pr.enable()
fct_res = fct()
pr.disable()
s = StringIO()
ps = Stats(pr, stream=s).sort_stats(sort)
ps.print_stats()
res = s.getvalue()
try:
pack = site.getsitepackages()
except AttributeError:
import numpy
pack = os.path.normpath(
os.path.abspath(os.path.join(os.path.dirname(numpy.__file__), ".."))
)
pack = [pack]
pack_ = os.path.normpath(os.path.join(pack[-1], ".."))
def clean_text(res):
res = res.replace(pack[-1], "site-packages")
res = res.replace(pack_, "lib")
if rootrem is not None:
if isinstance(rootrem, str):
res = res.replace(rootrem, "")
else:
for sub in rootrem:
if isinstance(sub, str):
res = res.replace(sub, "")
elif isinstance(sub, tuple) and len(sub) == 2:
res = res.replace(sub[0], sub[1])
else:
raise TypeError(
f"rootrem must contains strings or tuple not {rootrem!r}."
)
return res
if as_df:
def better_name(row):
if len(row["fct"]) > 15:
return f"{row['file'].split(':')[-1]}-{row['fct']}"
name = row["file"].replace("\\", "/")
return f"{name.split('/')[-1]}-{row['fct']}"
rows = _process_pstats(ps, clean_text)
import pandas
df = pandas.DataFrame(rows)
df = df[
[
"fct",
"file",
"ncalls1",
"ncalls2",
"tin",
"cum_tin",
"tall",
"cum_tall",
]
]
df["namefct"] = df.apply(lambda row: better_name(row), axis=1)
df = (
df.groupby(["namefct", "file"], as_index=False)
.sum()
.sort_values("cum_tall", ascending=False)
.reset_index(drop=True)
)
if return_results:
return fct_res, ps, df
return ps, df
res = clean_text(res)
if return_results:
return fct_res, ps, res
return ps, res
[docs]
def profile2graph(
ps: Stats,
clean_text: Optional[Callable] = None,
verbose: bool = False,
) -> Tuple[Any, Dict[Any, ProfileNode]]:
"""
Converts profiling statistics into a graphs.
:param ps: an instance of `pstats
<https://docs.python.org/3/library/profile.html#pstats.Stats>`_
:param clean_text: function to clean function names
:param verbose: verbosity
:return: an instance of :class:`ProfileNode
<onnxrt_backend_dev.monitoring.profiling.ProfileNode>`
:epkg:`pyinstrument` has a nice display to show
time spent and call stack at the same time. This function
tries to replicate that display based on the results produced
by module :mod:`cProfile`. Here is an example.
.. runpython::
:showcode:
import time
from onnxrt_backend_dev.monitoring.profiling import profile, profile2graph
def fct0(t):
time.sleep(t)
def fct1(t):
time.sleep(t)
def fct2():
fct1(0.1)
fct1(0.01)
def fct3():
fct0(0.2)
fct1(0.5)
def fct4():
fct2()
fct3()
ps = profile(fct4)[0]
root, nodes = profile2graph(ps, clean_text=lambda x: x.split("/")[-1])
text = root.to_text()
print(text)
"""
if clean_text is None:
clean_text = lambda x: x # noqa: E731
nodes = {}
for k, v in ps.stats.items():
if verbose:
print(f"[pstats] {k}={v!r}")
if len(v) < 5:
continue
if k[0] == "~" and len(v) == 0:
# raw function never called by another
continue
if "method 'disable' of '_lsprof.Profiler'" in k[2]:
continue
node = ProfileNode(
filename=clean_text(k[0].replace("\\", "/")),
line=k[1],
func_name=k[2],
nc1=v[0],
nc2=v[1],
tin=v[2],
tall=v[3],
)
if node.key in nodes:
raise RuntimeError(f"Key {node.key!r} is already present, node={node!r}.")
nodes[node.key] = node
for k, v in ps.stats.items():
if "method 'disable' of '_lsprof.Profiler'" in k[2]:
continue
filename = clean_text(k[0].replace("\\", "/"))
ks = ProfileNode._key(filename, k[1], k[2])
node = nodes[ks]
sublist = v[4]
for f, vv in sublist.items():
if "method 'disable' of '_lsprof.Profiler'" in f[2]:
continue
name = clean_text(f[0].replace("\\", "/"))
key = ProfileNode._key(name, f[1], f[2])
if key not in nodes:
raise RuntimeError(
"Unable to find key %r into\n%s" % (key, "\n".join(sorted(nodes)))
)
if k[0] == "~" and len(v) == 0:
continue
child = nodes[key]
node.add_called_by(child)
child.add_calls_to(node, vv)
for k, v in nodes.items():
root = v.get_root()
break
return root, nodes