Coverage for mlos_bench/mlos_bench/tests/services/remote/ssh/test_ssh_host_service.py: 98%

93 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-06 00:35 +0000

1# 

2# Copyright (c) Microsoft Corporation. 

3# Licensed under the MIT License. 

4# 

5""" 

6Tests for mlos_bench.services.remote.ssh.ssh_host_service 

7""" 

8 

9from subprocess import CalledProcessError, run 

10 

11import logging 

12import time 

13 

14from pytest_docker.plugin import Services as DockerServices 

15 

16from mlos_bench.services.remote.ssh.ssh_host_service import SshHostService 

17from mlos_bench.services.remote.ssh.ssh_service import SshClient 

18 

19from mlos_bench.tests import requires_docker 

20from mlos_bench.tests.services.remote.ssh import (SshTestServerInfo, 

21 ALT_TEST_SERVER_NAME, 

22 REBOOT_TEST_SERVER_NAME, 

23 SSH_TEST_SERVER_NAME, 

24 wait_docker_service_socket) 

25 

26_LOG = logging.getLogger(__name__) 

27 

28 

29@requires_docker 

30def test_ssh_service_remote_exec(ssh_test_server: SshTestServerInfo, 

31 alt_test_server: SshTestServerInfo, 

32 ssh_host_service: SshHostService) -> None: 

33 """ 

34 Test the SshHostService remote_exec. 

35 

36 This checks state of the service across multiple invocations and states to 

37 check for internal cache handling logic as well. 

38 """ 

39 # pylint: disable=protected-access 

40 with ssh_host_service: 

41 config = ssh_test_server.to_ssh_service_config() 

42 

43 connection_id = SshClient.id_from_params(ssh_test_server.to_connect_params()) 

44 assert ssh_host_service._EVENT_LOOP_THREAD_SSH_CLIENT_CACHE is not None 

45 connection_client = ssh_host_service._EVENT_LOOP_THREAD_SSH_CLIENT_CACHE._cache.get(connection_id) 

46 assert connection_client is None 

47 

48 (status, results_info) = ssh_host_service.remote_exec( 

49 script=["hostname"], 

50 config=config, 

51 env_params={}, 

52 ) 

53 assert status.is_pending() 

54 assert "asyncRemoteExecResultsFuture" in results_info 

55 status, results = ssh_host_service.get_remote_exec_results(results_info) 

56 assert status.is_succeeded() 

57 assert results["stdout"].strip() == SSH_TEST_SERVER_NAME 

58 

59 # Check that the client caching is behaving as expected. 

60 connection, client = ssh_host_service._EVENT_LOOP_THREAD_SSH_CLIENT_CACHE._cache[connection_id] 

61 assert connection is not None 

62 assert connection._username == ssh_test_server.username 

63 assert connection._host == ssh_test_server.hostname 

64 assert connection._port == ssh_test_server.get_port() 

65 local_port = connection._local_port 

66 assert local_port 

67 assert client is not None 

68 assert client._conn_event.is_set() 

69 

70 # Connect to a different server. 

71 (status, results_info) = ssh_host_service.remote_exec( 

72 script=["hostname"], 

73 config=alt_test_server.to_ssh_service_config(), 

74 env_params={ 

75 "UNUSED": "unused", # unused, making sure it doesn't carry over with cached connections 

76 }, 

77 ) 

78 assert status.is_pending() 

79 assert "asyncRemoteExecResultsFuture" in results_info 

80 status, results = ssh_host_service.get_remote_exec_results(results_info) 

81 assert status.is_succeeded() 

82 assert results["stdout"].strip() == ALT_TEST_SERVER_NAME 

83 

84 # Test reusing the existing connection. 

85 (status, results_info) = ssh_host_service.remote_exec( 

86 script=["echo BAR=$BAR && echo UNUSED=$UNUSED && false"], 

87 config=config, 

88 # Also test interacting with environment_variables. 

89 env_params={ 

90 "BAR": "bar", 

91 }, 

92 ) 

93 status, results = ssh_host_service.get_remote_exec_results(results_info) 

94 assert status.is_failed() # should retain exit code from "false" 

95 stdout = str(results["stdout"]) 

96 assert stdout.splitlines() == [ 

97 "BAR=bar", 

98 "UNUSED=", 

99 ] 

100 connection, client = ssh_host_service._EVENT_LOOP_THREAD_SSH_CLIENT_CACHE._cache[connection_id] 

101 assert connection._local_port == local_port 

102 

103 # Close the connection (gracefully) 

104 connection.close() 

105 

106 # Try and reconnect and see if it detects the closed connection and starts over. 

107 (status, results_info) = ssh_host_service.remote_exec( 

108 script=[ 

109 # Test multi-string scripts. 

110 "echo FOO=$FOO\n", 

111 # Test multi-line strings. 

112 "echo BAR=$BAR\necho BAZ=$BAZ", 

113 ], 

114 config=config, 

115 # Also test interacting with environment_variables. 

116 env_params={ 

117 'FOO': 'foo', 

118 }, 

119 ) 

120 status, results = ssh_host_service.get_remote_exec_results(results_info) 

121 assert status.is_succeeded() 

122 stdout = str(results["stdout"]) 

123 lines = stdout.splitlines() 

124 assert lines == [ 

125 "FOO=foo", 

126 "BAR=", 

127 "BAZ=", 

128 ] 

129 # Make sure it looks like we reconnected. 

130 connection, client = ssh_host_service._EVENT_LOOP_THREAD_SSH_CLIENT_CACHE._cache[connection_id] 

131 assert connection._local_port != local_port 

132 

133 # Make sure the cache is cleaned up on context exit. 

134 assert len(SshHostService._EVENT_LOOP_THREAD_SSH_CLIENT_CACHE) == 0 

135 

136 

137def check_ssh_service_reboot(docker_services: DockerServices, 

138 reboot_test_server: SshTestServerInfo, 

139 ssh_host_service: SshHostService, 

140 graceful: bool) -> None: 

141 """ 

142 Check the SshHostService reboot operation. 

143 """ 

144 # Note: rebooting changes the port number unfortunately, but makes it 

145 # easier to check for success. 

146 # Also, it may cause issues with other parallel unit tests, so we run it as 

147 # a part of the same unit test for now. 

148 with ssh_host_service: 

149 reboot_test_srv_ssh_svc_conf = reboot_test_server.to_ssh_service_config(uncached=True) 

150 (status, results_info) = ssh_host_service.remote_exec( 

151 script=[ 

152 'echo "sleeping..."', 

153 'sleep 30', 

154 'echo "should not reach this point"' 

155 ], 

156 config=reboot_test_srv_ssh_svc_conf, 

157 env_params={}, 

158 ) 

159 assert status.is_pending() 

160 # Wait a moment for that to start in the background thread. 

161 time.sleep(1) 

162 

163 # Now try to restart the server. 

164 (status, reboot_results_info) = ssh_host_service.reboot(params=reboot_test_srv_ssh_svc_conf, 

165 force=not graceful) 

166 assert status.is_pending() 

167 

168 (status, reboot_results_info) = ssh_host_service.wait_os_operation(reboot_results_info) 

169 # NOTE: reboot/shutdown ops mostly return FAILED, even though the reboot succeeds. 

170 _LOG.debug("reboot status: %s: %s", status, reboot_results_info) 

171 

172 # Check for decent error handling on disconnects. 

173 status, results = ssh_host_service.get_remote_exec_results(results_info) 

174 assert status.is_failed() 

175 stdout = str(results["stdout"]) 

176 assert "sleeping" in stdout 

177 assert "should not reach this point" not in stdout 

178 

179 reboot_test_srv_ssh_svc_conf_new: dict = {} 

180 for _ in range(0, 3): 

181 # Give docker some time to restart the service after the "reboot". 

182 # Note: this relies on having a `restart_policy` in the docker-compose.yml file. 

183 time.sleep(1) 

184 # try to reconnect and see if the port changed 

185 try: 

186 run_res = run("docker ps | grep mlos_bench-test- | grep reboot", shell=True, capture_output=True, check=False) 

187 print(run_res.stdout.decode()) 

188 print(run_res.stderr.decode()) 

189 reboot_test_srv_ssh_svc_conf_new = reboot_test_server.to_ssh_service_config(uncached=True) 

190 if reboot_test_srv_ssh_svc_conf_new["ssh_port"] != reboot_test_srv_ssh_svc_conf["ssh_port"]: 

191 break 

192 except CalledProcessError as ex: 

193 _LOG.info("Failed to check port for reboot test server: %s", ex) 

194 assert reboot_test_srv_ssh_svc_conf_new["ssh_port"] != reboot_test_srv_ssh_svc_conf["ssh_port"] 

195 

196 wait_docker_service_socket(docker_services, 

197 reboot_test_server.hostname, 

198 reboot_test_srv_ssh_svc_conf_new["ssh_port"]) 

199 

200 (status, results_info) = ssh_host_service.remote_exec( 

201 script=["hostname"], 

202 config=reboot_test_srv_ssh_svc_conf_new, 

203 env_params={}, 

204 ) 

205 status, results = ssh_host_service.get_remote_exec_results(results_info) 

206 assert status.is_succeeded() 

207 assert results["stdout"].strip() == REBOOT_TEST_SERVER_NAME 

208 

209 

210@requires_docker 

211def test_ssh_service_reboot(locked_docker_services: DockerServices, 

212 reboot_test_server: SshTestServerInfo, 

213 ssh_host_service: SshHostService) -> None: 

214 """ 

215 Test the SshHostService reboot operation. 

216 """ 

217 # Grouped together to avoid parallel runner interactions. 

218 check_ssh_service_reboot(locked_docker_services, reboot_test_server, ssh_host_service, graceful=True) 

219 check_ssh_service_reboot(locked_docker_services, reboot_test_server, ssh_host_service, graceful=False)