Large objects as task parameters
If tasks are too large to fit in the queue, you can “stash” the data away.
class LargeBatchProcessor(WorkSpecification):
def __init__(self):
super().__init__()
self.stashblobs = BlobContainer(storage_account="mystorageaccount", container="stash-container")
self.register_context_manager(self.stashblobs)
async def list_tasks(self, seed, force=False):
async for large_batch_object in some_generator():
yield dict(batch=large_batch_object)
async def enqueue_task(self, batch):
# Stash the large object in a blob container
stash = await self.stashblobs.stash_as_pickle(batch)
return dict(filename=stash.filename, md5sum=stash.md5sum)
async def __call__(self, filename, md5sum, **kwargs):
# Retrieve the large object from the stash
batch = await self.stashblobs.unstash_from_pickle(filename, md5sum)
Ideally, set a retention policy for the stash-container, so that your stash blobs are automatically cleaned up after a certain period.