Working with Python API

We provide Python API which help to initialze and setup the channel easily. In this tutorial, you will write a simple program to initialize communication between eight GPUs using MSCCL++ Python API.

Setup Channel with Python API

We will setup a mesh topology with eight GPUs. Each GPU will be connected to its neighbors. The following code shows how to initialize communication with MSCCL++ Python API.

from mpi4py import MPI
import cupy as cp

from mscclpp import (
    ProxyService,
    Transport,
)
import mscclpp.comm as mscclpp_comm

def create_connection(group: mscclpp_comm.CommGroup, transport: str):
    remote_nghrs = list(range(group.nranks))
    remote_nghrs.remove(group.my_rank)
    if transport == "NVLink":
        tran = Transport.CudaIpc
    elif transport == "IB":
        tran = group.my_ib_device(group.my_rank % 8)
    else:
        assert False
    connections = group.make_connection(remote_nghrs, tran)
    return connections

if __name__ == "__main__":
    mscclpp_group = mscclpp_comm.CommGroup(MPI.COMM_WORLD)
    connections = create_connection(mscclpp_group, "NVLink")
    nelems = 1024
    memory = cp.zeros(nelem, dtype=cp.int32)
    proxy_service = ProxyService()
    simple_channels = group.make_proxy_channels(proxy_service, memory, connections)
    proxy_service.start_proxy()
    mscclpp_group.barrier()
    launch_kernel(mscclpp_group.my_rank, mscclpp_group.nranks, simple_channels, memory)
    cp.cuda.runtime.deviceSynchronize()
    mscclpp_group.barrier()

Launch Kernel with Python API

We provide some Python utils to help you launch kernel via python. Here is a exampl.

from mscclpp.utils import KernelBuilder, pack

def launch_kernel(my_rank: int, nranks: int, simple_channels: List[SimpleProxyChannel], memory: cp.ndarray):
    file_dir = os.path.dirname(os.path.abspath(__file__))
    kernel = KernelBuilder(file="test.cu", kernel_name="test", file_dir=file_dir).get_compiled_kernel()
    params = b""
    first_arg = next(iter(simple_channels.values()))
    size_of_channels = len(first_arg.device_handle().raw)
    device_handles = []
    for rank in range(nranks):
        if rank == my_rank:
            device_handles.append(
                bytes(size_of_channels)
            )  # just zeros for semaphores that do not exist
        else:
            device_handles.append(simple_channels[rank].device_handle().raw)
    # keep a reference to the device handles so that they don't get garbage collected
    d_channels = cp.asarray(memoryview(b"".join(device_handles)), dtype=cp.uint8)
    params = pack(d_channels, my_rank, nranks, memory.size)

    nblocks = 1
    nthreads = 512
    kernel.launch_kernel(params, nblocks, nthreads, 0, None)

The test kernel is defined in test.cu as follows:

#include <mscclpp/packet_device.hpp>
#include <mscclpp/proxy_channel_device.hpp>

// be careful about using channels[my_rank] as it is inavlie and it is there just for simplicity of indexing
extern "C" __global__ void __launch_bounds__(1024, 1)
    simple_proxy_channel(mscclpp::SimpleProxyChannelDeviceHandle* channels, int my_rank, int nranks,
                         int num_elements) {
    int tid = threadIdx.x;
    int nthreads = blockDim.x;
    uint64_t size_per_rank = (num_elements * sizeof(int)) / nranks;
    uint64_t my_offset = size_per_rank * my_rank;
    __syncthreads();
    if (tid < nranks && tid != my_rank) {
      channels[tid].putWithSignalAndFlush(my_offset, my_offset, size_per_rank);
      channels[tid].wait();
    }
}