Coverage for mlos_bench/mlos_bench/services/config_persistence.py: 95%
158 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-06 00:35 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-06 00:35 +0000
1#
2# Copyright (c) Microsoft Corporation.
3# Licensed under the MIT License.
4#
5"""
6Helper functions to load, instantiate, and serialize Python objects
7that encapsulate benchmark environments, tunable parameters, and
8service functions.
9"""
11import os
12import sys
14import json # For logging only
15import logging
17from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, TYPE_CHECKING
19import json5 # To read configs with comments and other JSON5 syntax features
20from jsonschema import ValidationError, SchemaError
22from mlos_bench.config.schemas import ConfigSchema
23from mlos_bench.environments.base_environment import Environment
24from mlos_bench.optimizers.base_optimizer import Optimizer
25from mlos_bench.services.base_service import Service
26from mlos_bench.services.types.config_loader_type import SupportsConfigLoading
27from mlos_bench.tunables.tunable import TunableValue
28from mlos_bench.tunables.tunable_groups import TunableGroups
29from mlos_bench.util import instantiate_from_config, merge_parameters, path_join, preprocess_dynamic_configs
31if sys.version_info < (3, 10):
32 from importlib_resources import files
33else:
34 from importlib.resources import files
36if TYPE_CHECKING:
37 from mlos_bench.storage.base_storage import Storage
38 from mlos_bench.schedulers.base_scheduler import Scheduler
41_LOG = logging.getLogger(__name__)
44class ConfigPersistenceService(Service, SupportsConfigLoading):
45 """
46 Collection of methods to deserialize the Environment, Service, and TunableGroups objects.
47 """
49 BUILTIN_CONFIG_PATH = str(files("mlos_bench.config").joinpath("")).replace("\\", "/")
51 def __init__(self,
52 config: Optional[Dict[str, Any]] = None,
53 global_config: Optional[Dict[str, Any]] = None,
54 parent: Optional[Service] = None,
55 methods: Union[Dict[str, Callable], List[Callable], None] = None):
56 """
57 Create a new instance of config persistence service.
59 Parameters
60 ----------
61 config : dict
62 Free-format dictionary that contains parameters for the service.
63 (E.g., root path for config files, etc.)
64 global_config : dict
65 Free-format dictionary of global parameters.
66 parent : Service
67 An optional parent service that can provide mixin functions.
68 methods : Union[Dict[str, Callable], List[Callable], None]
69 New methods to register with the service.
70 """
71 super().__init__(
72 config, global_config, parent,
73 self.merge_methods(methods, [
74 self.resolve_path,
75 self.load_config,
76 self.prepare_class_load,
77 self.build_service,
78 self.build_environment,
79 self.load_services,
80 self.load_environment,
81 self.load_environment_list,
82 ])
83 )
84 self._config_loader_service = self
86 # Normalize and deduplicate config paths, but maintain order.
87 self._config_path: List[str] = []
88 for path in self.config.get("config_path", []):
89 if path not in self._config_path:
90 self._config_path.append(path_join(path, abs_path=True))
91 # Prepend the cwd if not already on the list.
92 cwd = path_join(os.getcwd(), abs_path=True)
93 if cwd not in self._config_path:
94 self._config_path.insert(0, cwd)
95 # Append the built-in config path if not already on the list.
96 if self.BUILTIN_CONFIG_PATH not in self._config_path:
97 self._config_path.append(self.BUILTIN_CONFIG_PATH)
99 @property
100 def config_paths(self) -> List[str]:
101 """
102 Gets the list of config paths this service will search for config files.
104 Returns
105 -------
106 List[str]
107 """
108 return list(self._config_path) # make a copy to avoid modifications
110 def resolve_path(self, file_path: str,
111 extra_paths: Optional[Iterable[str]] = None) -> str:
112 """
113 Prepend the suitable `_config_path` to `path` if the latter is not absolute.
114 If `_config_path` is `None` or `path` is absolute, return `path` as is.
116 Parameters
117 ----------
118 file_path : str
119 Path to the input config file.
120 extra_paths : Iterable[str]
121 Additional directories to prepend to the list of search paths.
123 Returns
124 -------
125 path : str
126 An actual path to the config or script.
127 """
128 path_list = list(extra_paths or []) + self._config_path
129 _LOG.debug("Resolve path: %s in: %s", file_path, path_list)
130 if os.path.isabs(file_path):
131 _LOG.debug("Path is absolute: %s", file_path)
132 return file_path
133 for path in path_list:
134 full_path = path_join(path, file_path, abs_path=True)
135 if os.path.exists(full_path):
136 _LOG.debug("Path resolved: %s", full_path)
137 return full_path
138 _LOG.debug("Path not resolved: %s", file_path)
139 return file_path
141 def load_config(self,
142 json_file_name: str,
143 schema_type: Optional[ConfigSchema],
144 ) -> Dict[str, Any]:
145 """
146 Load JSON config file. Search for a file relative to `_config_path`
147 if the input path is not absolute.
148 This method is exported to be used as a service.
150 Parameters
151 ----------
152 json_file_name : str
153 Path to the input config file.
154 schema_type : Optional[ConfigSchema]
155 The schema type to validate the config against.
157 Returns
158 -------
159 config : Union[dict, List[dict]]
160 Free-format dictionary that contains the configuration.
161 """
162 json_file_name = self.resolve_path(json_file_name)
163 _LOG.info("Load config: %s", json_file_name)
164 with open(json_file_name, mode='r', encoding='utf-8') as fh_json:
165 config = json5.load(fh_json)
166 if schema_type is not None:
167 try:
168 schema_type.validate(config)
169 except (ValidationError, SchemaError) as ex:
170 _LOG.error("Failed to validate config %s against schema type %s at %s",
171 json_file_name, schema_type.name, schema_type.value)
172 raise ValueError(f"Failed to validate config {json_file_name} against " +
173 f"schema type {schema_type.name} at {schema_type.value}") from ex
174 if isinstance(config, dict) and config.get("$schema"):
175 # Remove $schema attributes from the config after we've validated
176 # them to avoid passing them on to other objects
177 # (e.g. SqlAlchemy based storage initializers).
178 # NOTE: we only do this for internal schemas.
179 # Other configs that get loaded may need the schema field
180 # (e.g. Azure ARM templates).
181 del config["$schema"]
182 else:
183 _LOG.warning("Config %s is not validated against a schema.", json_file_name)
184 return config # type: ignore[no-any-return]
186 def prepare_class_load(self, config: Dict[str, Any],
187 global_config: Optional[Dict[str, Any]] = None,
188 parent_args: Optional[Dict[str, TunableValue]] = None) -> Tuple[str, Dict[str, Any]]:
189 """
190 Extract the class instantiation parameters from the configuration.
191 Mix-in the global parameters and resolve the local file system paths,
192 where it is required.
194 Parameters
195 ----------
196 config : dict
197 Configuration of the optimizer.
198 global_config : dict
199 Global configuration parameters (optional).
200 parent_args : Dict[str, TunableValue]
201 An optional reference of the parent CompositeEnv's const_args used to
202 expand dynamic config parameters from.
204 Returns
205 -------
206 (class_name, class_config) : (str, dict)
207 Name of the class to instantiate and its configuration.
208 """
209 class_name = config["class"]
210 class_config = config.setdefault("config", {})
212 # Replace any appearance of "$param_name" in the const_arg values with
213 # the value from the parent CompositeEnv.
214 # Note: we could consider expanding this feature to additional config
215 # sections in the future, but for now only use it in const_args.
216 if class_name.startswith("mlos_bench.environments."):
217 const_args = class_config.get("const_args", {})
218 preprocess_dynamic_configs(dest=const_args, source=parent_args)
220 merge_parameters(dest=class_config, source=global_config)
222 for key in set(class_config).intersection(config.get("resolve_config_property_paths", [])):
223 if isinstance(class_config[key], str):
224 class_config[key] = self.resolve_path(class_config[key])
225 elif isinstance(class_config[key], (list, tuple)):
226 class_config[key] = [self.resolve_path(path) for path in class_config[key]]
227 else:
228 raise ValueError(f"Parameter {key} must be a string or a list")
230 if _LOG.isEnabledFor(logging.DEBUG):
231 _LOG.debug("Instantiating: %s with config:\n%s",
232 class_name, json.dumps(class_config, indent=2))
234 return (class_name, class_config)
236 def build_optimizer(self, *,
237 tunables: TunableGroups,
238 service: Service,
239 config: Dict[str, Any],
240 global_config: Optional[Dict[str, Any]] = None) -> Optimizer:
241 """
242 Instantiation of mlos_bench Optimizer
243 that depend on Service and TunableGroups.
245 A class *MUST* have a constructor that takes four named arguments:
246 (tunables, config, global_config, service)
248 Parameters
249 ----------
250 tunables : TunableGroups
251 Tunable parameters of the environment. We need them to validate the
252 configurations of merged-in experiments and restored/pending trials.
253 service: Service
254 An optional service object (e.g., providing methods to load config files, etc.)
255 config : dict
256 Configuration of the class to instantiate, as loaded from JSON.
257 global_config : dict
258 Global configuration parameters (optional).
260 Returns
261 -------
262 inst : Optimizer
263 A new instance of the `Optimizer` class.
264 """
265 tunables_path = config.get("include_tunables")
266 if tunables_path is not None:
267 tunables = self._load_tunables(tunables_path, tunables)
268 (class_name, class_config) = self.prepare_class_load(config, global_config)
269 inst = instantiate_from_config(Optimizer, class_name, # type: ignore[type-abstract]
270 tunables=tunables,
271 config=class_config,
272 global_config=global_config,
273 service=service)
274 _LOG.info("Created: Optimizer %s", inst)
275 return inst
277 def build_storage(self, *,
278 service: Service,
279 config: Dict[str, Any],
280 global_config: Optional[Dict[str, Any]] = None) -> "Storage":
281 """
282 Instantiation of mlos_bench Storage objects.
284 Parameters
285 ----------
286 service: Service
287 An optional service object (e.g., providing methods to load config files, etc.)
288 config : dict
289 Configuration of the class to instantiate, as loaded from JSON.
290 global_config : dict
291 Global configuration parameters (optional).
293 Returns
294 -------
295 inst : Storage
296 A new instance of the Storage class.
297 """
298 (class_name, class_config) = self.prepare_class_load(config, global_config)
299 from mlos_bench.storage.base_storage import Storage # pylint: disable=import-outside-toplevel
300 inst = instantiate_from_config(Storage, class_name, # type: ignore[type-abstract]
301 config=class_config,
302 global_config=global_config,
303 service=service)
304 _LOG.info("Created: Storage %s", inst)
305 return inst
307 def build_scheduler(self, *,
308 config: Dict[str, Any],
309 global_config: Dict[str, Any],
310 environment: Environment,
311 optimizer: Optimizer,
312 storage: "Storage",
313 root_env_config: str) -> "Scheduler":
314 """
315 Instantiation of mlos_bench Scheduler.
317 Parameters
318 ----------
319 config : dict
320 Configuration of the class to instantiate, as loaded from JSON.
321 global_config : dict
322 Global configuration parameters.
323 environment : Environment
324 The environment to benchmark/optimize.
325 optimizer : Optimizer
326 The optimizer to use.
327 storage : Storage
328 The storage to use.
329 root_env_config : str
330 Path to the root environment configuration.
332 Returns
333 -------
334 inst : Scheduler
335 A new instance of the Scheduler.
336 """
337 (class_name, class_config) = self.prepare_class_load(config, global_config)
338 from mlos_bench.schedulers.base_scheduler import Scheduler # pylint: disable=import-outside-toplevel
339 inst = instantiate_from_config(Scheduler, class_name, # type: ignore[type-abstract]
340 config=class_config,
341 global_config=global_config,
342 environment=environment,
343 optimizer=optimizer,
344 storage=storage,
345 root_env_config=root_env_config)
346 _LOG.info("Created: Scheduler %s", inst)
347 return inst
349 def build_environment(self, # pylint: disable=too-many-arguments
350 config: Dict[str, Any],
351 tunables: TunableGroups,
352 global_config: Optional[Dict[str, Any]] = None,
353 parent_args: Optional[Dict[str, TunableValue]] = None,
354 service: Optional[Service] = None) -> Environment:
355 """
356 Factory method for a new environment with a given config.
358 Parameters
359 ----------
360 config : dict
361 A dictionary with three mandatory fields:
362 "name": Human-readable string describing the environment;
363 "class": FQN of a Python class to instantiate;
364 "config": Free-format dictionary to pass to the constructor.
365 tunables : TunableGroups
366 A (possibly empty) collection of groups of tunable parameters for
367 all environments.
368 global_config : dict
369 Global parameters to add to the environment config.
370 parent_args : Dict[str, TunableValue]
371 An optional reference of the parent CompositeEnv's const_args used to
372 expand dynamic config parameters from.
373 service: Service
374 An optional service object (e.g., providing methods to
375 deploy or reboot a VM, etc.).
377 Returns
378 -------
379 env : Environment
380 An instance of the `Environment` class initialized with `config`.
381 """
382 env_name = config["name"]
383 (env_class, env_config) = self.prepare_class_load(config, global_config, parent_args)
385 env_services_path = config.get("include_services")
386 if env_services_path is not None:
387 service = self.load_services(env_services_path, global_config, service)
389 env_tunables_path = config.get("include_tunables")
390 if env_tunables_path is not None:
391 tunables = self._load_tunables(env_tunables_path, tunables)
393 _LOG.debug("Creating env: %s :: %s", env_name, env_class)
394 env = Environment.new(env_name=env_name, class_name=env_class,
395 config=env_config, global_config=global_config,
396 tunables=tunables, service=service)
398 _LOG.info("Created env: %s :: %s", env_name, env)
399 return env
401 def _build_standalone_service(self, config: Dict[str, Any],
402 global_config: Optional[Dict[str, Any]] = None,
403 parent: Optional[Service] = None) -> Service:
404 """
405 Factory method for a new service with a given config.
407 Parameters
408 ----------
409 config : dict
410 A dictionary with two mandatory fields:
411 "class": FQN of a Python class to instantiate;
412 "config": Free-format dictionary to pass to the constructor.
413 global_config : dict
414 Global parameters to add to the service config.
415 parent: Service
416 An optional reference of the parent service to mix in.
418 Returns
419 -------
420 svc : Service
421 An instance of the `Service` class initialized with `config`.
422 """
423 (svc_class, svc_config) = self.prepare_class_load(config, global_config)
424 service = Service.new(svc_class, svc_config, global_config, parent)
425 _LOG.info("Created service: %s", service)
426 return service
428 def _build_composite_service(self, config_list: Iterable[Dict[str, Any]],
429 global_config: Optional[Dict[str, Any]] = None,
430 parent: Optional[Service] = None) -> Service:
431 """
432 Factory method for a new service with a given config.
434 Parameters
435 ----------
436 config_list : a list of dict
437 A list where each element is a dictionary with 2 mandatory fields:
438 "class": FQN of a Python class to instantiate;
439 "config": Free-format dictionary to pass to the constructor.
440 global_config : dict
441 Global parameters to add to the service config.
442 parent: Service
443 An optional reference of the parent service to mix in.
445 Returns
446 -------
447 svc : Service
448 An instance of the `Service` class that is a combination of all
449 services from the list plus the parent mix-in.
450 """
451 service = Service()
452 if parent:
453 service.register(parent.export())
455 for config in config_list:
456 service.register(self._build_standalone_service(
457 config, global_config, service).export())
459 if _LOG.isEnabledFor(logging.DEBUG):
460 _LOG.debug("Created mix-in service: %s", service)
462 return service
464 def build_service(self,
465 config: Dict[str, Any],
466 global_config: Optional[Dict[str, Any]] = None,
467 parent: Optional[Service] = None) -> Service:
468 """
469 Factory method for a new service with a given config.
471 Parameters
472 ----------
473 config : dict
474 A dictionary with 2 mandatory fields:
475 "class": FQN of a Python class to instantiate;
476 "config": Free-format dictionary to pass to the constructor.
477 global_config : dict
478 Global parameters to add to the service config.
479 parent: Service
480 An optional reference of the parent service to mix in.
482 Returns
483 -------
484 svc : Service
485 An instance of the `Service` class that is a combination of all
486 services from the list plus the parent mix-in.
487 """
488 if _LOG.isEnabledFor(logging.DEBUG):
489 _LOG.debug("Build service from config:\n%s",
490 json.dumps(config, indent=2))
492 assert isinstance(config, dict)
493 config_list: List[Dict[str, Any]]
494 if "class" not in config:
495 # Top level config is a simple object with a list of services
496 config_list = config["services"]
497 else:
498 # Top level config is a single service
499 if parent is None:
500 return self._build_standalone_service(config, global_config)
501 config_list = [config]
503 return self._build_composite_service(config_list, global_config, parent)
505 def load_environment(self, # pylint: disable=too-many-arguments
506 json_file_name: str,
507 tunables: TunableGroups,
508 global_config: Optional[Dict[str, Any]] = None,
509 parent_args: Optional[Dict[str, TunableValue]] = None,
510 service: Optional[Service] = None) -> Environment:
511 """
512 Load and build new environment from the config file.
514 Parameters
515 ----------
516 json_file_name : str
517 The environment JSON configuration file.
518 tunables : TunableGroups
519 A (possibly empty) collection of tunables to add to the environment.
520 global_config : dict
521 Global parameters to add to the environment config.
522 parent_args : Dict[str, TunableValue]
523 An optional reference of the parent CompositeEnv's const_args used to
524 expand dynamic config parameters from.
525 service : Service
526 An optional reference of the parent service to mix in.
528 Returns
529 -------
530 env : Environment
531 A new benchmarking environment.
532 """
533 config = self.load_config(json_file_name, ConfigSchema.ENVIRONMENT)
534 assert isinstance(config, dict)
535 return self.build_environment(config, tunables, global_config, parent_args, service)
537 def load_environment_list(self, # pylint: disable=too-many-arguments
538 json_file_name: str,
539 tunables: TunableGroups,
540 global_config: Optional[Dict[str, Any]] = None,
541 parent_args: Optional[Dict[str, TunableValue]] = None,
542 service: Optional[Service] = None) -> List[Environment]:
543 """
544 Load and build a list of environments from the config file.
546 Parameters
547 ----------
548 json_file_name : str
549 The environment JSON configuration file.
550 Can contain either one environment or a list of environments.
551 tunables : TunableGroups
552 An (possibly empty) collection of tunables to add to the environment.
553 global_config : dict
554 Global parameters to add to the environment config.
555 service : Service
556 An optional reference of the parent service to mix in.
557 parent_args : Dict[str, TunableValue]
558 An optional reference of the parent CompositeEnv's const_args used to
559 expand dynamic config parameters from.
561 Returns
562 -------
563 env : List[Environment]
564 A list of new benchmarking environments.
565 """
566 config = self.load_config(json_file_name, ConfigSchema.ENVIRONMENT)
567 return [
568 self.build_environment(config, tunables, global_config, parent_args, service)
569 ]
571 def load_services(self, json_file_names: Iterable[str],
572 global_config: Optional[Dict[str, Any]] = None,
573 parent: Optional[Service] = None) -> Service:
574 """
575 Read the configuration files and bundle all service methods
576 from those configs into a single Service object.
578 Parameters
579 ----------
580 json_file_names : list of str
581 A list of service JSON configuration files.
582 global_config : dict
583 Global parameters to add to the service config.
584 parent : Service
585 An optional reference of the parent service to mix in.
587 Returns
588 -------
589 service : Service
590 A collection of service methods.
591 """
592 _LOG.info("Load services: %s parent: %s",
593 json_file_names, parent.__class__.__name__)
594 service = Service({}, global_config, parent)
595 for fname in json_file_names:
596 config = self.load_config(fname, ConfigSchema.SERVICE)
597 service.register(self.build_service(config, global_config, service).export())
598 return service
600 def _load_tunables(self, json_file_names: Iterable[str],
601 parent: TunableGroups) -> TunableGroups:
602 """
603 Load a collection of tunable parameters from JSON files into the parent
604 TunableGroup.
606 This helps allow standalone environment configs to reference
607 overlapping tunable groups configs but still allow combining them into
608 a single instance that each environment can reference.
610 Parameters
611 ----------
612 json_file_names : list of str
613 A list of JSON files to load.
614 parent : TunableGroups
615 A (possibly empty) collection of tunables to add to the new collection.
617 Returns
618 -------
619 tunables : TunableGroup
620 The larger collection of tunable parameters.
621 """
622 _LOG.info("Load tunables: '%s'", json_file_names)
623 tunables = parent.copy()
624 for fname in json_file_names:
625 config = self.load_config(fname, ConfigSchema.TUNABLE_PARAMS)
626 assert isinstance(config, dict)
627 tunables.merge(TunableGroups(config))
628 return tunables