Dealing with Preemption

If your command supports checkpointing, you can try to trigger a checkpoint when your job is about to be preempted. When using the ShellCommandProcessor, be sure to configure your shell to pass the signal to your python process.

# instead of:
python main.py

# run this:
set -o monitor
python main.py

Your process can catch the signal and, for example, trigger saving a checkpoint.

There’s an important distinction here between Manifold and AzureML Compute:

  • Manifold sends us SIGTERM and then stops the container after a timeout.

  • On AzureML Compute, the azure VM announces a preemption, but then Azure may reconsider this decision and not preempt the VM after all. If this is too disruptive, and you don’t have any checkpointing logic anyway, set JOBQ_DISABLE_SCHEDULED_EVENTS=1.

    The standard behavior is for jobq to send SIGTERM, wait for your command to finish, ignore its exit code, and reschedule the task.

If you’re not using ShellCommandProcessor, you likely don’t care that much about saving state when preempted. One thing you can consider is that the ProcessPool installs a signal handler that passes the signal to all subprocesses. Your subprocesses can then install their own signal handlers as they please.

Preemption walkthrough

If you want your code to be preemptible, you can start with the following sample script which is called dummy_task.py:

import signal
import time


def handle_event(
    event: signal.Signals,
    stack_frame,
):
    """Handle preemption signals."""
    print(f"CHECKPOINTING Received signal: {event}")


def main():
    print("Starting dummy task, will sleep 10 seconds.")
    signal.signal(signal.SIGTERM, handle_event)

    # Simulate a long-running task
    time.sleep(10)
    print("Finished dummy task.")


if __name__ == "__main__":
    main()

You can then queue this task:

from ai4s.jobq import JobQ, batch_enqueue
from azure.identity import AzureDefaultCredential

your_queue_name = "demo-preemption"
your_storage_account = "<YOUR-STORAGE-ACCOUNT>"
nb_tasks_to_queue = 40


all_commands_to_run = [
    "set -o monitor && python ./dummy_task.py"
] * nb_tasks_to_queue

# Enqueue the commands to the JobQ queue.
async with JobQ.from_storage_queue(
    your_queue_name, storage_account=your_storage_account, credential=AzureDefaultCredential()
) as queue:
    await batch_enqueue(
        queue,
        all_commands_to_run,
    )

You can then start a local ai4s-jobq worker to process the tasks: Note that this example uses --num-workers 2 to start two workers on the same node, so two tasks run in parallel.

ai4s-jobq <YOUR-STORAGE-ACCOUNT>/demo-preemption worker --num-workers 2 --heartbeat --max-consecutive-failures 5 --time-limit 1d

To simulate a preemption, press Ctrl-C in the terminal where the worker is running. Look at the output, you can see that the CHECKPOINTING message is printed when the SIGTERM signal is received.

Best practices: Your sigterm-handles should be quick, and should not raise exceptions. It should not call sys.exit() or similar. It should ideally just signal to the main thread to checkpoint and exit.

SIGKILL escalation for unresponsive processes

When a worker sends SIGTERM to a task subprocess (for example, during preemption or graceful shutdown), the subprocess is expected to clean up and exit. If the process ignores or mishandles SIGTERM and keeps running, the worker escalates to SIGKILL after a configurable timeout.

Each subprocess runs in its own process group (start_new_session=True), so the SIGKILL is sent to the entire process group. This ensures that any child processes spawned by the task are also terminated, preventing orphaned processes from consuming resources after the worker moves on.

Environment variable

Default

Description

JOBQ_KILL_TIMEOUT

600 (10 minutes)

Seconds to wait after SIGTERM before sending SIGKILL to the process group. Set to 0 to disable SIGKILL escalation (not recommended).

For short-lived test scenarios, you can set a lower value:

JOBQ_KILL_TIMEOUT=30 ai4s-jobq myaccount/myqueue worker --num-workers 2