Coverage for mlos_bench/mlos_bench/event_loop_context.py: 92%

51 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-05 00:36 +0000

1# 

2# Copyright (c) Microsoft Corporation. 

3# Licensed under the MIT License. 

4# 

5""" 

6EventLoopContext class definition. 

7""" 

8 

9from asyncio import AbstractEventLoop 

10from concurrent.futures import Future 

11from typing import Any, Coroutine, Optional, TypeVar 

12from threading import Lock as ThreadLock, Thread 

13 

14import asyncio 

15import logging 

16import sys 

17 

18if sys.version_info >= (3, 10): 

19 from typing import TypeAlias 

20else: 

21 from typing_extensions import TypeAlias 

22 

23CoroReturnType = TypeVar('CoroReturnType') # pylint: disable=invalid-name 

24if sys.version_info >= (3, 9): 

25 FutureReturnType: TypeAlias = Future[CoroReturnType] 

26else: 

27 FutureReturnType: TypeAlias = Future 

28 

29_LOG = logging.getLogger(__name__) 

30 

31 

32class EventLoopContext: 

33 """ 

34 EventLoopContext encapsulates a background thread for asyncio event 

35 loop processing as an aid for context managers. 

36 

37 There is generally only expected to be one of these, either as a base 

38 class instance if it's specific to that functionality or for the full 

39 mlos_bench process to support parallel trial runners, for instance. 

40 

41 It's enter() and exit() routines are expected to be called from the 

42 caller's context manager routines (e.g., __enter__ and __exit__). 

43 """ 

44 

45 def __init__(self) -> None: 

46 self._event_loop: Optional[AbstractEventLoop] = None 

47 self._event_loop_thread: Optional[Thread] = None 

48 self._event_loop_thread_lock = ThreadLock() 

49 self._event_loop_thread_refcnt: int = 0 

50 

51 def _run_event_loop(self) -> None: 

52 """ 

53 Runs the asyncio event loop in a background thread. 

54 """ 

55 assert self._event_loop is not None 

56 asyncio.set_event_loop(self._event_loop) 

57 self._event_loop.run_forever() 

58 

59 def enter(self) -> None: 

60 """ 

61 Manages starting the background thread for event loop processing. 

62 """ 

63 # Start the background thread if it's not already running. 

64 with self._event_loop_thread_lock: 

65 if not self._event_loop_thread: 

66 assert self._event_loop_thread_refcnt == 0 

67 if self._event_loop is None: 

68 if sys.platform == "win32": 

69 asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) 

70 self._event_loop = asyncio.new_event_loop() 

71 assert not self._event_loop.is_running() 

72 self._event_loop_thread = Thread(target=self._run_event_loop, daemon=True) 

73 self._event_loop_thread.start() 

74 self._event_loop_thread_refcnt += 1 

75 

76 def exit(self) -> None: 

77 """ 

78 Manages cleaning up the background thread for event loop processing. 

79 """ 

80 with self._event_loop_thread_lock: 

81 self._event_loop_thread_refcnt -= 1 

82 assert self._event_loop_thread_refcnt >= 0 

83 if self._event_loop_thread_refcnt == 0: 

84 assert self._event_loop is not None 

85 self._event_loop.call_soon_threadsafe(self._event_loop.stop) 

86 _LOG.info("Waiting for event loop thread to stop...") 

87 assert self._event_loop_thread is not None 

88 self._event_loop_thread.join(timeout=3) 

89 if self._event_loop_thread.is_alive(): 

90 raise RuntimeError("Failed to stop event loop thread.") 

91 self._event_loop_thread = None 

92 

93 def run_coroutine(self, coro: Coroutine[Any, Any, CoroReturnType]) -> FutureReturnType: 

94 """ 

95 Runs the given coroutine in the background event loop thread and 

96 returns a Future that can be used to wait for the result. 

97 

98 Parameters 

99 ---------- 

100 coro : Coroutine[Any, Any, CoroReturnType] 

101 The coroutine to run. 

102 

103 Returns 

104 ------- 

105 Future[CoroReturnType] 

106 A future that will be completed when the coroutine completes. 

107 """ 

108 assert self._event_loop_thread_refcnt > 0 

109 assert self._event_loop is not None 

110 return asyncio.run_coroutine_threadsafe(coro, self._event_loop)