Coverage for mlos_bench/mlos_bench/tests/event_loop_context_test.py: 98%

82 statements  

« prev     ^ index     » next       coverage.py v7.6.7, created at 2024-11-22 01:18 +0000

1# 

2# Copyright (c) Microsoft Corporation. 

3# Licensed under the MIT License. 

4# 

5"""Tests for mlos_bench.event_loop_context background thread logic.""" 

6 

7import asyncio 

8import sys 

9import time 

10from asyncio import AbstractEventLoop 

11from threading import Thread 

12from types import TracebackType 

13from typing import Literal, Optional, Type 

14 

15import pytest 

16 

17from mlos_bench.event_loop_context import EventLoopContext 

18 

19 

20class EventLoopContextCaller: 

21 """ 

22 Simple class to test the EventLoopContext. 

23 

24 See Also: SshService 

25 """ 

26 

27 EVENT_LOOP_CONTEXT = EventLoopContext() 

28 

29 def __init__(self, instance_id: int) -> None: 

30 self._id = instance_id 

31 self._in_context = False 

32 

33 def __repr__(self) -> str: 

34 return f"{self.__class__.__name__}(id={self._id})" 

35 

36 def __enter__(self) -> None: 

37 assert not self._in_context 

38 self.EVENT_LOOP_CONTEXT.enter() 

39 self._in_context = True 

40 

41 def __exit__( 

42 self, 

43 ex_type: Optional[Type[BaseException]], 

44 ex_val: Optional[BaseException], 

45 ex_tb: Optional[TracebackType], 

46 ) -> Literal[False]: 

47 assert self._in_context 

48 self.EVENT_LOOP_CONTEXT.exit() 

49 self._in_context = False 

50 return False 

51 

52 

53@pytest.mark.filterwarnings( 

54 "ignore:.*(coroutine 'sleep' was never awaited).*:RuntimeWarning:.*event_loop_context_test.*:0" 

55) 

56def test_event_loop_context() -> None: 

57 """Test event loop context background thread setup/cleanup handling.""" 

58 # pylint: disable=protected-access,too-many-statements 

59 

60 # Should start with no event loop thread. 

61 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread is None 

62 

63 # The background thread should only be created upon context entry. 

64 event_loop_caller_instance_1 = EventLoopContextCaller(1) 

65 assert event_loop_caller_instance_1 

66 assert not event_loop_caller_instance_1._in_context 

67 assert event_loop_caller_instance_1.EVENT_LOOP_CONTEXT._event_loop_thread is None 

68 

69 event_loop: Optional[AbstractEventLoop] = None 

70 

71 # After we enter the instance context, we should have a background thread. 

72 with event_loop_caller_instance_1: 

73 assert event_loop_caller_instance_1._in_context 

74 assert ( # type: ignore[unreachable] 

75 isinstance(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread, Thread) 

76 ) 

77 # Give the thread a chance to start. 

78 # Mostly important on the underpowered Windows CI machines. 

79 time.sleep(0.25) 

80 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread.is_alive() 

81 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 1 

82 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is not None 

83 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running() 

84 event_loop = EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop 

85 

86 event_loop_caller_instance_2 = EventLoopContextCaller(instance_id=2) 

87 assert event_loop_caller_instance_2 

88 assert not event_loop_caller_instance_2._in_context 

89 

90 with event_loop_caller_instance_2: 

91 assert event_loop_caller_instance_2._in_context 

92 assert event_loop_caller_instance_1._in_context 

93 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 2 

94 # We should only get one thread for all instances. 

95 assert ( 

96 EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread 

97 is event_loop_caller_instance_1.EVENT_LOOP_CONTEXT._event_loop_thread 

98 is event_loop_caller_instance_2.EVENT_LOOP_CONTEXT._event_loop_thread 

99 ) 

100 assert ( 

101 EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop 

102 is event_loop_caller_instance_1.EVENT_LOOP_CONTEXT._event_loop 

103 is event_loop_caller_instance_2.EVENT_LOOP_CONTEXT._event_loop 

104 ) 

105 

106 assert not event_loop_caller_instance_2._in_context 

107 

108 # The background thread should remain running since we have another context still open. 

109 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 1 

110 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread is not None 

111 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread.is_alive() 

112 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is not None 

113 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running() 

114 

115 start = time.time() 

116 future = event_loop_caller_instance_1.EVENT_LOOP_CONTEXT.run_coroutine( 

117 asyncio.sleep(0.1, result="foo") 

118 ) 

119 assert 0.0 <= time.time() - start < 0.1 

120 assert future.result(timeout=0.2) == "foo" 

121 assert 0.1 <= time.time() - start <= 0.2 

122 

123 # Once we exit the last context, the background thread should be stopped 

124 # and unusable for running co-routines. 

125 

126 assert ( # type: ignore[unreachable] # (false positives) 

127 EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread is None 

128 ) 

129 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop_thread_refcnt == 0 

130 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is event_loop is not None 

131 assert not EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running() 

132 # Check that the event loop has no more tasks. 

133 assert hasattr(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop, "_ready") 

134 # Windows ProactorEventLoopPolicy adds a dummy task. 

135 if sys.platform == "win32" and isinstance( 

136 EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop, asyncio.ProactorEventLoop 

137 ): 

138 assert len(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop._ready) == 1 

139 else: 

140 assert len(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop._ready) == 0 

141 assert hasattr(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop, "_scheduled") 

142 assert len(EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop._scheduled) == 0 

143 

144 with pytest.raises( 

145 AssertionError 

146 ): # , pytest.warns(RuntimeWarning, match=r".*coroutine 'sleep' was never awaited"): 

147 future = event_loop_caller_instance_1.EVENT_LOOP_CONTEXT.run_coroutine( 

148 asyncio.sleep(0.1, result="foo") 

149 ) 

150 raise ValueError(f"Future should not have been available to wait on {future.result()}") 

151 

152 # Test that when re-entering the context we have the same event loop. 

153 with event_loop_caller_instance_1: 

154 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is not None 

155 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop.is_running() 

156 assert EventLoopContextCaller.EVENT_LOOP_CONTEXT._event_loop is event_loop 

157 

158 # Test running again. 

159 start = time.time() 

160 future = event_loop_caller_instance_1.EVENT_LOOP_CONTEXT.run_coroutine( 

161 asyncio.sleep(0.1, result="foo") 

162 ) 

163 assert 0.0 <= time.time() - start < 0.1 

164 assert future.result(timeout=0.2) == "foo" 

165 assert 0.1 <= time.time() - start <= 0.2 

166 

167 

168if __name__ == "__main__": 

169 # For debugging in Windows which has issues with pytest detection in vscode. 

170 pytest.main(["-n1", "--dist=no", "-k", "test_event_loop_context"])