components.py
This script contains a class to structure and standardize all scripts in the lightgbm-benchmark repository. This class factors duplicate code to achieve usual routines of every script: logging init, MLFlow init, system properties logging, etc.
RunnableScript
This class factors duplicate code to achieve usual routines of every script in the lightgbm-benchmark repo: logging init, MLFlow init, system properties logging, etc.
Source code in src/common/components.py
class RunnableScript():
"""
This class factors duplicate code to achieve usual routines
of every script in the lightgbm-benchmark repo: logging init, MLFlow init,
system properties logging, etc.
"""
def __init__(self, task, framework, framework_version, metrics_prefix=None):
""" Generic initialization for all script classes.
Args:
task (str): name of task in the pipeline/benchmark (ex: train, score)
framework (str): name of ML framework
framework_version (str): a version of this framework
metrics_prefix (str): any prefix to add to this scripts metrics
"""
self.task = task
self.framework = framework
self.framework_version = framework_version
self.metrics_prefix = metrics_prefix
self.logger = logging.getLogger(f"{framework}.{task}")
# default metrics logger is just stdout print
self.metrics_logger = MetricsLogger(
f"{framework}.{task}",
metrics_prefix=self.metrics_prefix
)
self.perf_report_collector = None
@classmethod
def get_arg_parser(cls, parser=None):
"""Adds component/module arguments to a given argument parser.
Args:
parser (argparse.ArgumentParser): an argument parser instance
Returns:
ArgumentParser: the argument parser instance
Notes:
if parser is None, creates a new parser instance
"""
# add arguments that are specific to the module
if parser is None:
parser = argparse.ArgumentParser()
# add generic arguments here
group_general = parser.add_argument_group(f"General parameters [{__name__}:{cls.__name__}]")
group_general.add_argument(
"--verbose",
required=False,
default=False,
type=strtobool, # use this for bool args, do not use action_store=True
help="set True to show DEBUG logs",
)
group_general.add_argument(
"--custom_properties",
required=False,
default=None,
type=str,
help="provide custom properties as json dict",
)
group_general.add_argument(
"--disable_perf_metrics",
required=False,
default=False,
type=strtobool,
help="disable performance metrics (default: False)",
)
group_general.add_argument(
"--metrics_driver",
required=False,
default="mlflow",
choices=['mlflow', 'azureml'],
type=str,
help="which class to use to report metrics mlflow or azureml",
)
return parser
def initialize_run(self, args):
"""Initialize the component run, opens/setups what needs to be"""
self.logger.info("Initializing script run...")
# initializes reporting of metrics
if args.metrics_driver == 'mlflow':
self.metrics_logger = MLFlowMetricsLogger(
f"{self.framework}.{self.task}",
metrics_prefix=self.metrics_prefix
)
elif args.metrics_driver == 'azureml':
self.metrics_logger = AzureMLRunMetricsLogger(
f"{self.framework}.{self.task}",
metrics_prefix=self.metrics_prefix
)
else:
# use default metrics_logger (stdout print)
pass
# open mlflow
self.metrics_logger.open()
# record properties of the run
self.metrics_logger.set_properties(
task = self.task,
framework = self.framework,
framework_version = self.framework_version
)
# if provided some custom_properties by the outside orchestrator
if args.custom_properties:
self.metrics_logger.set_properties_from_json(args.custom_properties)
# add properties about environment of this script
self.metrics_logger.set_platform_properties()
# enable perf reporting
if not args.disable_perf_metrics:
self.perf_report_collector = PerformanceMetricsCollector()
self.perf_report_collector.start()
def run(self, args, logger, metrics_logger, unknown_args):
"""The run function of your script. You are required to override this method
with your own implementation.
Args:
args (argparse.namespace): command line arguments provided to script
logger (logging.logger): a logger initialized for this script
metrics_logger (common.metrics.MetricLogger): to report metrics for this script, already initialized for MLFlow
unknown_args (list[str]): list of arguments not recognized during argparse
"""
raise NotImplementedError(f"run() method from class {self.__class__.__name__} hasn't actually been implemented.")
def finalize_run(self, args):
"""Finalize the run, close what needs to be"""
self.logger.info("Finalizing script run...")
if self.perf_report_collector:
self.perf_report_collector.finalize()
plotter = PerfReportPlotter(self.metrics_logger)
plotter.add_perf_reports(self.perf_report_collector.perf_reports, node=0)
plotter.report_nodes_perf()
# write perf record as artifact
self.metrics_logger.log_artifact(plotter.save_to())
# close mlflow
self.metrics_logger.close()
####################
### MAIN METHODS ###
####################
@classmethod
def initialize_root_logger(cls):
# initialize root logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s : %(levelname)s : %(name)s : %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
@classmethod
def parse_class_arguments(cls, cli_args=None):
logger = logging.getLogger()
# show the command used to run
if cli_args:
logger.info(f"Running main() with specific cli args: {cli_args}")
else:
logger.info(f"Running main() with sys.argv={sys.argv}")
# construct arg parser
parser = cls.get_arg_parser()
# if argument parsing fails, or if unknown arguments, will except
args, unknown_args = parser.parse_known_args(cli_args)
logger.setLevel(logging.DEBUG if args.verbose else logging.INFO)
return args, unknown_args
def _main_run_hook(self, args, unknown_args):
"""Run function called from main()"""
self.run(args, self.logger, self.metrics_logger, unknown_args=unknown_args)
@classmethod
def main(cls, cli_args=None):
""" Component main function, it is not recommended to override this method.
It parses arguments and executes run() with the right arguments.
Args:
cli_args (List[str], optional): list of args to feed script, useful for debugging. Defaults to None.
"""
cls.initialize_root_logger()
args, unknown_args = cls.parse_class_arguments(cli_args)
# create script instance, initialize mlflow
script_instance = cls()
script_instance.initialize_run(args)
# catch run function exceptions to properly finalize run (kill/join threads)
try:
# run the class run method (passthrough)
script_instance._main_run_hook(args, unknown_args)
except BaseException as e:
logging.critical(f"Exception occured during run():\n{traceback.format_exc()}")
script_instance.finalize_run(args)
raise e
# close mlflow
script_instance.finalize_run(args)
# return for unit tests
return script_instance
__init__(task, framework, framework_version, metrics_prefix=None)
Generic initialization for all script classes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
str
|
name of task in the pipeline/benchmark (ex: train, score) |
required |
framework |
str
|
name of ML framework |
required |
framework_version |
str
|
a version of this framework |
required |
metrics_prefix |
str
|
any prefix to add to this scripts metrics |
None
|
Source code in src/common/components.py
def __init__(self, task, framework, framework_version, metrics_prefix=None):
""" Generic initialization for all script classes.
Args:
task (str): name of task in the pipeline/benchmark (ex: train, score)
framework (str): name of ML framework
framework_version (str): a version of this framework
metrics_prefix (str): any prefix to add to this scripts metrics
"""
self.task = task
self.framework = framework
self.framework_version = framework_version
self.metrics_prefix = metrics_prefix
self.logger = logging.getLogger(f"{framework}.{task}")
# default metrics logger is just stdout print
self.metrics_logger = MetricsLogger(
f"{framework}.{task}",
metrics_prefix=self.metrics_prefix
)
self.perf_report_collector = None
get_arg_parser(parser=None)
classmethod
Adds component/module arguments to a given argument parser.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parser |
argparse.ArgumentParser
|
an argument parser instance |
None
|
Returns:
Name | Type | Description |
---|---|---|
ArgumentParser | the argument parser instance |
Notes
if parser is None, creates a new parser instance
Source code in src/common/components.py
@classmethod
def get_arg_parser(cls, parser=None):
"""Adds component/module arguments to a given argument parser.
Args:
parser (argparse.ArgumentParser): an argument parser instance
Returns:
ArgumentParser: the argument parser instance
Notes:
if parser is None, creates a new parser instance
"""
# add arguments that are specific to the module
if parser is None:
parser = argparse.ArgumentParser()
# add generic arguments here
group_general = parser.add_argument_group(f"General parameters [{__name__}:{cls.__name__}]")
group_general.add_argument(
"--verbose",
required=False,
default=False,
type=strtobool, # use this for bool args, do not use action_store=True
help="set True to show DEBUG logs",
)
group_general.add_argument(
"--custom_properties",
required=False,
default=None,
type=str,
help="provide custom properties as json dict",
)
group_general.add_argument(
"--disable_perf_metrics",
required=False,
default=False,
type=strtobool,
help="disable performance metrics (default: False)",
)
group_general.add_argument(
"--metrics_driver",
required=False,
default="mlflow",
choices=['mlflow', 'azureml'],
type=str,
help="which class to use to report metrics mlflow or azureml",
)
return parser
initialize_run(args)
Initialize the component run, opens/setups what needs to be
Source code in src/common/components.py
def initialize_run(self, args):
"""Initialize the component run, opens/setups what needs to be"""
self.logger.info("Initializing script run...")
# initializes reporting of metrics
if args.metrics_driver == 'mlflow':
self.metrics_logger = MLFlowMetricsLogger(
f"{self.framework}.{self.task}",
metrics_prefix=self.metrics_prefix
)
elif args.metrics_driver == 'azureml':
self.metrics_logger = AzureMLRunMetricsLogger(
f"{self.framework}.{self.task}",
metrics_prefix=self.metrics_prefix
)
else:
# use default metrics_logger (stdout print)
pass
# open mlflow
self.metrics_logger.open()
# record properties of the run
self.metrics_logger.set_properties(
task = self.task,
framework = self.framework,
framework_version = self.framework_version
)
# if provided some custom_properties by the outside orchestrator
if args.custom_properties:
self.metrics_logger.set_properties_from_json(args.custom_properties)
# add properties about environment of this script
self.metrics_logger.set_platform_properties()
# enable perf reporting
if not args.disable_perf_metrics:
self.perf_report_collector = PerformanceMetricsCollector()
self.perf_report_collector.start()
run(args, logger, metrics_logger, unknown_args)
The run function of your script. You are required to override this method with your own implementation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
argparse.namespace
|
command line arguments provided to script |
required |
logger |
logging.logger
|
a logger initialized for this script |
required |
metrics_logger |
common.metrics.MetricLogger
|
to report metrics for this script, already initialized for MLFlow |
required |
unknown_args |
list[str]
|
list of arguments not recognized during argparse |
required |
Source code in src/common/components.py
def run(self, args, logger, metrics_logger, unknown_args):
"""The run function of your script. You are required to override this method
with your own implementation.
Args:
args (argparse.namespace): command line arguments provided to script
logger (logging.logger): a logger initialized for this script
metrics_logger (common.metrics.MetricLogger): to report metrics for this script, already initialized for MLFlow
unknown_args (list[str]): list of arguments not recognized during argparse
"""
raise NotImplementedError(f"run() method from class {self.__class__.__name__} hasn't actually been implemented.")
finalize_run(args)
Finalize the run, close what needs to be
Source code in src/common/components.py
def finalize_run(self, args):
"""Finalize the run, close what needs to be"""
self.logger.info("Finalizing script run...")
if self.perf_report_collector:
self.perf_report_collector.finalize()
plotter = PerfReportPlotter(self.metrics_logger)
plotter.add_perf_reports(self.perf_report_collector.perf_reports, node=0)
plotter.report_nodes_perf()
# write perf record as artifact
self.metrics_logger.log_artifact(plotter.save_to())
# close mlflow
self.metrics_logger.close()
main(cli_args=None)
classmethod
Component main function, it is not recommended to override this method. It parses arguments and executes run() with the right arguments.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cli_args |
List[str]
|
list of args to feed script, useful for debugging. Defaults to None. |
None
|
Source code in src/common/components.py
@classmethod
def main(cls, cli_args=None):
""" Component main function, it is not recommended to override this method.
It parses arguments and executes run() with the right arguments.
Args:
cli_args (List[str], optional): list of args to feed script, useful for debugging. Defaults to None.
"""
cls.initialize_root_logger()
args, unknown_args = cls.parse_class_arguments(cli_args)
# create script instance, initialize mlflow
script_instance = cls()
script_instance.initialize_run(args)
# catch run function exceptions to properly finalize run (kill/join threads)
try:
# run the class run method (passthrough)
script_instance._main_run_hook(args, unknown_args)
except BaseException as e:
logging.critical(f"Exception occured during run():\n{traceback.format_exc()}")
script_instance.finalize_run(args)
raise e
# close mlflow
script_instance.finalize_run(args)
# return for unit tests
return script_instance