Coverage for mlos_bench/mlos_bench/tests/event_loop_context_test.py: 98%
82 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-20 00:44 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-20 00:44 +0000
1#
2# Copyright (c) Microsoft Corporation.
3# Licensed under the MIT License.
4#
5"""Tests for mlos_bench.event_loop_context background thread logic."""
7import asyncio
8import sys
9import time
10from asyncio import AbstractEventLoop
11from threading import Thread
12from types import TracebackType
13from typing import Literal, Optional, Type
15import pytest
17from mlos_bench.event_loop_context import EventLoopContext
20class EventLoopContextCaller:
21 """
22 Simple class to test the EventLoopContext.
24 See Also: SshService
25 """
27 EVENT_LOOP_CONTEXT = EventLoopContext()
29 def __init__(self, instance_id: int) -> None:
30 self._id = instance_id
31 self._in_context = False
33 def __repr__(self) -> str:
34 return f"{self.__class__.__name__}(id={self._id})"
36 def __enter__(self) -> None:
37 assert not self._in_context
38 self.EVENT_LOOP_CONTEXT.enter()
39 self._in_context = True
41 def __exit__(
42 self,
43 ex_type: Optional[Type[BaseException]],
44 ex_val: Optional[BaseException],
45 ex_tb: Optional[TracebackType],
46 ) -> Literal[False]:
47 assert self._in_context
48 self.EVENT_LOOP_CONTEXT.exit()
49 self._in_context = False
50 return False
53@pytest.mark.filterwarnings(
54 "ignore:.*(coroutine 'sleep' was never awaited).*:RuntimeWarning:.*event_loop_context_test.*:0"
55)
56def test_event_loop_context() -> None:
57 """Test event loop context background thread setup/cleanup handling."""
58 # pylint: disable=protected-access,too-many-statements
60 # Should start with no event loop thread.
61 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread is None
63 # The background thread should only be created upon context entry.
64 event_loop_caller_instance_1 = EventLoopContextCaller(1)
65 assert event_loop_caller_instance_1
66 assert not event_loop_caller_instance_1._in_context
67 assert event_loop_caller_instance_1.EVENT_LOOP_CONTEXT._event_loop_thread is None
69 event_loop: Optional[AbstractEventLoop] = None
71 # After we enter the instance context, we should have a background thread.
72 with event_loop_caller_instance_1:
73 assert event_loop_caller_instance_1._in_context
74 assert ( # type: ignore[unreachable]
75 isinstance(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread, Thread)
76 )
77 # Give the thread a chance to start.
78 # Mostly important on the underpowered Windows CI machines.
79 time.sleep(0.25)
80 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread.is_alive()
81 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 1
82 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is not None
83 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running()
84 event_loop = EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop
86 event_loop_caller_instance_2 = EventLoopContextCaller(instance_id=2)
87 assert event_loop_caller_instance_2
88 assert not event_loop_caller_instance_2._in_context
90 with event_loop_caller_instance_2:
91 assert event_loop_caller_instance_2._in_context
92 assert event_loop_caller_instance_1._in_context
93 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 2
94 # We should only get one thread for all instances.
95 assert (
96 EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread
97 is event_loop_caller_instance_1.EVENT_LOOP_CONTEXT._event_loop_thread
98 is event_loop_caller_instance_2.EVENT_LOOP_CONTEXT._event_loop_thread
99 )
100 assert (
101 EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop
102 is event_loop_caller_instance_1.EVENT_LOOP_CONTEXT._event_loop
103 is event_loop_caller_instance_2.EVENT_LOOP_CONTEXT._event_loop
104 )
106 assert not event_loop_caller_instance_2._in_context
108 # The background thread should remain running since we have another context still open.
109 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 1
110 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread is not None
111 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread.is_alive()
112 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is not None
113 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running()
115 start = time.time()
116 future = event_loop_caller_instance_1.EVENT_LOOP_CONTEXT.run_coroutine(
117 asyncio.sleep(0.1, result="foo")
118 )
119 assert 0.0 <= time.time() - start < 0.1
120 assert future.result(timeout=0.2) == "foo"
121 assert 0.1 <= time.time() - start <= 0.2
123 # Once we exit the last context, the background thread should be stopped
124 # and unusable for running co-routines.
126 assert ( # type: ignore[unreachable] # (false positives)
127 EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread is None
128 )
129 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 0
130 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is event_loop is not None
131 assert not EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running()
132 # Check that the event loop has no more tasks.
133 assert hasattr(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop, "_ready")
134 # Windows ProactorEventLoopPolicy adds a dummy task.
135 if sys.platform == "win32" and isinstance(
136 EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop, asyncio.ProactorEventLoop
137 ):
138 assert len(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop._ready) == 1
139 else:
140 assert len(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop._ready) == 0
141 assert hasattr(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop, "_scheduled")
142 assert len(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop._scheduled) == 0
144 with pytest.raises(
145 AssertionError
146 ): # , pytest.warns(RuntimeWarning, match=r".*coroutine 'sleep' was never awaited"):
147 future = event_loop_caller_instance_1.EVENT_LOOP_CONTEXT.run_coroutine(
148 asyncio.sleep(0.1, result="foo")
149 )
150 raise ValueError(f"Future should not have been available to wait on {future.result()}")
152 # Test that when re-entering the context we have the same event loop.
153 with event_loop_caller_instance_1:
154 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is not None
155 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running()
156 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is event_loop
158 # Test running again.
159 start = time.time()
160 future = event_loop_caller_instance_1.EVENT_LOOP_CONTEXT.run_coroutine(
161 asyncio.sleep(0.1, result="foo")
162 )
163 assert 0.0 <= time.time() - start < 0.1
164 assert future.result(timeout=0.2) == "foo"
165 assert 0.1 <= time.time() - start <= 0.2
168if __name__ == "__main__":
169 # For debugging in Windows which has issues with pytest detection in vscode.
170 pytest.main(["-n1", "--dist=no", "-k", "test_event_loop_context"])