Coverage for mlos_bench/mlos_bench/environments/remote/remote_env.py: 88%

68 statements  

« prev     ^ index     » next       coverage.py v7.6.7, created at 2024-11-22 01:18 +0000

1# 

2# Copyright (c) Microsoft Corporation. 

3# Licensed under the MIT License. 

4# 

5""" 

6Remotely executed benchmark/script environment. 

7 

8e.g. Application Environment 

9 

10TODO: Documentat how variable propogation works in the remote environments. 

11""" 

12 

13import logging 

14import re 

15from datetime import datetime 

16from typing import Dict, Iterable, Optional, Tuple 

17 

18from pytz import UTC 

19 

20from mlos_bench.environments.script_env import ScriptEnv 

21from mlos_bench.environments.status import Status 

22from mlos_bench.services.base_service import Service 

23from mlos_bench.services.types.host_ops_type import SupportsHostOps 

24from mlos_bench.services.types.remote_exec_type import SupportsRemoteExec 

25from mlos_bench.tunables.tunable import TunableValue 

26from mlos_bench.tunables.tunable_groups import TunableGroups 

27 

28_LOG = logging.getLogger(__name__) 

29 

30 

31class RemoteEnv(ScriptEnv): 

32 """ 

33 Environment to run benchmarks and scripts on a remote host OS. 

34 

35 e.g. Application Environment 

36 """ 

37 

38 _RE_SPECIAL = re.compile(r"\W+") 

39 

40 def __init__( # pylint: disable=too-many-arguments 

41 self, 

42 *, 

43 name: str, 

44 config: dict, 

45 global_config: Optional[dict] = None, 

46 tunables: Optional[TunableGroups] = None, 

47 service: Optional[Service] = None, 

48 ): 

49 """ 

50 Create a new environment for remote execution. 

51 

52 Parameters 

53 ---------- 

54 name: str 

55 Human-readable name of the environment. 

56 config : dict 

57 Free-format dictionary that contains the benchmark environment 

58 configuration. Each config must have at least the "tunable_params" 

59 and the "const_args" sections. 

60 `RemoteEnv` must also have at least some of the following parameters: 

61 {setup, run, teardown, wait_boot} 

62 global_config : dict 

63 Free-format dictionary of global parameters (e.g., security credentials) 

64 to be mixed in into the "const_args" section of the local config. 

65 tunables : TunableGroups 

66 A collection of tunable parameters for *all* environments. 

67 service: Service 

68 An optional service object (e.g., providing methods to 

69 deploy or reboot a Host, VM, OS, etc.). 

70 """ 

71 super().__init__( 

72 name=name, 

73 config=config, 

74 global_config=global_config, 

75 tunables=tunables, 

76 service=service, 

77 ) 

78 

79 self._wait_boot = self.config.get("wait_boot", False) 

80 self._command_prefix = "mlos-" + self._RE_SPECIAL.sub("-", self.name).lower() + "-" 

81 

82 assert self._service is not None and isinstance( 

83 self._service, SupportsRemoteExec 

84 ), "RemoteEnv requires a service that supports remote execution operations" 

85 self._remote_exec_service: SupportsRemoteExec = self._service 

86 

87 if self._wait_boot: 

88 assert self._service is not None and isinstance( 

89 self._service, SupportsHostOps 

90 ), "RemoteEnv requires a service that supports host operations" 

91 self._host_service: SupportsHostOps = self._service 

92 

93 def setup(self, tunables: TunableGroups, global_config: Optional[dict] = None) -> bool: 

94 """ 

95 Check if the environment is ready and set up the application and benchmarks on a 

96 remote host. 

97 

98 Parameters 

99 ---------- 

100 tunables : TunableGroups 

101 A collection of tunable OS and application parameters along with their 

102 values. Setting these parameters should not require an OS reboot. 

103 global_config : dict 

104 Free-format dictionary of global parameters of the environment 

105 that are not used in the optimization process. 

106 

107 Returns 

108 ------- 

109 is_success : bool 

110 True if operation is successful, false otherwise. 

111 """ 

112 if not super().setup(tunables, global_config): 

113 return False 

114 

115 if self._wait_boot: 

116 _LOG.info("Wait for the remote environment to start: %s", self) 

117 (status, params) = self._host_service.start_host(self._params) 

118 if status.is_pending(): 

119 (status, _) = self._host_service.wait_host_operation(params) 

120 if not status.is_succeeded(): 

121 return False 

122 

123 if self._script_setup: 

124 _LOG.info("Set up the remote environment: %s", self) 

125 (status, _timestamp, _output) = self._remote_exec("setup", self._script_setup) 

126 _LOG.info("Remote set up complete: %s :: %s", self, status) 

127 self._is_ready = status.is_succeeded() 

128 else: 

129 self._is_ready = True 

130 

131 return self._is_ready 

132 

133 def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]: 

134 """ 

135 Runs the run script on the remote environment. 

136 

137 This can be used to, for instance, submit a new experiment to the 

138 remote application environment by (re)configuring an application and 

139 launching the benchmark, or run a script that collects the results. 

140 

141 Returns 

142 ------- 

143 (status, timestamp, output) : (Status, datetime.datetime, dict) 

144 3-tuple of (Status, timestamp, output) values, where `output` is a dict 

145 with the results or None if the status is not COMPLETED. 

146 If run script is a benchmark, then the score is usually expected to 

147 be in the `score` field. 

148 """ 

149 _LOG.info("Run script remotely on: %s", self) 

150 (status, timestamp, _) = result = super().run() 

151 if not (status.is_ready() and self._script_run): 

152 return result 

153 

154 (status, timestamp, output) = self._remote_exec("run", self._script_run) 

155 if status.is_succeeded() and output is not None: 

156 output = self._extract_stdout_results(output.get("stdout", "")) 

157 _LOG.info("Remote run complete: %s :: %s = %s", self, status, output) 

158 return (status, timestamp, output) 

159 

160 def teardown(self) -> None: 

161 """Clean up and shut down the remote environment.""" 

162 if self._script_teardown: 

163 _LOG.info("Remote teardown: %s", self) 

164 (status, _timestamp, _output) = self._remote_exec("teardown", self._script_teardown) 

165 _LOG.info("Remote teardown complete: %s :: %s", self, status) 

166 super().teardown() 

167 

168 def _remote_exec( 

169 self, 

170 command_name: str, 

171 script: Iterable[str], 

172 ) -> Tuple[Status, datetime, Optional[dict]]: 

173 """ 

174 Run a script on the remote host. 

175 

176 Parameters 

177 ---------- 

178 command_name : str 

179 Name of the command to be executed on the remote host. 

180 script : [str] 

181 List of commands to be executed on the remote host. 

182 

183 Returns 

184 ------- 

185 result : (Status, datetime.datetime, dict) 

186 3-tuple of Status, timestamp, and dict with the benchmark/script results. 

187 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT} 

188 """ 

189 env_params = self._get_env_params() 

190 command_name = self._command_prefix + command_name 

191 _LOG.debug("Submit command: %s with %s", command_name, env_params) 

192 (status, output) = self._remote_exec_service.remote_exec( 

193 script, 

194 config={ 

195 **self._params, 

196 "commandName": command_name, 

197 }, 

198 env_params=env_params, 

199 ) 

200 _LOG.debug("Script submitted: %s %s :: %s", self, status, output) 

201 if status in {Status.PENDING, Status.SUCCEEDED}: 

202 (status, output) = self._remote_exec_service.get_remote_exec_results(output) 

203 _LOG.debug("Status: %s :: %s", status, output) 

204 # FIXME: get the timestamp from the remote environment! 

205 timestamp = datetime.now(UTC) 

206 return (status, timestamp, output)