Module tinytroupe.utils.concurrency

Expand source code
import sys
import threading
import time
from tinytroupe.utils import logger
    

def check_threads_for_lock(blocked_keywords=None):
    """
    Inspects all active threads and reports if any thread appears to be blocked
    on a lock acquisition or waiting condition.

    :param blocked_keywords: List of function names or keywords to check in stack frames.
    """
    if blocked_keywords is None:
        blocked_keywords = ['acquire', 'wait', 'Condition.wait']
    
    frames = sys._current_frames()
    print("\n--- Thread Lock Check ---")
    for thread in threading.enumerate():
        frame = frames.get(thread.ident)
        if not frame:
            continue
        
        stack_trace = []
        suspected_block = False
        while frame:
            func_name = frame.f_code.co_name
            file_name = frame.f_code.co_filename
            line_no = frame.f_lineno
            stack_trace.append(f"{func_name} at {file_name}:{line_no}")
            
            if any(keyword in func_name for keyword in blocked_keywords):
                suspected_block = True
            frame = frame.f_back
        
        if suspected_block:
            logger.info(f"[WARNING] Thread '{thread.name}' (ID: {thread.ident}) may be blocked.")
            logger.info("Stack trace:")
            for trace in stack_trace:
                logger.info(f"  {trace}")
        else:
            logger.info(f"[OK] Thread '{thread.name}' is running normally.")

def monitor_threads(interval=5, stop_event=None):
    """
    Periodically checks all threads for potential lock blocking.

    :param interval: Time in seconds between checks.
    :param stop_event: Optional threading.Event that signals monitoring should stop.
    """
    try:
        while True:
            check_threads_for_lock()
            if stop_event is None:
                time.sleep(interval)
            else:
                if stop_event.wait(interval):
                    break
    except KeyboardInterrupt:
        print("\nMonitoring stopped by user.")

# Example usage:
# monitor_threads(interval=5)

Functions

def check_threads_for_lock(blocked_keywords=None)

Inspects all active threads and reports if any thread appears to be blocked on a lock acquisition or waiting condition.

:param blocked_keywords: List of function names or keywords to check in stack frames.

Expand source code
def check_threads_for_lock(blocked_keywords=None):
    """
    Inspects all active threads and reports if any thread appears to be blocked
    on a lock acquisition or waiting condition.

    :param blocked_keywords: List of function names or keywords to check in stack frames.
    """
    if blocked_keywords is None:
        blocked_keywords = ['acquire', 'wait', 'Condition.wait']
    
    frames = sys._current_frames()
    print("\n--- Thread Lock Check ---")
    for thread in threading.enumerate():
        frame = frames.get(thread.ident)
        if not frame:
            continue
        
        stack_trace = []
        suspected_block = False
        while frame:
            func_name = frame.f_code.co_name
            file_name = frame.f_code.co_filename
            line_no = frame.f_lineno
            stack_trace.append(f"{func_name} at {file_name}:{line_no}")
            
            if any(keyword in func_name for keyword in blocked_keywords):
                suspected_block = True
            frame = frame.f_back
        
        if suspected_block:
            logger.info(f"[WARNING] Thread '{thread.name}' (ID: {thread.ident}) may be blocked.")
            logger.info("Stack trace:")
            for trace in stack_trace:
                logger.info(f"  {trace}")
        else:
            logger.info(f"[OK] Thread '{thread.name}' is running normally.")
def monitor_threads(interval=5, stop_event=None)

Periodically checks all threads for potential lock blocking.

:param interval: Time in seconds between checks. :param stop_event: Optional threading.Event that signals monitoring should stop.

Expand source code
def monitor_threads(interval=5, stop_event=None):
    """
    Periodically checks all threads for potential lock blocking.

    :param interval: Time in seconds between checks.
    :param stop_event: Optional threading.Event that signals monitoring should stop.
    """
    try:
        while True:
            check_threads_for_lock()
            if stop_event is None:
                time.sleep(interval)
            else:
                if stop_event.wait(interval):
                    break
    except KeyboardInterrupt:
        print("\nMonitoring stopped by user.")