Coverage for mlos_bench/mlos_bench/event_loop_context.py: 92%

52 statements  

« prev     ^ index     » next       coverage.py v7.6.7, created at 2024-11-22 01:18 +0000

1# 

2# Copyright (c) Microsoft Corporation. 

3# Licensed under the MIT License. 

4# 

5"""EventLoopContext class definition.""" 

6 

7import asyncio 

8import logging 

9import sys 

10from asyncio import AbstractEventLoop 

11from concurrent.futures import Future 

12from threading import Lock as ThreadLock 

13from threading import Thread 

14from typing import Any, Coroutine, Optional, TypeVar 

15 

16if sys.version_info >= (3, 10): 

17 from typing import TypeAlias 

18else: 

19 from typing_extensions import TypeAlias 

20 

21CoroReturnType = TypeVar("CoroReturnType") # pylint: disable=invalid-name 

22"""Type variable for the return type of an :external:py:mod:`asyncio` coroutine.""" 

23 

24if sys.version_info >= (3, 9): 

25 FutureReturnType: TypeAlias = Future[CoroReturnType] 

26 """Type variable for the return type of a :py:class:`~concurrent.futures.Future`.""" 

27else: 

28 FutureReturnType: TypeAlias = Future 

29 

30_LOG = logging.getLogger(__name__) 

31 

32 

33class EventLoopContext: 

34 """ 

35 EventLoopContext encapsulates a background thread for :external:py:mod:`asyncio` 

36 event loop processing as an aid for context managers. 

37 

38 There is generally only expected to be one of these, either as a base class instance 

39 if it's specific to that functionality or for the full mlos_bench process to support 

40 parallel trial runners, for instance. 

41 

42 It's :py:meth:`.enter` and :py:meth:`.exit` routines are expected to be called 

43 from the caller's context manager routines (e.g., __enter__ and __exit__). 

44 """ 

45 

46 def __init__(self) -> None: 

47 self._event_loop: Optional[AbstractEventLoop] = None 

48 self._event_loop_thread: Optional[Thread] = None 

49 self._event_loop_thread_lock = ThreadLock() 

50 self._event_loop_thread_refcnt: int = 0 

51 

52 def _run_event_loop(self) -> None: 

53 """Runs the asyncio event loop in a background thread.""" 

54 assert self._event_loop is not None 

55 asyncio.set_event_loop(self._event_loop) 

56 self._event_loop.run_forever() 

57 

58 def enter(self) -> None: 

59 """Manages starting the background thread for event loop processing.""" 

60 # Start the background thread if it's not already running. 

61 with self._event_loop_thread_lock: 

62 if not self._event_loop_thread: 

63 assert self._event_loop_thread_refcnt == 0 

64 if self._event_loop is None: 

65 if sys.platform == "win32": 

66 asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) 

67 self._event_loop = asyncio.new_event_loop() 

68 assert not self._event_loop.is_running() 

69 self._event_loop_thread = Thread(target=self._run_event_loop, daemon=True) 

70 self._event_loop_thread.start() 

71 self._event_loop_thread_refcnt += 1 

72 

73 def exit(self) -> None: 

74 """Manages cleaning up the background thread for event loop processing.""" 

75 with self._event_loop_thread_lock: 

76 self._event_loop_thread_refcnt -= 1 

77 assert self._event_loop_thread_refcnt >= 0 

78 if self._event_loop_thread_refcnt == 0: 

79 assert self._event_loop is not None 

80 self._event_loop.call_soon_threadsafe(self._event_loop.stop) 

81 _LOG.info("Waiting for event loop thread to stop...") 

82 assert self._event_loop_thread is not None 

83 self._event_loop_thread.join(timeout=3) 

84 if self._event_loop_thread.is_alive(): 

85 raise RuntimeError("Failed to stop event loop thread.") 

86 self._event_loop_thread = None 

87 

88 def run_coroutine(self, coro: Coroutine[Any, Any, CoroReturnType]) -> FutureReturnType: 

89 """ 

90 Runs the given coroutine in the background event loop thread and returns a 

91 Future that can be used to wait for the result. 

92 

93 Parameters 

94 ---------- 

95 coro : Coroutine[Any, Any, CoroReturnType] 

96 The coroutine to run. 

97 

98 Returns 

99 ------- 

100 concurrent.futures.Future[CoroReturnType] 

101 A future that will be completed when the coroutine completes. 

102 """ 

103 assert self._event_loop_thread_refcnt > 0 

104 assert self._event_loop is not None 

105 return asyncio.run_coroutine_threadsafe(coro, self._event_loop)