metrics.py
These classes provide some tools to automate wall time compute and logging.
MetricsLogger
Class for handling metrics logging in MLFlow.
Source code in src/common/metrics.py
class MetricsLogger():
"""
Class for handling metrics logging in MLFlow.
"""
def __init__(self, session_name=None, metrics_prefix=None):
self._metrics_prefix = metrics_prefix
self._session_name = session_name
self._logger = logging.getLogger(__name__)
###############################
### DRIVER SPECIFIC METHODS ###
###############################
def open(self):
self._logger.info(f"Initializing {self.__class__.__name__} [session='{self._session_name}', metrics_prefix={self._metrics_prefix}]")
def close(self):
self._logger.info(f"Finalizing {self.__class__.__name__} [session='{self._session_name}', metrics_prefix={self._metrics_prefix}]")
def log_metric(self, key, value, step=None, metric_type=MetricType.ONETIME_METRIC):
self._logger.info(f"{self.__class__.__name__}.log_metric({key},{value}, step={step}) [session={self._session_name}]")
def log_figure(self, figure, artifact_file):
self._logger.info(f"{self.__class__.__name__}.log_figure(*figure*, {artifact_file}) [session={self._session_name}]")
def log_artifact(self, local_path, artifact_path=None):
self._logger.info(f"{self.__class__.__name__}.log_artifact({local_path}, {artifact_path}) [session={self._session_name}]")
def log_artifacts(self, local_dir, artifact_path=None):
self._logger.info(f"{self.__class__.__name__}.log_artifacts({local_dir}, {artifact_path}) [session={self._session_name}]")
def set_properties(self, **kwargs):
self._logger.info(f"{self.__class__.__name__}.set_properties({kwargs}) [session={self._session_name}]")
def log_parameters(self, **kwargs):
self._logger.info(f"{self.__class__.__name__}.log_parameters({kwargs}) [session={self._session_name}]")
###############################
### GENERIC UTILITY METHODS ###
###############################
@classmethod
def _remove_non_allowed_chars(cls, name_string):
""" Removes chars not allowed for metric keys """
return re.sub(r'[^a-zA-Z0-9_\-\.\ \/]', '', name_string)
def set_platform_properties(self):
""" Capture platform sysinfo and record as properties. """
self.set_properties(
machine=platform.machine(),
processor=platform.processor(),
architecture="-".join(platform.architecture()),
platform=platform.platform(),
system=platform.system(),
system_version=platform.version(),
cpu_count=os.cpu_count(),
cpu_frequency=round(psutil.cpu_freq().current),
system_memory=round((psutil.virtual_memory().total) / (1024*1024*1024))
)
def set_properties_from_json(self, json_string):
""" Set properties/tags for the session from a json_string.
Args:
json_string (str): a string parsable as json, contains a dict.
"""
try:
json_dict = json.loads(json_string)
except:
raise ValueError(f"During parsing of JSON properties '{json_string}', an exception occured: {traceback.format_exc()}")
if not isinstance(json_dict, dict):
raise ValueError(f"Provided JSON properties should be a dict, instead it was {str(type(json_dict))}: {json_string}")
properties_dict = dict(
[
(k, str(v)) # transform whatever as a string
for k,v in json_dict.items()
]
)
self.set_properties(**properties_dict)
def log_time_block(self, metric_name, step=None):
""" [Proxy] Use in a `with` statement to measure execution time of a code block.
Uses LogTimeBlock.
Example
-------
```python
with LogTimeBlock("my_perf_metric_name"):
print("(((sleeping for 1 second)))")
time.sleep(1)
```
"""
# see class below with proper __enter__ and __exit__
return LogTimeBlock(metric_name, step=step, metrics_logger=self)
def log_inferencing_latencies(self, time_per_batch, batch_length=1, factor_to_usecs=1000000.0):
"""Logs prediction latencies (for inferencing) with lots of fancy metrics and plots.
Args:
time_per_batch_list (List[float]): time per inferencing batch
batch_lengths (Union[List[int],int]): length of each batch (List or constant)
factor_to_usecs (float): factor to apply to time_per_batch to convert to microseconds
"""
if isinstance(batch_length, list):
sum_batch_lengths = sum(batch_length)
else:
sum_batch_lengths = batch_length*len(time_per_batch)
# log metadata
self.log_metric("prediction_batches", len(time_per_batch))
self.log_metric("prediction_queries", sum_batch_lengths)
if len(time_per_batch) > 0:
self.log_metric("prediction_latency_avg", (sum(time_per_batch) * factor_to_usecs)/sum_batch_lengths) # usecs
# if there's more than 1 batch, compute percentiles
if len(time_per_batch) > 1:
import numpy as np
import matplotlib.pyplot as plt
plt.switch_backend('agg')
# latency per batch
batch_run_times = np.array(time_per_batch) * factor_to_usecs
self.log_metric("batch_latency_p50_usecs", np.percentile(batch_run_times, 50))
self.log_metric("batch_latency_p75_usecs", np.percentile(batch_run_times, 75))
self.log_metric("batch_latency_p90_usecs", np.percentile(batch_run_times, 90))
self.log_metric("batch_latency_p95_usecs", np.percentile(batch_run_times, 95))
self.log_metric("batch_latency_p99_usecs", np.percentile(batch_run_times, 99))
# show the distribution prediction latencies
fig, ax = plt.subplots(1)
ax.hist(batch_run_times, bins=100)
ax.set_title("Latency-per-batch histogram (log scale)")
plt.xlabel("usecs")
plt.ylabel("occurence")
plt.yscale('log')
# record in mlflow
self.log_figure(fig, "batch_latency_log_histogram.png")
# latency per query
if isinstance(batch_length, list):
prediction_latencies = np.array(time_per_batch) * factor_to_usecs / np.array(batch_length)
else:
prediction_latencies = np.array(time_per_batch) * factor_to_usecs / batch_length
self.log_metric("prediction_latency_p50_usecs", np.percentile(prediction_latencies, 50))
self.log_metric("prediction_latency_p75_usecs", np.percentile(prediction_latencies, 75))
self.log_metric("prediction_latency_p90_usecs", np.percentile(prediction_latencies, 90))
self.log_metric("prediction_latency_p95_usecs", np.percentile(prediction_latencies, 95))
self.log_metric("prediction_latency_p99_usecs", np.percentile(prediction_latencies, 99))
# show the distribution prediction latencies
fig, ax = plt.subplots(1)
ax.hist(prediction_latencies, bins=100)
ax.set_title("Latency-per-prediction histogram (log scale)")
plt.xlabel("usecs")
plt.ylabel("occurence")
plt.yscale('log')
# record in mlflow
self.log_figure(fig, "prediction_latency_log_histogram.png")
set_platform_properties()
Capture platform sysinfo and record as properties.
Source code in src/common/metrics.py
def set_platform_properties(self):
""" Capture platform sysinfo and record as properties. """
self.set_properties(
machine=platform.machine(),
processor=platform.processor(),
architecture="-".join(platform.architecture()),
platform=platform.platform(),
system=platform.system(),
system_version=platform.version(),
cpu_count=os.cpu_count(),
cpu_frequency=round(psutil.cpu_freq().current),
system_memory=round((psutil.virtual_memory().total) / (1024*1024*1024))
)
set_properties_from_json(json_string)
Set properties/tags for the session from a json_string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
json_string |
str
|
a string parsable as json, contains a dict. |
required |
Source code in src/common/metrics.py
def set_properties_from_json(self, json_string):
""" Set properties/tags for the session from a json_string.
Args:
json_string (str): a string parsable as json, contains a dict.
"""
try:
json_dict = json.loads(json_string)
except:
raise ValueError(f"During parsing of JSON properties '{json_string}', an exception occured: {traceback.format_exc()}")
if not isinstance(json_dict, dict):
raise ValueError(f"Provided JSON properties should be a dict, instead it was {str(type(json_dict))}: {json_string}")
properties_dict = dict(
[
(k, str(v)) # transform whatever as a string
for k,v in json_dict.items()
]
)
self.set_properties(**properties_dict)
log_time_block(metric_name, step=None)
[Proxy] Use in a with
statement to measure execution time of a code block.
Uses LogTimeBlock.
Example
with LogTimeBlock("my_perf_metric_name"):
print("(((sleeping for 1 second)))")
time.sleep(1)
Source code in src/common/metrics.py
def log_time_block(self, metric_name, step=None):
""" [Proxy] Use in a `with` statement to measure execution time of a code block.
Uses LogTimeBlock.
Example
-------
```python
with LogTimeBlock("my_perf_metric_name"):
print("(((sleeping for 1 second)))")
time.sleep(1)
```
"""
# see class below with proper __enter__ and __exit__
return LogTimeBlock(metric_name, step=step, metrics_logger=self)
log_inferencing_latencies(time_per_batch, batch_length=1, factor_to_usecs=1000000.0)
Logs prediction latencies (for inferencing) with lots of fancy metrics and plots.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
time_per_batch_list |
List[float]
|
time per inferencing batch |
required |
batch_lengths |
Union[List[int], int]
|
length of each batch (List or constant) |
required |
factor_to_usecs |
float
|
factor to apply to time_per_batch to convert to microseconds |
1000000.0
|
Source code in src/common/metrics.py
def log_inferencing_latencies(self, time_per_batch, batch_length=1, factor_to_usecs=1000000.0):
"""Logs prediction latencies (for inferencing) with lots of fancy metrics and plots.
Args:
time_per_batch_list (List[float]): time per inferencing batch
batch_lengths (Union[List[int],int]): length of each batch (List or constant)
factor_to_usecs (float): factor to apply to time_per_batch to convert to microseconds
"""
if isinstance(batch_length, list):
sum_batch_lengths = sum(batch_length)
else:
sum_batch_lengths = batch_length*len(time_per_batch)
# log metadata
self.log_metric("prediction_batches", len(time_per_batch))
self.log_metric("prediction_queries", sum_batch_lengths)
if len(time_per_batch) > 0:
self.log_metric("prediction_latency_avg", (sum(time_per_batch) * factor_to_usecs)/sum_batch_lengths) # usecs
# if there's more than 1 batch, compute percentiles
if len(time_per_batch) > 1:
import numpy as np
import matplotlib.pyplot as plt
plt.switch_backend('agg')
# latency per batch
batch_run_times = np.array(time_per_batch) * factor_to_usecs
self.log_metric("batch_latency_p50_usecs", np.percentile(batch_run_times, 50))
self.log_metric("batch_latency_p75_usecs", np.percentile(batch_run_times, 75))
self.log_metric("batch_latency_p90_usecs", np.percentile(batch_run_times, 90))
self.log_metric("batch_latency_p95_usecs", np.percentile(batch_run_times, 95))
self.log_metric("batch_latency_p99_usecs", np.percentile(batch_run_times, 99))
# show the distribution prediction latencies
fig, ax = plt.subplots(1)
ax.hist(batch_run_times, bins=100)
ax.set_title("Latency-per-batch histogram (log scale)")
plt.xlabel("usecs")
plt.ylabel("occurence")
plt.yscale('log')
# record in mlflow
self.log_figure(fig, "batch_latency_log_histogram.png")
# latency per query
if isinstance(batch_length, list):
prediction_latencies = np.array(time_per_batch) * factor_to_usecs / np.array(batch_length)
else:
prediction_latencies = np.array(time_per_batch) * factor_to_usecs / batch_length
self.log_metric("prediction_latency_p50_usecs", np.percentile(prediction_latencies, 50))
self.log_metric("prediction_latency_p75_usecs", np.percentile(prediction_latencies, 75))
self.log_metric("prediction_latency_p90_usecs", np.percentile(prediction_latencies, 90))
self.log_metric("prediction_latency_p95_usecs", np.percentile(prediction_latencies, 95))
self.log_metric("prediction_latency_p99_usecs", np.percentile(prediction_latencies, 99))
# show the distribution prediction latencies
fig, ax = plt.subplots(1)
ax.hist(prediction_latencies, bins=100)
ax.set_title("Latency-per-prediction histogram (log scale)")
plt.xlabel("usecs")
plt.ylabel("occurence")
plt.yscale('log')
# record in mlflow
self.log_figure(fig, "prediction_latency_log_histogram.png")
MLFlowMetricsLogger
Bases: MetricsLogger
Class for handling metrics logging in MLFlow.
Source code in src/common/metrics.py
class MLFlowMetricsLogger(MetricsLogger):
"""
Class for handling metrics logging in MLFlow.
"""
###############################
### MLFLOW SPECIFIC METHODS ###
###############################
_initialized = False
def open(self):
"""Opens the MLFlow session."""
if not MLFlowMetricsLogger._initialized:
super().open()
mlflow.start_run()
MLFlowMetricsLogger._initialized = True
def close(self):
"""Close the MLFlow session."""
if MLFlowMetricsLogger._initialized:
super().close()
mlflow.end_run()
MLFlowMetricsLogger._initialized = False
else:
self._logger.warning(f"Call to finalize MLFLOW [session='{self._session_name}'] that was never initialized.")
def log_metric(self, key, value, step=None, metric_type=MetricType.ONETIME_METRIC):
"""Logs a metric key/value pair.
Args:
key (str): metric key
value (str): metric value
step (int): which step to log this metric? (see mlflow.log_metric())
metric_type (int): type of the metric
"""
super().log_metric(key, value, step=step, metric_type=metric_type)
if self._metrics_prefix:
key = self._metrics_prefix + key
key = self._remove_non_allowed_chars(key)
# NOTE: there's a limit to the name of a metric
if len(key) > 50:
key = key[:50]
if type == MetricType.PERF_INTERVAL_METRIC:
pass # for now, do not process those
else:
try:
mlflow.log_metric(key, value, step=step)
except mlflow.exceptions.MlflowException:
self._logger.critical(f"Could not log metric using MLFLOW due to exception:\n{traceback.format_exc()}")
def log_figure(self, figure, artifact_file):
"""Logs a figure using mlflow
Args:
figure (Union[matplotlib.figure.Figure, plotly.graph_objects.Figure]): figure to log
artifact_file (str): name of file to record
"""
super().log_figure(figure, artifact_file)
try:
mlflow.log_figure(figure, artifact_file)
except mlflow.exceptions.MlflowException:
self._logger.critical(f"Could not log figure using MLFLOW due to exception:\n{traceback.format_exc()}")
def log_artifact(self, local_path, artifact_path=None):
"""Logs an artifact
Args:
local_path (str): Path to the file to write.
artifact_path (str): If provided, the directory in artifact_uri to write to.
"""
super().log_artifact(local_path, artifact_path=artifact_path)
try:
mlflow.log_artifact(local_path, artifact_path=artifact_path)
except mlflow.exceptions.MlflowException:
self._logger.critical(f"Could not log artifact using MLFLOW due to exception:\n{traceback.format_exc()}")
def log_artifacts(self, local_dir, artifact_path=None):
"""Logs an artifact
Args:
local_dir (str): Path to the directory of files to write.
artifact_path (str): If provided, the directory in artifact_uri to write to.
"""
super().log_artifacts(local_dir, artifact_path=artifact_path)
try:
mlflow.log_artifacts(local_dir, artifact_path=artifact_path)
except mlflow.exceptions.MlflowException:
self._logger.critical(f"Could not log artifacts using MLFLOW due to exception:\n{traceback.format_exc()}")
def set_properties(self, **kwargs):
"""Set properties/tags for the session.
Args:
kwargs (dict): any keyword argument will be passed as tags to MLFLow
"""
super().set_properties(**kwargs)
try:
mlflow.set_tags(kwargs)
except mlflow.exceptions.MlflowException:
self._logger.critical(f"Could not set properties using MLFLOW due to exception:\n{traceback.format_exc()}")
def log_parameters(self, **kwargs):
""" Logs parameters to MLFlow.
Args:
kwargs (dict): any keyword arguments will be passed as parameters to MLFlow
"""
super().log_parameters(**kwargs)
# NOTE: to avoid mlflow exception when value length is too long (ex: label_gain)
for key,value in kwargs.items():
if isinstance(value, str) and len(value) > 255:
self._logger.warning(f"parameter {key} (str) could not be logged, value length {len(value)} > 255")
else:
try:
mlflow.log_param(key,value)
except mlflow.exceptions.MlflowException:
self._logger.critical(f"Could not log parameter using MLFLOW due to exception:\n{traceback.format_exc()}")
open()
Opens the MLFlow session.
Source code in src/common/metrics.py
def open(self):
"""Opens the MLFlow session."""
if not MLFlowMetricsLogger._initialized:
super().open()
mlflow.start_run()
MLFlowMetricsLogger._initialized = True
close()
Close the MLFlow session.
Source code in src/common/metrics.py
def close(self):
"""Close the MLFlow session."""
if MLFlowMetricsLogger._initialized:
super().close()
mlflow.end_run()
MLFlowMetricsLogger._initialized = False
else:
self._logger.warning(f"Call to finalize MLFLOW [session='{self._session_name}'] that was never initialized.")
log_metric(key, value, step=None, metric_type=MetricType.ONETIME_METRIC)
Logs a metric key/value pair.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str
|
metric key |
required |
value |
str
|
metric value |
required |
step |
int
|
which step to log this metric? (see mlflow.log_metric()) |
None
|
metric_type |
int
|
type of the metric |
MetricType.ONETIME_METRIC
|
Source code in src/common/metrics.py
def log_metric(self, key, value, step=None, metric_type=MetricType.ONETIME_METRIC):
"""Logs a metric key/value pair.
Args:
key (str): metric key
value (str): metric value
step (int): which step to log this metric? (see mlflow.log_metric())
metric_type (int): type of the metric
"""
super().log_metric(key, value, step=step, metric_type=metric_type)
if self._metrics_prefix:
key = self._metrics_prefix + key
key = self._remove_non_allowed_chars(key)
# NOTE: there's a limit to the name of a metric
if len(key) > 50:
key = key[:50]
if type == MetricType.PERF_INTERVAL_METRIC:
pass # for now, do not process those
else:
try:
mlflow.log_metric(key, value, step=step)
except mlflow.exceptions.MlflowException:
self._logger.critical(f"Could not log metric using MLFLOW due to exception:\n{traceback.format_exc()}")
log_figure(figure, artifact_file)
Logs a figure using mlflow
Parameters:
Name | Type | Description | Default |
---|---|---|---|
figure |
Union[matplotlib.figure.Figure, plotly.graph_objects.Figure]
|
figure to log |
required |
artifact_file |
str
|
name of file to record |
required |
Source code in src/common/metrics.py
def log_figure(self, figure, artifact_file):
"""Logs a figure using mlflow
Args:
figure (Union[matplotlib.figure.Figure, plotly.graph_objects.Figure]): figure to log
artifact_file (str): name of file to record
"""
super().log_figure(figure, artifact_file)
try:
mlflow.log_figure(figure, artifact_file)
except mlflow.exceptions.MlflowException:
self._logger.critical(f"Could not log figure using MLFLOW due to exception:\n{traceback.format_exc()}")
log_artifact(local_path, artifact_path=None)
Logs an artifact
Parameters:
Name | Type | Description | Default |
---|---|---|---|
local_path |
str
|
Path to the file to write. |
required |
artifact_path |
str
|
If provided, the directory in artifact_uri to write to. |
None
|
Source code in src/common/metrics.py
def log_artifact(self, local_path, artifact_path=None):
"""Logs an artifact
Args:
local_path (str): Path to the file to write.
artifact_path (str): If provided, the directory in artifact_uri to write to.
"""
super().log_artifact(local_path, artifact_path=artifact_path)
try:
mlflow.log_artifact(local_path, artifact_path=artifact_path)
except mlflow.exceptions.MlflowException:
self._logger.critical(f"Could not log artifact using MLFLOW due to exception:\n{traceback.format_exc()}")
log_artifacts(local_dir, artifact_path=None)
Logs an artifact
Parameters:
Name | Type | Description | Default |
---|---|---|---|
local_dir |
str
|
Path to the directory of files to write. |
required |
artifact_path |
str
|
If provided, the directory in artifact_uri to write to. |
None
|
Source code in src/common/metrics.py
def log_artifacts(self, local_dir, artifact_path=None):
"""Logs an artifact
Args:
local_dir (str): Path to the directory of files to write.
artifact_path (str): If provided, the directory in artifact_uri to write to.
"""
super().log_artifacts(local_dir, artifact_path=artifact_path)
try:
mlflow.log_artifacts(local_dir, artifact_path=artifact_path)
except mlflow.exceptions.MlflowException:
self._logger.critical(f"Could not log artifacts using MLFLOW due to exception:\n{traceback.format_exc()}")
set_properties(kwargs)
Set properties/tags for the session.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kwargs |
dict
|
any keyword argument will be passed as tags to MLFLow |
{}
|
Source code in src/common/metrics.py
def set_properties(self, **kwargs):
"""Set properties/tags for the session.
Args:
kwargs (dict): any keyword argument will be passed as tags to MLFLow
"""
super().set_properties(**kwargs)
try:
mlflow.set_tags(kwargs)
except mlflow.exceptions.MlflowException:
self._logger.critical(f"Could not set properties using MLFLOW due to exception:\n{traceback.format_exc()}")
log_parameters(kwargs)
Logs parameters to MLFlow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kwargs |
dict
|
any keyword arguments will be passed as parameters to MLFlow |
{}
|
Source code in src/common/metrics.py
def log_parameters(self, **kwargs):
""" Logs parameters to MLFlow.
Args:
kwargs (dict): any keyword arguments will be passed as parameters to MLFlow
"""
super().log_parameters(**kwargs)
# NOTE: to avoid mlflow exception when value length is too long (ex: label_gain)
for key,value in kwargs.items():
if isinstance(value, str) and len(value) > 255:
self._logger.warning(f"parameter {key} (str) could not be logged, value length {len(value)} > 255")
else:
try:
mlflow.log_param(key,value)
except mlflow.exceptions.MlflowException:
self._logger.critical(f"Could not log parameter using MLFLOW due to exception:\n{traceback.format_exc()}")
AzureMLRunMetricsLogger
Bases: MetricsLogger
Class for handling metrics logging using AzureML Run
Source code in src/common/metrics.py
class AzureMLRunMetricsLogger(MetricsLogger):
"""
Class for handling metrics logging using AzureML Run
"""
def __init__(self, session_name=None, metrics_prefix=None):
super().__init__(
session_name = session_name,
metrics_prefix = metrics_prefix
)
self._aml_run = None
def open(self):
"""Opens the AzureML run session."""
super().open()
try:
from azureml.core.run import Run
self._aml_run = Run.get_context()
if "_OfflineRun" in str(type(self._aml_run)):
self._logger.warning(f"Running offline, will not report any AzureML metrics")
self._aml_run = None
except BaseException as e:
self._logger.warning(f"Run get_context() failed due to exception: {traceback.format_exc()}".replace("\n", "--"))
self._aml_run = None
def close(self):
"""Close the AzureML session."""
super().close()
if self._aml_run:
self._aml_run.flush()
else:
self._logger.warning(f"Call to finalize AzureML Run [session='{self._session_name}'] that was never initialized.")
def log_metric(self, key, value, step=None, metric_type=MetricType.ONETIME_METRIC):
"""Logs a metric key/value pair.
Args:
key (str): metric key
value (str): metric value
step (int): which step to log this metric? (see mlflow.log_metric())
metric_type (int): type of the metric
"""
super().log_metric(key, value, step=step, metric_type=metric_type)
if self._metrics_prefix:
key = self._metrics_prefix + key
key = self._remove_non_allowed_chars(key)
# NOTE: there's a limit to the name of a metric
if len(key) > 50:
key = key[:50]
if type == MetricType.PERF_INTERVAL_METRIC:
pass # for now, do not process those
else:
if self._aml_run:
self._aml_run.log_row(key, key=value, step=step)
def set_properties(self, **kwargs):
"""Set properties/tags for the session.
Args:
kwargs (dict): any keyword argument will be passed as tags to MLFLow
"""
super().set_properties(**kwargs)
if self._aml_run:
self._aml_run.add_properties(kwargs)
def log_parameters(self, **kwargs):
""" Logs parameters to MLFlow.
Args:
kwargs (dict): any keyword arguments will be passed as parameters to MLFlow
"""
super().log_parameters(**kwargs)
# NOTE: to avoid mlflow exception when value length is too long (ex: label_gain)
for key,value in kwargs.items():
if isinstance(value, str) and len(value) > 255:
self._logger.warning(f"parameter {key} (str) could not be logged, value length {len(value)} > 255")
else:
if self._aml_run:
self._aml_run.set_tags({key:value})
open()
Opens the AzureML run session.
Source code in src/common/metrics.py
def open(self):
"""Opens the AzureML run session."""
super().open()
try:
from azureml.core.run import Run
self._aml_run = Run.get_context()
if "_OfflineRun" in str(type(self._aml_run)):
self._logger.warning(f"Running offline, will not report any AzureML metrics")
self._aml_run = None
except BaseException as e:
self._logger.warning(f"Run get_context() failed due to exception: {traceback.format_exc()}".replace("\n", "--"))
self._aml_run = None
close()
Close the AzureML session.
Source code in src/common/metrics.py
def close(self):
"""Close the AzureML session."""
super().close()
if self._aml_run:
self._aml_run.flush()
else:
self._logger.warning(f"Call to finalize AzureML Run [session='{self._session_name}'] that was never initialized.")
log_metric(key, value, step=None, metric_type=MetricType.ONETIME_METRIC)
Logs a metric key/value pair.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str
|
metric key |
required |
value |
str
|
metric value |
required |
step |
int
|
which step to log this metric? (see mlflow.log_metric()) |
None
|
metric_type |
int
|
type of the metric |
MetricType.ONETIME_METRIC
|
Source code in src/common/metrics.py
def log_metric(self, key, value, step=None, metric_type=MetricType.ONETIME_METRIC):
"""Logs a metric key/value pair.
Args:
key (str): metric key
value (str): metric value
step (int): which step to log this metric? (see mlflow.log_metric())
metric_type (int): type of the metric
"""
super().log_metric(key, value, step=step, metric_type=metric_type)
if self._metrics_prefix:
key = self._metrics_prefix + key
key = self._remove_non_allowed_chars(key)
# NOTE: there's a limit to the name of a metric
if len(key) > 50:
key = key[:50]
if type == MetricType.PERF_INTERVAL_METRIC:
pass # for now, do not process those
else:
if self._aml_run:
self._aml_run.log_row(key, key=value, step=step)
set_properties(kwargs)
Set properties/tags for the session.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kwargs |
dict
|
any keyword argument will be passed as tags to MLFLow |
{}
|
Source code in src/common/metrics.py
def set_properties(self, **kwargs):
"""Set properties/tags for the session.
Args:
kwargs (dict): any keyword argument will be passed as tags to MLFLow
"""
super().set_properties(**kwargs)
if self._aml_run:
self._aml_run.add_properties(kwargs)
log_parameters(kwargs)
Logs parameters to MLFlow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kwargs |
dict
|
any keyword arguments will be passed as parameters to MLFlow |
{}
|
Source code in src/common/metrics.py
def log_parameters(self, **kwargs):
""" Logs parameters to MLFlow.
Args:
kwargs (dict): any keyword arguments will be passed as parameters to MLFlow
"""
super().log_parameters(**kwargs)
# NOTE: to avoid mlflow exception when value length is too long (ex: label_gain)
for key,value in kwargs.items():
if isinstance(value, str) and len(value) > 255:
self._logger.warning(f"parameter {key} (str) could not be logged, value length {len(value)} > 255")
else:
if self._aml_run:
self._aml_run.set_tags({key:value})
LogTimeBlock
Bases: object
This class should be used to time a code block. The time diff is computed from enter to exit.
Example
with LogTimeBlock("my_perf_metric_name"):
print("(((sleeping for 1 second)))")
time.sleep(1)
Source code in src/common/metrics.py
class LogTimeBlock(object):
""" This class should be used to time a code block.
The time diff is computed from __enter__ to __exit__.
Example
-------
```python
with LogTimeBlock("my_perf_metric_name"):
print("(((sleeping for 1 second)))")
time.sleep(1)
```
"""
def __init__(self, name, **kwargs):
"""
Constructs the LogTimeBlock.
Args:
name (str): key for the time difference (for storing as metric)
kwargs (dict): any keyword will be added as properties to metrics for logging (work in progress)
"""
# kwargs
self.tags = kwargs.get('tags', None)
self.step = kwargs.get('step', None)
self.metrics_logger = kwargs.get('metrics_logger', None)
# internal variables
self.name = name
self.start_time = None
self._logger = logging.getLogger(__name__)
def __enter__(self):
""" Starts the timer, gets triggered at beginning of code block """
self.start_time = time.time() # starts "timer"
def __exit__(self, exc_type, value, traceback):
""" Stops the timer and stores accordingly
gets triggered at beginning of code block.
Note:
arguments are by design for with statements.
"""
run_time = time.time() - self.start_time # stops "timer"
self._logger.info(f"--- time elapsed: {self.name} = {run_time:2f} s" + (f" [tags: {self.tags}]" if self.tags else ""))
if self.metrics_logger:
self.metrics_logger.log_metric(self.name, run_time, step=self.step)
else:
MetricsLogger().log_metric(self.name, run_time, step=self.step)
__init__(name, kwargs)
Constructs the LogTimeBlock.
name (str): key for the time difference (for storing as metric) kwargs (dict): any keyword will be added as properties to metrics for logging (work in progress)
Source code in src/common/metrics.py
def __init__(self, name, **kwargs):
"""
Constructs the LogTimeBlock.
Args:
name (str): key for the time difference (for storing as metric)
kwargs (dict): any keyword will be added as properties to metrics for logging (work in progress)
"""
# kwargs
self.tags = kwargs.get('tags', None)
self.step = kwargs.get('step', None)
self.metrics_logger = kwargs.get('metrics_logger', None)
# internal variables
self.name = name
self.start_time = None
self._logger = logging.getLogger(__name__)
__enter__()
Starts the timer, gets triggered at beginning of code block
Source code in src/common/metrics.py
def __enter__(self):
""" Starts the timer, gets triggered at beginning of code block """
self.start_time = time.time() # starts "timer"
__exit__(exc_type, value, traceback)
Stops the timer and stores accordingly gets triggered at beginning of code block.
Note
arguments are by design for with statements.
Source code in src/common/metrics.py
def __exit__(self, exc_type, value, traceback):
""" Stops the timer and stores accordingly
gets triggered at beginning of code block.
Note:
arguments are by design for with statements.
"""
run_time = time.time() - self.start_time # stops "timer"
self._logger.info(f"--- time elapsed: {self.name} = {run_time:2f} s" + (f" [tags: {self.tags}]" if self.tags else ""))
if self.metrics_logger:
self.metrics_logger.log_metric(self.name, run_time, step=self.step)
else:
MetricsLogger().log_metric(self.name, run_time, step=self.step)