Coverage for mlos_bench/mlos_bench/services/local/local_exec.py: 91%
78 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-06 00:35 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-06 00:35 +0000
1#
2# Copyright (c) Microsoft Corporation.
3# Licensed under the MIT License.
4#
5"""
6Helper functions to run scripts and commands locally on the scheduler side.
7"""
9import errno
10import logging
11import os
12import shlex
13import subprocess
14import sys
16from string import Template
17from typing import (
18 Any, Callable, Dict, Iterable, List, Mapping, Optional, Tuple, TYPE_CHECKING, Union
19)
21from mlos_bench.os_environ import environ
22from mlos_bench.services.base_service import Service
23from mlos_bench.services.local.temp_dir_context import TempDirContextService
24from mlos_bench.services.types.local_exec_type import SupportsLocalExec
26if TYPE_CHECKING:
27 from mlos_bench.tunables.tunable import TunableValue
29_LOG = logging.getLogger(__name__)
32def split_cmdline(cmdline: str) -> Iterable[List[str]]:
33 """
34 A single command line may contain multiple commands separated by
35 special characters (e.g., &&, ||, etc.) so further split the
36 commandline into an array of subcommand arrays.
38 Parameters
39 ----------
40 cmdline: str
41 The commandline to split.
43 Yields
44 ------
45 Iterable[List[str]]
46 A list of subcommands or separators, each one a list of tokens.
47 Can be rejoined as a flattened array.
48 """
49 cmdline_tokens = shlex.shlex(cmdline, posix=True, punctuation_chars=True)
50 cmdline_tokens.whitespace_split = True
51 subcmd = []
52 for token in cmdline_tokens:
53 if token[0] not in cmdline_tokens.punctuation_chars:
54 subcmd.append(token)
55 else:
56 # Separator encountered. Yield any non-empty previous subcmd we accumulated.
57 if subcmd:
58 yield subcmd
59 # Also return the separators.
60 yield [token]
61 subcmd = []
62 # Return the trailing subcommand.
63 if subcmd:
64 yield subcmd
67class LocalExecService(TempDirContextService, SupportsLocalExec):
68 """
69 Collection of methods to run scripts and commands in an external process
70 on the node acting as the scheduler. Can be useful for data processing
71 due to reduced dependency management complications vs the target environment.
72 """
74 def __init__(self,
75 config: Optional[Dict[str, Any]] = None,
76 global_config: Optional[Dict[str, Any]] = None,
77 parent: Optional[Service] = None,
78 methods: Union[Dict[str, Callable], List[Callable], None] = None):
79 """
80 Create a new instance of a service to run scripts locally.
82 Parameters
83 ----------
84 config : dict
85 Free-format dictionary that contains parameters for the service.
86 (E.g., root path for config files, etc.)
87 global_config : dict
88 Free-format dictionary of global parameters.
89 parent : Service
90 An optional parent service that can provide mixin functions.
91 methods : Union[Dict[str, Callable], List[Callable], None]
92 New methods to register with the service.
93 """
94 super().__init__(
95 config, global_config, parent,
96 self.merge_methods(methods, [self.local_exec])
97 )
98 self.abort_on_error = self.config.get("abort_on_error", True)
100 def local_exec(self, script_lines: Iterable[str],
101 env: Optional[Mapping[str, "TunableValue"]] = None,
102 cwd: Optional[str] = None) -> Tuple[int, str, str]:
103 """
104 Execute the script lines from `script_lines` in a local process.
106 Parameters
107 ----------
108 script_lines : Iterable[str]
109 Lines of the script to run locally.
110 Treat every line as a separate command to run.
111 env : Mapping[str, Union[int, float, str]]
112 Environment variables (optional).
113 cwd : str
114 Work directory to run the script at.
115 If omitted, use `temp_dir` or create a temporary dir.
117 Returns
118 -------
119 (return_code, stdout, stderr) : (int, str, str)
120 A 3-tuple of return code, stdout, and stderr of the script process.
121 """
122 (return_code, stdout_list, stderr_list) = (0, [], [])
123 with self.temp_dir_context(cwd) as temp_dir:
125 _LOG.debug("Run in directory: %s", temp_dir)
127 for line in script_lines:
128 (return_code, stdout, stderr) = self._local_exec_script(line, env, temp_dir)
129 stdout_list.append(stdout)
130 stderr_list.append(stderr)
131 if return_code != 0 and self.abort_on_error:
132 break
134 stdout = "".join(stdout_list)
135 stderr = "".join(stderr_list)
137 _LOG.debug("Run: stdout:\n%s", stdout)
138 _LOG.debug("Run: stderr:\n%s", stderr)
140 return (return_code, stdout, stderr)
142 def _resolve_cmdline_script_path(self, subcmd_tokens: List[str]) -> List[str]:
143 """
144 Resolves local script path (first token) in the (sub)command line
145 tokens to its full path.
147 Parameters
148 ----------
149 subcmd_tokens : List[str]
150 The previously split tokens of the subcmd.
152 Returns
153 -------
154 List[str]
155 A modified sub command line with the script paths resolved.
156 """
157 script_path = self.config_loader_service.resolve_path(subcmd_tokens[0])
158 # Special case check for lone `.` which means both `source` and
159 # "current directory" (which isn't executable) in posix shells.
160 if os.path.exists(script_path) and os.path.isfile(script_path):
161 # If the script exists, use it.
162 subcmd_tokens[0] = os.path.abspath(script_path)
163 # Also check if it is a python script and prepend the currently
164 # executing python executable path to avoid requiring
165 # executable mode bits or a shebang.
166 if script_path.strip().lower().endswith(".py"):
167 subcmd_tokens.insert(0, sys.executable)
168 return subcmd_tokens
170 def _local_exec_script(self, script_line: str,
171 env_params: Optional[Mapping[str, "TunableValue"]],
172 cwd: str) -> Tuple[int, str, str]:
173 """
174 Execute the script from `script_path` in a local process.
176 Parameters
177 ----------
178 script_line : str
179 Line of the script to run in the local process.
180 env_params : Mapping[str, Union[int, float, str]]
181 Environment variables.
182 cwd : str
183 Work directory to run the script at.
185 Returns
186 -------
187 (return_code, stdout, stderr) : (int, str, str)
188 A 3-tuple of return code, stdout, and stderr of the script process.
189 """
190 # Split the command line into set of subcmd tokens.
191 # For each subcmd, perform path resolution fixups for any scripts being executed.
192 subcmds = split_cmdline(script_line)
193 subcmds = [self._resolve_cmdline_script_path(subcmd) for subcmd in subcmds]
194 # Finally recombine all of the fixed up subcmd tokens into the original.
195 cmd = [token for subcmd in subcmds for token in subcmd]
197 env: Dict[str, str] = {}
198 if env_params:
199 env = {key: str(val) for (key, val) in env_params.items()}
201 if sys.platform == 'win32':
202 # A hack to run Python on Windows with env variables set:
203 env_copy = environ.copy()
204 env_copy["PYTHONPATH"] = ""
205 env_copy.update(env)
206 env = env_copy
208 try:
209 if sys.platform != 'win32':
210 cmd = [" ".join(cmd)]
212 _LOG.info("Run: %s", cmd)
213 if _LOG.isEnabledFor(logging.DEBUG):
214 _LOG.debug("Expands to: %s", Template(" ".join(cmd)).safe_substitute(env))
215 _LOG.debug("Current working dir: %s", cwd)
217 proc = subprocess.run(cmd, env=env or None, cwd=cwd, shell=True,
218 text=True, check=False, capture_output=True)
220 _LOG.debug("Run: return code = %d", proc.returncode)
221 return (proc.returncode, proc.stdout, proc.stderr)
223 except FileNotFoundError as ex:
224 _LOG.warning("File not found: %s", cmd, exc_info=ex)
226 return (errno.ENOENT, "", "File not found")