Coverage for mlos_bench/mlos_bench/services/remote/azure/azure_vm_services.py: 98%
102 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-06 00:35 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-06 00:35 +0000
1#
2# Copyright (c) Microsoft Corporation.
3# Licensed under the MIT License.
4#
5"""
6A collection Service functions for managing VMs on Azure.
7"""
9import json
10import logging
12from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
14import requests
16from mlos_bench.environments.status import Status
17from mlos_bench.services.base_service import Service
18from mlos_bench.services.remote.azure.azure_deployment_services import AzureDeploymentService
19from mlos_bench.services.types.remote_exec_type import SupportsRemoteExec
20from mlos_bench.services.types.host_provisioner_type import SupportsHostProvisioning
21from mlos_bench.services.types.host_ops_type import SupportsHostOps
22from mlos_bench.services.types.os_ops_type import SupportsOSOps
23from mlos_bench.util import merge_parameters
25_LOG = logging.getLogger(__name__)
28class AzureVMService(AzureDeploymentService, SupportsHostProvisioning, SupportsHostOps, SupportsOSOps, SupportsRemoteExec):
29 """
30 Helper methods to manage VMs on Azure.
31 """
33 # pylint: disable=too-many-ancestors
35 # Azure Compute REST API calls as described in
36 # https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines
38 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/start
39 _URL_START = (
40 "https://management.azure.com" +
41 "/subscriptions/{subscription}" +
42 "/resourceGroups/{resource_group}" +
43 "/providers/Microsoft.Compute" +
44 "/virtualMachines/{vm_name}" +
45 "/start" +
46 "?api-version=2022-03-01"
47 )
49 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/power-off
50 _URL_STOP = (
51 "https://management.azure.com" +
52 "/subscriptions/{subscription}" +
53 "/resourceGroups/{resource_group}" +
54 "/providers/Microsoft.Compute" +
55 "/virtualMachines/{vm_name}" +
56 "/powerOff" +
57 "?api-version=2022-03-01"
58 )
60 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/deallocate
61 _URL_DEALLOCATE = (
62 "https://management.azure.com" +
63 "/subscriptions/{subscription}" +
64 "/resourceGroups/{resource_group}" +
65 "/providers/Microsoft.Compute" +
66 "/virtualMachines/{vm_name}" +
67 "/deallocate" +
68 "?api-version=2022-03-01"
69 )
71 # TODO: This is probably the more correct URL to use for the deprovision operation.
72 # However, previous code used the deallocate URL above, so for now, we keep
73 # that and handle that change later.
74 # See Also: #498
75 _URL_DEPROVISION = _URL_DEALLOCATE
77 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/delete
78 # _URL_DEPROVISION = (
79 # "https://management.azure.com" +
80 # "/subscriptions/{subscription}" +
81 # "/resourceGroups/{resource_group}" +
82 # "/providers/Microsoft.Compute" +
83 # "/virtualMachines/{vm_name}" +
84 # "/delete" +
85 # "?api-version=2022-03-01"
86 # )
88 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/restart
89 _URL_REBOOT = (
90 "https://management.azure.com" +
91 "/subscriptions/{subscription}" +
92 "/resourceGroups/{resource_group}" +
93 "/providers/Microsoft.Compute" +
94 "/virtualMachines/{vm_name}" +
95 "/restart" +
96 "?api-version=2022-03-01"
97 )
99 # From: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/run-command
100 _URL_REXEC_RUN = (
101 "https://management.azure.com" +
102 "/subscriptions/{subscription}" +
103 "/resourceGroups/{resource_group}" +
104 "/providers/Microsoft.Compute" +
105 "/virtualMachines/{vm_name}" +
106 "/runCommand" +
107 "?api-version=2022-03-01"
108 )
110 def __init__(self,
111 config: Optional[Dict[str, Any]] = None,
112 global_config: Optional[Dict[str, Any]] = None,
113 parent: Optional[Service] = None,
114 methods: Union[Dict[str, Callable], List[Callable], None] = None):
115 """
116 Create a new instance of Azure VM services proxy.
118 Parameters
119 ----------
120 config : dict
121 Free-format dictionary that contains the benchmark environment
122 configuration.
123 global_config : dict
124 Free-format dictionary of global parameters.
125 parent : Service
126 Parent service that can provide mixin functions.
127 methods : Union[Dict[str, Callable], List[Callable], None]
128 New methods to register with the service.
129 """
130 super().__init__(
131 config, global_config, parent,
132 self.merge_methods(methods, [
133 # SupportsHostProvisioning
134 self.provision_host,
135 self.deprovision_host,
136 self.deallocate_host,
137 self.wait_host_deployment,
138 # SupportsHostOps
139 self.start_host,
140 self.stop_host,
141 self.restart_host,
142 self.wait_host_operation,
143 # SupportsOSOps
144 self.shutdown,
145 self.reboot,
146 self.wait_os_operation,
147 # SupportsRemoteExec
148 self.remote_exec,
149 self.get_remote_exec_results,
150 ])
151 )
153 # As a convenience, allow reading customData out of a file, rather than
154 # embedding it in a json config file.
155 # Note: ARM templates expect this data to be base64 encoded, but that
156 # can be done using the `base64()` string function inside the ARM template.
157 self._custom_data_file = self.config.get("customDataFile", None)
158 if self._custom_data_file:
159 if self._deploy_params.get('customData', None):
160 raise ValueError("Both customDataFile and customData are specified.")
161 self._custom_data_file = self.config_loader_service.resolve_path(self._custom_data_file)
162 with open(self._custom_data_file, 'r', encoding='utf-8') as custom_data_fh:
163 self._deploy_params["customData"] = custom_data_fh.read()
165 def _set_default_params(self, params: dict) -> dict: # pylint: disable=no-self-use
166 # Try and provide a semi sane default for the deploymentName if not provided
167 # since this is a common way to set the deploymentName and can same some
168 # config work for the caller.
169 if "vmName" in params and "deploymentName" not in params:
170 params["deploymentName"] = f"{params['vmName']}-deployment"
171 _LOG.info("deploymentName missing from params. Defaulting to '%s'.", params["deploymentName"])
172 return params
174 def wait_host_deployment(self, params: dict, *, is_setup: bool) -> Tuple[Status, dict]:
175 """
176 Waits for a pending operation on an Azure VM to resolve to SUCCEEDED or FAILED.
177 Return TIMED_OUT when timing out.
179 Parameters
180 ----------
181 params : dict
182 Flat dictionary of (key, value) pairs of tunable parameters.
183 is_setup : bool
184 If True, wait for VM being deployed; otherwise, wait for successful deprovisioning.
186 Returns
187 -------
188 result : (Status, dict)
189 A pair of Status and result.
190 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT}
191 Result is info on the operation runtime if SUCCEEDED, otherwise {}.
192 """
193 return self._wait_deployment(params, is_setup=is_setup)
195 def wait_host_operation(self, params: dict) -> Tuple[Status, dict]:
196 """
197 Waits for a pending operation on an Azure VM to resolve to SUCCEEDED or FAILED.
198 Return TIMED_OUT when timing out.
200 Parameters
201 ----------
202 params: dict
203 Flat dictionary of (key, value) pairs of tunable parameters.
204 Must have the "asyncResultsUrl" key to get the results.
205 If the key is not present, return Status.PENDING.
207 Returns
208 -------
209 result : (Status, dict)
210 A pair of Status and result.
211 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT}
212 Result is info on the operation runtime if SUCCEEDED, otherwise {}.
213 """
214 _LOG.info("Wait for operation on VM %s", params["vmName"])
215 # Try and provide a semi sane default for the deploymentName
216 params.setdefault(f"{params['vmName']}-deployment")
217 return self._wait_while(self._check_operation_status, Status.RUNNING, params)
219 def wait_os_operation(self, params: dict) -> Tuple["Status", dict]:
220 return self.wait_host_operation(params)
222 def provision_host(self, params: dict) -> Tuple[Status, dict]:
223 """
224 Check if Azure VM is ready. Deploy a new VM, if necessary.
226 Parameters
227 ----------
228 params : dict
229 Flat dictionary of (key, value) pairs of tunable parameters.
230 HostEnv tunables are variable parameters that, together with the
231 HostEnv configuration, are sufficient to provision a VM.
233 Returns
234 -------
235 result : (Status, dict={})
236 A pair of Status and result. The result is the input `params` plus the
237 parameters extracted from the response JSON, or {} if the status is FAILED.
238 Status is one of {PENDING, SUCCEEDED, FAILED}
239 """
240 return self._provision_resource(params)
242 def deprovision_host(self, params: dict) -> Tuple[Status, dict]:
243 """
244 Deprovisions the VM on Azure by deleting it.
246 Parameters
247 ----------
248 params : dict
249 Flat dictionary of (key, value) pairs of tunable parameters.
251 Returns
252 -------
253 result : (Status, dict={})
254 A pair of Status and result. The result is always {}.
255 Status is one of {PENDING, SUCCEEDED, FAILED}
256 """
257 params = self._set_default_params(params)
258 config = merge_parameters(
259 dest=self.config.copy(),
260 source=params,
261 required_keys=[
262 "subscription",
263 "resourceGroup",
264 "deploymentName",
265 "vmName",
266 ]
267 )
268 _LOG.info("Deprovision VM: %s", config["vmName"])
269 _LOG.info("Deprovision deployment: %s", config["deploymentName"])
270 # TODO: Properly deprovision *all* resources specified in the ARM template.
271 return self._azure_rest_api_post_helper(config, self._URL_DEPROVISION.format(
272 subscription=config["subscription"],
273 resource_group=config["resourceGroup"],
274 vm_name=config["vmName"],
275 ))
277 def deallocate_host(self, params: dict) -> Tuple[Status, dict]:
278 """
279 Deallocates the VM on Azure by shutting it down then releasing the compute resources.
281 Note: This can cause the VM to arrive on a new host node when its
282 restarted, which may have different performance characteristics.
284 Parameters
285 ----------
286 params : dict
287 Flat dictionary of (key, value) pairs of tunable parameters.
289 Returns
290 -------
291 result : (Status, dict={})
292 A pair of Status and result. The result is always {}.
293 Status is one of {PENDING, SUCCEEDED, FAILED}
294 """
295 params = self._set_default_params(params)
296 config = merge_parameters(
297 dest=self.config.copy(),
298 source=params,
299 required_keys=[
300 "subscription",
301 "resourceGroup",
302 "vmName",
303 ]
304 )
305 _LOG.info("Deallocate VM: %s", config["vmName"])
306 return self._azure_rest_api_post_helper(config, self._URL_DEALLOCATE.format(
307 subscription=config["subscription"],
308 resource_group=config["resourceGroup"],
309 vm_name=config["vmName"],
310 ))
312 def start_host(self, params: dict) -> Tuple[Status, dict]:
313 """
314 Start the VM on Azure.
316 Parameters
317 ----------
318 params : dict
319 Flat dictionary of (key, value) pairs of tunable parameters.
321 Returns
322 -------
323 result : (Status, dict={})
324 A pair of Status and result. The result is always {}.
325 Status is one of {PENDING, SUCCEEDED, FAILED}
326 """
327 params = self._set_default_params(params)
328 config = merge_parameters(
329 dest=self.config.copy(),
330 source=params,
331 required_keys=[
332 "subscription",
333 "resourceGroup",
334 "vmName",
335 ]
336 )
337 _LOG.info("Start VM: %s :: %s", config["vmName"], params)
338 return self._azure_rest_api_post_helper(config, self._URL_START.format(
339 subscription=config["subscription"],
340 resource_group=config["resourceGroup"],
341 vm_name=config["vmName"],
342 ))
344 def stop_host(self, params: dict, force: bool = False) -> Tuple[Status, dict]:
345 """
346 Stops the VM on Azure by initiating a graceful shutdown.
348 Parameters
349 ----------
350 params : dict
351 Flat dictionary of (key, value) pairs of tunable parameters.
352 force : bool
353 If True, force stop the Host/VM.
355 Returns
356 -------
357 result : (Status, dict={})
358 A pair of Status and result. The result is always {}.
359 Status is one of {PENDING, SUCCEEDED, FAILED}
360 """
361 params = self._set_default_params(params)
362 config = merge_parameters(
363 dest=self.config.copy(),
364 source=params,
365 required_keys=[
366 "subscription",
367 "resourceGroup",
368 "vmName",
369 ]
370 )
371 _LOG.info("Stop VM: %s", config["vmName"])
372 return self._azure_rest_api_post_helper(config, self._URL_STOP.format(
373 subscription=config["subscription"],
374 resource_group=config["resourceGroup"],
375 vm_name=config["vmName"],
376 ))
378 def shutdown(self, params: dict, force: bool = False) -> Tuple["Status", dict]:
379 return self.stop_host(params, force)
381 def restart_host(self, params: dict, force: bool = False) -> Tuple[Status, dict]:
382 """
383 Reboot the VM on Azure by initiating a graceful shutdown.
385 Parameters
386 ----------
387 params : dict
388 Flat dictionary of (key, value) pairs of tunable parameters.
389 force : bool
390 If True, force restart the Host/VM.
392 Returns
393 -------
394 result : (Status, dict={})
395 A pair of Status and result. The result is always {}.
396 Status is one of {PENDING, SUCCEEDED, FAILED}
397 """
398 params = self._set_default_params(params)
399 config = merge_parameters(
400 dest=self.config.copy(),
401 source=params,
402 required_keys=[
403 "subscription",
404 "resourceGroup",
405 "vmName",
406 ]
407 )
408 _LOG.info("Reboot VM: %s", config["vmName"])
409 return self._azure_rest_api_post_helper(config, self._URL_REBOOT.format(
410 subscription=config["subscription"],
411 resource_group=config["resourceGroup"],
412 vm_name=config["vmName"],
413 ))
415 def reboot(self, params: dict, force: bool = False) -> Tuple["Status", dict]:
416 return self.restart_host(params, force)
418 def remote_exec(self, script: Iterable[str], config: dict,
419 env_params: dict) -> Tuple[Status, dict]:
420 """
421 Run a command on Azure VM.
423 Parameters
424 ----------
425 script : Iterable[str]
426 A list of lines to execute as a script on a remote VM.
427 config : dict
428 Flat dictionary of (key, value) pairs of the Environment parameters.
429 They usually come from `const_args` and `tunable_params`
430 properties of the Environment.
431 env_params : dict
432 Parameters to pass as *shell* environment variables into the script.
433 This is usually a subset of `config` with some possible conversions.
435 Returns
436 -------
437 result : (Status, dict)
438 A pair of Status and result.
439 Status is one of {PENDING, SUCCEEDED, FAILED}
440 """
441 config = self._set_default_params(config)
442 config = merge_parameters(
443 dest=self.config.copy(),
444 source=config,
445 required_keys=[
446 "subscription",
447 "resourceGroup",
448 "vmName",
449 ]
450 )
452 if _LOG.isEnabledFor(logging.INFO):
453 _LOG.info("Run a script on VM: %s\n %s", config["vmName"], "\n ".join(script))
455 json_req = {
456 "commandId": "RunShellScript",
457 "script": list(script),
458 "parameters": [{"name": key, "value": val} for (key, val) in env_params.items()]
459 }
461 url = self._URL_REXEC_RUN.format(
462 subscription=config["subscription"],
463 resource_group=config["resourceGroup"],
464 vm_name=config["vmName"],
465 )
467 if _LOG.isEnabledFor(logging.DEBUG):
468 _LOG.debug("Request: POST %s\n%s", url, json.dumps(json_req, indent=2))
470 response = requests.post(
471 url, json=json_req, headers=self._get_headers(), timeout=self._request_timeout)
473 if _LOG.isEnabledFor(logging.DEBUG):
474 _LOG.debug("Response: %s\n%s", response,
475 json.dumps(response.json(), indent=2)
476 if response.content else "")
477 else:
478 _LOG.info("Response: %s", response)
480 if response.status_code == 200:
481 # TODO: extract the results from JSON response
482 return (Status.SUCCEEDED, config)
483 elif response.status_code == 202:
484 return (Status.PENDING, {
485 **config,
486 "asyncResultsUrl": response.headers.get("Azure-AsyncOperation")
487 })
488 else:
489 _LOG.error("Response: %s :: %s", response, response.text)
490 # _LOG.error("Bad Request:\n%s", response.request.body)
491 return (Status.FAILED, {})
493 def get_remote_exec_results(self, config: dict) -> Tuple[Status, dict]:
494 """
495 Get the results of the asynchronously running command.
497 Parameters
498 ----------
499 config : dict
500 Flat dictionary of (key, value) pairs of tunable parameters.
501 Must have the "asyncResultsUrl" key to get the results.
502 If the key is not present, return Status.PENDING.
504 Returns
505 -------
506 result : (Status, dict)
507 A pair of Status and result.
508 Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT}
509 A dict can have an "stdout" key with the remote output.
510 """
511 _LOG.info("Check the results on VM: %s", config.get("vmName"))
512 (status, result) = self.wait_host_operation(config)
513 _LOG.debug("Result: %s :: %s", status, result)
514 if not status.is_succeeded():
515 # TODO: Extract the telemetry and status from stdout, if available
516 return (status, result)
517 val = result.get("properties", {}).get("output", {}).get("value", [])
518 return (status, {"stdout": val[0].get("message", "")} if val else {})