Coverage for mlos_bench/mlos_bench/environments/local/local_env.py: 96%
125 statements
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-22 01:18 +0000
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-22 01:18 +0000
1#
2# Copyright (c) Microsoft Corporation.
3# Licensed under the MIT License.
4#
5"""
6Scheduler-side benchmark environment to run scripts locally.
8TODO: Reference the script_env.py file for the base class.
9"""
11import json
12import logging
13import sys
14from contextlib import nullcontext
15from datetime import datetime
16from tempfile import TemporaryDirectory
17from types import TracebackType
18from typing import (
19 Any,
20 Dict,
21 Iterable,
22 List,
23 Literal,
24 Mapping,
25 Optional,
26 Tuple,
27 Type,
28 Union,
29)
31import pandas
33from mlos_bench.environments.base_environment import Environment
34from mlos_bench.environments.script_env import ScriptEnv
35from mlos_bench.environments.status import Status
36from mlos_bench.services.base_service import Service
37from mlos_bench.services.types.local_exec_type import SupportsLocalExec
38from mlos_bench.tunables.tunable import TunableValue
39from mlos_bench.tunables.tunable_groups import TunableGroups
40from mlos_bench.util import datetime_parser, path_join
42_LOG = logging.getLogger(__name__)
45class LocalEnv(ScriptEnv):
46 # pylint: disable=too-many-instance-attributes
47 """Scheduler-side Environment that runs scripts locally."""
49 def __init__( # pylint: disable=too-many-arguments
50 self,
51 *,
52 name: str,
53 config: dict,
54 global_config: Optional[dict] = None,
55 tunables: Optional[TunableGroups] = None,
56 service: Optional[Service] = None,
57 ):
58 """
59 Create a new environment for local execution.
61 Parameters
62 ----------
63 name: str
64 Human-readable name of the environment.
65 config : dict
66 Free-format dictionary that contains the benchmark environment
67 configuration. Each config must have at least the "tunable_params"
68 and the "const_args" sections.
69 `LocalEnv` must also have at least some of the following parameters:
70 {setup, run, teardown, dump_params_file, read_results_file}
71 global_config : dict
72 Free-format dictionary of global parameters (e.g., security credentials)
73 to be mixed in into the "const_args" section of the local config.
74 tunables : TunableGroups
75 A collection of tunable parameters for *all* environments.
76 service: Service
77 An optional service object (e.g., providing methods to
78 deploy or reboot a VM, etc.).
79 """
80 super().__init__(
81 name=name,
82 config=config,
83 global_config=global_config,
84 tunables=tunables,
85 service=service,
86 )
88 assert self._service is not None and isinstance(
89 self._service, SupportsLocalExec
90 ), "LocalEnv requires a service that supports local execution"
91 self._local_exec_service: SupportsLocalExec = self._service
93 self._temp_dir: Optional[str] = None
94 self._temp_dir_context: Union[TemporaryDirectory, nullcontext, None] = None
96 self._dump_params_file: Optional[str] = self.config.get("dump_params_file")
97 self._dump_meta_file: Optional[str] = self.config.get("dump_meta_file")
99 self._read_results_file: Optional[str] = self.config.get("read_results_file")
100 self._read_telemetry_file: Optional[str] = self.config.get("read_telemetry_file")
102 def __enter__(self) -> Environment:
103 assert self._temp_dir is None and self._temp_dir_context is None
104 self._temp_dir_context = self._local_exec_service.temp_dir_context(
105 self.config.get("temp_dir"),
106 )
107 self._temp_dir = self._temp_dir_context.__enter__()
108 return super().__enter__()
110 def __exit__(
111 self,
112 ex_type: Optional[Type[BaseException]],
113 ex_val: Optional[BaseException],
114 ex_tb: Optional[TracebackType],
115 ) -> Literal[False]:
116 """Exit the context of the benchmarking environment."""
117 assert not (self._temp_dir is None or self._temp_dir_context is None)
118 self._temp_dir_context.__exit__(ex_type, ex_val, ex_tb)
119 self._temp_dir = None
120 self._temp_dir_context = None
121 return super().__exit__(ex_type, ex_val, ex_tb)
123 def setup(self, tunables: TunableGroups, global_config: Optional[dict] = None) -> bool:
124 """
125 Check if the environment is ready and set up the application and benchmarks, if
126 necessary.
128 Parameters
129 ----------
130 tunables : TunableGroups
131 A collection of tunable OS and application parameters along with their
132 values. In a local environment these could be used to prepare a config
133 file on the scheduler prior to transferring it to the remote environment,
134 for instance.
135 global_config : dict
136 Free-format dictionary of global parameters of the environment
137 that are not used in the optimization process.
139 Returns
140 -------
141 is_success : bool
142 True if operation is successful, false otherwise.
143 """
144 if not super().setup(tunables, global_config):
145 return False
147 _LOG.info("Set up the environment locally: '%s' at %s", self, self._temp_dir)
148 assert self._temp_dir is not None
150 if self._dump_params_file:
151 fname = path_join(self._temp_dir, self._dump_params_file)
152 _LOG.debug("Dump tunables to file: %s", fname)
153 with open(fname, "wt", encoding="utf-8") as fh_tunables:
154 # json.dump(self._params, fh_tunables) # Tunables *and* const_args
155 json.dump(self._tunable_params.get_param_values(), fh_tunables)
157 if self._dump_meta_file:
158 fname = path_join(self._temp_dir, self._dump_meta_file)
159 _LOG.debug("Dump tunables metadata to file: %s", fname)
160 with open(fname, "wt", encoding="utf-8") as fh_meta:
161 json.dump(
162 {
163 tunable.name: tunable.meta
164 for (tunable, _group) in self._tunable_params
165 if tunable.meta
166 },
167 fh_meta,
168 )
170 if self._script_setup:
171 (return_code, _output) = self._local_exec(self._script_setup, self._temp_dir)
172 self._is_ready = bool(return_code == 0)
173 else:
174 self._is_ready = True
176 return self._is_ready
178 def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]:
179 """
180 Run a script in the local scheduler environment.
182 Returns
183 -------
184 (status, timestamp, output) : (Status, datetime.datetime, dict)
185 3-tuple of (Status, timestamp, output) values, where `output` is a dict
186 with the results or None if the status is not COMPLETED.
187 If run script is a benchmark, then the score is usually expected to
188 be in the `score` field.
189 """
190 (status, timestamp, _) = result = super().run()
191 if not status.is_ready():
192 return result
194 assert self._temp_dir is not None
196 stdout_data: Dict[str, TunableValue] = {}
197 if self._script_run:
198 (return_code, output) = self._local_exec(self._script_run, self._temp_dir)
199 if return_code != 0:
200 return (Status.FAILED, timestamp, None)
201 stdout_data = self._extract_stdout_results(output.get("stdout", ""))
203 # FIXME: We should not be assuming that the only output file type is a CSV.
204 if not self._read_results_file:
205 _LOG.debug("Not reading the data at: %s", self)
206 return (Status.SUCCEEDED, timestamp, stdout_data)
208 data = self._normalize_columns(
209 pandas.read_csv(
210 self._config_loader_service.resolve_path(
211 self._read_results_file,
212 extra_paths=[self._temp_dir],
213 ),
214 index_col=False,
215 )
216 )
218 _LOG.debug("Read data:\n%s", data)
219 if list(data.columns) == ["metric", "value"]:
220 _LOG.info(
221 "Local results have (metric,value) header and %d rows: assume long format",
222 len(data),
223 )
224 data = pandas.DataFrame([data.value.to_list()], columns=data.metric.to_list())
225 # Try to convert string metrics to numbers.
226 data = data.apply(
227 pandas.to_numeric,
228 errors="coerce",
229 ).fillna(data)
230 elif len(data) == 1:
231 _LOG.info("Local results have 1 row: assume wide format")
232 else:
233 raise ValueError(f"Invalid data format: {data}")
235 stdout_data.update(data.iloc[-1].to_dict())
236 _LOG.info("Local run complete: %s ::\n%s", self, stdout_data)
237 return (Status.SUCCEEDED, timestamp, stdout_data)
239 @staticmethod
240 def _normalize_columns(data: pandas.DataFrame) -> pandas.DataFrame:
241 """Strip trailing spaces from column names (Windows only)."""
242 # Windows cmd interpretation of > redirect symbols can leave trailing spaces in
243 # the final column, which leads to misnamed columns.
244 # For now, we simply strip trailing spaces from column names to account for that.
245 if sys.platform == "win32":
246 data.rename(str.rstrip, axis="columns", inplace=True)
247 return data
249 def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]:
251 (status, timestamp, _) = super().status()
252 if not (self._is_ready and self._read_telemetry_file):
253 return (status, timestamp, [])
255 assert self._temp_dir is not None
256 try:
257 fname = self._config_loader_service.resolve_path(
258 self._read_telemetry_file,
259 extra_paths=[self._temp_dir],
260 )
262 # TODO: Use the timestamp of the CSV file as our status timestamp?
264 # FIXME: We should not be assuming that the only output file type is a CSV.
266 data = self._normalize_columns(pandas.read_csv(fname, index_col=False))
267 data.iloc[:, 0] = datetime_parser(data.iloc[:, 0], origin="local")
269 expected_col_names = ["timestamp", "metric", "value"]
270 if len(data.columns) != len(expected_col_names):
271 raise ValueError(f"Telemetry data must have columns {expected_col_names}")
273 if list(data.columns) != expected_col_names:
274 # Assume no header - this is ok for telemetry data.
275 data = pandas.read_csv(fname, index_col=False, names=expected_col_names)
276 data.iloc[:, 0] = datetime_parser(data.iloc[:, 0], origin="local")
278 except FileNotFoundError as ex:
279 _LOG.warning("Telemetry CSV file not found: %s :: %s", self._read_telemetry_file, ex)
280 return (status, timestamp, [])
282 _LOG.debug("Read telemetry data:\n%s", data)
283 col_dtypes: Mapping[int, Type] = {0: datetime}
284 return (
285 status,
286 timestamp,
287 [
288 (pandas.Timestamp(ts).to_pydatetime(), metric, value)
289 for (ts, metric, value) in data.to_records(index=False, column_dtypes=col_dtypes)
290 ],
291 )
293 def teardown(self) -> None:
294 """Clean up the local environment."""
295 if self._script_teardown:
296 _LOG.info("Local teardown: %s", self)
297 (return_code, _output) = self._local_exec(self._script_teardown)
298 _LOG.info("Local teardown complete: %s :: %s", self, return_code)
299 super().teardown()
301 def _local_exec(self, script: Iterable[str], cwd: Optional[str] = None) -> Tuple[int, dict]:
302 """
303 Execute a script locally in the scheduler environment.
305 Parameters
306 ----------
307 script : Iterable[str]
308 Lines of the script to run locally.
309 Treat every line as a separate command to run.
310 cwd : Optional[str]
311 Work directory to run the script at.
313 Returns
314 -------
315 (return_code, output) : (int, dict)
316 Return code of the script and a dict with stdout/stderr. Return code = 0 if successful.
317 """
318 env_params = self._get_env_params()
319 _LOG.info("Run script locally on: %s at %s with env %s", self, cwd, env_params)
320 (return_code, stdout, stderr) = self._local_exec_service.local_exec(
321 script,
322 env=env_params,
323 cwd=cwd,
324 )
325 if return_code != 0:
326 _LOG.warning("ERROR: Local script returns code %d stderr:\n%s", return_code, stderr)
327 return (return_code, {"stdout": stdout, "stderr": stderr})