Coverage for mlos_bench/mlos_bench/tests/event_loop_context_test.py: 98%
84 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-06 00:35 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-06 00:35 +0000
1#
2# Copyright (c) Microsoft Corporation.
3# Licensed under the MIT License.
4#
5"""
6Tests for mlos_bench.event_loop_context background thread logic.
7"""
9import asyncio
10import sys
11import time
13from asyncio import AbstractEventLoop
14from threading import Thread
15from types import TracebackType
16from typing import Optional, Type
17from typing_extensions import Literal
19import pytest
21from mlos_bench.event_loop_context import EventLoopContext
24class EventLoopContextCaller:
25 """
26 Simple class to test the EventLoopContext.
27 See Also: SshService
28 """
30 EVENT_LOOP_CONTEXT = EventLoopContext()
32 def __init__(self, instance_id: int) -> None:
33 self._id = instance_id
34 self._in_context = False
36 def __repr__(self) -> str:
37 return f"{self.__class__.__name__}(id={self._id})"
39 def __enter__(self) -> None:
40 assert not self._in_context
41 self.EVENT_LOOP_CONTEXT.enter()
42 self._in_context = True
44 def __exit__(self, ex_type: Optional[Type[BaseException]],
45 ex_val: Optional[BaseException],
46 ex_tb: Optional[TracebackType]) -> Literal[False]:
47 assert self._in_context
48 self.EVENT_LOOP_CONTEXT.exit()
49 self._in_context = False
50 return False
53@pytest.mark.filterwarnings("ignore:.*(coroutine 'sleep' was never awaited).*:RuntimeWarning:.*event_loop_context_test.*:0")
54def test_event_loop_context() -> None:
55 """Test event loop context background thread setup/cleanup handling."""
56 # pylint: disable=protected-access,too-many-statements
58 # Should start with no event loop thread.
59 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread is None
61 # The background thread should only be created upon context entry.
62 event_loop_caller_instance_1 = EventLoopContextCaller(1)
63 assert event_loop_caller_instance_1
64 assert not event_loop_caller_instance_1._in_context
65 assert event_loop_caller_instance_1.EVENT_LOOP_CONTEXT._event_loop_thread is None
67 event_loop: Optional[AbstractEventLoop] = None
69 # After we enter the instance context, we should have a background thread.
70 with event_loop_caller_instance_1:
71 assert event_loop_caller_instance_1._in_context
72 assert isinstance(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread, Thread) # type: ignore[unreachable]
73 # Give the thread a chance to start.
74 # Mostly important on the underpowered Windows CI machines.
75 time.sleep(0.25)
76 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread.is_alive()
77 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 1
78 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is not None
79 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running()
80 event_loop = EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop
82 event_loop_caller_instance_2 = EventLoopContextCaller(instance_id=2)
83 assert event_loop_caller_instance_2
84 assert not event_loop_caller_instance_2._in_context
86 with event_loop_caller_instance_2:
87 assert event_loop_caller_instance_2._in_context
88 assert event_loop_caller_instance_1._in_context
89 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 2
90 # We should only get one thread for all instances.
91 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread \
92 is event_loop_caller_instance_1.EVENT_LOOP_CONTEXT._event_loop_thread \
93 is event_loop_caller_instance_2.EVENT_LOOP_CONTEXT._event_loop_thread
94 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop \
95 is event_loop_caller_instance_1.EVENT_LOOP_CONTEXT._event_loop \
96 is event_loop_caller_instance_2.EVENT_LOOP_CONTEXT._event_loop
98 assert not event_loop_caller_instance_2._in_context
100 # The background thread should remain running since we have another context still open.
101 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 1
102 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread is not None
103 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread.is_alive()
104 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is not None
105 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running()
107 start = time.time()
108 future = event_loop_caller_instance_1.EVENT_LOOP_CONTEXT.run_coroutine(asyncio.sleep(0.1, result='foo'))
109 assert 0.0 <= time.time() - start < 0.1
110 assert future.result(timeout=0.2) == 'foo'
111 assert 0.1 <= time.time() - start <= 0.2
113 # Once we exit the last context, the background thread should be stopped
114 # and unusable for running co-routines.
116 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread is None # type: ignore[unreachable] # (false positives)
117 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 0
118 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is event_loop is not None
119 assert not EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running()
120 # Check that the event loop has no more tasks.
121 assert hasattr(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop, '_ready')
122 # Windows ProactorEventLoopPolicy adds a dummy task.
123 if sys.platform == 'win32' and isinstance(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop, asyncio.ProactorEventLoop):
124 assert len(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop._ready) == 1
125 else:
126 assert len(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop._ready) == 0
127 assert hasattr(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop, '_scheduled')
128 assert len(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop._scheduled) == 0
130 with pytest.raises(AssertionError): # , pytest.warns(RuntimeWarning, match=r".*coroutine 'sleep' was never awaited"):
131 future = event_loop_caller_instance_1.EVENT_LOOP_CONTEXT.run_coroutine(asyncio.sleep(0.1, result='foo'))
132 raise ValueError(f"Future should not have been available to wait on {future.result()}")
134 # Test that when re-entering the context we have the same event loop.
135 with event_loop_caller_instance_1:
136 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is not None
137 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running()
138 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is event_loop
140 # Test running again.
141 start = time.time()
142 future = event_loop_caller_instance_1.EVENT_LOOP_CONTEXT.run_coroutine(asyncio.sleep(0.1, result='foo'))
143 assert 0.0 <= time.time() - start < 0.1
144 assert future.result(timeout=0.2) == 'foo'
145 assert 0.1 <= time.time() - start <= 0.2
148if __name__ == '__main__':
149 # For debugging in Windows which has issues with pytest detection in vscode.
150 pytest.main(["-n1", "--dist=no", "-k", "test_event_loop_context"])