Coverage for mlos_bench/mlos_bench/environments/local/local_env.py: 88%

127 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-06 00:35 +0000

1# 

2# Copyright (c) Microsoft Corporation. 

3# Licensed under the MIT License. 

4# 

5""" 

6Scheduler-side benchmark environment to run scripts locally. 

7""" 

8 

9import json 

10import logging 

11import sys 

12 

13from datetime import datetime 

14from tempfile import TemporaryDirectory 

15from contextlib import nullcontext 

16 

17from types import TracebackType 

18from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Type, Union 

19from typing_extensions import Literal 

20 

21import pandas 

22 

23from mlos_bench.environments.status import Status 

24from mlos_bench.environments.base_environment import Environment 

25from mlos_bench.environments.script_env import ScriptEnv 

26from mlos_bench.services.base_service import Service 

27from mlos_bench.services.types.local_exec_type import SupportsLocalExec 

28from mlos_bench.tunables.tunable import TunableValue 

29from mlos_bench.tunables.tunable_groups import TunableGroups 

30from mlos_bench.util import datetime_parser, path_join 

31 

32_LOG = logging.getLogger(__name__) 

33 

34 

35class LocalEnv(ScriptEnv): 

36 # pylint: disable=too-many-instance-attributes 

37 """ 

38 Scheduler-side Environment that runs scripts locally. 

39 """ 

40 

41 def __init__(self, 

42 *, 

43 name: str, 

44 config: dict, 

45 global_config: Optional[dict] = None, 

46 tunables: Optional[TunableGroups] = None, 

47 service: Optional[Service] = None): 

48 """ 

49 Create a new environment for local execution. 

50 

51 Parameters 

52 ---------- 

53 name: str 

54 Human-readable name of the environment. 

55 config : dict 

56 Free-format dictionary that contains the benchmark environment 

57 configuration. Each config must have at least the "tunable_params" 

58 and the "const_args" sections. 

59 `LocalEnv` must also have at least some of the following parameters: 

60 {setup, run, teardown, dump_params_file, read_results_file} 

61 global_config : dict 

62 Free-format dictionary of global parameters (e.g., security credentials) 

63 to be mixed in into the "const_args" section of the local config. 

64 tunables : TunableGroups 

65 A collection of tunable parameters for *all* environments. 

66 service: Service 

67 An optional service object (e.g., providing methods to 

68 deploy or reboot a VM, etc.). 

69 """ 

70 super().__init__(name=name, config=config, global_config=global_config, 

71 tunables=tunables, service=service) 

72 

73 assert self._service is not None and isinstance(self._service, SupportsLocalExec), \ 

74 "LocalEnv requires a service that supports local execution" 

75 self._local_exec_service: SupportsLocalExec = self._service 

76 

77 self._temp_dir: Optional[str] = None 

78 self._temp_dir_context: Union[TemporaryDirectory, nullcontext, None] = None 

79 

80 self._dump_params_file: Optional[str] = self.config.get("dump_params_file") 

81 self._dump_meta_file: Optional[str] = self.config.get("dump_meta_file") 

82 

83 self._read_results_file: Optional[str] = self.config.get("read_results_file") 

84 self._read_telemetry_file: Optional[str] = self.config.get("read_telemetry_file") 

85 

86 def __enter__(self) -> Environment: 

87 assert self._temp_dir is None and self._temp_dir_context is None 

88 self._temp_dir_context = self._local_exec_service.temp_dir_context(self.config.get("temp_dir")) 

89 self._temp_dir = self._temp_dir_context.__enter__() 

90 return super().__enter__() 

91 

92 def __exit__(self, ex_type: Optional[Type[BaseException]], 

93 ex_val: Optional[BaseException], 

94 ex_tb: Optional[TracebackType]) -> Literal[False]: 

95 """ 

96 Exit the context of the benchmarking environment. 

97 """ 

98 assert not (self._temp_dir is None or self._temp_dir_context is None) 

99 self._temp_dir_context.__exit__(ex_type, ex_val, ex_tb) 

100 self._temp_dir = None 

101 self._temp_dir_context = None 

102 return super().__exit__(ex_type, ex_val, ex_tb) 

103 

104 def setup(self, tunables: TunableGroups, global_config: Optional[dict] = None) -> bool: 

105 """ 

106 Check if the environment is ready and set up the application 

107 and benchmarks, if necessary. 

108 

109 Parameters 

110 ---------- 

111 tunables : TunableGroups 

112 A collection of tunable OS and application parameters along with their 

113 values. In a local environment these could be used to prepare a config 

114 file on the scheduler prior to transferring it to the remote environment, 

115 for instance. 

116 global_config : dict 

117 Free-format dictionary of global parameters of the environment 

118 that are not used in the optimization process. 

119 

120 Returns 

121 ------- 

122 is_success : bool 

123 True if operation is successful, false otherwise. 

124 """ 

125 if not super().setup(tunables, global_config): 

126 return False 

127 

128 _LOG.info("Set up the environment locally: '%s' at %s", self, self._temp_dir) 

129 assert self._temp_dir is not None 

130 

131 if self._dump_params_file: 

132 fname = path_join(self._temp_dir, self._dump_params_file) 

133 _LOG.debug("Dump tunables to file: %s", fname) 

134 with open(fname, "wt", encoding="utf-8") as fh_tunables: 

135 # json.dump(self._params, fh_tunables) # Tunables *and* const_args 

136 json.dump(self._tunable_params.get_param_values(), fh_tunables) 

137 

138 if self._dump_meta_file: 

139 fname = path_join(self._temp_dir, self._dump_meta_file) 

140 _LOG.debug("Dump tunables metadata to file: %s", fname) 

141 with open(fname, "wt", encoding="utf-8") as fh_meta: 

142 json.dump({ 

143 tunable.name: tunable.meta 

144 for (tunable, _group) in self._tunable_params if tunable.meta 

145 }, fh_meta) 

146 

147 if self._script_setup: 

148 (return_code, _output) = self._local_exec(self._script_setup, self._temp_dir) 

149 self._is_ready = bool(return_code == 0) 

150 else: 

151 self._is_ready = True 

152 

153 return self._is_ready 

154 

155 def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]: 

156 """ 

157 Run a script in the local scheduler environment. 

158 

159 Returns 

160 ------- 

161 (status, timestamp, output) : (Status, datetime, dict) 

162 3-tuple of (Status, timestamp, output) values, where `output` is a dict 

163 with the results or None if the status is not COMPLETED. 

164 If run script is a benchmark, then the score is usually expected to 

165 be in the `score` field. 

166 """ 

167 (status, timestamp, _) = result = super().run() 

168 if not status.is_ready(): 

169 return result 

170 

171 assert self._temp_dir is not None 

172 

173 stdout_data: Dict[str, TunableValue] = {} 

174 if self._script_run: 

175 (return_code, output) = self._local_exec(self._script_run, self._temp_dir) 

176 if return_code != 0: 

177 return (Status.FAILED, timestamp, None) 

178 stdout_data = self._extract_stdout_results(output.get("stdout", "")) 

179 

180 # FIXME: We should not be assuming that the only output file type is a CSV. 

181 if not self._read_results_file: 

182 _LOG.debug("Not reading the data at: %s", self) 

183 return (Status.SUCCEEDED, timestamp, stdout_data) 

184 

185 data = self._normalize_columns(pandas.read_csv( 

186 self._config_loader_service.resolve_path( 

187 self._read_results_file, extra_paths=[self._temp_dir]), 

188 index_col=False, 

189 )) 

190 

191 _LOG.debug("Read data:\n%s", data) 

192 if list(data.columns) == ["metric", "value"]: 

193 _LOG.info("Local results have (metric,value) header and %d rows: assume long format", len(data)) 

194 data = pandas.DataFrame([data.value.to_list()], columns=data.metric.to_list()) 

195 # Try to convert string metrics to numbers. 

196 data = data.apply(pandas.to_numeric, errors='coerce').fillna(data) # type: ignore[assignment] # (false positive) 

197 elif len(data) == 1: 

198 _LOG.info("Local results have 1 row: assume wide format") 

199 else: 

200 raise ValueError(f"Invalid data format: {data}") 

201 

202 stdout_data.update(data.iloc[-1].to_dict()) 

203 _LOG.info("Local run complete: %s ::\n%s", self, stdout_data) 

204 return (Status.SUCCEEDED, timestamp, stdout_data) 

205 

206 @staticmethod 

207 def _normalize_columns(data: pandas.DataFrame) -> pandas.DataFrame: 

208 """ 

209 Strip trailing spaces from column names (Windows only). 

210 """ 

211 # Windows cmd interpretation of > redirect symbols can leave trailing spaces in 

212 # the final column, which leads to misnamed columns. 

213 # For now, we simply strip trailing spaces from column names to account for that. 

214 if sys.platform == 'win32': 

215 data.rename(str.rstrip, axis='columns', inplace=True) 

216 return data 

217 

218 def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]: 

219 

220 (status, timestamp, _) = super().status() 

221 if not (self._is_ready and self._read_telemetry_file): 

222 return (status, timestamp, []) 

223 

224 assert self._temp_dir is not None 

225 try: 

226 fname = self._config_loader_service.resolve_path( 

227 self._read_telemetry_file, extra_paths=[self._temp_dir]) 

228 

229 # TODO: Use the timestamp of the CSV file as our status timestamp? 

230 

231 # FIXME: We should not be assuming that the only output file type is a CSV. 

232 

233 data = self._normalize_columns( 

234 pandas.read_csv(fname, index_col=False)) 

235 data.iloc[:, 0] = datetime_parser(data.iloc[:, 0], origin="local") 

236 

237 expected_col_names = ["timestamp", "metric", "value"] 

238 if len(data.columns) != len(expected_col_names): 

239 raise ValueError(f'Telemetry data must have columns {expected_col_names}') 

240 

241 if list(data.columns) != expected_col_names: 

242 # Assume no header - this is ok for telemetry data. 

243 data = pandas.read_csv( 

244 fname, index_col=False, names=expected_col_names) 

245 data.iloc[:, 0] = datetime_parser(data.iloc[:, 0], origin="local") 

246 

247 except FileNotFoundError as ex: 

248 _LOG.warning("Telemetry CSV file not found: %s :: %s", self._read_telemetry_file, ex) 

249 return (status, timestamp, []) 

250 

251 _LOG.debug("Read telemetry data:\n%s", data) 

252 col_dtypes: Mapping[int, Type] = {0: datetime} 

253 return (status, timestamp, [ 

254 (pandas.Timestamp(ts).to_pydatetime(), metric, value) 

255 for (ts, metric, value) in data.to_records(index=False, column_dtypes=col_dtypes) 

256 ]) 

257 

258 def teardown(self) -> None: 

259 """ 

260 Clean up the local environment. 

261 """ 

262 if self._script_teardown: 

263 _LOG.info("Local teardown: %s", self) 

264 (return_code, _output) = self._local_exec(self._script_teardown) 

265 _LOG.info("Local teardown complete: %s :: %s", self, return_code) 

266 super().teardown() 

267 

268 def _local_exec(self, script: Iterable[str], cwd: Optional[str] = None) -> Tuple[int, dict]: 

269 """ 

270 Execute a script locally in the scheduler environment. 

271 

272 Parameters 

273 ---------- 

274 script : Iterable[str] 

275 Lines of the script to run locally. 

276 Treat every line as a separate command to run. 

277 cwd : Optional[str] 

278 Work directory to run the script at. 

279 

280 Returns 

281 ------- 

282 (return_code, output) : (int, dict) 

283 Return code of the script and a dict with stdout/stderr. Return code = 0 if successful. 

284 """ 

285 env_params = self._get_env_params() 

286 _LOG.info("Run script locally on: %s at %s with env %s", self, cwd, env_params) 

287 (return_code, stdout, stderr) = self._local_exec_service.local_exec( 

288 script, env=env_params, cwd=cwd) 

289 if return_code != 0: 

290 _LOG.warning("ERROR: Local script returns code %d stderr:\n%s", return_code, stderr) 

291 return (return_code, {"stdout": stdout, "stderr": stderr})