Coverage for mlos_bench/mlos_bench/services/remote/azure/azure_vm_services.py: 98%

102 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-06 00:35 +0000

1# 

2# Copyright (c) Microsoft Corporation. 

3# Licensed under the MIT License. 

4# 

5""" 

6A collection Service functions for managing VMs on Azure. 

7""" 

8 

9import json 

10import logging 

11 

12from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union 

13 

14import requests 

15 

16from mlos_bench.environments.status import Status 

17from mlos_bench.services.base_service import Service 

18from mlos_bench.services.remote.azure.azure_deployment_services import AzureDeploymentService 

19from mlos_bench.services.types.remote_exec_type import SupportsRemoteExec 

20from mlos_bench.services.types.host_provisioner_type import SupportsHostProvisioning 

21from mlos_bench.services.types.host_ops_type import SupportsHostOps 

22from mlos_bench.services.types.os_ops_type import SupportsOSOps 

23from mlos_bench.util import merge_parameters 

24 

25_LOG = logging.getLogger(__name__) 

26 

27 

28class AzureVMService(AzureDeploymentService, SupportsHostProvisioning, SupportsHostOps, SupportsOSOps, SupportsRemoteExec): 

29 """ 

30 Helper methods to manage VMs on Azure. 

31 """ 

32 

33 # pylint: disable=too-many-ancestors 

34 

35 # Azure Compute REST API calls as described in 

36 # https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines 

37 

38 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/start 

39 _URL_START = ( 

40 "https://management.azure.com" + 

41 "/subscriptions/{subscription}" + 

42 "/resourceGroups/{resource_group}" + 

43 "/providers/Microsoft.Compute" + 

44 "/virtualMachines/{vm_name}" + 

45 "/start" + 

46 "?api-version=2022-03-01" 

47 ) 

48 

49 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/power-off 

50 _URL_STOP = ( 

51 "https://management.azure.com" + 

52 "/subscriptions/{subscription}" + 

53 "/resourceGroups/{resource_group}" + 

54 "/providers/Microsoft.Compute" + 

55 "/virtualMachines/{vm_name}" + 

56 "/powerOff" + 

57 "?api-version=2022-03-01" 

58 ) 

59 

60 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/deallocate 

61 _URL_DEALLOCATE = ( 

62 "https://management.azure.com" + 

63 "/subscriptions/{subscription}" + 

64 "/resourceGroups/{resource_group}" + 

65 "/providers/Microsoft.Compute" + 

66 "/virtualMachines/{vm_name}" + 

67 "/deallocate" + 

68 "?api-version=2022-03-01" 

69 ) 

70 

71 # TODO: This is probably the more correct URL to use for the deprovision operation. 

72 # However, previous code used the deallocate URL above, so for now, we keep 

73 # that and handle that change later. 

74 # See Also: #498 

75 _URL_DEPROVISION = _URL_DEALLOCATE 

76 

77 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/delete 

78 # _URL_DEPROVISION = ( 

79 # "https://management.azure.com" + 

80 # "/subscriptions/{subscription}" + 

81 # "/resourceGroups/{resource_group}" + 

82 # "/providers/Microsoft.Compute" + 

83 # "/virtualMachines/{vm_name}" + 

84 # "/delete" + 

85 # "?api-version=2022-03-01" 

86 # ) 

87 

88 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/restart 

89 _URL_REBOOT = ( 

90 "https://management.azure.com" + 

91 "/subscriptions/{subscription}" + 

92 "/resourceGroups/{resource_group}" + 

93 "/providers/Microsoft.Compute" + 

94 "/virtualMachines/{vm_name}" + 

95 "/restart" + 

96 "?api-version=2022-03-01" 

97 ) 

98 

99 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/run-command 

100 _URL_REXEC_RUN = ( 

101 "https://management.azure.com" + 

102 "/subscriptions/{subscription}" + 

103 "/resourceGroups/{resource_group}" + 

104 "/providers/Microsoft.Compute" + 

105 "/virtualMachines/{vm_name}" + 

106 "/runCommand" + 

107 "?api-version=2022-03-01" 

108 ) 

109 

110 def __init__(self, 

111 config: Optional[Dict[str, Any]] = None, 

112 global_config: Optional[Dict[str, Any]] = None, 

113 parent: Optional[Service] = None, 

114 methods: Union[Dict[str, Callable], List[Callable], None] = None): 

115 """ 

116 Create a new instance of Azure VM services proxy. 

117 

118 Parameters 

119 ---------- 

120 config : dict 

121 Free-format dictionary that contains the benchmark environment 

122 configuration. 

123 global_config : dict 

124 Free-format dictionary of global parameters. 

125 parent : Service 

126 Parent service that can provide mixin functions. 

127 methods : Union[Dict[str, Callable], List[Callable], None] 

128 New methods to register with the service. 

129 """ 

130 super().__init__( 

131 config, global_config, parent, 

132 self.merge_methods(methods, [ 

133 # SupportsHostProvisioning 

134 self.provision_host, 

135 self.deprovision_host, 

136 self.deallocate_host, 

137 self.wait_host_deployment, 

138 # SupportsHostOps 

139 self.start_host, 

140 self.stop_host, 

141 self.restart_host, 

142 self.wait_host_operation, 

143 # SupportsOSOps 

144 self.shutdown, 

145 self.reboot, 

146 self.wait_os_operation, 

147 # SupportsRemoteExec 

148 self.remote_exec, 

149 self.get_remote_exec_results, 

150 ]) 

151 ) 

152 

153 # As a convenience, allow reading customData out of a file, rather than 

154 # embedding it in a json config file. 

155 # Note: ARM templates expect this data to be base64 encoded, but that 

156 # can be done using the `base64()` string function inside the ARM template. 

157 self._custom_data_file = self.config.get("customDataFile", None) 

158 if self._custom_data_file: 

159 if self._deploy_params.get('customData', None): 

160 raise ValueError("Both customDataFile and customData are specified.") 

161 self._custom_data_file = self.config_loader_service.resolve_path(self._custom_data_file) 

162 with open(self._custom_data_file, 'r', encoding='utf-8') as custom_data_fh: 

163 self._deploy_params["customData"] = custom_data_fh.read() 

164 

165 def _set_default_params(self, params: dict) -> dict: # pylint: disable=no-self-use 

166 # Try and provide a semi sane default for the deploymentName if not provided 

167 # since this is a common way to set the deploymentName and can same some 

168 # config work for the caller. 

169 if "vmName" in params and "deploymentName" not in params: 

170 params["deploymentName"] = f"{params['vmName']}-deployment" 

171 _LOG.info("deploymentName missing from params. Defaulting to '%s'.", params["deploymentName"]) 

172 return params 

173 

174 def wait_host_deployment(self, params: dict, *, is_setup: bool) -> Tuple[Status, dict]: 

175 """ 

176 Waits for a pending operation on an Azure VM to resolve to SUCCEEDED or FAILED. 

177 Return TIMED_OUT when timing out. 

178 

179 Parameters 

180 ---------- 

181 params : dict 

182 Flat dictionary of (key, value) pairs of tunable parameters. 

183 is_setup : bool 

184 If True, wait for VM being deployed; otherwise, wait for successful deprovisioning. 

185 

186 Returns 

187 ------- 

188 result : (Status, dict) 

189 A pair of Status and result. 

190 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT} 

191 Result is info on the operation runtime if SUCCEEDED, otherwise {}. 

192 """ 

193 return self._wait_deployment(params, is_setup=is_setup) 

194 

195 def wait_host_operation(self, params: dict) -> Tuple[Status, dict]: 

196 """ 

197 Waits for a pending operation on an Azure VM to resolve to SUCCEEDED or FAILED. 

198 Return TIMED_OUT when timing out. 

199 

200 Parameters 

201 ---------- 

202 params: dict 

203 Flat dictionary of (key, value) pairs of tunable parameters. 

204 Must have the "asyncResultsUrl" key to get the results. 

205 If the key is not present, return Status.PENDING. 

206 

207 Returns 

208 ------- 

209 result : (Status, dict) 

210 A pair of Status and result. 

211 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT} 

212 Result is info on the operation runtime if SUCCEEDED, otherwise {}. 

213 """ 

214 _LOG.info("Wait for operation on VM %s", params["vmName"]) 

215 # Try and provide a semi sane default for the deploymentName 

216 params.setdefault(f"{params['vmName']}-deployment") 

217 return self._wait_while(self._check_operation_status, Status.RUNNING, params) 

218 

219 def wait_os_operation(self, params: dict) -> Tuple["Status", dict]: 

220 return self.wait_host_operation(params) 

221 

222 def provision_host(self, params: dict) -> Tuple[Status, dict]: 

223 """ 

224 Check if Azure VM is ready. Deploy a new VM, if necessary. 

225 

226 Parameters 

227 ---------- 

228 params : dict 

229 Flat dictionary of (key, value) pairs of tunable parameters. 

230 HostEnv tunables are variable parameters that, together with the 

231 HostEnv configuration, are sufficient to provision a VM. 

232 

233 Returns 

234 ------- 

235 result : (Status, dict={}) 

236 A pair of Status and result. The result is the input `params` plus the 

237 parameters extracted from the response JSON, or {} if the status is FAILED. 

238 Status is one of {PENDING, SUCCEEDED, FAILED} 

239 """ 

240 return self._provision_resource(params) 

241 

242 def deprovision_host(self, params: dict) -> Tuple[Status, dict]: 

243 """ 

244 Deprovisions the VM on Azure by deleting it. 

245 

246 Parameters 

247 ---------- 

248 params : dict 

249 Flat dictionary of (key, value) pairs of tunable parameters. 

250 

251 Returns 

252 ------- 

253 result : (Status, dict={}) 

254 A pair of Status and result. The result is always {}. 

255 Status is one of {PENDING, SUCCEEDED, FAILED} 

256 """ 

257 params = self._set_default_params(params) 

258 config = merge_parameters( 

259 dest=self.config.copy(), 

260 source=params, 

261 required_keys=[ 

262 "subscription", 

263 "resourceGroup", 

264 "deploymentName", 

265 "vmName", 

266 ] 

267 ) 

268 _LOG.info("Deprovision VM: %s", config["vmName"]) 

269 _LOG.info("Deprovision deployment: %s", config["deploymentName"]) 

270 # TODO: Properly deprovision *all* resources specified in the ARM template. 

271 return self._azure_rest_api_post_helper(config, self._URL_DEPROVISION.format( 

272 subscription=config["subscription"], 

273 resource_group=config["resourceGroup"], 

274 vm_name=config["vmName"], 

275 )) 

276 

277 def deallocate_host(self, params: dict) -> Tuple[Status, dict]: 

278 """ 

279 Deallocates the VM on Azure by shutting it down then releasing the compute resources. 

280 

281 Note: This can cause the VM to arrive on a new host node when its 

282 restarted, which may have different performance characteristics. 

283 

284 Parameters 

285 ---------- 

286 params : dict 

287 Flat dictionary of (key, value) pairs of tunable parameters. 

288 

289 Returns 

290 ------- 

291 result : (Status, dict={}) 

292 A pair of Status and result. The result is always {}. 

293 Status is one of {PENDING, SUCCEEDED, FAILED} 

294 """ 

295 params = self._set_default_params(params) 

296 config = merge_parameters( 

297 dest=self.config.copy(), 

298 source=params, 

299 required_keys=[ 

300 "subscription", 

301 "resourceGroup", 

302 "vmName", 

303 ] 

304 ) 

305 _LOG.info("Deallocate VM: %s", config["vmName"]) 

306 return self._azure_rest_api_post_helper(config, self._URL_DEALLOCATE.format( 

307 subscription=config["subscription"], 

308 resource_group=config["resourceGroup"], 

309 vm_name=config["vmName"], 

310 )) 

311 

312 def start_host(self, params: dict) -> Tuple[Status, dict]: 

313 """ 

314 Start the VM on Azure. 

315 

316 Parameters 

317 ---------- 

318 params : dict 

319 Flat dictionary of (key, value) pairs of tunable parameters. 

320 

321 Returns 

322 ------- 

323 result : (Status, dict={}) 

324 A pair of Status and result. The result is always {}. 

325 Status is one of {PENDING, SUCCEEDED, FAILED} 

326 """ 

327 params = self._set_default_params(params) 

328 config = merge_parameters( 

329 dest=self.config.copy(), 

330 source=params, 

331 required_keys=[ 

332 "subscription", 

333 "resourceGroup", 

334 "vmName", 

335 ] 

336 ) 

337 _LOG.info("Start VM: %s :: %s", config["vmName"], params) 

338 return self._azure_rest_api_post_helper(config, self._URL_START.format( 

339 subscription=config["subscription"], 

340 resource_group=config["resourceGroup"], 

341 vm_name=config["vmName"], 

342 )) 

343 

344 def stop_host(self, params: dict, force: bool = False) -> Tuple[Status, dict]: 

345 """ 

346 Stops the VM on Azure by initiating a graceful shutdown. 

347 

348 Parameters 

349 ---------- 

350 params : dict 

351 Flat dictionary of (key, value) pairs of tunable parameters. 

352 force : bool 

353 If True, force stop the Host/VM. 

354 

355 Returns 

356 ------- 

357 result : (Status, dict={}) 

358 A pair of Status and result. The result is always {}. 

359 Status is one of {PENDING, SUCCEEDED, FAILED} 

360 """ 

361 params = self._set_default_params(params) 

362 config = merge_parameters( 

363 dest=self.config.copy(), 

364 source=params, 

365 required_keys=[ 

366 "subscription", 

367 "resourceGroup", 

368 "vmName", 

369 ] 

370 ) 

371 _LOG.info("Stop VM: %s", config["vmName"]) 

372 return self._azure_rest_api_post_helper(config, self._URL_STOP.format( 

373 subscription=config["subscription"], 

374 resource_group=config["resourceGroup"], 

375 vm_name=config["vmName"], 

376 )) 

377 

378 def shutdown(self, params: dict, force: bool = False) -> Tuple["Status", dict]: 

379 return self.stop_host(params, force) 

380 

381 def restart_host(self, params: dict, force: bool = False) -> Tuple[Status, dict]: 

382 """ 

383 Reboot the VM on Azure by initiating a graceful shutdown. 

384 

385 Parameters 

386 ---------- 

387 params : dict 

388 Flat dictionary of (key, value) pairs of tunable parameters. 

389 force : bool 

390 If True, force restart the Host/VM. 

391 

392 Returns 

393 ------- 

394 result : (Status, dict={}) 

395 A pair of Status and result. The result is always {}. 

396 Status is one of {PENDING, SUCCEEDED, FAILED} 

397 """ 

398 params = self._set_default_params(params) 

399 config = merge_parameters( 

400 dest=self.config.copy(), 

401 source=params, 

402 required_keys=[ 

403 "subscription", 

404 "resourceGroup", 

405 "vmName", 

406 ] 

407 ) 

408 _LOG.info("Reboot VM: %s", config["vmName"]) 

409 return self._azure_rest_api_post_helper(config, self._URL_REBOOT.format( 

410 subscription=config["subscription"], 

411 resource_group=config["resourceGroup"], 

412 vm_name=config["vmName"], 

413 )) 

414 

415 def reboot(self, params: dict, force: bool = False) -> Tuple["Status", dict]: 

416 return self.restart_host(params, force) 

417 

418 def remote_exec(self, script: Iterable[str], config: dict, 

419 env_params: dict) -> Tuple[Status, dict]: 

420 """ 

421 Run a command on Azure VM. 

422 

423 Parameters 

424 ---------- 

425 script : Iterable[str] 

426 A list of lines to execute as a script on a remote VM. 

427 config : dict 

428 Flat dictionary of (key, value) pairs of the Environment parameters. 

429 They usually come from `const_args` and `tunable_params` 

430 properties of the Environment. 

431 env_params : dict 

432 Parameters to pass as *shell* environment variables into the script. 

433 This is usually a subset of `config` with some possible conversions. 

434 

435 Returns 

436 ------- 

437 result : (Status, dict) 

438 A pair of Status and result. 

439 Status is one of {PENDING, SUCCEEDED, FAILED} 

440 """ 

441 config = self._set_default_params(config) 

442 config = merge_parameters( 

443 dest=self.config.copy(), 

444 source=config, 

445 required_keys=[ 

446 "subscription", 

447 "resourceGroup", 

448 "vmName", 

449 ] 

450 ) 

451 

452 if _LOG.isEnabledFor(logging.INFO): 

453 _LOG.info("Run a script on VM: %s\n %s", config["vmName"], "\n ".join(script)) 

454 

455 json_req = { 

456 "commandId": "RunShellScript", 

457 "script": list(script), 

458 "parameters": [{"name": key, "value": val} for (key, val) in env_params.items()] 

459 } 

460 

461 url = self._URL_REXEC_RUN.format( 

462 subscription=config["subscription"], 

463 resource_group=config["resourceGroup"], 

464 vm_name=config["vmName"], 

465 ) 

466 

467 if _LOG.isEnabledFor(logging.DEBUG): 

468 _LOG.debug("Request: POST %s\n%s", url, json.dumps(json_req, indent=2)) 

469 

470 response = requests.post( 

471 url, json=json_req, headers=self._get_headers(), timeout=self._request_timeout) 

472 

473 if _LOG.isEnabledFor(logging.DEBUG): 

474 _LOG.debug("Response: %s\n%s", response, 

475 json.dumps(response.json(), indent=2) 

476 if response.content else "") 

477 else: 

478 _LOG.info("Response: %s", response) 

479 

480 if response.status_code == 200: 

481 # TODO: extract the results from JSON response 

482 return (Status.SUCCEEDED, config) 

483 elif response.status_code == 202: 

484 return (Status.PENDING, { 

485 **config, 

486 "asyncResultsUrl": response.headers.get("Azure-AsyncOperation") 

487 }) 

488 else: 

489 _LOG.error("Response: %s :: %s", response, response.text) 

490 # _LOG.error("Bad Request:\n%s", response.request.body) 

491 return (Status.FAILED, {}) 

492 

493 def get_remote_exec_results(self, config: dict) -> Tuple[Status, dict]: 

494 """ 

495 Get the results of the asynchronously running command. 

496 

497 Parameters 

498 ---------- 

499 config : dict 

500 Flat dictionary of (key, value) pairs of tunable parameters. 

501 Must have the "asyncResultsUrl" key to get the results. 

502 If the key is not present, return Status.PENDING. 

503 

504 Returns 

505 ------- 

506 result : (Status, dict) 

507 A pair of Status and result. 

508 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT} 

509 A dict can have an "stdout" key with the remote output. 

510 """ 

511 _LOG.info("Check the results on VM: %s", config.get("vmName")) 

512 (status, result) = self.wait_host_operation(config) 

513 _LOG.debug("Result: %s :: %s", status, result) 

514 if not status.is_succeeded(): 

515 # TODO: Extract the telemetry and status from stdout, if available 

516 return (status, result) 

517 val = result.get("properties", {}).get("output", {}).get("value", []) 

518 return (status, {"stdout": val[0].get("message", "")} if val else {})