Coverage for mlos_bench/mlos_bench/tests/event_loop_context_test.py: 98%

84 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-06 00:35 +0000

1# 

2# Copyright (c) Microsoft Corporation. 

3# Licensed under the MIT License. 

4# 

5""" 

6Tests for mlos_bench.event_loop_context background thread logic. 

7""" 

8 

9import asyncio 

10import sys 

11import time 

12 

13from asyncio import AbstractEventLoop 

14from threading import Thread 

15from types import TracebackType 

16from typing import Optional, Type 

17from typing_extensions import Literal 

18 

19import pytest 

20 

21from mlos_bench.event_loop_context import EventLoopContext 

22 

23 

24class EventLoopContextCaller: 

25 """ 

26 Simple class to test the EventLoopContext. 

27 See Also: SshService 

28 """ 

29 

30 EVENT_LOOP_CONTEXT = EventLoopContext() 

31 

32 def __init__(self, instance_id: int) -> None: 

33 self._id = instance_id 

34 self._in_context = False 

35 

36 def __repr__(self) -> str: 

37 return f"{self.__class__.__name__}(id={self._id})" 

38 

39 def __enter__(self) -> None: 

40 assert not self._in_context 

41 self.EVENT_LOOP_CONTEXT.enter() 

42 self._in_context = True 

43 

44 def __exit__(self, ex_type: Optional[Type[BaseException]], 

45 ex_val: Optional[BaseException], 

46 ex_tb: Optional[TracebackType]) -> Literal[False]: 

47 assert self._in_context 

48 self.EVENT_LOOP_CONTEXT.exit() 

49 self._in_context = False 

50 return False 

51 

52 

53@pytest.mark.filterwarnings("ignore:.*(coroutine 'sleep' was never awaited).*:RuntimeWarning:.*event_loop_context_test.*:0") 

54def test_event_loop_context() -> None: 

55 """Test event loop context background thread setup/cleanup handling.""" 

56 # pylint: disable=protected-access,too-many-statements 

57 

58 # Should start with no event loop thread. 

59 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread is None 

60 

61 # The background thread should only be created upon context entry. 

62 event_loop_caller_instance_1 = EventLoopContextCaller(1) 

63 assert event_loop_caller_instance_1 

64 assert not event_loop_caller_instance_1._in_context 

65 assert event_loop_caller_instance_1.EVENT_LOOP_CONTEXT._event_loop_thread is None 

66 

67 event_loop: Optional[AbstractEventLoop] = None 

68 

69 # After we enter the instance context, we should have a background thread. 

70 with event_loop_caller_instance_1: 

71 assert event_loop_caller_instance_1._in_context 

72 assert isinstance(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread, Thread) # type: ignore[unreachable] 

73 # Give the thread a chance to start. 

74 # Mostly important on the underpowered Windows CI machines. 

75 time.sleep(0.25) 

76 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread.is_alive() 

77 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 1 

78 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is not None 

79 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running() 

80 event_loop = EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop 

81 

82 event_loop_caller_instance_2 = EventLoopContextCaller(instance_id=2) 

83 assert event_loop_caller_instance_2 

84 assert not event_loop_caller_instance_2._in_context 

85 

86 with event_loop_caller_instance_2: 

87 assert event_loop_caller_instance_2._in_context 

88 assert event_loop_caller_instance_1._in_context 

89 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 2 

90 # We should only get one thread for all instances. 

91 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread \ 

92 is event_loop_caller_instance_1.EVENT_LOOP_CONTEXT._event_loop_thread \ 

93 is event_loop_caller_instance_2.EVENT_LOOP_CONTEXT._event_loop_thread 

94 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop \ 

95 is event_loop_caller_instance_1.EVENT_LOOP_CONTEXT._event_loop \ 

96 is event_loop_caller_instance_2.EVENT_LOOP_CONTEXT._event_loop 

97 

98 assert not event_loop_caller_instance_2._in_context 

99 

100 # The background thread should remain running since we have another context still open. 

101 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 1 

102 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread is not None 

103 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread.is_alive() 

104 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is not None 

105 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running() 

106 

107 start = time.time() 

108 future = event_loop_caller_instance_1.EVENT_LOOP_CONTEXT.run_coroutine(asyncio.sleep(0.1, result='foo')) 

109 assert 0.0 <= time.time() - start < 0.1 

110 assert future.result(timeout=0.2) == 'foo' 

111 assert 0.1 <= time.time() - start <= 0.2 

112 

113 # Once we exit the last context, the background thread should be stopped 

114 # and unusable for running co-routines. 

115 

116 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread is None # type: ignore[unreachable] # (false positives) 

117 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 0 

118 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is event_loop is not None 

119 assert not EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running() 

120 # Check that the event loop has no more tasks. 

121 assert hasattr(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop, '_ready') 

122 # Windows ProactorEventLoopPolicy adds a dummy task. 

123 if sys.platform == 'win32' and isinstance(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop, asyncio.ProactorEventLoop): 

124 assert len(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop._ready) == 1 

125 else: 

126 assert len(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop._ready) == 0 

127 assert hasattr(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop, '_scheduled') 

128 assert len(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop._scheduled) == 0 

129 

130 with pytest.raises(AssertionError): # , pytest.warns(RuntimeWarning, match=r".*coroutine 'sleep' was never awaited"): 

131 future = event_loop_caller_instance_1.EVENT_LOOP_CONTEXT.run_coroutine(asyncio.sleep(0.1, result='foo')) 

132 raise ValueError(f"Future should not have been available to wait on {future.result()}") 

133 

134 # Test that when re-entering the context we have the same event loop. 

135 with event_loop_caller_instance_1: 

136 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is not None 

137 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running() 

138 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is event_loop 

139 

140 # Test running again. 

141 start = time.time() 

142 future = event_loop_caller_instance_1.EVENT_LOOP_CONTEXT.run_coroutine(asyncio.sleep(0.1, result='foo')) 

143 assert 0.0 <= time.time() - start < 0.1 

144 assert future.result(timeout=0.2) == 'foo' 

145 assert 0.1 <= time.time() - start <= 0.2 

146 

147 

148if __name__ == '__main__': 

149 # For debugging in Windows which has issues with pytest detection in vscode. 

150 pytest.main(["-n1", "--dist=no", "-k", "test_event_loop_context"])