aml.py
This script contains methods to handle connection to AzureML, such as registering Datasets or obtaining a Dataset handler from a given workspace.
dataset_from_dstore_path(workspace, datastore, datastore_path, validate=True)
Obtains a local reference for a given datastore and path
Parameters:
Name | Type | Description | Default |
---|---|---|---|
datastore |
str
|
name of the AzureML datastore |
required |
datastore_path |
str
|
path in datastore to register as Dataset |
required |
validate |
bool
|
validate files exist or not |
True
|
Returns:
Type | Description |
---|---|
azureml.core.Dataset: registered Dataset object |
Source code in src/common/aml.py
def dataset_from_dstore_path(workspace, datastore, datastore_path, validate=True):
""" Obtains a local reference for a given datastore and path
Args:
datastore (str): name of the AzureML datastore
datastore_path (str): path in datastore to register as Dataset
validate (bool): validate files exist or not
Returns:
azureml.core.Dataset: registered Dataset object
"""
logger = logging.getLogger(__name__)
logger.info(f"Connecting to Datastore {datastore}...")
datastore = Datastore.get(workspace, datastore)
logger.info(f"Reading path {datastore_path}...")
remote_ds_path = [(datastore, datastore_path)]
logger.info(f"Registering as dataset...")
remote_dataset = Dataset.File.from_files(path=remote_ds_path, validate=validate)
return remote_dataset
load_dataset_from_data_input_spec(workspace, data_input_spec)
Loads a dataset based on config object data_input_spec (see tasks.py data_input_spec)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workspace |
azureml.core.Workspace
|
connector to an AzureML workspace |
required |
data_input_spec |
OmegaConf.DictConfig
|
config Hydra dataclass data_input_spec (see tasks.py) |
required |
Returns:
Type | Description |
---|---|
azureml.core.Dataset: registered Dataset object |
Source code in src/common/aml.py
def load_dataset_from_data_input_spec(workspace, data_input_spec):
""" Loads a dataset based on config object data_input_spec (see tasks.py data_input_spec)
Args:
workspace (azureml.core.Workspace): connector to an AzureML workspace
data_input_spec (OmegaConf.DictConfig): config Hydra dataclass data_input_spec (see tasks.py)
Returns:
azureml.core.Dataset: registered Dataset object
"""
logger = logging.getLogger(__name__)
if data_input_spec.name:
logger.info(f"Reading dataset from name={data_input_spec.name} version={data_input_spec.version}")
loaded_dataset = Dataset.get_by_name(workspace, name=data_input_spec.name, version=data_input_spec.version)
elif data_input_spec.uuid:
logger.info(f"Reading dataset from uuid")
loaded_dataset = Dataset.get_by_id(workspace, id=data_input_spec.uuid)
elif data_input_spec.datastore and data_input_spec.path:
logger.info(f"Connecting to Datastore {data_input_spec.datastore}...")
datastore = Datastore.get(workspace, data_input_spec.datastore)
logger.info(f"Reading path {data_input_spec.path}...")
remote_ds_path = [(datastore, data_input_spec.path)]
logger.info(f"Registering as dataset...")
loaded_dataset = Dataset.File.from_files(path=remote_ds_path, validate=data_input_spec.validate)
else:
raise ValueError("To load a dataset using data_input_spec, you need to provide either a name, a uuid or a datastore+path (provided config = {data_input_spec})")
return loaded_dataset
apply_sweep_settings(step, sweep_settings_config)
Applies the settings to a sweep step based on a config dataclass.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step |
PipelineStep
|
the instance of the step |
required |
sweep_settings_config |
OmegaConf.DictConfig
|
schema specified in src.common.tasks.sweep_runsettings |
required |
Source code in src/common/aml.py
def apply_sweep_settings(step, sweep_settings_config):
"""Applies the settings to a sweep step based on a config dataclass.
Args:
step (PipelineStep): the instance of the step
sweep_settings_config (OmegaConf.DictConfig): schema specified in src.common.tasks.sweep_runsettings
"""
if (not sweep_settings_config.primary_metric) or (not sweep_settings_config.goal):
raise ValueError("in sweep settings, you need to provide a primary_metric and a goal settings.")
else:
step.runsettings.sweep.objective.configure(
primary_metric = sweep_settings_config.primary_metric,
goal = sweep_settings_config.goal,
)
if not sweep_settings_config.algorithm:
raise ValueError("in sweep settings, you need to provide an algorithm setting.")
else:
step.runsettings.sweep.algorithm = sweep_settings_config.algorithm
if sweep_settings_config.limits:
step.runsettings.sweep.limits.configure(
max_total_trials = sweep_settings_config.limits.max_total_trials,
max_concurrent_trials = sweep_settings_config.limits.max_concurrent_trials,
timeout_minutes = sweep_settings_config.limits.timeout_minutes,
)
if sweep_settings_config.early_termination:
if sweep_settings_config.early_termination.policy_type == "median_stopping":
step.runsettings.sweep.early_termination.configure(
policy_type="median_stopping",
evaluation_interval=sweep_settings_config.early_termination.evaluation_interval,
delay_evaluation=sweep_settings_config.early_termination.delay_evaluation
)
elif sweep_settings_config.early_termination.policy_type == "bandit":
step.runsettings.sweep.early_termination.configure(
policy_type="bandit",
slack_factor=sweep_settings_config.early_termination.slack_factor,
evaluation_interval=sweep_settings_config.early_termination.evaluation_interval,
delay_evaluation=sweep_settings_config.early_termination.delay_evaluation
)
elif sweep_settings_config.early_termination.policy_type == "truncation_selection":
step.runsettings.sweep.early_termination.configure(
policy_type="truncation_selection",
truncation_percentage=sweep_settings_config.early_termination.truncation_percentage,
evaluation_interval=sweep_settings_config.early_termination.evaluation_interval,
delay_evaluation=sweep_settings_config.early_termination.delay_evaluation
)
elif sweep_settings_config.early_termination.policy_type == "default":
pass
elif sweep_settings_config.early_termination.policy_type == None:
pass
else:
raise NotImplementedError(f"sweep settings early_termination policy_type={sweep_settings_config.early_termination.policy_type} is not implemented.")
format_run_name(run_name)
Formats a run name to fit with AzureML constraints.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_name |
str
|
string to be formatted. |
required |
Returns:
Type | Description |
---|---|
formatted_run_name (str) |
Notes
Node name must start with a letter, and can only contain letters, numbers, underscores, within 1-255 characters.
Source code in src/common/aml.py
def format_run_name(run_name: str):
"""Formats a run name to fit with AzureML constraints.
Args:
run_name (str): string to be formatted.
Returns:
formatted_run_name (str)
Notes:
Node name must start with a letter,
and can only contain letters, numbers,
underscores, within 1-255 characters.
"""
# removing all chars not allowed, use underscore instead
formatted_run_name = re.sub(r'[^a-zA-Z0-9_]', '_', run_name)
# cutting to first 255 chars
if len(formatted_run_name) > 255:
formatted_run_name = formatted_run_name[0:255]
return formatted_run_name