import sys
from timeit import Timer
from typing import Any, Callable, Dict, Optional, Union
import numpy
[docs]
def measure_time(
stmt: Union[str, Callable],
context: Optional[Dict[str, Any]] = None,
repeat: int = 10,
number: int = 50,
warmup: int = 1,
div_by_number: bool = True,
max_time: Optional[float] = None,
) -> Dict[str, Union[str, int, float]]:
"""
Measures a statement and returns the results as a dictionary.
:param stmt: string or callable
:param context: variable to know in a dictionary
:param repeat: average over *repeat* experiment
:param number: number of executions in one row
:param warmup: number of iteration to do before starting the
real measurement
:param div_by_number: divide by the number of executions
:param max_time: execute the statement until the total goes
beyond this time (approximatively), *repeat* is ignored,
*div_by_number* must be set to True
:return: dictionary
.. runpython::
:showcode:
from pprint import pprint
from math import cos
from onnxrt_backend_dev.monitoring.benchmark import measure_time
res = measure_time(lambda: cos(0.5))
pprint(res)
See `Timer.repeat <https://docs.python.org/3/library/
timeit.html?timeit.Timer.repeat>`_
for a better understanding of parameter *repeat* and *number*.
The function returns a duration corresponding to
*number* times the execution of the main statement.
"""
if not callable(stmt) and not isinstance(stmt, str):
raise TypeError(
f"stmt is not callable or a string but is of type {type(stmt)!r}."
)
if context is None:
context = {}
if isinstance(stmt, str):
tim = Timer(stmt, globals=context)
else:
tim = Timer(stmt)
if warmup > 0:
warmup_time = tim.timeit(warmup)
else:
warmup_time = 0
if max_time is not None:
if not div_by_number:
raise ValueError(
"div_by_number must be set to True of max_time is defined."
)
i = 1
total_time = 0.0
results = []
while True:
for j in (1, 2):
number = i * j
time_taken = tim.timeit(number)
results.append((number, time_taken))
total_time += time_taken
if total_time >= max_time:
break
if total_time >= max_time:
break
ratio = (max_time - total_time) / total_time
ratio = max(ratio, 1)
i = int(i * ratio)
res = numpy.array(results)
tw = res[:, 0].sum()
ttime = res[:, 1].sum()
mean = ttime / tw
ave = res[:, 1] / res[:, 0]
dev = (((ave - mean) ** 2 * res[:, 0]).sum() / tw) ** 0.5
mes = dict(
average=mean,
deviation=dev,
min_exec=numpy.min(ave),
max_exec=numpy.max(ave),
repeat=1,
number=tw,
ttime=ttime,
)
else:
res = numpy.array(tim.repeat(repeat=repeat, number=number))
if div_by_number:
res /= number
mean = numpy.mean(res)
dev = numpy.mean(res**2)
dev = (dev - mean**2) ** 0.5
mes = dict(
average=mean,
deviation=dev,
min_exec=numpy.min(res),
max_exec=numpy.max(res),
repeat=repeat,
number=number,
ttime=res.sum(),
)
if "values" in context:
if hasattr(context["values"], "shape"):
mes["size"] = context["values"].shape[0]
else:
mes["size"] = len(context["values"])
else:
mes["context_size"] = sys.getsizeof(context)
mes["warmup_time"] = warmup_time
return mes