Coverage for mlos_bench/mlos_bench/services/local/local_exec.py: 91%

77 statements  

« prev     ^ index     » next       coverage.py v7.6.7, created at 2024-11-22 01:18 +0000

1# 

2# Copyright (c) Microsoft Corporation. 

3# Licensed under the MIT License. 

4# 

5"""Helper functions to run scripts and commands locally on the scheduler side.""" 

6 

7import errno 

8import logging 

9import os 

10import shlex 

11import subprocess 

12import sys 

13from string import Template 

14from typing import ( 

15 TYPE_CHECKING, 

16 Any, 

17 Callable, 

18 Dict, 

19 Iterable, 

20 List, 

21 Mapping, 

22 Optional, 

23 Tuple, 

24 Union, 

25) 

26 

27from mlos_bench.os_environ import environ 

28from mlos_bench.services.base_service import Service 

29from mlos_bench.services.local.temp_dir_context import TempDirContextService 

30from mlos_bench.services.types.local_exec_type import SupportsLocalExec 

31 

32if TYPE_CHECKING: 

33 from mlos_bench.tunables.tunable import TunableValue 

34 

35_LOG = logging.getLogger(__name__) 

36 

37 

38def split_cmdline(cmdline: str) -> Iterable[List[str]]: 

39 """ 

40 A single command line may contain multiple commands separated by special characters 

41 (e.g., &&, ||, etc.) so further split the commandline into an array of subcommand 

42 arrays. 

43 

44 Parameters 

45 ---------- 

46 cmdline: str 

47 The commandline to split. 

48 

49 Yields 

50 ------ 

51 Iterable[List[str]] 

52 A list of subcommands or separators, each one a list of tokens. 

53 Can be rejoined as a flattened array. 

54 """ 

55 cmdline_tokens = shlex.shlex(cmdline, posix=True, punctuation_chars=True) 

56 cmdline_tokens.whitespace_split = True 

57 subcmd = [] 

58 for token in cmdline_tokens: 

59 if token[0] not in cmdline_tokens.punctuation_chars: 

60 subcmd.append(token) 

61 else: 

62 # Separator encountered. Yield any non-empty previous subcmd we accumulated. 

63 if subcmd: 

64 yield subcmd 

65 # Also return the separators. 

66 yield [token] 

67 subcmd = [] 

68 # Return the trailing subcommand. 

69 if subcmd: 

70 yield subcmd 

71 

72 

73class LocalExecService(TempDirContextService, SupportsLocalExec): 

74 """ 

75 Collection of methods to run scripts and commands in an external process on the node 

76 acting as the scheduler. 

77 

78 Can be useful for data processing due to reduced dependency management complications 

79 vs the target environment. 

80 """ 

81 

82 def __init__( 

83 self, 

84 config: Optional[Dict[str, Any]] = None, 

85 global_config: Optional[Dict[str, Any]] = None, 

86 parent: Optional[Service] = None, 

87 methods: Union[Dict[str, Callable], List[Callable], None] = None, 

88 ): 

89 """ 

90 Create a new instance of a service to run scripts locally. 

91 

92 Parameters 

93 ---------- 

94 config : dict 

95 Free-format dictionary that contains parameters for the service. 

96 (E.g., root path for config files, etc.) 

97 global_config : dict 

98 Free-format dictionary of global parameters. 

99 parent : Service 

100 An optional parent service that can provide mixin functions. 

101 methods : Union[Dict[str, Callable], List[Callable], None] 

102 New methods to register with the service. 

103 """ 

104 super().__init__( 

105 config, 

106 global_config, 

107 parent, 

108 self.merge_methods(methods, [self.local_exec]), 

109 ) 

110 self.abort_on_error = self.config.get("abort_on_error", True) 

111 

112 def local_exec( 

113 self, 

114 script_lines: Iterable[str], 

115 env: Optional[Mapping[str, "TunableValue"]] = None, 

116 cwd: Optional[str] = None, 

117 ) -> Tuple[int, str, str]: 

118 """ 

119 Execute the script lines from `script_lines` in a local process. 

120 

121 Parameters 

122 ---------- 

123 script_lines : Iterable[str] 

124 Lines of the script to run locally. 

125 Treat every line as a separate command to run. 

126 env : Mapping[str, Union[int, float, str]] 

127 Environment variables (optional). 

128 cwd : str 

129 Work directory to run the script at. 

130 If omitted, use `temp_dir` or create a temporary dir. 

131 

132 Returns 

133 ------- 

134 (return_code, stdout, stderr) : (int, str, str) 

135 A 3-tuple of return code, stdout, and stderr of the script process. 

136 """ 

137 (return_code, stdout_list, stderr_list) = (0, [], []) 

138 with self.temp_dir_context(cwd) as temp_dir: 

139 

140 _LOG.debug("Run in directory: %s", temp_dir) 

141 

142 for line in script_lines: 

143 (return_code, stdout, stderr) = self._local_exec_script(line, env, temp_dir) 

144 stdout_list.append(stdout) 

145 stderr_list.append(stderr) 

146 if return_code != 0 and self.abort_on_error: 

147 break 

148 

149 stdout = "".join(stdout_list) 

150 stderr = "".join(stderr_list) 

151 

152 _LOG.debug("Run: stdout:\n%s", stdout) 

153 _LOG.debug("Run: stderr:\n%s", stderr) 

154 

155 return (return_code, stdout, stderr) 

156 

157 def _resolve_cmdline_script_path(self, subcmd_tokens: List[str]) -> List[str]: 

158 """ 

159 Resolves local script path (first token) in the (sub)command line tokens to its 

160 full path. 

161 

162 Parameters 

163 ---------- 

164 subcmd_tokens : List[str] 

165 The previously split tokens of the subcmd. 

166 

167 Returns 

168 ------- 

169 List[str] 

170 A modified sub command line with the script paths resolved. 

171 """ 

172 script_path = self.config_loader_service.resolve_path(subcmd_tokens[0]) 

173 # Special case check for lone `.` which means both `source` and 

174 # "current directory" (which isn't executable) in posix shells. 

175 if os.path.exists(script_path) and os.path.isfile(script_path): 

176 # If the script exists, use it. 

177 subcmd_tokens[0] = os.path.abspath(script_path) 

178 # Also check if it is a python script and prepend the currently 

179 # executing python executable path to avoid requiring 

180 # executable mode bits or a shebang. 

181 if script_path.strip().lower().endswith(".py"): 

182 subcmd_tokens.insert(0, sys.executable) 

183 return subcmd_tokens 

184 

185 def _local_exec_script( 

186 self, 

187 script_line: str, 

188 env_params: Optional[Mapping[str, "TunableValue"]], 

189 cwd: str, 

190 ) -> Tuple[int, str, str]: 

191 """ 

192 Execute the script from `script_path` in a local process. 

193 

194 Parameters 

195 ---------- 

196 script_line : str 

197 Line of the script to run in the local process. 

198 env_params : Mapping[str, Union[int, float, str]] 

199 Environment variables. 

200 cwd : str 

201 Work directory to run the script at. 

202 

203 Returns 

204 ------- 

205 (return_code, stdout, stderr) : (int, str, str) 

206 A 3-tuple of return code, stdout, and stderr of the script process. 

207 """ 

208 # Split the command line into set of subcmd tokens. 

209 # For each subcmd, perform path resolution fixups for any scripts being executed. 

210 subcmds = split_cmdline(script_line) 

211 subcmds = [self._resolve_cmdline_script_path(subcmd) for subcmd in subcmds] 

212 # Finally recombine all of the fixed up subcmd tokens into the original. 

213 cmd = [token for subcmd in subcmds for token in subcmd] 

214 

215 env: Dict[str, str] = {} 

216 if env_params: 

217 env = {key: str(val) for (key, val) in env_params.items()} 

218 

219 if sys.platform == "win32": 

220 # A hack to run Python on Windows with env variables set: 

221 env_copy = environ.copy() 

222 env_copy["PYTHONPATH"] = "" 

223 env_copy.update(env) 

224 env = env_copy 

225 

226 try: 

227 if sys.platform != "win32": 

228 cmd = [" ".join(cmd)] 

229 

230 _LOG.info("Run: %s", cmd) 

231 if _LOG.isEnabledFor(logging.DEBUG): 

232 _LOG.debug("Expands to: %s", Template(" ".join(cmd)).safe_substitute(env)) 

233 _LOG.debug("Current working dir: %s", cwd) 

234 

235 proc = subprocess.run( 

236 cmd, 

237 env=env or None, 

238 cwd=cwd, 

239 shell=True, 

240 text=True, 

241 check=False, 

242 capture_output=True, 

243 ) 

244 

245 _LOG.debug("Run: return code = %d", proc.returncode) 

246 return (proc.returncode, proc.stdout, proc.stderr) 

247 

248 except FileNotFoundError as ex: 

249 _LOG.warning("File not found: %s", cmd, exc_info=ex) 

250 

251 return (errno.ENOENT, "", "File not found")