Skip to content

perf.py

Each component in this repository is reporting performance metrics in MLFlow.

The list of available metrics is detailed below:

Metric Level Description
cpu_avg_utilization_over20_pct One value per node How much time every cpu of that node are utilized more than 20%, over the total job time.
cpu_avg_utilization_over40_pct One value per node How much time are all cpus are utilized more than 40%, over the total job time.
cpu_avg_utilization_over80_pct One value per node ow much time are all cpus are utilized more than 80%, over the total job time.
cpu_avg_utilization_at100_pct One value per node How much time are all cpus fully utilized at 100%, over the total job time.
cpu_avg_utilization_pct One value per node How much are every cpu utilized on average during the entire job.
max_t_cpu_pct_per_cpu_avg One value per node Maximum of average cpu utilization over the entire job time.
max_t_cpu_pct_per_cpu_max One value per node Maximum of maximum cpu utilization over the entire job time.
max_t_cpu_pct_per_cpu_min One value per node Maximum of minimum cpu utilization over the entire job time.
node_cpu_hours One value per node time * #cpus
node_unused_cpu_hours One value per node time * #cpus * (1 - cpu_avg_utilization_pct)
max_t_mem_percent One value per node Maximum of memory utilization over the entire job time.
max_t_disk_usage_percent One value per node Maximum of disk usage over the entire job time.
total_disk_io_read_mb One value per node Total disk data read in MB (max value at the end of job).
total_disk_io_write_mb One value per node Total disk data write in MB (max value at the end of job).
total_net_io_lo_sent_mb One value per node Total net data sent on loopback device (max value at the end of job).
total_net_io_ext_sent_mb One value per node Total net data sent on external device (max value at the end of job).
total_net_io_lo_recv_mb One value per node Total net data received on loopback device (max value at the end of job).
total_net_io_ext_recv_mb One value per node Total net data received on external device (max value at the end of job).

Helps with reporting performance metrics (cpu/mem utilization). Needs to be implemented in the rest of the code.

PerformanceReportingThread

Bases: threading.Thread

Thread to report performance (cpu/mem/net)

Source code in src/common/perf.py
class PerformanceReportingThread(threading.Thread):
    """Thread to report performance (cpu/mem/net)"""
    def __init__(self,
                 initial_time_increment=1.0,
                 cpu_interval=1.0,
                 callback_on_loop=None,
                 callback_on_exit=None):
        """Constructor

        Args:
            initial_time_increment (float): how much time to sleep between perf readings
            cpu_interval (float): interval to capture cpu utilization
            callback_on_loop (func): function to call when a perf reading is issued
            callback_on_exit (func): function to call when thread is finalized
        """
        threading.Thread.__init__(self)
        self.killed = False # flag, set to True to kill from the inside

        self.logger = logging.getLogger(__name__)

        # time between perf reports
        self.time_increment = initial_time_increment
        self.cpu_interval = cpu_interval

        # set callbacks
        self.callback_on_loop = callback_on_loop
        self.callback_on_exit = callback_on_exit


    #####################
    ### RUN FUNCTIONS ###
    #####################

    def run(self):
        """Run function of the thread, while(True)"""
        while not(self.killed):
            if self.time_increment >= self.cpu_interval: # cpu_percent.interval already consumes 1sec
                time.sleep(self.time_increment - self.cpu_interval) # will double every time report_store_max_length is reached
            self._run_loop()

        if self.callback_on_exit:
            self.callback_on_exit()

    def _run_loop(self):
        """What to run within the while(not(killed))"""
        perf_report = {}

        # CPU UTILIZATION
        cpu_utilization = psutil.cpu_percent(interval=self.cpu_interval, percpu=True) # will take 1 sec to return
        perf_report["cpu_pct_per_cpu_avg"] = sum(cpu_utilization) / len(cpu_utilization)
        perf_report["cpu_pct_per_cpu_min"] = min(cpu_utilization)
        perf_report["cpu_pct_per_cpu_max"] = max(cpu_utilization)

        # MEM UTILIZATION
        perf_report["mem_percent"] = psutil.virtual_memory().percent

        # DISK UTILIZAITON
        perf_report["disk_usage_percent"] = psutil.disk_usage('/').percent
        perf_report["disk_io_read_mb"] = (psutil.disk_io_counters(perdisk=False).read_bytes / (1024 * 1024))
        perf_report["disk_io_write_mb"] = (psutil.disk_io_counters(perdisk=False).write_bytes / (1024 * 1024))

        # NET I/O SEND/RECV
        net_io_counters = psutil.net_io_counters(pernic=True)
        net_io_lo_identifiers = []
        net_io_ext_identifiers = []

        for key in net_io_counters:
            if 'loopback' in key.lower():
                net_io_lo_identifiers.append(key)
            elif key.lower() == 'lo':
                net_io_lo_identifiers.append(key)
            else:
                net_io_ext_identifiers.append(key)

        lo_sent_mb = sum(
            [
                net_io_counters.get(key).bytes_sent
                for key in net_io_lo_identifiers
            ]
        ) / (1024 * 1024)

        ext_sent_mb = sum(
            [
                net_io_counters.get(key).bytes_sent
                for key in net_io_ext_identifiers
            ]
        ) / (1024 * 1024)

        lo_recv_mb = sum(
            [
                net_io_counters.get(key).bytes_recv
                for key in net_io_lo_identifiers
            ]
        ) / (1024 * 1024)

        ext_recv_mb = sum(
            [
                net_io_counters.get(key).bytes_recv
                for key in net_io_ext_identifiers
            ]
        ) / (1024 * 1024)

        perf_report["net_io_lo_sent_mb"] = lo_sent_mb
        perf_report["net_io_ext_sent_mb"] = ext_sent_mb
        perf_report["net_io_lo_recv_mb"] = lo_recv_mb
        perf_report["net_io_ext_recv_mb"] = ext_recv_mb

        # add a timestamp
        perf_report["timestamp"] = time.time()

        # END OF REPORT
        if self.callback_on_loop:
            self.callback_on_loop(perf_report)

    def finalize(self):
        """Ask the thread to finalize (clean)"""
        self.killed = True
        self.join()

__init__(initial_time_increment=1.0, cpu_interval=1.0, callback_on_loop=None, callback_on_exit=None)

Constructor

Parameters:

Name Type Description Default
initial_time_increment float

how much time to sleep between perf readings

1.0
cpu_interval float

interval to capture cpu utilization

1.0
callback_on_loop func

function to call when a perf reading is issued

None
callback_on_exit func

function to call when thread is finalized

None
Source code in src/common/perf.py
def __init__(self,
             initial_time_increment=1.0,
             cpu_interval=1.0,
             callback_on_loop=None,
             callback_on_exit=None):
    """Constructor

    Args:
        initial_time_increment (float): how much time to sleep between perf readings
        cpu_interval (float): interval to capture cpu utilization
        callback_on_loop (func): function to call when a perf reading is issued
        callback_on_exit (func): function to call when thread is finalized
    """
    threading.Thread.__init__(self)
    self.killed = False # flag, set to True to kill from the inside

    self.logger = logging.getLogger(__name__)

    # time between perf reports
    self.time_increment = initial_time_increment
    self.cpu_interval = cpu_interval

    # set callbacks
    self.callback_on_loop = callback_on_loop
    self.callback_on_exit = callback_on_exit

run()

Run function of the thread, while(True)

Source code in src/common/perf.py
def run(self):
    """Run function of the thread, while(True)"""
    while not(self.killed):
        if self.time_increment >= self.cpu_interval: # cpu_percent.interval already consumes 1sec
            time.sleep(self.time_increment - self.cpu_interval) # will double every time report_store_max_length is reached
        self._run_loop()

    if self.callback_on_exit:
        self.callback_on_exit()

finalize()

Ask the thread to finalize (clean)

Source code in src/common/perf.py
def finalize(self):
    """Ask the thread to finalize (clean)"""
    self.killed = True
    self.join()

PerformanceMetricsCollector

Collects performance metrics from PerformanceReportingThread Limits all values to a maximum length

Source code in src/common/perf.py
class PerformanceMetricsCollector():
    """Collects performance metrics from PerformanceReportingThread
    Limits all values to a maximum length"""
    def __init__(self, max_length=1000):
        """Constructor

        Args:
            max_length (int): maximum number of perf reports to keep
        """
        self.logger = logging.getLogger(__name__)

        # create a thread to generate reports regularly
        self.report_thread = PerformanceReportingThread(
            initial_time_increment=1.0,
            cpu_interval=1.0,
            callback_on_loop=self.append_perf_metrics
        )

        self.perf_reports = [] # internal storage
        self.perf_reports_freqs = 1 # frequency to skip reports from thread
        self.perf_reports_counter = 0 # how many reports we had so far

        self.max_length = (max_length//2 + max_length%2) * 2 # has to be dividable by 2


    def start(self):
        """Start collector perf metrics (start internal thread)"""
        self.logger.info(f"Starting perf metric collector (max_length={self.max_length})")
        self.report_thread.start()

    def finalize(self):
        """Stop collector perf metrics (stop internal thread)"""
        self.logger.info(f"Finalizing perf metric collector (length={len(self.perf_reports)})")
        self.report_thread.finalize()

    def append_perf_metrics(self, perf_metrics):
        """Add a perf metric report to the internal storage"""
        self.perf_reports_counter += 1

        if (self.perf_reports_counter % self.perf_reports_freqs):
            # if we've decided to skip this one
            return

        self.perf_reports.append(perf_metrics)

        if len(self.perf_reports) > self.max_length:
            # trim the report by half
            self.perf_reports = [
                self.perf_reports[i]
                for i in range(0, self.max_length, 2)
            ]
            self.perf_reports_freqs *= 2 # we'll start accepting reports only 1 out of 2
            self.logger.warning(f"Perf report store reached max, increasing freq to {self.perf_reports_freqs}")

__init__(max_length=1000)

Constructor

Parameters:

Name Type Description Default
max_length int

maximum number of perf reports to keep

1000
Source code in src/common/perf.py
def __init__(self, max_length=1000):
    """Constructor

    Args:
        max_length (int): maximum number of perf reports to keep
    """
    self.logger = logging.getLogger(__name__)

    # create a thread to generate reports regularly
    self.report_thread = PerformanceReportingThread(
        initial_time_increment=1.0,
        cpu_interval=1.0,
        callback_on_loop=self.append_perf_metrics
    )

    self.perf_reports = [] # internal storage
    self.perf_reports_freqs = 1 # frequency to skip reports from thread
    self.perf_reports_counter = 0 # how many reports we had so far

    self.max_length = (max_length//2 + max_length%2) * 2 # has to be dividable by 2

start()

Start collector perf metrics (start internal thread)

Source code in src/common/perf.py
def start(self):
    """Start collector perf metrics (start internal thread)"""
    self.logger.info(f"Starting perf metric collector (max_length={self.max_length})")
    self.report_thread.start()

finalize()

Stop collector perf metrics (stop internal thread)

Source code in src/common/perf.py
def finalize(self):
    """Stop collector perf metrics (stop internal thread)"""
    self.logger.info(f"Finalizing perf metric collector (length={len(self.perf_reports)})")
    self.report_thread.finalize()

append_perf_metrics(perf_metrics)

Add a perf metric report to the internal storage

Source code in src/common/perf.py
def append_perf_metrics(self, perf_metrics):
    """Add a perf metric report to the internal storage"""
    self.perf_reports_counter += 1

    if (self.perf_reports_counter % self.perf_reports_freqs):
        # if we've decided to skip this one
        return

    self.perf_reports.append(perf_metrics)

    if len(self.perf_reports) > self.max_length:
        # trim the report by half
        self.perf_reports = [
            self.perf_reports[i]
            for i in range(0, self.max_length, 2)
        ]
        self.perf_reports_freqs *= 2 # we'll start accepting reports only 1 out of 2
        self.logger.warning(f"Perf report store reached max, increasing freq to {self.perf_reports_freqs}")

PerfReportPlotter

Once collected all perf reports from all nodes

Source code in src/common/perf.py
class PerfReportPlotter():
    """Once collected all perf reports from all nodes"""
    def __init__(self, metrics_logger):
        self.all_reports = {}
        self.metrics_logger = metrics_logger

    def save_to(self, perf_report_file_path=None):
        """Saves all reports into a json file"""
        # if no file path provided, create a temp file
        if perf_report_file_path is None:
            perf_report_file_path = tempfile.NamedTemporaryFile(suffix=".json").name

        with open(perf_report_file_path, "w") as out_file:
            out_file.write(json.dumps(self.all_reports, indent="    "))

        return perf_report_file_path

    def add_perf_reports(self, perf_reports, node):
        """Add a set of reports from a given node"""
        self.all_reports[node] = perf_reports

    def report_nodes_perf(self):
        # Currently reporting one metric per node
        for node in self.all_reports:
            # CPU UTILIZATION
            cpu_avg_utilization = [ report["cpu_pct_per_cpu_avg"] for report in self.all_reports[node] ]

            self.metrics_logger.log_metric(
                "max_t_(cpu_pct_per_cpu_avg)",
                max(cpu_avg_utilization),
                step=node
            )
            self.metrics_logger.log_metric(
                "cpu_avg_utilization_pct",
                sum(cpu_avg_utilization)/len(cpu_avg_utilization),
                step=node
            )
            self.metrics_logger.log_metric(
                "cpu_avg_utilization_at100_pct",
                sum( [ utilization >= 100.0 for utilization in cpu_avg_utilization])/len(cpu_avg_utilization)*100.0,
                step=node
            )
            self.metrics_logger.log_metric(
                "cpu_avg_utilization_over80_pct",
                sum( [ utilization >= 80.0 for utilization in cpu_avg_utilization])/len(cpu_avg_utilization)*100.0,
                step=node
            )
            self.metrics_logger.log_metric(
                "cpu_avg_utilization_over40_pct",
                sum( [ utilization >= 40.0 for utilization in cpu_avg_utilization])/len(cpu_avg_utilization)*100.0,
                step=node
            )
            self.metrics_logger.log_metric(
                "cpu_avg_utilization_over20_pct",
                sum( [ utilization >= 20.0 for utilization in cpu_avg_utilization])/len(cpu_avg_utilization)*100.0,
                step=node
            )
            self.metrics_logger.log_metric(
                "max_t_(cpu_pct_per_cpu_min)",
                max([ report["cpu_pct_per_cpu_min"] for report in self.all_reports[node] ]),
                step=node
            )
            self.metrics_logger.log_metric(
                "max_t_(cpu_pct_per_cpu_max)",
                max([ report["cpu_pct_per_cpu_max"] for report in self.all_reports[node] ]),
                step=node
            )

            # "CPU HOURS"
            job_internal_cpu_hours = (time.time() - self.all_reports[node][0]["timestamp"]) * psutil.cpu_count() / 60 / 60
            self.metrics_logger.log_metric(
                "node_cpu_hours",
                job_internal_cpu_hours,
                step=node
            )
            self.metrics_logger.log_metric(
                "node_unused_cpu_hours",
                job_internal_cpu_hours * (100.0 - sum(cpu_avg_utilization)/len(cpu_avg_utilization)) / 100.0,
                step=node
            )

            # MEM
            self.metrics_logger.log_metric(
                "max_t_(mem_percent)",
                max([ report["mem_percent"] for report in self.all_reports[node] ]),
                step=node
            )

            # DISK
            self.metrics_logger.log_metric(
                "max_t_disk_usage_percent",
                max([ report["disk_usage_percent"] for report in self.all_reports[node] ]),
                step=node
            )
            self.metrics_logger.log_metric(
                "total_disk_io_read_mb",
                max([ report["disk_io_read_mb"] for report in self.all_reports[node] ]),
                step=node
            )
            self.metrics_logger.log_metric(
                "total_disk_io_write_mb",
                max([ report["disk_io_write_mb"] for report in self.all_reports[node] ]),
                step=node
            )

            # NET I/O
            self.metrics_logger.log_metric(
                "total_net_io_lo_sent_mb",
                max([ report["net_io_lo_sent_mb"] for report in self.all_reports[node] ]),
                step=node
            )
            self.metrics_logger.log_metric(
                "total_net_io_ext_sent_mb",
                max([ report["net_io_ext_sent_mb"] for report in self.all_reports[node] ]),
                step=node
            )
            self.metrics_logger.log_metric(
                "total_net_io_lo_recv_mb",
                max([ report["net_io_lo_recv_mb"] for report in self.all_reports[node] ]),
                step=node
            )
            self.metrics_logger.log_metric(
                "total_net_io_ext_recv_mb",
                max([ report["net_io_ext_recv_mb"] for report in self.all_reports[node] ]),
                step=node
            )

save_to(perf_report_file_path=None)

Saves all reports into a json file

Source code in src/common/perf.py
def save_to(self, perf_report_file_path=None):
    """Saves all reports into a json file"""
    # if no file path provided, create a temp file
    if perf_report_file_path is None:
        perf_report_file_path = tempfile.NamedTemporaryFile(suffix=".json").name

    with open(perf_report_file_path, "w") as out_file:
        out_file.write(json.dumps(self.all_reports, indent="    "))

    return perf_report_file_path

add_perf_reports(perf_reports, node)

Add a set of reports from a given node

Source code in src/common/perf.py
def add_perf_reports(self, perf_reports, node):
    """Add a set of reports from a given node"""
    self.all_reports[node] = perf_reports