Coverage for mlos_bench/mlos_bench/schedulers/sync_scheduler.py: 88%

33 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-06 00:35 +0000

1# 

2# Copyright (c) Microsoft Corporation. 

3# Licensed under the MIT License. 

4# 

5""" 

6A simple single-threaded synchronous optimization loop implementation. 

7""" 

8 

9import logging 

10from datetime import datetime 

11 

12from pytz import UTC 

13 

14from mlos_bench.environments.status import Status 

15from mlos_bench.schedulers.base_scheduler import Scheduler 

16from mlos_bench.storage.base_storage import Storage 

17 

18_LOG = logging.getLogger(__name__) 

19 

20 

21class SyncScheduler(Scheduler): 

22 """ 

23 A simple single-threaded synchronous optimization loop implementation. 

24 """ 

25 

26 def start(self) -> None: 

27 """ 

28 Start the optimization loop. 

29 """ 

30 super().start() 

31 

32 is_warm_up = self.optimizer.supports_preload 

33 if not is_warm_up: 

34 _LOG.warning("Skip pending trials and warm-up: %s", self.optimizer) 

35 

36 not_done = True 

37 while not_done: 

38 _LOG.info("Optimization loop: Last trial ID: %d", self._last_trial_id) 

39 self._run_schedule(is_warm_up) 

40 not_done = self._schedule_new_optimizer_suggestions() 

41 is_warm_up = False 

42 

43 def run_trial(self, trial: Storage.Trial) -> None: 

44 """ 

45 Set up and run a single trial. Save the results in the storage. 

46 """ 

47 super().run_trial(trial) 

48 

49 if not self.environment.setup(trial.tunables, trial.config(self.global_config)): 

50 _LOG.warning("Setup failed: %s :: %s", self.environment, trial.tunables) 

51 # FIXME: Use the actual timestamp from the environment. 

52 _LOG.info("QUEUE: Update trial results: %s :: %s", trial, Status.FAILED) 

53 trial.update(Status.FAILED, datetime.now(UTC)) 

54 return 

55 

56 (status, timestamp, results) = self.environment.run() # Block and wait for the final result. 

57 _LOG.info("Results: %s :: %s\n%s", trial.tunables, status, results) 

58 

59 # In async mode (TODO), poll the environment for status and telemetry 

60 # and update the storage with the intermediate results. 

61 (_status, _timestamp, telemetry) = self.environment.status() 

62 

63 # Use the status and timestamp from `.run()` as it is the final status of the experiment. 

64 # TODO: Use the `.status()` output in async mode. 

65 trial.update_telemetry(status, timestamp, telemetry) 

66 

67 trial.update(status, timestamp, results) 

68 _LOG.info("QUEUE: Update trial results: %s :: %s %s", trial, status, results)