Coverage for mlos_bench/mlos_bench/services/config_persistence.py: 95%

157 statements  

« prev     ^ index     » next       coverage.py v7.6.7, created at 2024-11-22 01:18 +0000

1# 

2# Copyright (c) Microsoft Corporation. 

3# Licensed under the MIT License. 

4# 

5"""Helper functions to load, instantiate, and serialize Python objects that encapsulate 

6benchmark environments, tunable parameters, and service functions. 

7""" 

8 

9import json # For logging only 

10import logging 

11import os 

12import sys 

13from typing import ( 

14 TYPE_CHECKING, 

15 Any, 

16 Callable, 

17 Dict, 

18 Iterable, 

19 List, 

20 Optional, 

21 Tuple, 

22 Union, 

23) 

24 

25import json5 # To read configs with comments and other JSON5 syntax features 

26from jsonschema import SchemaError, ValidationError 

27 

28from mlos_bench.config.schemas.config_schemas import ConfigSchema 

29from mlos_bench.environments.base_environment import Environment 

30from mlos_bench.optimizers.base_optimizer import Optimizer 

31from mlos_bench.services.base_service import Service 

32from mlos_bench.services.types.config_loader_type import SupportsConfigLoading 

33from mlos_bench.tunables.tunable import TunableValue 

34from mlos_bench.tunables.tunable_groups import TunableGroups 

35from mlos_bench.util import ( 

36 instantiate_from_config, 

37 merge_parameters, 

38 path_join, 

39 preprocess_dynamic_configs, 

40) 

41 

42if sys.version_info < (3, 10): 

43 from importlib_resources import files 

44else: 

45 from importlib.resources import files 

46 

47if TYPE_CHECKING: 

48 from mlos_bench.schedulers.base_scheduler import Scheduler 

49 from mlos_bench.storage.base_storage import Storage 

50 

51 

52_LOG = logging.getLogger(__name__) 

53 

54 

55class ConfigPersistenceService(Service, SupportsConfigLoading): 

56 """Collection of methods to deserialize the Environment, Service, and TunableGroups 

57 objects. 

58 """ 

59 

60 BUILTIN_CONFIG_PATH = str(files("mlos_bench.config").joinpath("")).replace("\\", "/") 

61 

62 def __init__( 

63 self, 

64 config: Optional[Dict[str, Any]] = None, 

65 global_config: Optional[Dict[str, Any]] = None, 

66 parent: Optional[Service] = None, 

67 methods: Union[Dict[str, Callable], List[Callable], None] = None, 

68 ): 

69 """ 

70 Create a new instance of config persistence service. 

71 

72 Parameters 

73 ---------- 

74 config : dict 

75 Free-format dictionary that contains parameters for the service. 

76 (E.g., root path for config files, etc.) 

77 global_config : dict 

78 Free-format dictionary of global parameters. 

79 parent : Service 

80 An optional parent service that can provide mixin functions. 

81 methods : Union[Dict[str, Callable], List[Callable], None] 

82 New methods to register with the service. 

83 """ 

84 super().__init__( 

85 config, 

86 global_config, 

87 parent, 

88 self.merge_methods( 

89 methods, 

90 [ 

91 self.resolve_path, 

92 self.load_config, 

93 self.prepare_class_load, 

94 self.build_service, 

95 self.build_environment, 

96 self.load_services, 

97 self.load_environment, 

98 self.load_environment_list, 

99 ], 

100 ), 

101 ) 

102 self._config_loader_service = self 

103 

104 # Normalize and deduplicate config paths, but maintain order. 

105 self._config_path: List[str] = [] 

106 for path in self.config.get("config_path", []): 

107 if path not in self._config_path: 

108 self._config_path.append(path_join(path, abs_path=True)) 

109 # Prepend the cwd if not already on the list. 

110 cwd = path_join(os.getcwd(), abs_path=True) 

111 if cwd not in self._config_path: 

112 self._config_path.insert(0, cwd) 

113 # Append the built-in config path if not already on the list. 

114 if self.BUILTIN_CONFIG_PATH not in self._config_path: 

115 self._config_path.append(self.BUILTIN_CONFIG_PATH) 

116 

117 @property 

118 def config_paths(self) -> List[str]: 

119 """ 

120 Gets the list of config paths this service will search for config files. 

121 

122 Returns 

123 ------- 

124 List[str] 

125 """ 

126 return list(self._config_path) # make a copy to avoid modifications 

127 

128 def resolve_path(self, file_path: str, extra_paths: Optional[Iterable[str]] = None) -> str: 

129 """ 

130 Prepend the suitable `_config_path` to `path` if the latter is not absolute. If 

131 `_config_path` is `None` or `path` is absolute, return `path` as is. 

132 

133 Parameters 

134 ---------- 

135 file_path : str 

136 Path to the input config file. 

137 extra_paths : Iterable[str] 

138 Additional directories to prepend to the list of search paths. 

139 

140 Returns 

141 ------- 

142 path : str 

143 An actual path to the config or script. 

144 """ 

145 path_list = list(extra_paths or []) + self._config_path 

146 _LOG.debug("Resolve path: %s in: %s", file_path, path_list) 

147 if os.path.isabs(file_path): 

148 _LOG.debug("Path is absolute: %s", file_path) 

149 return file_path 

150 for path in path_list: 

151 full_path = path_join(path, file_path, abs_path=True) 

152 if os.path.exists(full_path): 

153 _LOG.debug("Path resolved: %s", full_path) 

154 return full_path 

155 _LOG.debug("Path not resolved: %s", file_path) 

156 return file_path 

157 

158 def load_config( 

159 self, 

160 json_file_name: str, 

161 schema_type: Optional[ConfigSchema], 

162 ) -> Dict[str, Any]: 

163 """ 

164 Load JSON config file. Search for a file relative to `_config_path` if the input 

165 path is not absolute. This method is exported to be used as a service. 

166 

167 Parameters 

168 ---------- 

169 json_file_name : str 

170 Path to the input config file. 

171 schema_type : Optional[ConfigSchema] 

172 The schema type to validate the config against. 

173 

174 Returns 

175 ------- 

176 config : Union[dict, List[dict]] 

177 Free-format dictionary that contains the configuration. 

178 """ 

179 json_file_name = self.resolve_path(json_file_name) 

180 _LOG.info("Load config: %s", json_file_name) 

181 with open(json_file_name, mode="r", encoding="utf-8") as fh_json: 

182 config = json5.load(fh_json) 

183 if schema_type is not None: 

184 try: 

185 schema_type.validate(config) 

186 except (ValidationError, SchemaError) as ex: 

187 _LOG.error( 

188 "Failed to validate config %s against schema type %s at %s", 

189 json_file_name, 

190 schema_type.name, 

191 schema_type.value, 

192 ) 

193 raise ValueError( 

194 f"Failed to validate config {json_file_name} against " 

195 f"schema type {schema_type.name} at {schema_type.value}" 

196 ) from ex 

197 if isinstance(config, dict) and config.get("$schema"): 

198 # Remove $schema attributes from the config after we've validated 

199 # them to avoid passing them on to other objects 

200 # (e.g. SqlAlchemy based storage initializers). 

201 # NOTE: we only do this for internal schemas. 

202 # Other configs that get loaded may need the schema field 

203 # (e.g. Azure ARM templates). 

204 del config["$schema"] 

205 else: 

206 _LOG.warning("Config %s is not validated against a schema.", json_file_name) 

207 return config # type: ignore[no-any-return] 

208 

209 def prepare_class_load( 

210 self, 

211 config: Dict[str, Any], 

212 global_config: Optional[Dict[str, Any]] = None, 

213 parent_args: Optional[Dict[str, TunableValue]] = None, 

214 ) -> Tuple[str, Dict[str, Any]]: 

215 """ 

216 Extract the class instantiation parameters from the configuration. Mix-in the 

217 global parameters and resolve the local file system paths, where it is required. 

218 

219 Parameters 

220 ---------- 

221 config : dict 

222 Configuration of the optimizer. 

223 global_config : dict 

224 Global configuration parameters (optional). 

225 parent_args : Dict[str, TunableValue] 

226 An optional reference of the parent CompositeEnv's const_args used to 

227 expand dynamic config parameters from. 

228 

229 Returns 

230 ------- 

231 (class_name, class_config) : (str, dict) 

232 Name of the class to instantiate and its configuration. 

233 """ 

234 class_name = config["class"] 

235 class_config = config.setdefault("config", {}) 

236 

237 # Replace any appearance of "$param_name" in the const_arg values with 

238 # the value from the parent CompositeEnv. 

239 # Note: we could consider expanding this feature to additional config 

240 # sections in the future, but for now only use it in const_args. 

241 if class_name.startswith("mlos_bench.environments."): 

242 const_args = class_config.get("const_args", {}) 

243 preprocess_dynamic_configs(dest=const_args, source=parent_args) 

244 

245 merge_parameters(dest=class_config, source=global_config) 

246 

247 for key in set(class_config).intersection(config.get("resolve_config_property_paths", [])): 

248 if isinstance(class_config[key], str): 

249 class_config[key] = self.resolve_path(class_config[key]) 

250 elif isinstance(class_config[key], (list, tuple)): 

251 class_config[key] = [self.resolve_path(path) for path in class_config[key]] 

252 else: 

253 raise ValueError(f"Parameter {key} must be a string or a list") 

254 

255 if _LOG.isEnabledFor(logging.DEBUG): 

256 _LOG.debug( 

257 "Instantiating: %s with config:\n%s", 

258 class_name, 

259 json.dumps(class_config, indent=2), 

260 ) 

261 

262 return (class_name, class_config) 

263 

264 def build_optimizer( 

265 self, 

266 *, 

267 tunables: TunableGroups, 

268 service: Service, 

269 config: Dict[str, Any], 

270 global_config: Optional[Dict[str, Any]] = None, 

271 ) -> Optimizer: 

272 """ 

273 Instantiation of mlos_bench Optimizer that depend on Service and TunableGroups. 

274 

275 A class *MUST* have a constructor that takes four named arguments: 

276 (tunables, config, global_config, service) 

277 

278 Parameters 

279 ---------- 

280 tunables : TunableGroups 

281 Tunable parameters of the environment. We need them to validate the 

282 configurations of merged-in experiments and restored/pending trials. 

283 service: Service 

284 An optional service object (e.g., providing methods to load config files, etc.) 

285 config : dict 

286 Configuration of the class to instantiate, as loaded from JSON. 

287 global_config : dict 

288 Global configuration parameters (optional). 

289 

290 Returns 

291 ------- 

292 inst : Optimizer 

293 A new instance of the `Optimizer` class. 

294 """ 

295 tunables_path = config.get("include_tunables") 

296 if tunables_path is not None: 

297 tunables = self._load_tunables(tunables_path, tunables) 

298 (class_name, class_config) = self.prepare_class_load(config, global_config) 

299 inst = instantiate_from_config( 

300 Optimizer, # type: ignore[type-abstract] 

301 class_name, 

302 tunables=tunables, 

303 config=class_config, 

304 global_config=global_config, 

305 service=service, 

306 ) 

307 _LOG.info("Created: Optimizer %s", inst) 

308 return inst 

309 

310 def build_storage( 

311 self, 

312 *, 

313 service: Service, 

314 config: Dict[str, Any], 

315 global_config: Optional[Dict[str, Any]] = None, 

316 ) -> "Storage": 

317 """ 

318 Instantiation of mlos_bench Storage objects. 

319 

320 Parameters 

321 ---------- 

322 service: Service 

323 An optional service object (e.g., providing methods to load config files, etc.) 

324 config : dict 

325 Configuration of the class to instantiate, as loaded from JSON. 

326 global_config : dict 

327 Global configuration parameters (optional). 

328 

329 Returns 

330 ------- 

331 inst : Storage 

332 A new instance of the Storage class. 

333 """ 

334 (class_name, class_config) = self.prepare_class_load(config, global_config) 

335 # pylint: disable=import-outside-toplevel 

336 from mlos_bench.storage.base_storage import Storage 

337 

338 inst = instantiate_from_config( 

339 Storage, # type: ignore[type-abstract] 

340 class_name, 

341 config=class_config, 

342 global_config=global_config, 

343 service=service, 

344 ) 

345 _LOG.info("Created: Storage %s", inst) 

346 return inst 

347 

348 def build_scheduler( # pylint: disable=too-many-arguments 

349 self, 

350 *, 

351 config: Dict[str, Any], 

352 global_config: Dict[str, Any], 

353 environment: Environment, 

354 optimizer: Optimizer, 

355 storage: "Storage", 

356 root_env_config: str, 

357 ) -> "Scheduler": 

358 """ 

359 Instantiation of mlos_bench Scheduler. 

360 

361 Parameters 

362 ---------- 

363 config : dict 

364 Configuration of the class to instantiate, as loaded from JSON. 

365 global_config : dict 

366 Global configuration parameters. 

367 environment : Environment 

368 The environment to benchmark/optimize. 

369 optimizer : Optimizer 

370 The optimizer to use. 

371 storage : Storage 

372 The storage to use. 

373 root_env_config : str 

374 Path to the root environment configuration. 

375 

376 Returns 

377 ------- 

378 inst : Scheduler 

379 A new instance of the Scheduler. 

380 """ 

381 (class_name, class_config) = self.prepare_class_load(config, global_config) 

382 # pylint: disable=import-outside-toplevel 

383 from mlos_bench.schedulers.base_scheduler import Scheduler 

384 

385 inst = instantiate_from_config( 

386 Scheduler, # type: ignore[type-abstract] 

387 class_name, 

388 config=class_config, 

389 global_config=global_config, 

390 environment=environment, 

391 optimizer=optimizer, 

392 storage=storage, 

393 root_env_config=root_env_config, 

394 ) 

395 _LOG.info("Created: Scheduler %s", inst) 

396 return inst 

397 

398 def build_environment( 

399 self, 

400 config: Dict[str, Any], 

401 tunables: TunableGroups, 

402 global_config: Optional[Dict[str, Any]] = None, 

403 parent_args: Optional[Dict[str, TunableValue]] = None, 

404 service: Optional[Service] = None, 

405 ) -> Environment: 

406 # pylint: disable=too-many-arguments,too-many-positional-arguments 

407 """ 

408 Factory method for a new environment with a given config. 

409 

410 Parameters 

411 ---------- 

412 config : dict 

413 A dictionary with three mandatory fields: 

414 "name": Human-readable string describing the environment; 

415 "class": FQN of a Python class to instantiate; 

416 "config": Free-format dictionary to pass to the constructor. 

417 tunables : TunableGroups 

418 A (possibly empty) collection of groups of tunable parameters for 

419 all environments. 

420 global_config : dict 

421 Global parameters to add to the environment config. 

422 parent_args : Dict[str, TunableValue] 

423 An optional reference of the parent CompositeEnv's const_args used to 

424 expand dynamic config parameters from. 

425 service: Service 

426 An optional service object (e.g., providing methods to 

427 deploy or reboot a VM, etc.). 

428 

429 Returns 

430 ------- 

431 env : Environment 

432 An instance of the `Environment` class initialized with `config`. 

433 """ 

434 env_name = config["name"] 

435 (env_class, env_config) = self.prepare_class_load(config, global_config, parent_args) 

436 

437 env_services_path = config.get("include_services") 

438 if env_services_path is not None: 

439 service = self.load_services(env_services_path, global_config, service) 

440 

441 env_tunables_path = config.get("include_tunables") 

442 if env_tunables_path is not None: 

443 tunables = self._load_tunables(env_tunables_path, tunables) 

444 

445 _LOG.debug("Creating env: %s :: %s", env_name, env_class) 

446 env = Environment.new( 

447 env_name=env_name, 

448 class_name=env_class, 

449 config=env_config, 

450 global_config=global_config, 

451 tunables=tunables, 

452 service=service, 

453 ) 

454 

455 _LOG.info("Created env: %s :: %s", env_name, env) 

456 return env 

457 

458 def _build_standalone_service( 

459 self, 

460 config: Dict[str, Any], 

461 global_config: Optional[Dict[str, Any]] = None, 

462 parent: Optional[Service] = None, 

463 ) -> Service: 

464 """ 

465 Factory method for a new service with a given config. 

466 

467 Parameters 

468 ---------- 

469 config : dict 

470 A dictionary with two mandatory fields: 

471 "class": FQN of a Python class to instantiate; 

472 "config": Free-format dictionary to pass to the constructor. 

473 global_config : dict 

474 Global parameters to add to the service config. 

475 parent: Service 

476 An optional reference of the parent service to mix in. 

477 

478 Returns 

479 ------- 

480 svc : Service 

481 An instance of the `Service` class initialized with `config`. 

482 """ 

483 (svc_class, svc_config) = self.prepare_class_load(config, global_config) 

484 service = Service.new(svc_class, svc_config, global_config, parent) 

485 _LOG.info("Created service: %s", service) 

486 return service 

487 

488 def _build_composite_service( 

489 self, 

490 config_list: Iterable[Dict[str, Any]], 

491 global_config: Optional[Dict[str, Any]] = None, 

492 parent: Optional[Service] = None, 

493 ) -> Service: 

494 """ 

495 Factory method for a new service with a given config. 

496 

497 Parameters 

498 ---------- 

499 config_list : a list of dict 

500 A list where each element is a dictionary with 2 mandatory fields: 

501 "class": FQN of a Python class to instantiate; 

502 "config": Free-format dictionary to pass to the constructor. 

503 global_config : dict 

504 Global parameters to add to the service config. 

505 parent: Service 

506 An optional reference of the parent service to mix in. 

507 

508 Returns 

509 ------- 

510 svc : Service 

511 An instance of the `Service` class that is a combination of all 

512 services from the list plus the parent mix-in. 

513 """ 

514 service = Service() 

515 if parent: 

516 service.register(parent.export()) 

517 

518 for config in config_list: 

519 service.register( 

520 self._build_standalone_service(config, global_config, service).export() 

521 ) 

522 

523 if _LOG.isEnabledFor(logging.DEBUG): 

524 _LOG.debug("Created mix-in service: %s", service) 

525 

526 return service 

527 

528 def build_service( 

529 self, 

530 config: Dict[str, Any], 

531 global_config: Optional[Dict[str, Any]] = None, 

532 parent: Optional[Service] = None, 

533 ) -> Service: 

534 """ 

535 Factory method for a new service with a given config. 

536 

537 Parameters 

538 ---------- 

539 config : dict 

540 A dictionary with 2 mandatory fields: 

541 "class": FQN of a Python class to instantiate; 

542 "config": Free-format dictionary to pass to the constructor. 

543 global_config : dict 

544 Global parameters to add to the service config. 

545 parent: Service 

546 An optional reference of the parent service to mix in. 

547 

548 Returns 

549 ------- 

550 svc : Service 

551 An instance of the `Service` class that is a combination of all 

552 services from the list plus the parent mix-in. 

553 """ 

554 if _LOG.isEnabledFor(logging.DEBUG): 

555 _LOG.debug("Build service from config:\n%s", json.dumps(config, indent=2)) 

556 

557 assert isinstance(config, dict) 

558 config_list: List[Dict[str, Any]] 

559 if "class" not in config: 

560 # Top level config is a simple object with a list of services 

561 config_list = config["services"] 

562 else: 

563 # Top level config is a single service 

564 if parent is None: 

565 return self._build_standalone_service(config, global_config) 

566 config_list = [config] 

567 

568 return self._build_composite_service(config_list, global_config, parent) 

569 

570 def load_environment( 

571 self, 

572 json_file_name: str, 

573 tunables: TunableGroups, 

574 global_config: Optional[Dict[str, Any]] = None, 

575 parent_args: Optional[Dict[str, TunableValue]] = None, 

576 service: Optional[Service] = None, 

577 ) -> Environment: 

578 # pylint: disable=too-many-arguments,too-many-positional-arguments 

579 """ 

580 Load and build new environment from the config file. 

581 

582 Parameters 

583 ---------- 

584 json_file_name : str 

585 The environment JSON configuration file. 

586 tunables : TunableGroups 

587 A (possibly empty) collection of tunables to add to the environment. 

588 global_config : dict 

589 Global parameters to add to the environment config. 

590 parent_args : Dict[str, TunableValue] 

591 An optional reference of the parent CompositeEnv's const_args used to 

592 expand dynamic config parameters from. 

593 service : Service 

594 An optional reference of the parent service to mix in. 

595 

596 Returns 

597 ------- 

598 env : Environment 

599 A new benchmarking environment. 

600 """ 

601 config = self.load_config(json_file_name, ConfigSchema.ENVIRONMENT) 

602 assert isinstance(config, dict) 

603 return self.build_environment(config, tunables, global_config, parent_args, service) 

604 

605 def load_environment_list( 

606 self, 

607 json_file_name: str, 

608 tunables: TunableGroups, 

609 global_config: Optional[Dict[str, Any]] = None, 

610 parent_args: Optional[Dict[str, TunableValue]] = None, 

611 service: Optional[Service] = None, 

612 ) -> List[Environment]: 

613 # pylint: disable=too-many-arguments,too-many-positional-arguments 

614 """ 

615 Load and build a list of environments from the config file. 

616 

617 Parameters 

618 ---------- 

619 json_file_name : str 

620 The environment JSON configuration file. 

621 Can contain either one environment or a list of environments. 

622 tunables : TunableGroups 

623 An (possibly empty) collection of tunables to add to the environment. 

624 global_config : dict 

625 Global parameters to add to the environment config. 

626 service : Service 

627 An optional reference of the parent service to mix in. 

628 parent_args : Dict[str, TunableValue] 

629 An optional reference of the parent CompositeEnv's const_args used to 

630 expand dynamic config parameters from. 

631 

632 Returns 

633 ------- 

634 env : List[Environment] 

635 A list of new benchmarking environments. 

636 """ 

637 config = self.load_config(json_file_name, ConfigSchema.ENVIRONMENT) 

638 return [self.build_environment(config, tunables, global_config, parent_args, service)] 

639 

640 def load_services( 

641 self, 

642 json_file_names: Iterable[str], 

643 global_config: Optional[Dict[str, Any]] = None, 

644 parent: Optional[Service] = None, 

645 ) -> Service: 

646 """ 

647 Read the configuration files and bundle all service methods from those configs 

648 into a single Service object. 

649 

650 Parameters 

651 ---------- 

652 json_file_names : list of str 

653 A list of service JSON configuration files. 

654 global_config : dict 

655 Global parameters to add to the service config. 

656 parent : Service 

657 An optional reference of the parent service to mix in. 

658 

659 Returns 

660 ------- 

661 service : Service 

662 A collection of service methods. 

663 """ 

664 _LOG.info("Load services: %s parent: %s", json_file_names, parent.__class__.__name__) 

665 service = Service({}, global_config, parent) 

666 for fname in json_file_names: 

667 config = self.load_config(fname, ConfigSchema.SERVICE) 

668 service.register(self.build_service(config, global_config, service).export()) 

669 return service 

670 

671 def _load_tunables( 

672 self, 

673 json_file_names: Iterable[str], 

674 parent: TunableGroups, 

675 ) -> TunableGroups: 

676 """ 

677 Load a collection of tunable parameters from JSON files into the parent 

678 TunableGroup. 

679 

680 This helps allow standalone environment configs to reference 

681 overlapping tunable groups configs but still allow combining them into 

682 a single instance that each environment can reference. 

683 

684 Parameters 

685 ---------- 

686 json_file_names : list of str 

687 A list of JSON files to load. 

688 parent : TunableGroups 

689 A (possibly empty) collection of tunables to add to the new collection. 

690 

691 Returns 

692 ------- 

693 tunables : TunableGroup 

694 The larger collection of tunable parameters. 

695 """ 

696 _LOG.info("Load tunables: '%s'", json_file_names) 

697 tunables = parent.copy() 

698 for fname in json_file_names: 

699 config = self.load_config(fname, ConfigSchema.TUNABLE_PARAMS) 

700 assert isinstance(config, dict) 

701 tunables.merge(TunableGroups(config)) 

702 return tunables