/* * lbpi.c * * load balanced version of PI * * computes pi by a Montecarlo method * * usage: * * lbpi <no_blocks> <blocksize> * */ #include <stdio.h> #include <stdlib.h> #include <math.h> #include <sys/utsname.h> #include <mpi.h> #define TAG_WORK 1 #define TAG_RESULT 2 #define TAG_REQUEST 3 #define TAG_REPORT 4 #define DEFAULT_N_BLOCKS 10 /* default for number of blocks */ #define DEFAULT_BLOCKSIZE 200000 /* number of iterations per block */ /* * data to describe amount of work done by a process/host */ typedef struct { char hostname[MPI_MAX_PROCESSOR_NAME]; int id; long count; } hostinfo; void main(int argc, char *argv[]) { int myid; MPI_Datatype hostinfo_type; void create_hostinfo_type(MPI_Datatype *newtype); void master(int argc, char *argv[], MPI_Datatype hostinfo_type); void slave(MPI_Datatype hostinfo_type, int myid); /* start MPI */ MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &myid); /* create datatype for hostinfo structure */ create_hostinfo_type(&hostinfo_type); if (myid == 0) { master(argc, argv, hostinfo_type); } else { slave(hostinfo_type, myid); } /* leave MPI */ MPI_Finalize(); } void master(int argc, char *argv[], MPI_Datatype hostinfo_type) { long n_blocks; /* total number of blocks */ long blocksize; /* no. of points per block */ long blockcount; /* no. of blocks left */ int nproc; /* number of processes */ long hits; /* number of hits per process */ long totalhits = 0; /* total number of hits */ int slaveid; double pi; MPI_Status status; int i; void show_work(int nproc, MPI_Datatype hostinfo_type); /* get total work and work per job */ if (argc != 3) { n_blocks = DEFAULT_N_BLOCKS; blocksize = DEFAULT_BLOCKSIZE; } else { n_blocks = atol(argv[1]); blocksize = atol(argv[2]); } blockcount = n_blocks; MPI_Comm_size(MPI_COMM_WORLD, &nproc); /* start with one block per processor (assuming nproc < n_blocks!) */ for (i=1; i<nproc; i++) { MPI_Send(&blocksize, 1, MPI_LONG, i, TAG_WORK, MPI_COMM_WORLD); } blockcount -= (nproc - 1); /* receive results and send additional blocks */ printf("blocks received:\n"); while (blockcount > 0) { MPI_Recv(&hits, 1, MPI_LONG, MPI_ANY_SOURCE, TAG_RESULT, MPI_COMM_WORLD, &status); slaveid = status.MPI_SOURCE; printf("."); fflush(stdout); totalhits += hits; MPI_Send(&blocksize, 1, MPI_LONG, slaveid, TAG_WORK, MPI_COMM_WORLD); blockcount--; } /* get last results */ for (i = 1; i < nproc; i++) { MPI_Recv(&hits, 1, MPI_LONG, MPI_ANY_SOURCE, TAG_RESULT, MPI_COMM_WORLD, &status); printf("."); fflush(stdout); totalhits += hits; } /* print result */ pi = 4 * totalhits/(double)(n_blocks * blocksize); printf("\nPI = %lf\n", pi); /* get distribution protocol by using a new tag */ show_work(nproc, hostinfo_type); } #include <strings.h> #include <sys/types.h> void slave(MPI_Datatype hostinfo_type, int myid) { long mytotal; /* no. of points per block */ long myhits; /* no. of hits per block */ MPI_Status status; hostinfo myinfo; hostinfo * infos; /* for the slaves just a dummy */ int namelength; long calc(long total); /* initialize random generator */ srand(getpid()); /* fill in workload protocol */ myinfo.id = myid; MPI_Get_processor_name(myinfo.hostname, &namelength); myinfo.count = 0; /* get work from master until workload is requested */ do { MPI_Recv(&mytotal, 1, MPI_LONG, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status); if (status.MPI_TAG == TAG_REQUEST) { break; } /* compute partial result */ myhits = calc(mytotal); /* send result to master */ MPI_Send(&myhits, 1, MPI_LONG, 0, TAG_RESULT, MPI_COMM_WORLD); /* update workload */ myinfo.count++; } while (1); /* send workload */ MPI_Gather(&myinfo, 1, hostinfo_type, infos, 1, hostinfo_type, 0, MPI_COMM_WORLD); } long calc(long total) { /* * compute total random points in the unit square * and return the number of hits in the sector (x*x + y*y < 1) */ double x, y; /* random coordinates */ long hits = 0; /* number of hits */ int i; for(i=0; i<total; i++) { x = ((double) rand())/RAND_MAX; y = ((double) rand())/RAND_MAX; if ( x*x + y*y <= 1.0 ) { hits++; } } return(hits); } void show_work(int nproc, MPI_Datatype hostinfo_type) { /* sends request to slaves, gathers workload protocols and prints them */ int i; long dummy = 0; hostinfo myinfo; /* for root just a dummy */ hostinfo * infos; MPI_Status status; /* send request for workload to all slaves */ for (i=1; i<nproc; i++) { MPI_Send(&dummy, 1, MPI_LONG, i, TAG_REQUEST, MPI_COMM_WORLD); } /* dummy info[0] needed for gather operation */ infos = (hostinfo *) malloc(nproc * sizeof(hostinfo)); /* gather protocols */ MPI_Gather(&myinfo, 1, hostinfo_type, infos, 1, hostinfo_type, 0, MPI_COMM_WORLD); /* print them */ printf("\n\n workload distribution:\n\n"); printf(" machine id : # of blocks\n"); for(i=1; i<nproc; i++) { printf(" %-20s %2d : %4d\n", infos[i].hostname, infos[i].id, infos[i].count); } } void create_hostinfo_type(MPI_Datatype *newtype) { /* creates type with MPI_MAX_PROCESSOR_NAME chars, one int and one long */ hostinfo template; /* we need one element to compute displacements */ MPI_Datatype type[3]; int blocklength[3] = {MPI_MAX_PROCESSOR_NAME, 1, 1}; MPI_Aint disp[3]; type[0] = MPI_CHAR; type[1] = MPI_INT; type[2] = MPI_LONG; /* compute displacements of structure components */ MPI_Address(&template, disp); MPI_Address(&template.id, disp+1); MPI_Address(&template.count, disp+2); disp[2] -= disp[0]; disp[1] -= disp[0]; disp[0] = 0; MPI_Type_struct(3, blocklength, disp, type, newtype); MPI_Type_commit(newtype); }