Coverage for mlos_bench/mlos_bench/schedulers/sync_scheduler.py: 88%
33 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-06 00:35 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-06 00:35 +0000
1#
2# Copyright (c) Microsoft Corporation.
3# Licensed under the MIT License.
4#
5"""
6A simple single-threaded synchronous optimization loop implementation.
7"""
9import logging
10from datetime import datetime
12from pytz import UTC
14from mlos_bench.environments.status import Status
15from mlos_bench.schedulers.base_scheduler import Scheduler
16from mlos_bench.storage.base_storage import Storage
18_LOG = logging.getLogger(__name__)
21class SyncScheduler(Scheduler):
22 """
23 A simple single-threaded synchronous optimization loop implementation.
24 """
26 def start(self) -> None:
27 """
28 Start the optimization loop.
29 """
30 super().start()
32 is_warm_up = self.optimizer.supports_preload
33 if not is_warm_up:
34 _LOG.warning("Skip pending trials and warm-up: %s", self.optimizer)
36 not_done = True
37 while not_done:
38 _LOG.info("Optimization loop: Last trial ID: %d", self._last_trial_id)
39 self._run_schedule(is_warm_up)
40 not_done = self._schedule_new_optimizer_suggestions()
41 is_warm_up = False
43 def run_trial(self, trial: Storage.Trial) -> None:
44 """
45 Set up and run a single trial. Save the results in the storage.
46 """
47 super().run_trial(trial)
49 if not self.environment.setup(trial.tunables, trial.config(self.global_config)):
50 _LOG.warning("Setup failed: %s :: %s", self.environment, trial.tunables)
51 # FIXME: Use the actual timestamp from the environment.
52 _LOG.info("QUEUE: Update trial results: %s :: %s", trial, Status.FAILED)
53 trial.update(Status.FAILED, datetime.now(UTC))
54 return
56 (status, timestamp, results) = self.environment.run() # Block and wait for the final result.
57 _LOG.info("Results: %s :: %s\n%s", trial.tunables, status, results)
59 # In async mode (TODO), poll the environment for status and telemetry
60 # and update the storage with the intermediate results.
61 (_status, _timestamp, telemetry) = self.environment.status()
63 # Use the status and timestamp from `.run()` as it is the final status of the experiment.
64 # TODO: Use the `.status()` output in async mode.
65 trial.update_telemetry(status, timestamp, telemetry)
67 trial.update(status, timestamp, results)
68 _LOG.info("QUEUE: Update trial results: %s :: %s %s", trial, status, results)