Python API
The Python API is mainly interesting if you want to run small tasks where the shell has too much overhead. Examples are:
You want to interleave data loading/storing and processing
You’re I/O constrained and want to run many tasks in parallel
You get the most out of the Python API if you write (…and your performance
depends on…) async code, but it’s not strictly necessary.
Queueing and Running
Rather than submitting tasks one by one using the CLI as shown in the basic example, you can implement the ai4s.jobq.orchestration.WorkSpecification protocol in Python to list all your tasks quickly.
from azure.identity import AzureCliCredential
from ai4s.jobq import JobQ, WorkSpecification
class NumberPrinting(WorkSpecification):
async def list_tasks(self, seed=None, force=False):
# Here you define the tasks that you want to run.
# Tasks are dictionaries that will be passed to the __call__ method below.
# See the WorkSpecification protocol for more options.
for i in range(10):
yield dict(my_number=i)
async def __call__(self, my_number):
# Here you define the work that you want to do for each task.
print(f"{my_number} squared is {my_number**2}.")
work_specification = NumberPrinting()
You then enqueue tasks like this:
from ai4s.jobq import batch_enqueue
async with JobQ.from_storage_queue("test-queue", storage_account="mystorageaccount", credential=AzureCliCredential()) as jobq:
await batch_enqueue(jobq, work_specification)
# or, as a shortcut:
await batch_enqueue(jobq, [dict(my_number=i) for i in range(10)])
And running multiple workers (in parallel with asyncio) looks like this:
from ai4s.jobq import launch_workers
async with JobQ.from_storage_queue("test-queue", storage_account="ai4science0eastus", credential=AzureCliCredential()) as jobq:
await launch_workers(
jobq,
work_specification,
num_workers=10
)
Multi-Worker Logging
You can contextualize your logs by writing worker and job ID. To achieve this,
add the magic _job_id and _worker_id string parameters to your callback:
async def __call__(self, my_number: int, _job_id: str, _worker_id: str):
logger = logging.getLogger(f"task.{_worker_id}.{_job_id}")
logger.info("Working on %d", my_number)
...
CPU intensive operations
You can use a ProcessPool to make sure computationally-intensive work can be parallelized and does not block the queue.
import os
import time
from functools import partial
from ai4s.jobq import WorkSpecification, ProcessPool
class NumberPrinting(WorkSpecification):
def __init__(self):
super().__init__()
self.pool = ProcessPool(pool_size=os.cpu_count())
self.register_context_manager(self.pool)
async def list_tasks(self, seed=None, force=False):
for i in range(10):
await self.pool.submit(partial(time.sleep, 5)) # Compute intensive task
yield dict(my_number=i)
async def __call__(self, my_number):
await self.pool.submit(partial(time.sleep, 5)) # Compute intensive task
print(f"{my_number} squared is {my_number**2}.")
Writing an entry point
A common use case is to simply call a function for every item in the queue.
This can be achieved using the SequentialProcessor, which removes some
of the boilerplate code with WorkSpecification and ProcessPool.
import asyncio
from ai4s.jobq import SequentialProcessor, launch_workers, JobQ, setup_logging
def my_cpu_intensive_work(**kwargs):
...
async def main():
async with JobQ.from_environment() as jobq:
setup_logging(jobq.full_name)
await launch_workers(jobq, SequentialProcessor(my_cpu_intensive_work))
asyncio.run(main())
The kwargs correspond to the dict you queued. my_cpu_intensive_work is
automatically run in a process pool of size 1. The from_environment
constructor is a shortcut that relies on the environment variables set by
ai4s-jobq QUEUE_SPEC amlt: JOBQ_STORAGE and JOBQ_QUEUE.
Working with blob storage efficiently
from ai4s.jobq import WorkSpecification
from ai4s.jobq.blob import BlobContainer
from tempfile import TemporaryDirectory
import os
class BlobSizeCounting(WorkSpecification):
def __init__(self):
super().__init__()
self.container = BlobContainer(storage_account="mystorageaccount", container="my-data")
self.register_context_manager(self.container)
async def task_seeds(self):
# You can use top-level directories in the blob storage container to parallelize the listing of blobs.
# `list_tasks` will be called in parallel for each of these 'seeds'.
walk = self.container.client.walk_blobs(name_starts_with="")
async for directory in walk:
yield directory.name
async def list_tasks(self, seed, force=False):
async for blob in self.container.client.list_blobs(name_starts_with=seed):
yield {"blob": blob.name}
async def __call__(self, blob, **kwargs):
# Download the blob and report its size (just as an example).
with TemporaryDirectory(dir="/dev/shm") as tmpdir:
filename = await self.container.download_file(blob, tmpdir)
print(f"{blob} is {os.path.getsize(filename)} bytes.")
work_specification = BlobSizeCounting()
Similar to download_file, there’s also upload_file and upload_from_folder.
The latter uploads all files in the folder concurrently.
Checking if work has already been done before enqueing
You can overload the already_done method to prevent a listed item from queueing.
This is useful when checking the item takes time. There’s a separate worker
pool that does this checking, increasing the overall efficiency. The size of
the worker pools for list_task and already_done can be parameterized with the
batch_enqueue function.
class ZipAll(WorkSpecification):
def __init__(self):
super().__init__()
self.container = BlobContainer(storage_account="mystorageaccount", container="my-data")
self.register_context_manager(self.container)
async def list_tasks(self, seed, force=False):
async for blob in self.container.client.list_blobs(name_starts_with=""):
if not blob.name.endswith(".zip")
yield {"blob": blob.name}
async def already_done(self, blob: str):
# return value True will cause the item not to be queued.
return await self.container.blob_exists(blob + ".zip")
work_specification = ZipAll()