mscclpp.language.rank

Classes

BaseBuffer(rank, buffer_type, offset, size)

Base class for buffer objects in MSCCL++ programs.

Buffer(rank, size)

A scratch buffer for temporary data storage during communication operations.

Rank(rank)

Represents a single rank (GPU) in an MSCCL++ program.

Semaphore(rank, initial_value)

A semaphore for asynchronus synchronization between thread blocks.

class mscclpp.language.rank.BaseBuffer(rank, buffer_type, offset, size)

Bases: object

Base class for buffer objects in MSCCL++ programs.

BaseBuffer represents a memory buffer associated with a specific rank, providing indexed access to create chunks for communication operations. It supports slice-based indexing to create Chunk objects.

Parameters:
  • rank (int)

  • buffer_type (BufferType)

  • offset (int)

  • size (int)

rank

The rank that owns this buffer.

Type:

int

buffer_type

The type of buffer (input, output, scratch).

Type:

BufferType

offset

The starting offset of this buffer.

Type:

int

size

The total size of the buffer.

Type:

int

class mscclpp.language.rank.Buffer(rank, size)

Bases: BaseBuffer

A scratch buffer for temporary data storage during communication operations.

Buffer extends BaseBuffer to provide dynamically allocated scratch space for a specific rank. It automatically manages scratch buffer allocation within the GPU’s scratch memory space.

Parameters:
rank

The rank that owns this buffer.

Type:

int

buffer_type

Always BufferType.scratch for Buffer instances.

Type:

BufferType

offset

The starting offset within the rank’s scratch space.

Type:

int

size

The total size of the allocated buffer.

Type:

int

class mscclpp.language.rank.Rank(rank)

Bases: object

Represents a single rank (GPU) in an MSCCL++ program.

Rank provides operations that can be performed locally on a single GPU, including copy operations, reduce operations, and synchronization barriers. It manages local buffer operations and coordinates with other ranks through the program context.

Parameters:

rank (int)

rank

The rank identifier for this GPU.

Type:

int

barrier(tb_list)

Create a synchronization barrier between thread blocks.

Synchronizes execution between multiple thread blocks on this rank. For a single thread block, creates a sync operation. For multiple thread blocks, creates a barrier operation.

Parameters:

tb_list (List[int]) – List of thread block IDs to synchronize.

Raises:

RuntimeError – If tb_list is empty.

Example

>>> rank0.barrier(tb_list=[0, 1, 2])
copy(dst_chunk, src_chunk, tb=None, tb_group=None)

Copy data from source chunk to destination chunk.

Performs a simple local copy operation between two chunks on this rank without any packet format conversion.

Parameters:
  • dst_chunk (Chunk) – The destination chunk to copy data to.

  • src_chunk (Chunk) – The source chunk to copy data from.

  • tb (int, optional) – The thread block ID that will execute this operation. Defaults to None.

  • tb_group (ThreadBlockGroup, optional) – The ThreadBlockGroup that will execute this operation. Defaults to None.

Example

>>> rank.copy(dst_chunk, src_chunk, tb=0)
copy_packets(dst_chunk, src_chunk, tb=None, tb_group=None)

Copy data from regular format to packet format.

Packs data from the source chunk and copies it to the destination scratch buffer in packet format.

Parameters:
  • dst_chunk (Chunk) – The destination scratch chunk to store packed data.

  • src_chunk (Chunk) – The source chunk containing data to pack.

  • tb (int, optional) – The thread block ID that will execute this operation. Defaults to None.

  • tb_group (ThreadBlockGroup, optional) – The ThreadBlockGroup that will execute this operation. Defaults to None.

Example

>>> rank.copy_packet(dst_chunk, src_chunk, tb=0)
get_input_buffer()

Get the input buffer for this rank.

Returns:

The input buffer associated with this rank.

Return type:

BaseBuffer

Example

>>> input_buf = rank.get_input_buffer()
get_output_buffer()

Get the output buffer for this rank.

Returns:

The output buffer associated with this rank.

Return type:

BaseBuffer

Example

>>> output_buf = rank.get_output_buffer()
reduce(src_chunk, other_chunks, tb=None, tb_group=None, dst_chunk=None, reduce_op=ReduceOperationType.sum, packet=False)

Perform a local reduction operation on this rank.

Reduces data from multiple chunks locally on this rank, combining the source chunk with other chunks using the specified reduction operation.

Parameters:
  • src_chunk (Chunk) – The primary source chunk to reduce.

  • other_chunks (List[Chunk]) – Additional chunks to include in the reduction.

  • tb (int, optional) – The thread block ID that will execute this operation. Defaults to None.

  • tb_group (ThreadBlockGroup, optional) – The ThreadBlockGroup that will execute this operation. Defaults to None.

  • dst_chunk (Chunk, optional) – The destination chunk for the result. If None, uses src_chunk. Defaults to None.

  • reduce_op (ReduceOperationType, optional) – The reduction operation to perform. Defaults to ReduceOperationType.sum.

  • packet (bool, optional) – Whether to operate in packet format. Defaults to False.

Raises:

RuntimeError – If chunk ranks don’t match this rank, if chunk sizes are inconsistent, or if other_chunks is empty.

Example

>>> rank.reduce(src_chunk, other_chunks, tb=0, dst_chunk)
unpack_packets(dst_chunk, src_chunk, tb=None, tb_group=None)

Copy data from packet format to regular format.

Unpacks data from packet format in the source scratch buffer and copies it to the destination chunk in regular format.

Parameters:
  • dst_chunk (Chunk) – The destination chunk to copy unpacked data to.

  • src_chunk (Chunk) – The source scratch chunk containing packed data.

  • tb (int, optional) – The thread block ID that will execute this operation. Defaults to None.

  • tb_group (ThreadBlockGroup, optional) – The ThreadBlockGroup that will execute this operation. Defaults to None.

Example

>>> rank.unpack_packet(dst_chunk, src_chunk, tb=0)
class mscclpp.language.rank.Semaphore(rank, initial_value)

Bases: object

A semaphore for asynchronus synchronization between thread blocks.

Semaphore provides acquire and release operations for synchronization between thread blocks within a rank. Each semaphore has an initial value and supports typical semaphore semantics.

Parameters:
  • rank (int)

  • initial_value (int)

id

Unique identifier for this semaphore within its rank.

Type:

int

rank

The rank that owns this semaphore.

Type:

int

initial_value

The initial value of the semaphore.

Type:

int

acquire(tb, data_sync=SyncType.both)

Acquire the semaphore from a thread block.

Blocks the thread block until the semaphore can be acquired (value > 0), then decrements the semaphore value.

Parameters:
  • tb (int) – The thread block ID that will acquire the semaphore.

  • data_sync (SyncType, optional) – Defines the order where threads inside the thread block will be synchronized (equivalent to __syncthreads()) relative to the acquire operation. Defaults to SyncType.both.

Example

>>> sem.acquire(tb=0, data_sync=SyncType.before)
release(tb, data_sync=SyncType.both)

Release the semaphore from a thread block.

Increments the semaphore value, potentially unblocking other thread blocks waiting to acquire the semaphore.

Parameters:
  • tb (int) – The thread block ID that will release the semaphore.

  • data_sync (SyncType, optional) – Defines the order where threads inside the thread block will be synchronized (equivalent to __syncthreads()) relative to the release operation. Defaults to SyncType.both.

Example

>>> sem.release(tb=0, data_sync=SyncType.after)