Coverage for mlos_bench/mlos_bench/services/local/local_exec.py: 91%

78 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-06 00:35 +0000

1# 

2# Copyright (c) Microsoft Corporation. 

3# Licensed under the MIT License. 

4# 

5""" 

6Helper functions to run scripts and commands locally on the scheduler side. 

7""" 

8 

9import errno 

10import logging 

11import os 

12import shlex 

13import subprocess 

14import sys 

15 

16from string import Template 

17from typing import ( 

18 Any, Callable, Dict, Iterable, List, Mapping, Optional, Tuple, TYPE_CHECKING, Union 

19) 

20 

21from mlos_bench.os_environ import environ 

22from mlos_bench.services.base_service import Service 

23from mlos_bench.services.local.temp_dir_context import TempDirContextService 

24from mlos_bench.services.types.local_exec_type import SupportsLocalExec 

25 

26if TYPE_CHECKING: 

27 from mlos_bench.tunables.tunable import TunableValue 

28 

29_LOG = logging.getLogger(__name__) 

30 

31 

32def split_cmdline(cmdline: str) -> Iterable[List[str]]: 

33 """ 

34 A single command line may contain multiple commands separated by 

35 special characters (e.g., &&, ||, etc.) so further split the 

36 commandline into an array of subcommand arrays. 

37 

38 Parameters 

39 ---------- 

40 cmdline: str 

41 The commandline to split. 

42 

43 Yields 

44 ------ 

45 Iterable[List[str]] 

46 A list of subcommands or separators, each one a list of tokens. 

47 Can be rejoined as a flattened array. 

48 """ 

49 cmdline_tokens = shlex.shlex(cmdline, posix=True, punctuation_chars=True) 

50 cmdline_tokens.whitespace_split = True 

51 subcmd = [] 

52 for token in cmdline_tokens: 

53 if token[0] not in cmdline_tokens.punctuation_chars: 

54 subcmd.append(token) 

55 else: 

56 # Separator encountered. Yield any non-empty previous subcmd we accumulated. 

57 if subcmd: 

58 yield subcmd 

59 # Also return the separators. 

60 yield [token] 

61 subcmd = [] 

62 # Return the trailing subcommand. 

63 if subcmd: 

64 yield subcmd 

65 

66 

67class LocalExecService(TempDirContextService, SupportsLocalExec): 

68 """ 

69 Collection of methods to run scripts and commands in an external process 

70 on the node acting as the scheduler. Can be useful for data processing 

71 due to reduced dependency management complications vs the target environment. 

72 """ 

73 

74 def __init__(self, 

75 config: Optional[Dict[str, Any]] = None, 

76 global_config: Optional[Dict[str, Any]] = None, 

77 parent: Optional[Service] = None, 

78 methods: Union[Dict[str, Callable], List[Callable], None] = None): 

79 """ 

80 Create a new instance of a service to run scripts locally. 

81 

82 Parameters 

83 ---------- 

84 config : dict 

85 Free-format dictionary that contains parameters for the service. 

86 (E.g., root path for config files, etc.) 

87 global_config : dict 

88 Free-format dictionary of global parameters. 

89 parent : Service 

90 An optional parent service that can provide mixin functions. 

91 methods : Union[Dict[str, Callable], List[Callable], None] 

92 New methods to register with the service. 

93 """ 

94 super().__init__( 

95 config, global_config, parent, 

96 self.merge_methods(methods, [self.local_exec]) 

97 ) 

98 self.abort_on_error = self.config.get("abort_on_error", True) 

99 

100 def local_exec(self, script_lines: Iterable[str], 

101 env: Optional[Mapping[str, "TunableValue"]] = None, 

102 cwd: Optional[str] = None) -> Tuple[int, str, str]: 

103 """ 

104 Execute the script lines from `script_lines` in a local process. 

105 

106 Parameters 

107 ---------- 

108 script_lines : Iterable[str] 

109 Lines of the script to run locally. 

110 Treat every line as a separate command to run. 

111 env : Mapping[str, Union[int, float, str]] 

112 Environment variables (optional). 

113 cwd : str 

114 Work directory to run the script at. 

115 If omitted, use `temp_dir` or create a temporary dir. 

116 

117 Returns 

118 ------- 

119 (return_code, stdout, stderr) : (int, str, str) 

120 A 3-tuple of return code, stdout, and stderr of the script process. 

121 """ 

122 (return_code, stdout_list, stderr_list) = (0, [], []) 

123 with self.temp_dir_context(cwd) as temp_dir: 

124 

125 _LOG.debug("Run in directory: %s", temp_dir) 

126 

127 for line in script_lines: 

128 (return_code, stdout, stderr) = self._local_exec_script(line, env, temp_dir) 

129 stdout_list.append(stdout) 

130 stderr_list.append(stderr) 

131 if return_code != 0 and self.abort_on_error: 

132 break 

133 

134 stdout = "".join(stdout_list) 

135 stderr = "".join(stderr_list) 

136 

137 _LOG.debug("Run: stdout:\n%s", stdout) 

138 _LOG.debug("Run: stderr:\n%s", stderr) 

139 

140 return (return_code, stdout, stderr) 

141 

142 def _resolve_cmdline_script_path(self, subcmd_tokens: List[str]) -> List[str]: 

143 """ 

144 Resolves local script path (first token) in the (sub)command line 

145 tokens to its full path. 

146 

147 Parameters 

148 ---------- 

149 subcmd_tokens : List[str] 

150 The previously split tokens of the subcmd. 

151 

152 Returns 

153 ------- 

154 List[str] 

155 A modified sub command line with the script paths resolved. 

156 """ 

157 script_path = self.config_loader_service.resolve_path(subcmd_tokens[0]) 

158 # Special case check for lone `.` which means both `source` and 

159 # "current directory" (which isn't executable) in posix shells. 

160 if os.path.exists(script_path) and os.path.isfile(script_path): 

161 # If the script exists, use it. 

162 subcmd_tokens[0] = os.path.abspath(script_path) 

163 # Also check if it is a python script and prepend the currently 

164 # executing python executable path to avoid requiring 

165 # executable mode bits or a shebang. 

166 if script_path.strip().lower().endswith(".py"): 

167 subcmd_tokens.insert(0, sys.executable) 

168 return subcmd_tokens 

169 

170 def _local_exec_script(self, script_line: str, 

171 env_params: Optional[Mapping[str, "TunableValue"]], 

172 cwd: str) -> Tuple[int, str, str]: 

173 """ 

174 Execute the script from `script_path` in a local process. 

175 

176 Parameters 

177 ---------- 

178 script_line : str 

179 Line of the script to run in the local process. 

180 env_params : Mapping[str, Union[int, float, str]] 

181 Environment variables. 

182 cwd : str 

183 Work directory to run the script at. 

184 

185 Returns 

186 ------- 

187 (return_code, stdout, stderr) : (int, str, str) 

188 A 3-tuple of return code, stdout, and stderr of the script process. 

189 """ 

190 # Split the command line into set of subcmd tokens. 

191 # For each subcmd, perform path resolution fixups for any scripts being executed. 

192 subcmds = split_cmdline(script_line) 

193 subcmds = [self._resolve_cmdline_script_path(subcmd) for subcmd in subcmds] 

194 # Finally recombine all of the fixed up subcmd tokens into the original. 

195 cmd = [token for subcmd in subcmds for token in subcmd] 

196 

197 env: Dict[str, str] = {} 

198 if env_params: 

199 env = {key: str(val) for (key, val) in env_params.items()} 

200 

201 if sys.platform == 'win32': 

202 # A hack to run Python on Windows with env variables set: 

203 env_copy = environ.copy() 

204 env_copy["PYTHONPATH"] = "" 

205 env_copy.update(env) 

206 env = env_copy 

207 

208 try: 

209 if sys.platform != 'win32': 

210 cmd = [" ".join(cmd)] 

211 

212 _LOG.info("Run: %s", cmd) 

213 if _LOG.isEnabledFor(logging.DEBUG): 

214 _LOG.debug("Expands to: %s", Template(" ".join(cmd)).safe_substitute(env)) 

215 _LOG.debug("Current working dir: %s", cwd) 

216 

217 proc = subprocess.run(cmd, env=env or None, cwd=cwd, shell=True, 

218 text=True, check=False, capture_output=True) 

219 

220 _LOG.debug("Run: return code = %d", proc.returncode) 

221 return (proc.returncode, proc.stdout, proc.stderr) 

222 

223 except FileNotFoundError as ex: 

224 _LOG.warning("File not found: %s", cmd, exc_info=ex) 

225 

226 return (errno.ENOENT, "", "File not found")