Source code for archai.common.azureml_helper

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

from typing import Dict, Union, Optional, Any
from pathlib import Path

from import MLClient
from import AmlCompute, Environment
from azure.identity import DefaultAzureCredential

[docs]def get_aml_client_from_file(config_path: Union[str, Path]) -> MLClient: """ Creates an MLClient object from a workspace config file Args: config_path (Union[str, Path]): Path to the workspace config file Returns: MLClient: MLClient object """ credential = DefaultAzureCredential() config_path = Path(config_path) ml_client = MLClient.from_config( credential=credential, path=config_path ) return ml_client
[docs]def create_compute_cluster( ml_client: MLClient, compute_name: str, type: Optional[str] = "amlcompute", size: Optional[str] = "Standard_D14_v2", min_instances: Optional[int] = 0, max_instances: Optional[int] = 4, idle_time_before_scale_down: Optional[int] = 180, tier: Optional[str] = "Dedicated", **kwargs): """ Creates a compute cluster for the workspace Args: ml_client (MLClient): MLClient object compute_name (str): Name of the (CPU/GPU) compute cluster type (str, optional): Type of the compute cluster. Defaults to "amlcompute". size (str, optional): VM Family of the compute cluster. Defaults to "Standard_D14_v2". min_instances (int, optional): Minimum running nodes when there is no job running. Defaults to 0. max_instances (int, optional): Maximum number of nodes in the cluster. Defaults to 4. idle_time_before_scale_down (int, optional): How many seconds will the node be allowed to run after the job termination. Defaults to 180. tier (str, optional): Dedicated or LowPriority. The latter is cheaper but there is a chance of job termination. Defaults to "Dedicated". Returns: Compute: Compute object """ try: compute_cluster = ml_client.compute.get(compute_name) print(f"You already have a cluster named {compute_name}, we'll reuse it as is.") except Exception: cpu_compute = AmlCompute( name=compute_name, type=type, size=size, min_instances=min_instances, max_instances=max_instances, idle_time_before_scale_down=idle_time_before_scale_down, tier=tier, **kwargs ) compute_cluster = ml_client.compute.begin_create_or_update(cpu_compute).result() print(f"AMLCompute with name {} is created, the compute size is {compute_cluster.size}") return compute_cluster
[docs]def create_environment_from_file( ml_client: MLClient, custom_env_name: Optional[str] = "aml-archai", description: Optional[str] = "Custom environment for Archai", tags: Optional[Dict[str, Any]] = None, conda_file: Optional[str] = "conda.yaml", image: Optional[str] = None, version: Optional[str] = "0.1.0", **kwargs) -> Environment: """ Creates an environment from a conda file Args: ml_client (MLClient): MLClient object custom_env_name (str, optional): Name of the environment. Defaults to "aml-archai". description (str, optional): Description of the environment. Defaults to "Custom environment for Archai". tags (Dict[str, Any], optional): Tags for the environment, e.g. {"archai": "1.0.0"}. Defaults to None. conda_file (str, optional): Path to the conda file. Defaults to "conda.yaml". image (str, optional): Docker image for the environment. version (str, optional): Version of the environment. Defaults to "0.1.0". Returns: Environment: Environment object """ tags = tags or {"archai": "1.0.0"} archai_job_env = Environment( name=custom_env_name, description=description, tags=tags, conda_file=conda_file, image=image, version=version, **kwargs ) archai_job_env = ml_client.environments.create_or_update(archai_job_env) print( f"Environment with name {} is registered to workspace, the environment version is {archai_job_env.version}") return archai_job_env
# TODO How can we return the path that the output was downloaded to?
[docs]def download_job_output( ml_client: MLClient, job_name: str, output_name: str, download_path: Optional[Union[str, Path]] = "output") -> None: """ Downloads the output of a job Args: ml_client (MLClient): MLClient object job_name (str): Name of the job output_name (str): Named output to downlaod download_path (Union[str, Path], optional): Path to download the output to. Defaults to "output". Returns: None """ try: target_job = except Exception as e: print(f"{e.error}") return None if target_job.status == "Completed":, download_path=Path(download_path), output_name=output_name) else: print(f"Job {} is not completed yet")