Coverage for mlos_bench/mlos_bench/services/remote/azure/azure_vm_services.py: 78%
136 statements
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-22 01:18 +0000
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-22 01:18 +0000
1#
2# Copyright (c) Microsoft Corporation.
3# Licensed under the MIT License.
4#
5"""A collection Service functions for managing VMs on Azure."""
7import json
8import logging
9from datetime import datetime
10from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
12import requests
14from mlos_bench.environments.status import Status
15from mlos_bench.services.base_service import Service
16from mlos_bench.services.remote.azure.azure_deployment_services import (
17 AzureDeploymentService,
18)
19from mlos_bench.services.types.host_ops_type import SupportsHostOps
20from mlos_bench.services.types.host_provisioner_type import SupportsHostProvisioning
21from mlos_bench.services.types.os_ops_type import SupportsOSOps
22from mlos_bench.services.types.remote_exec_type import SupportsRemoteExec
23from mlos_bench.util import merge_parameters
25_LOG = logging.getLogger(__name__)
28class AzureVMService(
29 AzureDeploymentService,
30 SupportsHostProvisioning,
31 SupportsHostOps,
32 SupportsOSOps,
33 SupportsRemoteExec,
34):
35 """Helper methods to manage VMs on Azure."""
37 # pylint: disable=too-many-ancestors
39 # Azure Compute REST API calls as described in
40 # https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines
42 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/start
43 _URL_START = (
44 "https://management.azure.com"
45 "/subscriptions/{subscription}"
46 "/resourceGroups/{resource_group}"
47 "/providers/Microsoft.Compute"
48 "/virtualMachines/{vm_name}"
49 "/start"
50 "?api-version=2022-03-01"
51 )
53 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/power-off
54 _URL_STOP = (
55 "https://management.azure.com"
56 "/subscriptions/{subscription}"
57 "/resourceGroups/{resource_group}"
58 "/providers/Microsoft.Compute"
59 "/virtualMachines/{vm_name}"
60 "/powerOff"
61 "?api-version=2022-03-01"
62 )
64 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/deallocate
65 _URL_DEALLOCATE = (
66 "https://management.azure.com"
67 "/subscriptions/{subscription}"
68 "/resourceGroups/{resource_group}"
69 "/providers/Microsoft.Compute"
70 "/virtualMachines/{vm_name}"
71 "/deallocate"
72 "?api-version=2022-03-01"
73 )
75 # TODO: This is probably the more correct URL to use for the deprovision operation.
76 # However, previous code used the deallocate URL above, so for now, we keep
77 # that and handle that change later.
78 # See Also: #498
79 _URL_DEPROVISION = _URL_DEALLOCATE
81 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/delete
82 # _URL_DEPROVISION = (
83 # "https://management.azure.com"
84 # "/subscriptions/{subscription}"
85 # "/resourceGroups/{resource_group}"
86 # "/providers/Microsoft.Compute"
87 # "/virtualMachines/{vm_name}"
88 # "/delete"
89 # "?api-version=2022-03-01"
90 # )
92 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/restart
93 _URL_REBOOT = (
94 "https://management.azure.com"
95 "/subscriptions/{subscription}"
96 "/resourceGroups/{resource_group}"
97 "/providers/Microsoft.Compute"
98 "/virtualMachines/{vm_name}"
99 "/restart"
100 "?api-version=2022-03-01"
101 )
103 # From:
104 # https://learn.microsoft.com/en-us/rest/api/compute/virtual-machine-run-commands/create-or-update
105 _URL_REXEC_RUN = (
106 "https://management.azure.com"
107 "/subscriptions/{subscription}"
108 "/resourceGroups/{resource_group}"
109 "/providers/Microsoft.Compute"
110 "/virtualMachines/{vm_name}"
111 "/runcommands/{command_name}"
112 "?api-version=2024-07-01"
113 )
114 _URL_REXEC_RESULT = (
115 "https://management.azure.com"
116 "/subscriptions/{subscription}"
117 "/resourceGroups/{resource_group}"
118 "/providers/Microsoft.Compute"
119 "/virtualMachines/{vm_name}"
120 "/runcommands/{command_name}"
121 "?$expand=instanceView&api-version=2024-07-01"
122 )
124 def __init__(
125 self,
126 config: Optional[Dict[str, Any]] = None,
127 global_config: Optional[Dict[str, Any]] = None,
128 parent: Optional[Service] = None,
129 methods: Union[Dict[str, Callable], List[Callable], None] = None,
130 ):
131 """
132 Create a new instance of Azure VM services proxy.
134 Parameters
135 ----------
136 config : dict
137 Free-format dictionary that contains the benchmark environment
138 configuration.
139 global_config : dict
140 Free-format dictionary of global parameters.
141 parent : Service
142 Parent service that can provide mixin functions.
143 methods : Union[Dict[str, Callable], List[Callable], None]
144 New methods to register with the service.
145 """
146 super().__init__(
147 config,
148 global_config,
149 parent,
150 self.merge_methods(
151 methods,
152 [
153 # SupportsHostProvisioning
154 self.provision_host,
155 self.deprovision_host,
156 self.deallocate_host,
157 self.wait_host_deployment,
158 # SupportsHostOps
159 self.start_host,
160 self.stop_host,
161 self.restart_host,
162 self.wait_host_operation,
163 # SupportsOSOps
164 self.shutdown,
165 self.reboot,
166 self.wait_os_operation,
167 # SupportsRemoteExec
168 self.remote_exec,
169 self.get_remote_exec_results,
170 ],
171 ),
172 )
174 # As a convenience, allow reading customData out of a file, rather than
175 # embedding it in a json config file.
176 # Note: ARM templates expect this data to be base64 encoded, but that
177 # can be done using the `base64()` string function inside the ARM template.
178 self._custom_data_file = self.config.get("customDataFile", None)
179 if self._custom_data_file:
180 if self._deploy_params.get("customData", None):
181 raise ValueError("Both customDataFile and customData are specified.")
182 self._custom_data_file = self.config_loader_service.resolve_path(
183 self._custom_data_file
184 )
185 with open(self._custom_data_file, "r", encoding="utf-8") as custom_data_fh:
186 self._deploy_params["customData"] = custom_data_fh.read()
188 def _set_default_params(self, params: dict) -> dict: # pylint: disable=no-self-use
189 # Try and provide a semi sane default for the deploymentName if not provided
190 # since this is a common way to set the deploymentName and can same some
191 # config work for the caller.
192 if "vmName" in params and "deploymentName" not in params:
193 params["deploymentName"] = f"{params['vmName']}-deployment"
194 _LOG.info(
195 "deploymentName missing from params. Defaulting to '%s'.",
196 params["deploymentName"],
197 )
198 return params
200 def wait_host_deployment(self, params: dict, *, is_setup: bool) -> Tuple[Status, dict]:
201 """
202 Waits for a pending operation on an Azure VM to resolve to SUCCEEDED or FAILED.
203 Return TIMED_OUT when timing out.
205 Parameters
206 ----------
207 params : dict
208 Flat dictionary of (key, value) pairs of tunable parameters.
209 is_setup : bool
210 If True, wait for VM being deployed; otherwise, wait for successful deprovisioning.
212 Returns
213 -------
214 result : (Status, dict)
215 A pair of Status and result.
216 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT}
217 Result is info on the operation runtime if SUCCEEDED, otherwise {}.
218 """
219 return self._wait_deployment(params, is_setup=is_setup)
221 def wait_host_operation(self, params: dict) -> Tuple[Status, dict]:
222 """
223 Waits for a pending operation on an Azure VM to resolve to SUCCEEDED or FAILED.
224 Return TIMED_OUT when timing out.
226 Parameters
227 ----------
228 params: dict
229 Flat dictionary of (key, value) pairs of tunable parameters.
230 Must have the "asyncResultsUrl" key to get the results.
231 If the key is not present, return Status.PENDING.
233 Returns
234 -------
235 result : (Status, dict)
236 A pair of Status and result.
237 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT}
238 Result is info on the operation runtime if SUCCEEDED, otherwise {}.
239 """
240 _LOG.info("Wait for operation on VM %s", params["vmName"])
241 # Try and provide a semi sane default for the deploymentName
242 params.setdefault(f"{params['vmName']}-deployment")
243 return self._wait_while(self._check_operation_status, Status.RUNNING, params)
245 def wait_remote_exec_operation(self, params: dict) -> Tuple["Status", dict]:
246 """
247 Waits for a pending remote execution on an Azure VM to resolve to SUCCEEDED or
248 FAILED. Return TIMED_OUT when timing out.
250 Parameters
251 ----------
252 params: dict
253 Flat dictionary of (key, value) pairs of tunable parameters.
254 Must have the "asyncResultsUrl" key to get the results.
255 If the key is not present, return Status.PENDING.
257 Returns
258 -------
259 result : (Status, dict)
260 A pair of Status and result.
261 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT}
262 Result is info on the operation runtime if SUCCEEDED, otherwise {}.
263 """
264 _LOG.info("Wait for run command %s on VM %s", params["commandName"], params["vmName"])
265 return self._wait_while(self._check_remote_exec_status, Status.RUNNING, params)
267 def wait_os_operation(self, params: dict) -> Tuple["Status", dict]:
268 return self.wait_host_operation(params)
270 def provision_host(self, params: dict) -> Tuple[Status, dict]:
271 """
272 Check if Azure VM is ready. Deploy a new VM, if necessary.
274 Parameters
275 ----------
276 params : dict
277 Flat dictionary of (key, value) pairs of tunable parameters.
278 HostEnv tunables are variable parameters that, together with the
279 HostEnv configuration, are sufficient to provision a VM.
281 Returns
282 -------
283 result : (Status, dict)
284 A pair of Status and result. The result is the input `params` plus the
285 parameters extracted from the response JSON, or {} if the status is FAILED.
286 Status is one of {PENDING, SUCCEEDED, FAILED}
287 """
288 return self._provision_resource(params)
290 def deprovision_host(self, params: dict) -> Tuple[Status, dict]:
291 """
292 Deprovisions the VM on Azure by deleting it.
294 Parameters
295 ----------
296 params : dict
297 Flat dictionary of (key, value) pairs of tunable parameters.
299 Returns
300 -------
301 result : (Status, dict)
302 A pair of Status and result. The result is always {}.
303 Status is one of {PENDING, SUCCEEDED, FAILED}
304 """
305 params = self._set_default_params(params)
306 config = merge_parameters(
307 dest=self.config.copy(),
308 source=params,
309 required_keys=[
310 "subscription",
311 "resourceGroup",
312 "deploymentName",
313 "vmName",
314 ],
315 )
316 _LOG.info("Deprovision VM: %s", config["vmName"])
317 _LOG.info("Deprovision deployment: %s", config["deploymentName"])
318 # TODO: Properly deprovision *all* resources specified in the ARM template.
319 return self._azure_rest_api_post_helper(
320 config,
321 self._URL_DEPROVISION.format(
322 subscription=config["subscription"],
323 resource_group=config["resourceGroup"],
324 vm_name=config["vmName"],
325 ),
326 )
328 def deallocate_host(self, params: dict) -> Tuple[Status, dict]:
329 """
330 Deallocates the VM on Azure by shutting it down then releasing the compute
331 resources.
333 Note: This can cause the VM to arrive on a new host node when its
334 restarted, which may have different performance characteristics.
336 Parameters
337 ----------
338 params : dict
339 Flat dictionary of (key, value) pairs of tunable parameters.
341 Returns
342 -------
343 result : (Status, dict)
344 A pair of Status and result. The result is always {}.
345 Status is one of {PENDING, SUCCEEDED, FAILED}
346 """
347 params = self._set_default_params(params)
348 config = merge_parameters(
349 dest=self.config.copy(),
350 source=params,
351 required_keys=[
352 "subscription",
353 "resourceGroup",
354 "vmName",
355 ],
356 )
357 _LOG.info("Deallocate VM: %s", config["vmName"])
358 return self._azure_rest_api_post_helper(
359 config,
360 self._URL_DEALLOCATE.format(
361 subscription=config["subscription"],
362 resource_group=config["resourceGroup"],
363 vm_name=config["vmName"],
364 ),
365 )
367 def start_host(self, params: dict) -> Tuple[Status, dict]:
368 """
369 Start the VM on Azure.
371 Parameters
372 ----------
373 params : dict
374 Flat dictionary of (key, value) pairs of tunable parameters.
376 Returns
377 -------
378 result : (Status, dict)
379 A pair of Status and result. The result is always {}.
380 Status is one of {PENDING, SUCCEEDED, FAILED}
381 """
382 params = self._set_default_params(params)
383 config = merge_parameters(
384 dest=self.config.copy(),
385 source=params,
386 required_keys=[
387 "subscription",
388 "resourceGroup",
389 "vmName",
390 ],
391 )
392 _LOG.info("Start VM: %s :: %s", config["vmName"], params)
393 return self._azure_rest_api_post_helper(
394 config,
395 self._URL_START.format(
396 subscription=config["subscription"],
397 resource_group=config["resourceGroup"],
398 vm_name=config["vmName"],
399 ),
400 )
402 def stop_host(self, params: dict, force: bool = False) -> Tuple[Status, dict]:
403 """
404 Stops the VM on Azure by initiating a graceful shutdown.
406 Parameters
407 ----------
408 params : dict
409 Flat dictionary of (key, value) pairs of tunable parameters.
410 force : bool
411 If True, force stop the Host/VM.
413 Returns
414 -------
415 result : (Status, dict)
416 A pair of Status and result. The result is always {}.
417 Status is one of {PENDING, SUCCEEDED, FAILED}
418 """
419 params = self._set_default_params(params)
420 config = merge_parameters(
421 dest=self.config.copy(),
422 source=params,
423 required_keys=[
424 "subscription",
425 "resourceGroup",
426 "vmName",
427 ],
428 )
429 _LOG.info("Stop VM: %s", config["vmName"])
430 return self._azure_rest_api_post_helper(
431 config,
432 self._URL_STOP.format(
433 subscription=config["subscription"],
434 resource_group=config["resourceGroup"],
435 vm_name=config["vmName"],
436 ),
437 )
439 def shutdown(self, params: dict, force: bool = False) -> Tuple["Status", dict]:
440 return self.stop_host(params, force)
442 def restart_host(self, params: dict, force: bool = False) -> Tuple[Status, dict]:
443 """
444 Reboot the VM on Azure by initiating a graceful shutdown.
446 Parameters
447 ----------
448 params : dict
449 Flat dictionary of (key, value) pairs of tunable parameters.
450 force : bool
451 If True, force restart the Host/VM.
453 Returns
454 -------
455 result : (Status, dict)
456 A pair of Status and result. The result is always {}.
457 Status is one of {PENDING, SUCCEEDED, FAILED}
458 """
459 params = self._set_default_params(params)
460 config = merge_parameters(
461 dest=self.config.copy(),
462 source=params,
463 required_keys=[
464 "subscription",
465 "resourceGroup",
466 "vmName",
467 ],
468 )
469 _LOG.info("Reboot VM: %s", config["vmName"])
470 return self._azure_rest_api_post_helper(
471 config,
472 self._URL_REBOOT.format(
473 subscription=config["subscription"],
474 resource_group=config["resourceGroup"],
475 vm_name=config["vmName"],
476 ),
477 )
479 def reboot(self, params: dict, force: bool = False) -> Tuple["Status", dict]:
480 return self.restart_host(params, force)
482 def remote_exec(
483 self,
484 script: Iterable[str],
485 config: dict,
486 env_params: dict,
487 ) -> Tuple[Status, dict]:
488 """
489 Run a command on Azure VM.
491 Parameters
492 ----------
493 script : Iterable[str]
494 A list of lines to execute as a script on a remote VM.
495 config : dict
496 Flat dictionary of (key, value) pairs of the Environment parameters.
497 They usually come from `const_args` and `tunable_params`
498 properties of the Environment.
499 env_params : dict
500 Parameters to pass as *shell* environment variables into the script.
501 This is usually a subset of `config` with some possible conversions.
503 Returns
504 -------
505 result : (Status, dict)
506 A pair of Status and result.
507 Status is one of {PENDING, SUCCEEDED, FAILED}
508 """
509 config = self._set_default_params(config)
510 config = merge_parameters(
511 dest=self.config.copy(),
512 source=config,
513 required_keys=[
514 "subscription",
515 "resourceGroup",
516 "vmName",
517 "commandName",
518 "location",
519 ],
520 )
522 if _LOG.isEnabledFor(logging.INFO):
523 _LOG.info("Run a script on VM: %s\n %s", config["vmName"], "\n ".join(script))
525 json_req = {
526 "location": config["location"],
527 "properties": {
528 "source": {"script": "; ".join(script)},
529 "protectedParameters": [
530 {"name": key, "value": val} for (key, val) in env_params.items()
531 ],
532 "timeoutInSeconds": int(self._poll_timeout),
533 "asyncExecution": True,
534 },
535 }
537 url = self._URL_REXEC_RUN.format(
538 subscription=config["subscription"],
539 resource_group=config["resourceGroup"],
540 vm_name=config["vmName"],
541 command_name=config["commandName"],
542 )
544 if _LOG.isEnabledFor(logging.DEBUG):
545 _LOG.debug("Request: PUT %s\n%s", url, json.dumps(json_req, indent=2))
547 response = requests.put(
548 url,
549 json=json_req,
550 headers=self._get_headers(),
551 timeout=self._request_timeout,
552 )
554 if _LOG.isEnabledFor(logging.DEBUG):
555 _LOG.debug(
556 "Response: %s\n%s",
557 response,
558 json.dumps(response.json(), indent=2) if response.content else "",
559 )
560 else:
561 _LOG.info("Response: %s", response)
563 if response.status_code in {200, 201}:
564 results_url = self._URL_REXEC_RESULT.format(
565 subscription=config["subscription"],
566 resource_group=config["resourceGroup"],
567 vm_name=config["vmName"],
568 command_name=config["commandName"],
569 )
570 return (
571 Status.PENDING,
572 {**config, "asyncResultsUrl": results_url},
573 )
574 else:
575 _LOG.error("Response: %s :: %s", response, response.text)
576 return (Status.FAILED, {})
578 def _check_remote_exec_status(self, params: dict) -> Tuple[Status, dict]:
579 """
580 Checks the status of a pending remote execution on an Azure VM.
582 Parameters
583 ----------
584 params: dict
585 Flat dictionary of (key, value) pairs of tunable parameters.
586 Must have the "asyncResultsUrl" key to get the results.
587 If the key is not present, return Status.PENDING.
589 Returns
590 -------
591 result : (Status, dict)
592 A pair of Status and result.
593 Status is one of {PENDING, RUNNING, SUCCEEDED, FAILED}
594 Result is info on the operation runtime if SUCCEEDED, otherwise {}.
595 """
596 url = params.get("asyncResultsUrl")
597 if url is None:
598 return Status.PENDING, {}
600 session = self._get_session(params)
601 try:
602 response = session.get(url, timeout=self._request_timeout)
603 except requests.exceptions.ReadTimeout:
604 _LOG.warning("Request timed out after %.2f s: %s", self._request_timeout, url)
605 return Status.RUNNING, {}
606 except requests.exceptions.RequestException as ex:
607 _LOG.exception("Error in request checking operation status", exc_info=ex)
608 return (Status.FAILED, {})
610 if _LOG.isEnabledFor(logging.DEBUG):
611 _LOG.debug(
612 "Response: %s\n%s",
613 response,
614 json.dumps(response.json(), indent=2) if response.content else "",
615 )
617 if response.status_code == 200:
618 output = response.json()
619 execution_state = (
620 output.get("properties", {}).get("instanceView", {}).get("executionState")
621 )
622 if execution_state in {"Running", "Pending"}:
623 return Status.RUNNING, {}
624 elif execution_state == "Succeeded":
625 return Status.SUCCEEDED, output
627 _LOG.error("Response: %s :: %s", response, response.text)
628 return Status.FAILED, {}
630 def get_remote_exec_results(self, config: dict) -> Tuple[Status, dict]:
631 """
632 Get the results of the asynchronously running command.
634 Parameters
635 ----------
636 config : dict
637 Flat dictionary of (key, value) pairs of tunable parameters.
638 Must have the "asyncResultsUrl" key to get the results.
639 If the key is not present, return Status.PENDING.
641 Returns
642 -------
643 result : (Status, dict)
644 A pair of Status and result.
645 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT}
646 A dict can have an "stdout" key with the remote output
647 and an "stderr" key for errors / warnings.
648 """
649 _LOG.info("Check the results on VM: %s", config.get("vmName"))
650 (status, result) = self.wait_remote_exec_operation(config)
651 _LOG.debug("Result: %s :: %s", status, result)
652 if not status.is_succeeded():
653 # TODO: Extract the telemetry and status from stdout, if available
654 return (status, result)
656 output = result.get("properties", {}).get("instanceView", {})
657 exit_code = output.get("exitCode")
658 execution_state = output.get("executionState")
659 outputs = output.get("output", "").strip().split("\n")
660 errors = output.get("error", "").strip().split("\n")
662 if execution_state == "Succeeded" and exit_code == 0:
663 status = Status.SUCCEEDED
664 else:
665 status = Status.FAILED
667 return (
668 status,
669 {
670 "stdout": outputs,
671 "stderr": errors,
672 "exitCode": exit_code,
673 "startTimestamp": datetime.fromisoformat(output["startTime"]),
674 "endTimestamp": datetime.fromisoformat(output["endTime"]),
675 },
676 )