Coverage for mlos_bench/mlos_bench/environments/local/local_env.py: 88%
127 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-06 00:35 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-06 00:35 +0000
1#
2# Copyright (c) Microsoft Corporation.
3# Licensed under the MIT License.
4#
5"""
6Scheduler-side benchmark environment to run scripts locally.
7"""
9import json
10import logging
11import sys
13from datetime import datetime
14from tempfile import TemporaryDirectory
15from contextlib import nullcontext
17from types import TracebackType
18from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Type, Union
19from typing_extensions import Literal
21import pandas
23from mlos_bench.environments.status import Status
24from mlos_bench.environments.base_environment import Environment
25from mlos_bench.environments.script_env import ScriptEnv
26from mlos_bench.services.base_service import Service
27from mlos_bench.services.types.local_exec_type import SupportsLocalExec
28from mlos_bench.tunables.tunable import TunableValue
29from mlos_bench.tunables.tunable_groups import TunableGroups
30from mlos_bench.util import datetime_parser, path_join
32_LOG = logging.getLogger(__name__)
35class LocalEnv(ScriptEnv):
36 # pylint: disable=too-many-instance-attributes
37 """
38 Scheduler-side Environment that runs scripts locally.
39 """
41 def __init__(self,
42 *,
43 name: str,
44 config: dict,
45 global_config: Optional[dict] = None,
46 tunables: Optional[TunableGroups] = None,
47 service: Optional[Service] = None):
48 """
49 Create a new environment for local execution.
51 Parameters
52 ----------
53 name: str
54 Human-readable name of the environment.
55 config : dict
56 Free-format dictionary that contains the benchmark environment
57 configuration. Each config must have at least the "tunable_params"
58 and the "const_args" sections.
59 `LocalEnv` must also have at least some of the following parameters:
60 {setup, run, teardown, dump_params_file, read_results_file}
61 global_config : dict
62 Free-format dictionary of global parameters (e.g., security credentials)
63 to be mixed in into the "const_args" section of the local config.
64 tunables : TunableGroups
65 A collection of tunable parameters for *all* environments.
66 service: Service
67 An optional service object (e.g., providing methods to
68 deploy or reboot a VM, etc.).
69 """
70 super().__init__(name=name, config=config, global_config=global_config,
71 tunables=tunables, service=service)
73 assert self._service is not None and isinstance(self._service, SupportsLocalExec), \
74 "LocalEnv requires a service that supports local execution"
75 self._local_exec_service: SupportsLocalExec = self._service
77 self._temp_dir: Optional[str] = None
78 self._temp_dir_context: Union[TemporaryDirectory, nullcontext, None] = None
80 self._dump_params_file: Optional[str] = self.config.get("dump_params_file")
81 self._dump_meta_file: Optional[str] = self.config.get("dump_meta_file")
83 self._read_results_file: Optional[str] = self.config.get("read_results_file")
84 self._read_telemetry_file: Optional[str] = self.config.get("read_telemetry_file")
86 def __enter__(self) -> Environment:
87 assert self._temp_dir is None and self._temp_dir_context is None
88 self._temp_dir_context = self._local_exec_service.temp_dir_context(self.config.get("temp_dir"))
89 self._temp_dir = self._temp_dir_context.__enter__()
90 return super().__enter__()
92 def __exit__(self, ex_type: Optional[Type[BaseException]],
93 ex_val: Optional[BaseException],
94 ex_tb: Optional[TracebackType]) -> Literal[False]:
95 """
96 Exit the context of the benchmarking environment.
97 """
98 assert not (self._temp_dir is None or self._temp_dir_context is None)
99 self._temp_dir_context.__exit__(ex_type, ex_val, ex_tb)
100 self._temp_dir = None
101 self._temp_dir_context = None
102 return super().__exit__(ex_type, ex_val, ex_tb)
104 def setup(self, tunables: TunableGroups, global_config: Optional[dict] = None) -> bool:
105 """
106 Check if the environment is ready and set up the application
107 and benchmarks, if necessary.
109 Parameters
110 ----------
111 tunables : TunableGroups
112 A collection of tunable OS and application parameters along with their
113 values. In a local environment these could be used to prepare a config
114 file on the scheduler prior to transferring it to the remote environment,
115 for instance.
116 global_config : dict
117 Free-format dictionary of global parameters of the environment
118 that are not used in the optimization process.
120 Returns
121 -------
122 is_success : bool
123 True if operation is successful, false otherwise.
124 """
125 if not super().setup(tunables, global_config):
126 return False
128 _LOG.info("Set up the environment locally: '%s' at %s", self, self._temp_dir)
129 assert self._temp_dir is not None
131 if self._dump_params_file:
132 fname = path_join(self._temp_dir, self._dump_params_file)
133 _LOG.debug("Dump tunables to file: %s", fname)
134 with open(fname, "wt", encoding="utf-8") as fh_tunables:
135 # json.dump(self._params, fh_tunables) # Tunables *and* const_args
136 json.dump(self._tunable_params.get_param_values(), fh_tunables)
138 if self._dump_meta_file:
139 fname = path_join(self._temp_dir, self._dump_meta_file)
140 _LOG.debug("Dump tunables metadata to file: %s", fname)
141 with open(fname, "wt", encoding="utf-8") as fh_meta:
142 json.dump({
143 tunable.name: tunable.meta
144 for (tunable, _group) in self._tunable_params if tunable.meta
145 }, fh_meta)
147 if self._script_setup:
148 (return_code, _output) = self._local_exec(self._script_setup, self._temp_dir)
149 self._is_ready = bool(return_code == 0)
150 else:
151 self._is_ready = True
153 return self._is_ready
155 def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]:
156 """
157 Run a script in the local scheduler environment.
159 Returns
160 -------
161 (status, timestamp, output) : (Status, datetime, dict)
162 3-tuple of (Status, timestamp, output) values, where `output` is a dict
163 with the results or None if the status is not COMPLETED.
164 If run script is a benchmark, then the score is usually expected to
165 be in the `score` field.
166 """
167 (status, timestamp, _) = result = super().run()
168 if not status.is_ready():
169 return result
171 assert self._temp_dir is not None
173 stdout_data: Dict[str, TunableValue] = {}
174 if self._script_run:
175 (return_code, output) = self._local_exec(self._script_run, self._temp_dir)
176 if return_code != 0:
177 return (Status.FAILED, timestamp, None)
178 stdout_data = self._extract_stdout_results(output.get("stdout", ""))
180 # FIXME: We should not be assuming that the only output file type is a CSV.
181 if not self._read_results_file:
182 _LOG.debug("Not reading the data at: %s", self)
183 return (Status.SUCCEEDED, timestamp, stdout_data)
185 data = self._normalize_columns(pandas.read_csv(
186 self._config_loader_service.resolve_path(
187 self._read_results_file, extra_paths=[self._temp_dir]),
188 index_col=False,
189 ))
191 _LOG.debug("Read data:\n%s", data)
192 if list(data.columns) == ["metric", "value"]:
193 _LOG.info("Local results have (metric,value) header and %d rows: assume long format", len(data))
194 data = pandas.DataFrame([data.value.to_list()], columns=data.metric.to_list())
195 # Try to convert string metrics to numbers.
196 data = data.apply(pandas.to_numeric, errors='coerce').fillna(data) # type: ignore[assignment] # (false positive)
197 elif len(data) == 1:
198 _LOG.info("Local results have 1 row: assume wide format")
199 else:
200 raise ValueError(f"Invalid data format: {data}")
202 stdout_data.update(data.iloc[-1].to_dict())
203 _LOG.info("Local run complete: %s ::\n%s", self, stdout_data)
204 return (Status.SUCCEEDED, timestamp, stdout_data)
206 @staticmethod
207 def _normalize_columns(data: pandas.DataFrame) -> pandas.DataFrame:
208 """
209 Strip trailing spaces from column names (Windows only).
210 """
211 # Windows cmd interpretation of > redirect symbols can leave trailing spaces in
212 # the final column, which leads to misnamed columns.
213 # For now, we simply strip trailing spaces from column names to account for that.
214 if sys.platform == 'win32':
215 data.rename(str.rstrip, axis='columns', inplace=True)
216 return data
218 def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]:
220 (status, timestamp, _) = super().status()
221 if not (self._is_ready and self._read_telemetry_file):
222 return (status, timestamp, [])
224 assert self._temp_dir is not None
225 try:
226 fname = self._config_loader_service.resolve_path(
227 self._read_telemetry_file, extra_paths=[self._temp_dir])
229 # TODO: Use the timestamp of the CSV file as our status timestamp?
231 # FIXME: We should not be assuming that the only output file type is a CSV.
233 data = self._normalize_columns(
234 pandas.read_csv(fname, index_col=False))
235 data.iloc[:, 0] = datetime_parser(data.iloc[:, 0], origin="local")
237 expected_col_names = ["timestamp", "metric", "value"]
238 if len(data.columns) != len(expected_col_names):
239 raise ValueError(f'Telemetry data must have columns {expected_col_names}')
241 if list(data.columns) != expected_col_names:
242 # Assume no header - this is ok for telemetry data.
243 data = pandas.read_csv(
244 fname, index_col=False, names=expected_col_names)
245 data.iloc[:, 0] = datetime_parser(data.iloc[:, 0], origin="local")
247 except FileNotFoundError as ex:
248 _LOG.warning("Telemetry CSV file not found: %s :: %s", self._read_telemetry_file, ex)
249 return (status, timestamp, [])
251 _LOG.debug("Read telemetry data:\n%s", data)
252 col_dtypes: Mapping[int, Type] = {0: datetime}
253 return (status, timestamp, [
254 (pandas.Timestamp(ts).to_pydatetime(), metric, value)
255 for (ts, metric, value) in data.to_records(index=False, column_dtypes=col_dtypes)
256 ])
258 def teardown(self) -> None:
259 """
260 Clean up the local environment.
261 """
262 if self._script_teardown:
263 _LOG.info("Local teardown: %s", self)
264 (return_code, _output) = self._local_exec(self._script_teardown)
265 _LOG.info("Local teardown complete: %s :: %s", self, return_code)
266 super().teardown()
268 def _local_exec(self, script: Iterable[str], cwd: Optional[str] = None) -> Tuple[int, dict]:
269 """
270 Execute a script locally in the scheduler environment.
272 Parameters
273 ----------
274 script : Iterable[str]
275 Lines of the script to run locally.
276 Treat every line as a separate command to run.
277 cwd : Optional[str]
278 Work directory to run the script at.
280 Returns
281 -------
282 (return_code, output) : (int, dict)
283 Return code of the script and a dict with stdout/stderr. Return code = 0 if successful.
284 """
285 env_params = self._get_env_params()
286 _LOG.info("Run script locally on: %s at %s with env %s", self, cwd, env_params)
287 (return_code, stdout, stderr) = self._local_exec_service.local_exec(
288 script, env=env_params, cwd=cwd)
289 if return_code != 0:
290 _LOG.warning("ERROR: Local script returns code %d stderr:\n%s", return_code, stderr)
291 return (return_code, {"stdout": stdout, "stderr": stderr})