Coverage for mlos_bench/mlos_bench/environments/local/local_env.py: 96%

125 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-20 00:44 +0000

1# 

2# Copyright (c) Microsoft Corporation. 

3# Licensed under the MIT License. 

4# 

5""" 

6Scheduler-side benchmark environment to run scripts locally. 

7 

8TODO: Reference the script_env.py file for the base class. 

9""" 

10 

11import json 

12import logging 

13import sys 

14from contextlib import nullcontext 

15from datetime import datetime 

16from tempfile import TemporaryDirectory 

17from types import TracebackType 

18from typing import ( 

19 Any, 

20 Dict, 

21 Iterable, 

22 List, 

23 Literal, 

24 Mapping, 

25 Optional, 

26 Tuple, 

27 Type, 

28 Union, 

29) 

30 

31import pandas 

32 

33from mlos_bench.environments.base_environment import Environment 

34from mlos_bench.environments.script_env import ScriptEnv 

35from mlos_bench.environments.status import Status 

36from mlos_bench.services.base_service import Service 

37from mlos_bench.services.types.local_exec_type import SupportsLocalExec 

38from mlos_bench.tunables.tunable import TunableValue 

39from mlos_bench.tunables.tunable_groups import TunableGroups 

40from mlos_bench.util import datetime_parser, path_join 

41 

42_LOG = logging.getLogger(__name__) 

43 

44 

45class LocalEnv(ScriptEnv): 

46 # pylint: disable=too-many-instance-attributes 

47 """Scheduler-side Environment that runs scripts locally.""" 

48 

49 def __init__( # pylint: disable=too-many-arguments 

50 self, 

51 *, 

52 name: str, 

53 config: dict, 

54 global_config: Optional[dict] = None, 

55 tunables: Optional[TunableGroups] = None, 

56 service: Optional[Service] = None, 

57 ): 

58 """ 

59 Create a new environment for local execution. 

60 

61 Parameters 

62 ---------- 

63 name: str 

64 Human-readable name of the environment. 

65 config : dict 

66 Free-format dictionary that contains the benchmark environment 

67 configuration. Each config must have at least the "tunable_params" 

68 and the "const_args" sections. 

69 `LocalEnv` must also have at least some of the following parameters: 

70 {setup, run, teardown, dump_params_file, read_results_file} 

71 global_config : dict 

72 Free-format dictionary of global parameters (e.g., security credentials) 

73 to be mixed in into the "const_args" section of the local config. 

74 tunables : TunableGroups 

75 A collection of tunable parameters for *all* environments. 

76 service: Service 

77 An optional service object (e.g., providing methods to 

78 deploy or reboot a VM, etc.). 

79 """ 

80 super().__init__( 

81 name=name, 

82 config=config, 

83 global_config=global_config, 

84 tunables=tunables, 

85 service=service, 

86 ) 

87 

88 assert self._service is not None and isinstance( 

89 self._service, SupportsLocalExec 

90 ), "LocalEnv requires a service that supports local execution" 

91 self._local_exec_service: SupportsLocalExec = self._service 

92 

93 self._temp_dir: Optional[str] = None 

94 self._temp_dir_context: Union[TemporaryDirectory, nullcontext, None] = None 

95 

96 self._dump_params_file: Optional[str] = self.config.get("dump_params_file") 

97 self._dump_meta_file: Optional[str] = self.config.get("dump_meta_file") 

98 

99 self._read_results_file: Optional[str] = self.config.get("read_results_file") 

100 self._read_telemetry_file: Optional[str] = self.config.get("read_telemetry_file") 

101 

102 def __enter__(self) -> Environment: 

103 assert self._temp_dir is None and self._temp_dir_context is None 

104 self._temp_dir_context = self._local_exec_service.temp_dir_context( 

105 self.config.get("temp_dir"), 

106 ) 

107 self._temp_dir = self._temp_dir_context.__enter__() 

108 return super().__enter__() 

109 

110 def __exit__( 

111 self, 

112 ex_type: Optional[Type[BaseException]], 

113 ex_val: Optional[BaseException], 

114 ex_tb: Optional[TracebackType], 

115 ) -> Literal[False]: 

116 """Exit the context of the benchmarking environment.""" 

117 assert not (self._temp_dir is None or self._temp_dir_context is None) 

118 self._temp_dir_context.__exit__(ex_type, ex_val, ex_tb) 

119 self._temp_dir = None 

120 self._temp_dir_context = None 

121 return super().__exit__(ex_type, ex_val, ex_tb) 

122 

123 def setup(self, tunables: TunableGroups, global_config: Optional[dict] = None) -> bool: 

124 """ 

125 Check if the environment is ready and set up the application and benchmarks, if 

126 necessary. 

127 

128 Parameters 

129 ---------- 

130 tunables : TunableGroups 

131 A collection of tunable OS and application parameters along with their 

132 values. In a local environment these could be used to prepare a config 

133 file on the scheduler prior to transferring it to the remote environment, 

134 for instance. 

135 global_config : dict 

136 Free-format dictionary of global parameters of the environment 

137 that are not used in the optimization process. 

138 

139 Returns 

140 ------- 

141 is_success : bool 

142 True if operation is successful, false otherwise. 

143 """ 

144 if not super().setup(tunables, global_config): 

145 return False 

146 

147 _LOG.info("Set up the environment locally: '%s' at %s", self, self._temp_dir) 

148 assert self._temp_dir is not None 

149 

150 if self._dump_params_file: 

151 fname = path_join(self._temp_dir, self._dump_params_file) 

152 _LOG.debug("Dump tunables to file: %s", fname) 

153 with open(fname, "wt", encoding="utf-8") as fh_tunables: 

154 # json.dump(self._params, fh_tunables) # Tunables *and* const_args 

155 json.dump(self._tunable_params.get_param_values(), fh_tunables) 

156 

157 if self._dump_meta_file: 

158 fname = path_join(self._temp_dir, self._dump_meta_file) 

159 _LOG.debug("Dump tunables metadata to file: %s", fname) 

160 with open(fname, "wt", encoding="utf-8") as fh_meta: 

161 json.dump( 

162 { 

163 tunable.name: tunable.meta 

164 for (tunable, _group) in self._tunable_params 

165 if tunable.meta 

166 }, 

167 fh_meta, 

168 ) 

169 

170 if self._script_setup: 

171 (return_code, _output) = self._local_exec(self._script_setup, self._temp_dir) 

172 self._is_ready = bool(return_code == 0) 

173 else: 

174 self._is_ready = True 

175 

176 return self._is_ready 

177 

178 def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]: 

179 """ 

180 Run a script in the local scheduler environment. 

181 

182 Returns 

183 ------- 

184 (status, timestamp, output) : (Status, datetime.datetime, dict) 

185 3-tuple of (Status, timestamp, output) values, where `output` is a dict 

186 with the results or None if the status is not COMPLETED. 

187 If run script is a benchmark, then the score is usually expected to 

188 be in the `score` field. 

189 """ 

190 (status, timestamp, _) = result = super().run() 

191 if not status.is_ready(): 

192 return result 

193 

194 assert self._temp_dir is not None 

195 

196 stdout_data: Dict[str, TunableValue] = {} 

197 if self._script_run: 

198 (return_code, output) = self._local_exec(self._script_run, self._temp_dir) 

199 if return_code != 0: 

200 return (Status.FAILED, timestamp, None) 

201 stdout_data = self._extract_stdout_results(output.get("stdout", "")) 

202 

203 # FIXME: We should not be assuming that the only output file type is a CSV. 

204 if not self._read_results_file: 

205 _LOG.debug("Not reading the data at: %s", self) 

206 return (Status.SUCCEEDED, timestamp, stdout_data) 

207 

208 data = self._normalize_columns( 

209 pandas.read_csv( 

210 self._config_loader_service.resolve_path( 

211 self._read_results_file, 

212 extra_paths=[self._temp_dir], 

213 ), 

214 index_col=False, 

215 ) 

216 ) 

217 

218 _LOG.debug("Read data:\n%s", data) 

219 if list(data.columns) == ["metric", "value"]: 

220 _LOG.info( 

221 "Local results have (metric,value) header and %d rows: assume long format", 

222 len(data), 

223 ) 

224 data = pandas.DataFrame([data.value.to_list()], columns=data.metric.to_list()) 

225 # Try to convert string metrics to numbers. 

226 data = data.apply( 

227 pandas.to_numeric, 

228 errors="coerce", 

229 ).fillna(data) 

230 elif len(data) == 1: 

231 _LOG.info("Local results have 1 row: assume wide format") 

232 else: 

233 raise ValueError(f"Invalid data format: {data}") 

234 

235 stdout_data.update(data.iloc[-1].to_dict()) 

236 _LOG.info("Local run complete: %s ::\n%s", self, stdout_data) 

237 return (Status.SUCCEEDED, timestamp, stdout_data) 

238 

239 @staticmethod 

240 def _normalize_columns(data: pandas.DataFrame) -> pandas.DataFrame: 

241 """Strip trailing spaces from column names (Windows only).""" 

242 # Windows cmd interpretation of > redirect symbols can leave trailing spaces in 

243 # the final column, which leads to misnamed columns. 

244 # For now, we simply strip trailing spaces from column names to account for that. 

245 if sys.platform == "win32": 

246 data.rename(str.rstrip, axis="columns", inplace=True) 

247 return data 

248 

249 def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]: 

250 

251 (status, timestamp, _) = super().status() 

252 if not (self._is_ready and self._read_telemetry_file): 

253 return (status, timestamp, []) 

254 

255 assert self._temp_dir is not None 

256 try: 

257 fname = self._config_loader_service.resolve_path( 

258 self._read_telemetry_file, 

259 extra_paths=[self._temp_dir], 

260 ) 

261 

262 # TODO: Use the timestamp of the CSV file as our status timestamp? 

263 

264 # FIXME: We should not be assuming that the only output file type is a CSV. 

265 

266 data = self._normalize_columns(pandas.read_csv(fname, index_col=False)) 

267 data.iloc[:, 0] = datetime_parser(data.iloc[:, 0], origin="local") 

268 

269 expected_col_names = ["timestamp", "metric", "value"] 

270 if len(data.columns) != len(expected_col_names): 

271 raise ValueError(f"Telemetry data must have columns {expected_col_names}") 

272 

273 if list(data.columns) != expected_col_names: 

274 # Assume no header - this is ok for telemetry data. 

275 data = pandas.read_csv(fname, index_col=False, names=expected_col_names) 

276 data.iloc[:, 0] = datetime_parser(data.iloc[:, 0], origin="local") 

277 

278 except FileNotFoundError as ex: 

279 _LOG.warning("Telemetry CSV file not found: %s :: %s", self._read_telemetry_file, ex) 

280 return (status, timestamp, []) 

281 

282 _LOG.debug("Read telemetry data:\n%s", data) 

283 col_dtypes: Mapping[int, Type] = {0: datetime} 

284 return ( 

285 status, 

286 timestamp, 

287 [ 

288 (pandas.Timestamp(ts).to_pydatetime(), metric, value) 

289 for (ts, metric, value) in data.to_records(index=False, column_dtypes=col_dtypes) 

290 ], 

291 ) 

292 

293 def teardown(self) -> None: 

294 """Clean up the local environment.""" 

295 if self._script_teardown: 

296 _LOG.info("Local teardown: %s", self) 

297 (return_code, _output) = self._local_exec(self._script_teardown) 

298 _LOG.info("Local teardown complete: %s :: %s", self, return_code) 

299 super().teardown() 

300 

301 def _local_exec(self, script: Iterable[str], cwd: Optional[str] = None) -> Tuple[int, dict]: 

302 """ 

303 Execute a script locally in the scheduler environment. 

304 

305 Parameters 

306 ---------- 

307 script : Iterable[str] 

308 Lines of the script to run locally. 

309 Treat every line as a separate command to run. 

310 cwd : Optional[str] 

311 Work directory to run the script at. 

312 

313 Returns 

314 ------- 

315 (return_code, output) : (int, dict) 

316 Return code of the script and a dict with stdout/stderr. Return code = 0 if successful. 

317 """ 

318 env_params = self._get_env_params() 

319 _LOG.info("Run script locally on: %s at %s with env %s", self, cwd, env_params) 

320 (return_code, stdout, stderr) = self._local_exec_service.local_exec( 

321 script, 

322 env=env_params, 

323 cwd=cwd, 

324 ) 

325 if return_code != 0: 

326 _LOG.warning("ERROR: Local script returns code %d stderr:\n%s", return_code, stderr) 

327 return (return_code, {"stdout": stdout, "stderr": stderr})