Coverage for mlos_bench/mlos_bench/services/config_persistence.py: 95%
157 statements
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-22 01:18 +0000
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-22 01:18 +0000
1#
2# Copyright (c) Microsoft Corporation.
3# Licensed under the MIT License.
4#
5"""Helper functions to load, instantiate, and serialize Python objects that encapsulate
6benchmark environments, tunable parameters, and service functions.
7"""
9import json # For logging only
10import logging
11import os
12import sys
13from typing import (
14 TYPE_CHECKING,
15 Any,
16 Callable,
17 Dict,
18 Iterable,
19 List,
20 Optional,
21 Tuple,
22 Union,
23)
25import json5 # To read configs with comments and other JSON5 syntax features
26from jsonschema import SchemaError, ValidationError
28from mlos_bench.config.schemas.config_schemas import ConfigSchema
29from mlos_bench.environments.base_environment import Environment
30from mlos_bench.optimizers.base_optimizer import Optimizer
31from mlos_bench.services.base_service import Service
32from mlos_bench.services.types.config_loader_type import SupportsConfigLoading
33from mlos_bench.tunables.tunable import TunableValue
34from mlos_bench.tunables.tunable_groups import TunableGroups
35from mlos_bench.util import (
36 instantiate_from_config,
37 merge_parameters,
38 path_join,
39 preprocess_dynamic_configs,
40)
42if sys.version_info < (3, 10):
43 from importlib_resources import files
44else:
45 from importlib.resources import files
47if TYPE_CHECKING:
48 from mlos_bench.schedulers.base_scheduler import Scheduler
49 from mlos_bench.storage.base_storage import Storage
52_LOG = logging.getLogger(__name__)
55class ConfigPersistenceService(Service, SupportsConfigLoading):
56 """Collection of methods to deserialize the Environment, Service, and TunableGroups
57 objects.
58 """
60 BUILTIN_CONFIG_PATH = str(files("mlos_bench.config").joinpath("")).replace("\\", "/")
62 def __init__(
63 self,
64 config: Optional[Dict[str, Any]] = None,
65 global_config: Optional[Dict[str, Any]] = None,
66 parent: Optional[Service] = None,
67 methods: Union[Dict[str, Callable], List[Callable], None] = None,
68 ):
69 """
70 Create a new instance of config persistence service.
72 Parameters
73 ----------
74 config : dict
75 Free-format dictionary that contains parameters for the service.
76 (E.g., root path for config files, etc.)
77 global_config : dict
78 Free-format dictionary of global parameters.
79 parent : Service
80 An optional parent service that can provide mixin functions.
81 methods : Union[Dict[str, Callable], List[Callable], None]
82 New methods to register with the service.
83 """
84 super().__init__(
85 config,
86 global_config,
87 parent,
88 self.merge_methods(
89 methods,
90 [
91 self.resolve_path,
92 self.load_config,
93 self.prepare_class_load,
94 self.build_service,
95 self.build_environment,
96 self.load_services,
97 self.load_environment,
98 self.load_environment_list,
99 ],
100 ),
101 )
102 self._config_loader_service = self
104 # Normalize and deduplicate config paths, but maintain order.
105 self._config_path: List[str] = []
106 for path in self.config.get("config_path", []):
107 if path not in self._config_path:
108 self._config_path.append(path_join(path, abs_path=True))
109 # Prepend the cwd if not already on the list.
110 cwd = path_join(os.getcwd(), abs_path=True)
111 if cwd not in self._config_path:
112 self._config_path.insert(0, cwd)
113 # Append the built-in config path if not already on the list.
114 if self.BUILTIN_CONFIG_PATH not in self._config_path:
115 self._config_path.append(self.BUILTIN_CONFIG_PATH)
117 @property
118 def config_paths(self) -> List[str]:
119 """
120 Gets the list of config paths this service will search for config files.
122 Returns
123 -------
124 List[str]
125 """
126 return list(self._config_path) # make a copy to avoid modifications
128 def resolve_path(self, file_path: str, extra_paths: Optional[Iterable[str]] = None) -> str:
129 """
130 Prepend the suitable `_config_path` to `path` if the latter is not absolute. If
131 `_config_path` is `None` or `path` is absolute, return `path` as is.
133 Parameters
134 ----------
135 file_path : str
136 Path to the input config file.
137 extra_paths : Iterable[str]
138 Additional directories to prepend to the list of search paths.
140 Returns
141 -------
142 path : str
143 An actual path to the config or script.
144 """
145 path_list = list(extra_paths or []) + self._config_path
146 _LOG.debug("Resolve path: %s in: %s", file_path, path_list)
147 if os.path.isabs(file_path):
148 _LOG.debug("Path is absolute: %s", file_path)
149 return file_path
150 for path in path_list:
151 full_path = path_join(path, file_path, abs_path=True)
152 if os.path.exists(full_path):
153 _LOG.debug("Path resolved: %s", full_path)
154 return full_path
155 _LOG.debug("Path not resolved: %s", file_path)
156 return file_path
158 def load_config(
159 self,
160 json_file_name: str,
161 schema_type: Optional[ConfigSchema],
162 ) -> Dict[str, Any]:
163 """
164 Load JSON config file. Search for a file relative to `_config_path` if the input
165 path is not absolute. This method is exported to be used as a service.
167 Parameters
168 ----------
169 json_file_name : str
170 Path to the input config file.
171 schema_type : Optional[ConfigSchema]
172 The schema type to validate the config against.
174 Returns
175 -------
176 config : Union[dict, List[dict]]
177 Free-format dictionary that contains the configuration.
178 """
179 json_file_name = self.resolve_path(json_file_name)
180 _LOG.info("Load config: %s", json_file_name)
181 with open(json_file_name, mode="r", encoding="utf-8") as fh_json:
182 config = json5.load(fh_json)
183 if schema_type is not None:
184 try:
185 schema_type.validate(config)
186 except (ValidationError, SchemaError) as ex:
187 _LOG.error(
188 "Failed to validate config %s against schema type %s at %s",
189 json_file_name,
190 schema_type.name,
191 schema_type.value,
192 )
193 raise ValueError(
194 f"Failed to validate config {json_file_name} against "
195 f"schema type {schema_type.name} at {schema_type.value}"
196 ) from ex
197 if isinstance(config, dict) and config.get("$schema"):
198 # Remove $schema attributes from the config after we've validated
199 # them to avoid passing them on to other objects
200 # (e.g. SqlAlchemy based storage initializers).
201 # NOTE: we only do this for internal schemas.
202 # Other configs that get loaded may need the schema field
203 # (e.g. Azure ARM templates).
204 del config["$schema"]
205 else:
206 _LOG.warning("Config %s is not validated against a schema.", json_file_name)
207 return config # type: ignore[no-any-return]
209 def prepare_class_load(
210 self,
211 config: Dict[str, Any],
212 global_config: Optional[Dict[str, Any]] = None,
213 parent_args: Optional[Dict[str, TunableValue]] = None,
214 ) -> Tuple[str, Dict[str, Any]]:
215 """
216 Extract the class instantiation parameters from the configuration. Mix-in the
217 global parameters and resolve the local file system paths, where it is required.
219 Parameters
220 ----------
221 config : dict
222 Configuration of the optimizer.
223 global_config : dict
224 Global configuration parameters (optional).
225 parent_args : Dict[str, TunableValue]
226 An optional reference of the parent CompositeEnv's const_args used to
227 expand dynamic config parameters from.
229 Returns
230 -------
231 (class_name, class_config) : (str, dict)
232 Name of the class to instantiate and its configuration.
233 """
234 class_name = config["class"]
235 class_config = config.setdefault("config", {})
237 # Replace any appearance of "$param_name" in the const_arg values with
238 # the value from the parent CompositeEnv.
239 # Note: we could consider expanding this feature to additional config
240 # sections in the future, but for now only use it in const_args.
241 if class_name.startswith("mlos_bench.environments."):
242 const_args = class_config.get("const_args", {})
243 preprocess_dynamic_configs(dest=const_args, source=parent_args)
245 merge_parameters(dest=class_config, source=global_config)
247 for key in set(class_config).intersection(config.get("resolve_config_property_paths", [])):
248 if isinstance(class_config[key], str):
249 class_config[key] = self.resolve_path(class_config[key])
250 elif isinstance(class_config[key], (list, tuple)):
251 class_config[key] = [self.resolve_path(path) for path in class_config[key]]
252 else:
253 raise ValueError(f"Parameter {key} must be a string or a list")
255 if _LOG.isEnabledFor(logging.DEBUG):
256 _LOG.debug(
257 "Instantiating: %s with config:\n%s",
258 class_name,
259 json.dumps(class_config, indent=2),
260 )
262 return (class_name, class_config)
264 def build_optimizer(
265 self,
266 *,
267 tunables: TunableGroups,
268 service: Service,
269 config: Dict[str, Any],
270 global_config: Optional[Dict[str, Any]] = None,
271 ) -> Optimizer:
272 """
273 Instantiation of mlos_bench Optimizer that depend on Service and TunableGroups.
275 A class *MUST* have a constructor that takes four named arguments:
276 (tunables, config, global_config, service)
278 Parameters
279 ----------
280 tunables : TunableGroups
281 Tunable parameters of the environment. We need them to validate the
282 configurations of merged-in experiments and restored/pending trials.
283 service: Service
284 An optional service object (e.g., providing methods to load config files, etc.)
285 config : dict
286 Configuration of the class to instantiate, as loaded from JSON.
287 global_config : dict
288 Global configuration parameters (optional).
290 Returns
291 -------
292 inst : Optimizer
293 A new instance of the `Optimizer` class.
294 """
295 tunables_path = config.get("include_tunables")
296 if tunables_path is not None:
297 tunables = self._load_tunables(tunables_path, tunables)
298 (class_name, class_config) = self.prepare_class_load(config, global_config)
299 inst = instantiate_from_config(
300 Optimizer, # type: ignore[type-abstract]
301 class_name,
302 tunables=tunables,
303 config=class_config,
304 global_config=global_config,
305 service=service,
306 )
307 _LOG.info("Created: Optimizer %s", inst)
308 return inst
310 def build_storage(
311 self,
312 *,
313 service: Service,
314 config: Dict[str, Any],
315 global_config: Optional[Dict[str, Any]] = None,
316 ) -> "Storage":
317 """
318 Instantiation of mlos_bench Storage objects.
320 Parameters
321 ----------
322 service: Service
323 An optional service object (e.g., providing methods to load config files, etc.)
324 config : dict
325 Configuration of the class to instantiate, as loaded from JSON.
326 global_config : dict
327 Global configuration parameters (optional).
329 Returns
330 -------
331 inst : Storage
332 A new instance of the Storage class.
333 """
334 (class_name, class_config) = self.prepare_class_load(config, global_config)
335 # pylint: disable=import-outside-toplevel
336 from mlos_bench.storage.base_storage import Storage
338 inst = instantiate_from_config(
339 Storage, # type: ignore[type-abstract]
340 class_name,
341 config=class_config,
342 global_config=global_config,
343 service=service,
344 )
345 _LOG.info("Created: Storage %s", inst)
346 return inst
348 def build_scheduler( # pylint: disable=too-many-arguments
349 self,
350 *,
351 config: Dict[str, Any],
352 global_config: Dict[str, Any],
353 environment: Environment,
354 optimizer: Optimizer,
355 storage: "Storage",
356 root_env_config: str,
357 ) -> "Scheduler":
358 """
359 Instantiation of mlos_bench Scheduler.
361 Parameters
362 ----------
363 config : dict
364 Configuration of the class to instantiate, as loaded from JSON.
365 global_config : dict
366 Global configuration parameters.
367 environment : Environment
368 The environment to benchmark/optimize.
369 optimizer : Optimizer
370 The optimizer to use.
371 storage : Storage
372 The storage to use.
373 root_env_config : str
374 Path to the root environment configuration.
376 Returns
377 -------
378 inst : Scheduler
379 A new instance of the Scheduler.
380 """
381 (class_name, class_config) = self.prepare_class_load(config, global_config)
382 # pylint: disable=import-outside-toplevel
383 from mlos_bench.schedulers.base_scheduler import Scheduler
385 inst = instantiate_from_config(
386 Scheduler, # type: ignore[type-abstract]
387 class_name,
388 config=class_config,
389 global_config=global_config,
390 environment=environment,
391 optimizer=optimizer,
392 storage=storage,
393 root_env_config=root_env_config,
394 )
395 _LOG.info("Created: Scheduler %s", inst)
396 return inst
398 def build_environment(
399 self,
400 config: Dict[str, Any],
401 tunables: TunableGroups,
402 global_config: Optional[Dict[str, Any]] = None,
403 parent_args: Optional[Dict[str, TunableValue]] = None,
404 service: Optional[Service] = None,
405 ) -> Environment:
406 # pylint: disable=too-many-arguments,too-many-positional-arguments
407 """
408 Factory method for a new environment with a given config.
410 Parameters
411 ----------
412 config : dict
413 A dictionary with three mandatory fields:
414 "name": Human-readable string describing the environment;
415 "class": FQN of a Python class to instantiate;
416 "config": Free-format dictionary to pass to the constructor.
417 tunables : TunableGroups
418 A (possibly empty) collection of groups of tunable parameters for
419 all environments.
420 global_config : dict
421 Global parameters to add to the environment config.
422 parent_args : Dict[str, TunableValue]
423 An optional reference of the parent CompositeEnv's const_args used to
424 expand dynamic config parameters from.
425 service: Service
426 An optional service object (e.g., providing methods to
427 deploy or reboot a VM, etc.).
429 Returns
430 -------
431 env : Environment
432 An instance of the `Environment` class initialized with `config`.
433 """
434 env_name = config["name"]
435 (env_class, env_config) = self.prepare_class_load(config, global_config, parent_args)
437 env_services_path = config.get("include_services")
438 if env_services_path is not None:
439 service = self.load_services(env_services_path, global_config, service)
441 env_tunables_path = config.get("include_tunables")
442 if env_tunables_path is not None:
443 tunables = self._load_tunables(env_tunables_path, tunables)
445 _LOG.debug("Creating env: %s :: %s", env_name, env_class)
446 env = Environment.new(
447 env_name=env_name,
448 class_name=env_class,
449 config=env_config,
450 global_config=global_config,
451 tunables=tunables,
452 service=service,
453 )
455 _LOG.info("Created env: %s :: %s", env_name, env)
456 return env
458 def _build_standalone_service(
459 self,
460 config: Dict[str, Any],
461 global_config: Optional[Dict[str, Any]] = None,
462 parent: Optional[Service] = None,
463 ) -> Service:
464 """
465 Factory method for a new service with a given config.
467 Parameters
468 ----------
469 config : dict
470 A dictionary with two mandatory fields:
471 "class": FQN of a Python class to instantiate;
472 "config": Free-format dictionary to pass to the constructor.
473 global_config : dict
474 Global parameters to add to the service config.
475 parent: Service
476 An optional reference of the parent service to mix in.
478 Returns
479 -------
480 svc : Service
481 An instance of the `Service` class initialized with `config`.
482 """
483 (svc_class, svc_config) = self.prepare_class_load(config, global_config)
484 service = Service.new(svc_class, svc_config, global_config, parent)
485 _LOG.info("Created service: %s", service)
486 return service
488 def _build_composite_service(
489 self,
490 config_list: Iterable[Dict[str, Any]],
491 global_config: Optional[Dict[str, Any]] = None,
492 parent: Optional[Service] = None,
493 ) -> Service:
494 """
495 Factory method for a new service with a given config.
497 Parameters
498 ----------
499 config_list : a list of dict
500 A list where each element is a dictionary with 2 mandatory fields:
501 "class": FQN of a Python class to instantiate;
502 "config": Free-format dictionary to pass to the constructor.
503 global_config : dict
504 Global parameters to add to the service config.
505 parent: Service
506 An optional reference of the parent service to mix in.
508 Returns
509 -------
510 svc : Service
511 An instance of the `Service` class that is a combination of all
512 services from the list plus the parent mix-in.
513 """
514 service = Service()
515 if parent:
516 service.register(parent.export())
518 for config in config_list:
519 service.register(
520 self._build_standalone_service(config, global_config, service).export()
521 )
523 if _LOG.isEnabledFor(logging.DEBUG):
524 _LOG.debug("Created mix-in service: %s", service)
526 return service
528 def build_service(
529 self,
530 config: Dict[str, Any],
531 global_config: Optional[Dict[str, Any]] = None,
532 parent: Optional[Service] = None,
533 ) -> Service:
534 """
535 Factory method for a new service with a given config.
537 Parameters
538 ----------
539 config : dict
540 A dictionary with 2 mandatory fields:
541 "class": FQN of a Python class to instantiate;
542 "config": Free-format dictionary to pass to the constructor.
543 global_config : dict
544 Global parameters to add to the service config.
545 parent: Service
546 An optional reference of the parent service to mix in.
548 Returns
549 -------
550 svc : Service
551 An instance of the `Service` class that is a combination of all
552 services from the list plus the parent mix-in.
553 """
554 if _LOG.isEnabledFor(logging.DEBUG):
555 _LOG.debug("Build service from config:\n%s", json.dumps(config, indent=2))
557 assert isinstance(config, dict)
558 config_list: List[Dict[str, Any]]
559 if "class" not in config:
560 # Top level config is a simple object with a list of services
561 config_list = config["services"]
562 else:
563 # Top level config is a single service
564 if parent is None:
565 return self._build_standalone_service(config, global_config)
566 config_list = [config]
568 return self._build_composite_service(config_list, global_config, parent)
570 def load_environment(
571 self,
572 json_file_name: str,
573 tunables: TunableGroups,
574 global_config: Optional[Dict[str, Any]] = None,
575 parent_args: Optional[Dict[str, TunableValue]] = None,
576 service: Optional[Service] = None,
577 ) -> Environment:
578 # pylint: disable=too-many-arguments,too-many-positional-arguments
579 """
580 Load and build new environment from the config file.
582 Parameters
583 ----------
584 json_file_name : str
585 The environment JSON configuration file.
586 tunables : TunableGroups
587 A (possibly empty) collection of tunables to add to the environment.
588 global_config : dict
589 Global parameters to add to the environment config.
590 parent_args : Dict[str, TunableValue]
591 An optional reference of the parent CompositeEnv's const_args used to
592 expand dynamic config parameters from.
593 service : Service
594 An optional reference of the parent service to mix in.
596 Returns
597 -------
598 env : Environment
599 A new benchmarking environment.
600 """
601 config = self.load_config(json_file_name, ConfigSchema.ENVIRONMENT)
602 assert isinstance(config, dict)
603 return self.build_environment(config, tunables, global_config, parent_args, service)
605 def load_environment_list(
606 self,
607 json_file_name: str,
608 tunables: TunableGroups,
609 global_config: Optional[Dict[str, Any]] = None,
610 parent_args: Optional[Dict[str, TunableValue]] = None,
611 service: Optional[Service] = None,
612 ) -> List[Environment]:
613 # pylint: disable=too-many-arguments,too-many-positional-arguments
614 """
615 Load and build a list of environments from the config file.
617 Parameters
618 ----------
619 json_file_name : str
620 The environment JSON configuration file.
621 Can contain either one environment or a list of environments.
622 tunables : TunableGroups
623 An (possibly empty) collection of tunables to add to the environment.
624 global_config : dict
625 Global parameters to add to the environment config.
626 service : Service
627 An optional reference of the parent service to mix in.
628 parent_args : Dict[str, TunableValue]
629 An optional reference of the parent CompositeEnv's const_args used to
630 expand dynamic config parameters from.
632 Returns
633 -------
634 env : List[Environment]
635 A list of new benchmarking environments.
636 """
637 config = self.load_config(json_file_name, ConfigSchema.ENVIRONMENT)
638 return [self.build_environment(config, tunables, global_config, parent_args, service)]
640 def load_services(
641 self,
642 json_file_names: Iterable[str],
643 global_config: Optional[Dict[str, Any]] = None,
644 parent: Optional[Service] = None,
645 ) -> Service:
646 """
647 Read the configuration files and bundle all service methods from those configs
648 into a single Service object.
650 Parameters
651 ----------
652 json_file_names : list of str
653 A list of service JSON configuration files.
654 global_config : dict
655 Global parameters to add to the service config.
656 parent : Service
657 An optional reference of the parent service to mix in.
659 Returns
660 -------
661 service : Service
662 A collection of service methods.
663 """
664 _LOG.info("Load services: %s parent: %s", json_file_names, parent.__class__.__name__)
665 service = Service({}, global_config, parent)
666 for fname in json_file_names:
667 config = self.load_config(fname, ConfigSchema.SERVICE)
668 service.register(self.build_service(config, global_config, service).export())
669 return service
671 def _load_tunables(
672 self,
673 json_file_names: Iterable[str],
674 parent: TunableGroups,
675 ) -> TunableGroups:
676 """
677 Load a collection of tunable parameters from JSON files into the parent
678 TunableGroup.
680 This helps allow standalone environment configs to reference
681 overlapping tunable groups configs but still allow combining them into
682 a single instance that each environment can reference.
684 Parameters
685 ----------
686 json_file_names : list of str
687 A list of JSON files to load.
688 parent : TunableGroups
689 A (possibly empty) collection of tunables to add to the new collection.
691 Returns
692 -------
693 tunables : TunableGroup
694 The larger collection of tunable parameters.
695 """
696 _LOG.info("Load tunables: '%s'", json_file_names)
697 tunables = parent.copy()
698 for fname in json_file_names:
699 config = self.load_config(fname, ConfigSchema.TUNABLE_PARAMS)
700 assert isinstance(config, dict)
701 tunables.merge(TunableGroups(config))
702 return tunables