Coverage for mlos_bench/mlos_bench/services/remote/azure/azure_vm_services.py: 78%

136 statements  

« prev     ^ index     » next       coverage.py v7.6.7, created at 2024-11-22 01:18 +0000

1# 

2# Copyright (c) Microsoft Corporation. 

3# Licensed under the MIT License. 

4# 

5"""A collection Service functions for managing VMs on Azure.""" 

6 

7import json 

8import logging 

9from datetime import datetime 

10from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union 

11 

12import requests 

13 

14from mlos_bench.environments.status import Status 

15from mlos_bench.services.base_service import Service 

16from mlos_bench.services.remote.azure.azure_deployment_services import ( 

17 AzureDeploymentService, 

18) 

19from mlos_bench.services.types.host_ops_type import SupportsHostOps 

20from mlos_bench.services.types.host_provisioner_type import SupportsHostProvisioning 

21from mlos_bench.services.types.os_ops_type import SupportsOSOps 

22from mlos_bench.services.types.remote_exec_type import SupportsRemoteExec 

23from mlos_bench.util import merge_parameters 

24 

25_LOG = logging.getLogger(__name__) 

26 

27 

28class AzureVMService( 

29 AzureDeploymentService, 

30 SupportsHostProvisioning, 

31 SupportsHostOps, 

32 SupportsOSOps, 

33 SupportsRemoteExec, 

34): 

35 """Helper methods to manage VMs on Azure.""" 

36 

37 # pylint: disable=too-many-ancestors 

38 

39 # Azure Compute REST API calls as described in 

40 # https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines 

41 

42 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/start 

43 _URL_START = ( 

44 "https://management.azure.com" 

45 "/subscriptions/{subscription}" 

46 "/resourceGroups/{resource_group}" 

47 "/providers/Microsoft.Compute" 

48 "/virtualMachines/{vm_name}" 

49 "/start" 

50 "?api-version=2022-03-01" 

51 ) 

52 

53 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/power-off 

54 _URL_STOP = ( 

55 "https://management.azure.com" 

56 "/subscriptions/{subscription}" 

57 "/resourceGroups/{resource_group}" 

58 "/providers/Microsoft.Compute" 

59 "/virtualMachines/{vm_name}" 

60 "/powerOff" 

61 "?api-version=2022-03-01" 

62 ) 

63 

64 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/deallocate 

65 _URL_DEALLOCATE = ( 

66 "https://management.azure.com" 

67 "/subscriptions/{subscription}" 

68 "/resourceGroups/{resource_group}" 

69 "/providers/Microsoft.Compute" 

70 "/virtualMachines/{vm_name}" 

71 "/deallocate" 

72 "?api-version=2022-03-01" 

73 ) 

74 

75 # TODO: This is probably the more correct URL to use for the deprovision operation. 

76 # However, previous code used the deallocate URL above, so for now, we keep 

77 # that and handle that change later. 

78 # See Also: #498 

79 _URL_DEPROVISION = _URL_DEALLOCATE 

80 

81 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/delete 

82 # _URL_DEPROVISION = ( 

83 # "https://management.azure.com" 

84 # "/subscriptions/{subscription}" 

85 # "/resourceGroups/{resource_group}" 

86 # "/providers/Microsoft.Compute" 

87 # "/virtualMachines/{vm_name}" 

88 # "/delete" 

89 # "?api-version=2022-03-01" 

90 # ) 

91 

92 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/restart 

93 _URL_REBOOT = ( 

94 "https://management.azure.com" 

95 "/subscriptions/{subscription}" 

96 "/resourceGroups/{resource_group}" 

97 "/providers/Microsoft.Compute" 

98 "/virtualMachines/{vm_name}" 

99 "/restart" 

100 "?api-version=2022-03-01" 

101 ) 

102 

103 # From: 

104 # https://learn.microsoft.com/en-us/rest/api/compute/virtual-machine-run-commands/create-or-update 

105 _URL_REXEC_RUN = ( 

106 "https://management.azure.com" 

107 "/subscriptions/{subscription}" 

108 "/resourceGroups/{resource_group}" 

109 "/providers/Microsoft.Compute" 

110 "/virtualMachines/{vm_name}" 

111 "/runcommands/{command_name}" 

112 "?api-version=2024-07-01" 

113 ) 

114 _URL_REXEC_RESULT = ( 

115 "https://management.azure.com" 

116 "/subscriptions/{subscription}" 

117 "/resourceGroups/{resource_group}" 

118 "/providers/Microsoft.Compute" 

119 "/virtualMachines/{vm_name}" 

120 "/runcommands/{command_name}" 

121 "?$expand=instanceView&api-version=2024-07-01" 

122 ) 

123 

124 def __init__( 

125 self, 

126 config: Optional[Dict[str, Any]] = None, 

127 global_config: Optional[Dict[str, Any]] = None, 

128 parent: Optional[Service] = None, 

129 methods: Union[Dict[str, Callable], List[Callable], None] = None, 

130 ): 

131 """ 

132 Create a new instance of Azure VM services proxy. 

133 

134 Parameters 

135 ---------- 

136 config : dict 

137 Free-format dictionary that contains the benchmark environment 

138 configuration. 

139 global_config : dict 

140 Free-format dictionary of global parameters. 

141 parent : Service 

142 Parent service that can provide mixin functions. 

143 methods : Union[Dict[str, Callable], List[Callable], None] 

144 New methods to register with the service. 

145 """ 

146 super().__init__( 

147 config, 

148 global_config, 

149 parent, 

150 self.merge_methods( 

151 methods, 

152 [ 

153 # SupportsHostProvisioning 

154 self.provision_host, 

155 self.deprovision_host, 

156 self.deallocate_host, 

157 self.wait_host_deployment, 

158 # SupportsHostOps 

159 self.start_host, 

160 self.stop_host, 

161 self.restart_host, 

162 self.wait_host_operation, 

163 # SupportsOSOps 

164 self.shutdown, 

165 self.reboot, 

166 self.wait_os_operation, 

167 # SupportsRemoteExec 

168 self.remote_exec, 

169 self.get_remote_exec_results, 

170 ], 

171 ), 

172 ) 

173 

174 # As a convenience, allow reading customData out of a file, rather than 

175 # embedding it in a json config file. 

176 # Note: ARM templates expect this data to be base64 encoded, but that 

177 # can be done using the `base64()` string function inside the ARM template. 

178 self._custom_data_file = self.config.get("customDataFile", None) 

179 if self._custom_data_file: 

180 if self._deploy_params.get("customData", None): 

181 raise ValueError("Both customDataFile and customData are specified.") 

182 self._custom_data_file = self.config_loader_service.resolve_path( 

183 self._custom_data_file 

184 ) 

185 with open(self._custom_data_file, "r", encoding="utf-8") as custom_data_fh: 

186 self._deploy_params["customData"] = custom_data_fh.read() 

187 

188 def _set_default_params(self, params: dict) -> dict: # pylint: disable=no-self-use 

189 # Try and provide a semi sane default for the deploymentName if not provided 

190 # since this is a common way to set the deploymentName and can same some 

191 # config work for the caller. 

192 if "vmName" in params and "deploymentName" not in params: 

193 params["deploymentName"] = f"{params['vmName']}-deployment" 

194 _LOG.info( 

195 "deploymentName missing from params. Defaulting to '%s'.", 

196 params["deploymentName"], 

197 ) 

198 return params 

199 

200 def wait_host_deployment(self, params: dict, *, is_setup: bool) -> Tuple[Status, dict]: 

201 """ 

202 Waits for a pending operation on an Azure VM to resolve to SUCCEEDED or FAILED. 

203 Return TIMED_OUT when timing out. 

204 

205 Parameters 

206 ---------- 

207 params : dict 

208 Flat dictionary of (key, value) pairs of tunable parameters. 

209 is_setup : bool 

210 If True, wait for VM being deployed; otherwise, wait for successful deprovisioning. 

211 

212 Returns 

213 ------- 

214 result : (Status, dict) 

215 A pair of Status and result. 

216 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT} 

217 Result is info on the operation runtime if SUCCEEDED, otherwise {}. 

218 """ 

219 return self._wait_deployment(params, is_setup=is_setup) 

220 

221 def wait_host_operation(self, params: dict) -> Tuple[Status, dict]: 

222 """ 

223 Waits for a pending operation on an Azure VM to resolve to SUCCEEDED or FAILED. 

224 Return TIMED_OUT when timing out. 

225 

226 Parameters 

227 ---------- 

228 params: dict 

229 Flat dictionary of (key, value) pairs of tunable parameters. 

230 Must have the "asyncResultsUrl" key to get the results. 

231 If the key is not present, return Status.PENDING. 

232 

233 Returns 

234 ------- 

235 result : (Status, dict) 

236 A pair of Status and result. 

237 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT} 

238 Result is info on the operation runtime if SUCCEEDED, otherwise {}. 

239 """ 

240 _LOG.info("Wait for operation on VM %s", params["vmName"]) 

241 # Try and provide a semi sane default for the deploymentName 

242 params.setdefault(f"{params['vmName']}-deployment") 

243 return self._wait_while(self._check_operation_status, Status.RUNNING, params) 

244 

245 def wait_remote_exec_operation(self, params: dict) -> Tuple["Status", dict]: 

246 """ 

247 Waits for a pending remote execution on an Azure VM to resolve to SUCCEEDED or 

248 FAILED. Return TIMED_OUT when timing out. 

249 

250 Parameters 

251 ---------- 

252 params: dict 

253 Flat dictionary of (key, value) pairs of tunable parameters. 

254 Must have the "asyncResultsUrl" key to get the results. 

255 If the key is not present, return Status.PENDING. 

256 

257 Returns 

258 ------- 

259 result : (Status, dict) 

260 A pair of Status and result. 

261 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT} 

262 Result is info on the operation runtime if SUCCEEDED, otherwise {}. 

263 """ 

264 _LOG.info("Wait for run command %s on VM %s", params["commandName"], params["vmName"]) 

265 return self._wait_while(self._check_remote_exec_status, Status.RUNNING, params) 

266 

267 def wait_os_operation(self, params: dict) -> Tuple["Status", dict]: 

268 return self.wait_host_operation(params) 

269 

270 def provision_host(self, params: dict) -> Tuple[Status, dict]: 

271 """ 

272 Check if Azure VM is ready. Deploy a new VM, if necessary. 

273 

274 Parameters 

275 ---------- 

276 params : dict 

277 Flat dictionary of (key, value) pairs of tunable parameters. 

278 HostEnv tunables are variable parameters that, together with the 

279 HostEnv configuration, are sufficient to provision a VM. 

280 

281 Returns 

282 ------- 

283 result : (Status, dict) 

284 A pair of Status and result. The result is the input `params` plus the 

285 parameters extracted from the response JSON, or {} if the status is FAILED. 

286 Status is one of {PENDING, SUCCEEDED, FAILED} 

287 """ 

288 return self._provision_resource(params) 

289 

290 def deprovision_host(self, params: dict) -> Tuple[Status, dict]: 

291 """ 

292 Deprovisions the VM on Azure by deleting it. 

293 

294 Parameters 

295 ---------- 

296 params : dict 

297 Flat dictionary of (key, value) pairs of tunable parameters. 

298 

299 Returns 

300 ------- 

301 result : (Status, dict) 

302 A pair of Status and result. The result is always {}. 

303 Status is one of {PENDING, SUCCEEDED, FAILED} 

304 """ 

305 params = self._set_default_params(params) 

306 config = merge_parameters( 

307 dest=self.config.copy(), 

308 source=params, 

309 required_keys=[ 

310 "subscription", 

311 "resourceGroup", 

312 "deploymentName", 

313 "vmName", 

314 ], 

315 ) 

316 _LOG.info("Deprovision VM: %s", config["vmName"]) 

317 _LOG.info("Deprovision deployment: %s", config["deploymentName"]) 

318 # TODO: Properly deprovision *all* resources specified in the ARM template. 

319 return self._azure_rest_api_post_helper( 

320 config, 

321 self._URL_DEPROVISION.format( 

322 subscription=config["subscription"], 

323 resource_group=config["resourceGroup"], 

324 vm_name=config["vmName"], 

325 ), 

326 ) 

327 

328 def deallocate_host(self, params: dict) -> Tuple[Status, dict]: 

329 """ 

330 Deallocates the VM on Azure by shutting it down then releasing the compute 

331 resources. 

332 

333 Note: This can cause the VM to arrive on a new host node when its 

334 restarted, which may have different performance characteristics. 

335 

336 Parameters 

337 ---------- 

338 params : dict 

339 Flat dictionary of (key, value) pairs of tunable parameters. 

340 

341 Returns 

342 ------- 

343 result : (Status, dict) 

344 A pair of Status and result. The result is always {}. 

345 Status is one of {PENDING, SUCCEEDED, FAILED} 

346 """ 

347 params = self._set_default_params(params) 

348 config = merge_parameters( 

349 dest=self.config.copy(), 

350 source=params, 

351 required_keys=[ 

352 "subscription", 

353 "resourceGroup", 

354 "vmName", 

355 ], 

356 ) 

357 _LOG.info("Deallocate VM: %s", config["vmName"]) 

358 return self._azure_rest_api_post_helper( 

359 config, 

360 self._URL_DEALLOCATE.format( 

361 subscription=config["subscription"], 

362 resource_group=config["resourceGroup"], 

363 vm_name=config["vmName"], 

364 ), 

365 ) 

366 

367 def start_host(self, params: dict) -> Tuple[Status, dict]: 

368 """ 

369 Start the VM on Azure. 

370 

371 Parameters 

372 ---------- 

373 params : dict 

374 Flat dictionary of (key, value) pairs of tunable parameters. 

375 

376 Returns 

377 ------- 

378 result : (Status, dict) 

379 A pair of Status and result. The result is always {}. 

380 Status is one of {PENDING, SUCCEEDED, FAILED} 

381 """ 

382 params = self._set_default_params(params) 

383 config = merge_parameters( 

384 dest=self.config.copy(), 

385 source=params, 

386 required_keys=[ 

387 "subscription", 

388 "resourceGroup", 

389 "vmName", 

390 ], 

391 ) 

392 _LOG.info("Start VM: %s :: %s", config["vmName"], params) 

393 return self._azure_rest_api_post_helper( 

394 config, 

395 self._URL_START.format( 

396 subscription=config["subscription"], 

397 resource_group=config["resourceGroup"], 

398 vm_name=config["vmName"], 

399 ), 

400 ) 

401 

402 def stop_host(self, params: dict, force: bool = False) -> Tuple[Status, dict]: 

403 """ 

404 Stops the VM on Azure by initiating a graceful shutdown. 

405 

406 Parameters 

407 ---------- 

408 params : dict 

409 Flat dictionary of (key, value) pairs of tunable parameters. 

410 force : bool 

411 If True, force stop the Host/VM. 

412 

413 Returns 

414 ------- 

415 result : (Status, dict) 

416 A pair of Status and result. The result is always {}. 

417 Status is one of {PENDING, SUCCEEDED, FAILED} 

418 """ 

419 params = self._set_default_params(params) 

420 config = merge_parameters( 

421 dest=self.config.copy(), 

422 source=params, 

423 required_keys=[ 

424 "subscription", 

425 "resourceGroup", 

426 "vmName", 

427 ], 

428 ) 

429 _LOG.info("Stop VM: %s", config["vmName"]) 

430 return self._azure_rest_api_post_helper( 

431 config, 

432 self._URL_STOP.format( 

433 subscription=config["subscription"], 

434 resource_group=config["resourceGroup"], 

435 vm_name=config["vmName"], 

436 ), 

437 ) 

438 

439 def shutdown(self, params: dict, force: bool = False) -> Tuple["Status", dict]: 

440 return self.stop_host(params, force) 

441 

442 def restart_host(self, params: dict, force: bool = False) -> Tuple[Status, dict]: 

443 """ 

444 Reboot the VM on Azure by initiating a graceful shutdown. 

445 

446 Parameters 

447 ---------- 

448 params : dict 

449 Flat dictionary of (key, value) pairs of tunable parameters. 

450 force : bool 

451 If True, force restart the Host/VM. 

452 

453 Returns 

454 ------- 

455 result : (Status, dict) 

456 A pair of Status and result. The result is always {}. 

457 Status is one of {PENDING, SUCCEEDED, FAILED} 

458 """ 

459 params = self._set_default_params(params) 

460 config = merge_parameters( 

461 dest=self.config.copy(), 

462 source=params, 

463 required_keys=[ 

464 "subscription", 

465 "resourceGroup", 

466 "vmName", 

467 ], 

468 ) 

469 _LOG.info("Reboot VM: %s", config["vmName"]) 

470 return self._azure_rest_api_post_helper( 

471 config, 

472 self._URL_REBOOT.format( 

473 subscription=config["subscription"], 

474 resource_group=config["resourceGroup"], 

475 vm_name=config["vmName"], 

476 ), 

477 ) 

478 

479 def reboot(self, params: dict, force: bool = False) -> Tuple["Status", dict]: 

480 return self.restart_host(params, force) 

481 

482 def remote_exec( 

483 self, 

484 script: Iterable[str], 

485 config: dict, 

486 env_params: dict, 

487 ) -> Tuple[Status, dict]: 

488 """ 

489 Run a command on Azure VM. 

490 

491 Parameters 

492 ---------- 

493 script : Iterable[str] 

494 A list of lines to execute as a script on a remote VM. 

495 config : dict 

496 Flat dictionary of (key, value) pairs of the Environment parameters. 

497 They usually come from `const_args` and `tunable_params` 

498 properties of the Environment. 

499 env_params : dict 

500 Parameters to pass as *shell* environment variables into the script. 

501 This is usually a subset of `config` with some possible conversions. 

502 

503 Returns 

504 ------- 

505 result : (Status, dict) 

506 A pair of Status and result. 

507 Status is one of {PENDING, SUCCEEDED, FAILED} 

508 """ 

509 config = self._set_default_params(config) 

510 config = merge_parameters( 

511 dest=self.config.copy(), 

512 source=config, 

513 required_keys=[ 

514 "subscription", 

515 "resourceGroup", 

516 "vmName", 

517 "commandName", 

518 "location", 

519 ], 

520 ) 

521 

522 if _LOG.isEnabledFor(logging.INFO): 

523 _LOG.info("Run a script on VM: %s\n %s", config["vmName"], "\n ".join(script)) 

524 

525 json_req = { 

526 "location": config["location"], 

527 "properties": { 

528 "source": {"script": "; ".join(script)}, 

529 "protectedParameters": [ 

530 {"name": key, "value": val} for (key, val) in env_params.items() 

531 ], 

532 "timeoutInSeconds": int(self._poll_timeout), 

533 "asyncExecution": True, 

534 }, 

535 } 

536 

537 url = self._URL_REXEC_RUN.format( 

538 subscription=config["subscription"], 

539 resource_group=config["resourceGroup"], 

540 vm_name=config["vmName"], 

541 command_name=config["commandName"], 

542 ) 

543 

544 if _LOG.isEnabledFor(logging.DEBUG): 

545 _LOG.debug("Request: PUT %s\n%s", url, json.dumps(json_req, indent=2)) 

546 

547 response = requests.put( 

548 url, 

549 json=json_req, 

550 headers=self._get_headers(), 

551 timeout=self._request_timeout, 

552 ) 

553 

554 if _LOG.isEnabledFor(logging.DEBUG): 

555 _LOG.debug( 

556 "Response: %s\n%s", 

557 response, 

558 json.dumps(response.json(), indent=2) if response.content else "", 

559 ) 

560 else: 

561 _LOG.info("Response: %s", response) 

562 

563 if response.status_code in {200, 201}: 

564 results_url = self._URL_REXEC_RESULT.format( 

565 subscription=config["subscription"], 

566 resource_group=config["resourceGroup"], 

567 vm_name=config["vmName"], 

568 command_name=config["commandName"], 

569 ) 

570 return ( 

571 Status.PENDING, 

572 {**config, "asyncResultsUrl": results_url}, 

573 ) 

574 else: 

575 _LOG.error("Response: %s :: %s", response, response.text) 

576 return (Status.FAILED, {}) 

577 

578 def _check_remote_exec_status(self, params: dict) -> Tuple[Status, dict]: 

579 """ 

580 Checks the status of a pending remote execution on an Azure VM. 

581 

582 Parameters 

583 ---------- 

584 params: dict 

585 Flat dictionary of (key, value) pairs of tunable parameters. 

586 Must have the "asyncResultsUrl" key to get the results. 

587 If the key is not present, return Status.PENDING. 

588 

589 Returns 

590 ------- 

591 result : (Status, dict) 

592 A pair of Status and result. 

593 Status is one of {PENDING, RUNNING, SUCCEEDED, FAILED} 

594 Result is info on the operation runtime if SUCCEEDED, otherwise {}. 

595 """ 

596 url = params.get("asyncResultsUrl") 

597 if url is None: 

598 return Status.PENDING, {} 

599 

600 session = self._get_session(params) 

601 try: 

602 response = session.get(url, timeout=self._request_timeout) 

603 except requests.exceptions.ReadTimeout: 

604 _LOG.warning("Request timed out after %.2f s: %s", self._request_timeout, url) 

605 return Status.RUNNING, {} 

606 except requests.exceptions.RequestException as ex: 

607 _LOG.exception("Error in request checking operation status", exc_info=ex) 

608 return (Status.FAILED, {}) 

609 

610 if _LOG.isEnabledFor(logging.DEBUG): 

611 _LOG.debug( 

612 "Response: %s\n%s", 

613 response, 

614 json.dumps(response.json(), indent=2) if response.content else "", 

615 ) 

616 

617 if response.status_code == 200: 

618 output = response.json() 

619 execution_state = ( 

620 output.get("properties", {}).get("instanceView", {}).get("executionState") 

621 ) 

622 if execution_state in {"Running", "Pending"}: 

623 return Status.RUNNING, {} 

624 elif execution_state == "Succeeded": 

625 return Status.SUCCEEDED, output 

626 

627 _LOG.error("Response: %s :: %s", response, response.text) 

628 return Status.FAILED, {} 

629 

630 def get_remote_exec_results(self, config: dict) -> Tuple[Status, dict]: 

631 """ 

632 Get the results of the asynchronously running command. 

633 

634 Parameters 

635 ---------- 

636 config : dict 

637 Flat dictionary of (key, value) pairs of tunable parameters. 

638 Must have the "asyncResultsUrl" key to get the results. 

639 If the key is not present, return Status.PENDING. 

640 

641 Returns 

642 ------- 

643 result : (Status, dict) 

644 A pair of Status and result. 

645 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT} 

646 A dict can have an "stdout" key with the remote output 

647 and an "stderr" key for errors / warnings. 

648 """ 

649 _LOG.info("Check the results on VM: %s", config.get("vmName")) 

650 (status, result) = self.wait_remote_exec_operation(config) 

651 _LOG.debug("Result: %s :: %s", status, result) 

652 if not status.is_succeeded(): 

653 # TODO: Extract the telemetry and status from stdout, if available 

654 return (status, result) 

655 

656 output = result.get("properties", {}).get("instanceView", {}) 

657 exit_code = output.get("exitCode") 

658 execution_state = output.get("executionState") 

659 outputs = output.get("output", "").strip().split("\n") 

660 errors = output.get("error", "").strip().split("\n") 

661 

662 if execution_state == "Succeeded" and exit_code == 0: 

663 status = Status.SUCCEEDED 

664 else: 

665 status = Status.FAILED 

666 

667 return ( 

668 status, 

669 { 

670 "stdout": outputs, 

671 "stderr": errors, 

672 "exitCode": exit_code, 

673 "startTimestamp": datetime.fromisoformat(output["startTime"]), 

674 "endTimestamp": datetime.fromisoformat(output["endTime"]), 

675 }, 

676 )