Lancelot / src / gpudb / test_broadcast.cu
test_broadcast.cu
Raw
#include <iostream>
#include <stdio.h>
#include <curand.h>
#include <cstdlib>
#include <cuda.h>
#include <cub/cub.cuh>
#include <assert.h>
#include <nccl.h>

using namespace std;

#define CHECK_LAST_CUDA_ERROR() checkLast(__FILE__, __LINE__)
void checkLast(const char* const file, const int line)
{
    cudaError_t err{cudaGetLastError()};
    if (err != cudaSuccess)
    {
        std::cerr << "CUDA Runtime Error at: " << file << ":" << line
                  << std::endl;
        std::cerr << cudaGetErrorString(err) << std::endl;
        // We don't exit when we encounter CUDA errors in this example.
        // std::exit(EXIT_FAILURE);
    }
}

__global__ void kernel(int *a, int rank) { 

  if(rank == 0)
    printf("%d\t", a[threadIdx.x]); 
      else
        printf("%d\t", a[threadIdx.x]*10); 
}
 
void show_all(int *in, int n){

 printf("\n");

 for(int i=0; i < n; i++)
    printf("%d\t", in[i]);
      
 printf("\n");

}/*show_all*/


int main(int argc, char* argv[]) {

  int size = 5000000;

  /*Get current amounts number of GPU*/
  int nGPUs = 0;
  cudaGetDeviceCount(&nGPUs);
  printf("nGPUs = %d\n",nGPUs);

  for (int i = 0; i < nGPUs; i++) {
    cudaSetDevice(i);
    for (int j = 0; j < nGPUs; j++) {
      if (i != j) cudaDeviceEnablePeerAccess(j, 0);
    }
  }

  /*List GPU Device*/
  int *DeviceList = (int *) malloc ( nGPUs * sizeof(int));

  for(int i = 0; i < nGPUs; ++i)
      DeviceList[i] = i;
  
  /*NCCL Init*/
  ncclComm_t* comms         = (ncclComm_t*)  malloc(sizeof(ncclComm_t)  * nGPUs);  
  cudaStream_t* s           = (cudaStream_t*)malloc(sizeof(cudaStream_t)* nGPUs);
  ncclCommInitAll(comms, nGPUs, DeviceList); 

  /*General variables*/
  int *host       = (int*) malloc(size      * sizeof(int));
  int **sendbuff  = (int**)malloc(nGPUs     * sizeof(int*));
  int **recvbuff  = (int**)malloc(nGPUs     * sizeof(int*));
  
  /*Population of vector*/
  for(int i = 0; i < size; i++)
      host[i] = i + 1;

  // show_all(host, size);

  for(int g = 0; g < nGPUs; g++) {
      cudaSetDevice(DeviceList[g]);
      cudaStreamCreate(&s[g]);
      cudaMalloc(&sendbuff[g], size * sizeof(int));
      cudaMalloc(&recvbuff[g], size * sizeof(int));
     
      if(g == 0)
        cudaMemcpy(sendbuff[g], host, size * sizeof(int),cudaMemcpyHostToDevice);
       
  }/*for*/

  cudaSetDevice(0);
  float time;
  cudaEvent_t start, stop; cudaEventCreate(&start); cudaEventCreate(&stop);
  cudaEventRecord(start, 0);
  printf("start\n");
  
  int gpu = 0;
    ncclGroupStart();
    for (int dst_gpu = 0; dst_gpu < nGPUs; dst_gpu++) {
        ncclBroadcast(sendbuff[gpu], recvbuff[dst_gpu], size, ncclInt32, 0, comms[dst_gpu], s[dst_gpu]);
    }
    ncclGroupEnd(); 

  cudaSetDevice(gpu);
  cudaStreamSynchronize(s[gpu]);
  cudaSetDevice(0);

  printf("end\n");

  cudaEventRecord(stop, 0);       
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);
  printf("time: %f\n", time);

  CHECK_LAST_CUDA_ERROR();

  for(int g = 0; g < nGPUs; g++) {
      cudaSetDevice(DeviceList[g]);
      cudaStreamDestroy(s[g]);
  }

  for(int g = 0; g < nGPUs; g++) {
     ncclCommDestroy(comms[g]);
  }
  
  free(s);
  free(host);
  
  cudaFree(sendbuff);
  cudaFree(recvbuff);

  return 0;

}/*main*/