Coverage for mlos_bench/mlos_bench/tests/services/remote/ssh/test_ssh_host_service.py: 98%
93 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-06 00:35 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-06 00:35 +0000
1#
2# Copyright (c) Microsoft Corporation.
3# Licensed under the MIT License.
4#
5"""
6Tests for mlos_bench.services.remote.ssh.ssh_host_service
7"""
9from subprocess import CalledProcessError, run
11import logging
12import time
14from pytest_docker.plugin import Services as DockerServices
16from mlos_bench.services.remote.ssh.ssh_host_service import SshHostService
17from mlos_bench.services.remote.ssh.ssh_service import SshClient
19from mlos_bench.tests import requires_docker
20from mlos_bench.tests.services.remote.ssh import (SshTestServerInfo,
21 ALT_TEST_SERVER_NAME,
22 REBOOT_TEST_SERVER_NAME,
23 SSH_TEST_SERVER_NAME,
24 wait_docker_service_socket)
26_LOG = logging.getLogger(__name__)
29@requires_docker
30def test_ssh_service_remote_exec(ssh_test_server: SshTestServerInfo,
31 alt_test_server: SshTestServerInfo,
32 ssh_host_service: SshHostService) -> None:
33 """
34 Test the SshHostService remote_exec.
36 This checks state of the service across multiple invocations and states to
37 check for internal cache handling logic as well.
38 """
39 # pylint: disable=protected-access
40 with ssh_host_service:
41 config = ssh_test_server.to_ssh_service_config()
43 connection_id = SshClient.id_from_params(ssh_test_server.to_connect_params())
44 assert ssh_host_service._EVENT_LOOP_THREAD_SSH_CLIENT_CACHE is not None
45 connection_client = ssh_host_service._EVENT_LOOP_THREAD_SSH_CLIENT_CACHE._cache.get(connection_id)
46 assert connection_client is None
48 (status, results_info) = ssh_host_service.remote_exec(
49 script=["hostname"],
50 config=config,
51 env_params={},
52 )
53 assert status.is_pending()
54 assert "asyncRemoteExecResultsFuture" in results_info
55 status, results = ssh_host_service.get_remote_exec_results(results_info)
56 assert status.is_succeeded()
57 assert results["stdout"].strip() == SSH_TEST_SERVER_NAME
59 # Check that the client caching is behaving as expected.
60 connection, client = ssh_host_service._EVENT_LOOP_THREAD_SSH_CLIENT_CACHE._cache[connection_id]
61 assert connection is not None
62 assert connection._username == ssh_test_server.username
63 assert connection._host == ssh_test_server.hostname
64 assert connection._port == ssh_test_server.get_port()
65 local_port = connection._local_port
66 assert local_port
67 assert client is not None
68 assert client._conn_event.is_set()
70 # Connect to a different server.
71 (status, results_info) = ssh_host_service.remote_exec(
72 script=["hostname"],
73 config=alt_test_server.to_ssh_service_config(),
74 env_params={
75 "UNUSED": "unused", # unused, making sure it doesn't carry over with cached connections
76 },
77 )
78 assert status.is_pending()
79 assert "asyncRemoteExecResultsFuture" in results_info
80 status, results = ssh_host_service.get_remote_exec_results(results_info)
81 assert status.is_succeeded()
82 assert results["stdout"].strip() == ALT_TEST_SERVER_NAME
84 # Test reusing the existing connection.
85 (status, results_info) = ssh_host_service.remote_exec(
86 script=["echo BAR=$BAR && echo UNUSED=$UNUSED && false"],
87 config=config,
88 # Also test interacting with environment_variables.
89 env_params={
90 "BAR": "bar",
91 },
92 )
93 status, results = ssh_host_service.get_remote_exec_results(results_info)
94 assert status.is_failed() # should retain exit code from "false"
95 stdout = str(results["stdout"])
96 assert stdout.splitlines() == [
97 "BAR=bar",
98 "UNUSED=",
99 ]
100 connection, client = ssh_host_service._EVENT_LOOP_THREAD_SSH_CLIENT_CACHE._cache[connection_id]
101 assert connection._local_port == local_port
103 # Close the connection (gracefully)
104 connection.close()
106 # Try and reconnect and see if it detects the closed connection and starts over.
107 (status, results_info) = ssh_host_service.remote_exec(
108 script=[
109 # Test multi-string scripts.
110 "echo FOO=$FOO\n",
111 # Test multi-line strings.
112 "echo BAR=$BAR\necho BAZ=$BAZ",
113 ],
114 config=config,
115 # Also test interacting with environment_variables.
116 env_params={
117 'FOO': 'foo',
118 },
119 )
120 status, results = ssh_host_service.get_remote_exec_results(results_info)
121 assert status.is_succeeded()
122 stdout = str(results["stdout"])
123 lines = stdout.splitlines()
124 assert lines == [
125 "FOO=foo",
126 "BAR=",
127 "BAZ=",
128 ]
129 # Make sure it looks like we reconnected.
130 connection, client = ssh_host_service._EVENT_LOOP_THREAD_SSH_CLIENT_CACHE._cache[connection_id]
131 assert connection._local_port != local_port
133 # Make sure the cache is cleaned up on context exit.
134 assert len(SshHostService._EVENT_LOOP_THREAD_SSH_CLIENT_CACHE) == 0
137def check_ssh_service_reboot(docker_services: DockerServices,
138 reboot_test_server: SshTestServerInfo,
139 ssh_host_service: SshHostService,
140 graceful: bool) -> None:
141 """
142 Check the SshHostService reboot operation.
143 """
144 # Note: rebooting changes the port number unfortunately, but makes it
145 # easier to check for success.
146 # Also, it may cause issues with other parallel unit tests, so we run it as
147 # a part of the same unit test for now.
148 with ssh_host_service:
149 reboot_test_srv_ssh_svc_conf = reboot_test_server.to_ssh_service_config(uncached=True)
150 (status, results_info) = ssh_host_service.remote_exec(
151 script=[
152 'echo "sleeping..."',
153 'sleep 30',
154 'echo "should not reach this point"'
155 ],
156 config=reboot_test_srv_ssh_svc_conf,
157 env_params={},
158 )
159 assert status.is_pending()
160 # Wait a moment for that to start in the background thread.
161 time.sleep(1)
163 # Now try to restart the server.
164 (status, reboot_results_info) = ssh_host_service.reboot(params=reboot_test_srv_ssh_svc_conf,
165 force=not graceful)
166 assert status.is_pending()
168 (status, reboot_results_info) = ssh_host_service.wait_os_operation(reboot_results_info)
169 # NOTE: reboot/shutdown ops mostly return FAILED, even though the reboot succeeds.
170 _LOG.debug("reboot status: %s: %s", status, reboot_results_info)
172 # Check for decent error handling on disconnects.
173 status, results = ssh_host_service.get_remote_exec_results(results_info)
174 assert status.is_failed()
175 stdout = str(results["stdout"])
176 assert "sleeping" in stdout
177 assert "should not reach this point" not in stdout
179 reboot_test_srv_ssh_svc_conf_new: dict = {}
180 for _ in range(0, 3):
181 # Give docker some time to restart the service after the "reboot".
182 # Note: this relies on having a `restart_policy` in the docker-compose.yml file.
183 time.sleep(1)
184 # try to reconnect and see if the port changed
185 try:
186 run_res = run("docker ps | grep mlos_bench-test- | grep reboot", shell=True, capture_output=True, check=False)
187 print(run_res.stdout.decode())
188 print(run_res.stderr.decode())
189 reboot_test_srv_ssh_svc_conf_new = reboot_test_server.to_ssh_service_config(uncached=True)
190 if reboot_test_srv_ssh_svc_conf_new["ssh_port"] != reboot_test_srv_ssh_svc_conf["ssh_port"]:
191 break
192 except CalledProcessError as ex:
193 _LOG.info("Failed to check port for reboot test server: %s", ex)
194 assert reboot_test_srv_ssh_svc_conf_new["ssh_port"] != reboot_test_srv_ssh_svc_conf["ssh_port"]
196 wait_docker_service_socket(docker_services,
197 reboot_test_server.hostname,
198 reboot_test_srv_ssh_svc_conf_new["ssh_port"])
200 (status, results_info) = ssh_host_service.remote_exec(
201 script=["hostname"],
202 config=reboot_test_srv_ssh_svc_conf_new,
203 env_params={},
204 )
205 status, results = ssh_host_service.get_remote_exec_results(results_info)
206 assert status.is_succeeded()
207 assert results["stdout"].strip() == REBOOT_TEST_SERVER_NAME
210@requires_docker
211def test_ssh_service_reboot(locked_docker_services: DockerServices,
212 reboot_test_server: SshTestServerInfo,
213 ssh_host_service: SshHostService) -> None:
214 """
215 Test the SshHostService reboot operation.
216 """
217 # Grouped together to avoid parallel runner interactions.
218 check_ssh_service_reboot(locked_docker_services, reboot_test_server, ssh_host_service, graceful=True)
219 check_ssh_service_reboot(locked_docker_services, reboot_test_server, ssh_host_service, graceful=False)