Commit fe3dcd2f authored by Kirill Terekhov's avatar Kirill Terekhov
Browse files

Overcome 2Gb limit in MPI communication

parent 1592c2a1
......@@ -2254,10 +2254,33 @@ namespace INMOST
typedef std::map<int, element_set > proc_elements;
typedef std::pair<int, buffer_type > proc_buffer_type;
typedef std::vector< proc_buffer_type > exch_buffer_type;
typedef std::vector<INMOST_MPI_Request> exch_reqs_type;
typedef struct
{
std::vector<unsigned> buf;
std::vector<unsigned> cnt;
std::vector<INMOST_MPI_Request> requests;
unsigned count()
{
unsigned ret = 0;
for(size_t i = 0; i < cnt.size(); ++i)
ret += cnt[i];
return ret;
}
void clear()
{
buf.clear();
buf.push_back(0);
cnt.clear();
requests.clear();
}
} exch_recv_reqs_type;
class exchange_data
{
public:
std::vector<INMOST_MPI_Request> send_reqs, recv_reqs;
exch_reqs_type send_reqs;
exch_recv_reqs_type recv_reqs;
exch_buffer_type send_buffers, recv_buffers;
};
private:
......@@ -2319,8 +2342,8 @@ namespace INMOST
void PrepareReceiveInner(Prepare todo, exch_buffer_type & send_bufs, exch_buffer_type & recv_bufs);
void ExchangeDataInnerBegin(const tag_set & tag, const parallel_storage & from, const parallel_storage & to, ElementType mask, MarkerType select, exchange_data & storage);
void ExchangeDataInnerEnd(const tag_set & tag, const parallel_storage & from, const parallel_storage & to, ElementType mask, MarkerType select, ReduceOperation op, exchange_data & storage);
void ExchangeBuffersInner(exch_buffer_type & send_bufs, exch_buffer_type & recv_bufs,std::vector<INMOST_MPI_Request> & send_reqs, std::vector<INMOST_MPI_Request> & recv_reqs);
std::vector<int> FinishRequests (std::vector<INMOST_MPI_Request> & recv_reqs);
void ExchangeBuffersInner(exch_buffer_type & send_bufs, exch_buffer_type & recv_bufs, exch_reqs_type & send_reqs, exch_recv_reqs_type & recv_reqs);
std::vector<int> FinishRequests (exch_recv_reqs_type & recv_reqs);
void SortParallelStorage(parallel_storage & ghost, parallel_storage & shared,ElementType mask);
void ReportParallelStorage();
void GatherParallelStorage(parallel_storage & ghost, parallel_storage & shared, ElementType mask);
......
......@@ -1448,7 +1448,8 @@ namespace INMOST
{
std::vector< MPI_Request > send_reqs, recv_reqs;
exch_reqs_type send_reqs;
exch_recv_reqs_type recv_reqs;
exch_buffer_type send_buffs(procs.size()), recv_buffs(procs.size());
std::vector<int> done;
......@@ -1707,7 +1708,8 @@ namespace INMOST
std::vector< std::vector<int> > message_recv(procs.size());
std::vector< element_set > elements(procs.size());
std::vector< MPI_Request > send_reqs,recv_reqs;
exch_reqs_type send_reqs;
exch_recv_reqs_type recv_reqs;
exch_buffer_type send_buffs(procs.size()), recv_buffs(procs.size());
ENTER_BLOCK();
......@@ -5634,7 +5636,7 @@ namespace INMOST
void Mesh::ExchangeBuffersInner(exch_buffer_type & send_bufs, exch_buffer_type & recv_bufs,
std::vector<INMOST_MPI_Request> & send_reqs, std::vector<INMOST_MPI_Request> & recv_reqs)
exch_reqs_type & send_reqs, exch_recv_reqs_type & recv_reqs)
{
ENTER_FUNC();
REPORT_VAL("exchange number", ++num_exchanges);
......@@ -5651,8 +5653,8 @@ namespace INMOST
MPI_Attr_get(comm,MPI_TAG_UB,&p_max_tag,&flag);
#endif //USE_MPI2
if( flag ) max_tag = *p_max_tag;
recv_reqs.resize(recv_bufs.size());
send_reqs.resize(send_bufs.size());
recv_reqs.clear();//resize(recv_bufs.size());
send_reqs.clear();//resize(send_bufs.size());
{
INMOST_DATA_BULK_TYPE stub;
REPORT_VAL("recv bufs size",recv_bufs.size());
......@@ -5660,22 +5662,58 @@ namespace INMOST
{
mpi_tag = ((parallel_mesh_unique_id+1)*mpisize*mpisize + (mpirank+mpisize+rand_num))%max_tag;
//mpi_tag = parallel_mesh_unique_id*mpisize*mpisize+recv_bufs[i].first*mpisize+mpirank;
INMOST_DATA_BIG_ENUM_TYPE shift = 0, chunk, datasize = recv_bufs[i].second.size();
int it = 0, mpi_tag_it; // for mpi tag
REPORT_VAL("mpi_tag",mpi_tag);
REPORT_VAL("size",recv_bufs[i].second.size());
REPORT_VAL("proc",recv_bufs[i].first);
REPORT_VAL("empty",recv_bufs[i].second.empty());
REPORT_MPI(MPI_Irecv(recv_bufs[i].second.empty()?&stub:&recv_bufs[i].second[0],static_cast<INMOST_MPI_SIZE>(recv_bufs[i].second.size()),MPI_PACKED,recv_bufs[i].first,mpi_tag,comm,&recv_reqs[i]));
REPORT_VAL("total size",datasize);
recv_reqs.cnt.push_back(0);
recv_reqs.buf.push_back(recv_reqs.buf.back());
do
{
MPI_Request req;
chunk = std::min(static_cast<INMOST_DATA_BIG_ENUM_TYPE>(INT_MAX),datasize - shift);
mpi_tag_it = (mpi_tag*1000 + it)%max_tag;
REPORT_VAL("it",it);
REPORT_VAL("using mpi_tag",mpi_tag_it);
REPORT_VAL("size",chunk);
REPORT_VAL("proc",recv_bufs[i].first);
REPORT_VAL("empty",recv_bufs[i].second.empty());
REPORT_MPI(MPI_Irecv(recv_bufs[i].second.empty()?&stub:&recv_bufs[i].second[shift],static_cast<INMOST_MPI_SIZE>(chunk),MPI_PACKED,recv_bufs[i].first,mpi_tag_it,comm,&req));
recv_reqs.requests.push_back(req);
recv_reqs.buf.back()++;
recv_reqs.cnt.back()++;
shift += chunk;
it++;
if( it >= 1000 )
std::cout << __FILE__ << ":" << __LINE__ << " too many iterations!!! " << it << " datasize " << datasize << std::endl;
} while( shift != datasize );
}
REPORT_VAL("send bufs size",send_bufs.size());
for(i = 0; i < send_bufs.size(); i++) //if( !send_bufs[i].second.empty() )
{
mpi_tag = ((parallel_mesh_unique_id+1)*mpisize*mpisize + (send_bufs[i].first+mpisize+rand_num))%max_tag;
//mpi_tag = parallel_mesh_unique_id*mpisize*mpisize+mpirank*mpisize+send_bufs[i].first;
INMOST_DATA_BIG_ENUM_TYPE shift = 0, chunk, datasize = send_bufs[i].second.size();
int it = 0, mpi_tag_it; // for mpi tag
REPORT_VAL("mpi_tag",mpi_tag);
REPORT_VAL("size",send_bufs[i].second.size());
REPORT_VAL("proc",send_bufs[i].first);
REPORT_VAL("empty",send_bufs[i].second.empty());
REPORT_MPI(MPI_Isend(send_bufs[i].second.empty()?&stub:&send_bufs[i].second[0],static_cast<INMOST_MPI_SIZE>(send_bufs[i].second.size()),MPI_PACKED,send_bufs[i].first,mpi_tag,comm,&send_reqs[i]));
REPORT_VAL("total size",datasize);
do
{
MPI_Request req;
chunk = std::min(static_cast<INMOST_DATA_BIG_ENUM_TYPE>(INT_MAX),datasize - shift);
mpi_tag_it = (mpi_tag*1000 + it)%max_tag;
REPORT_VAL("it",it);
REPORT_VAL("using mpi_tag",mpi_tag_it);
REPORT_VAL("size",chunk);
REPORT_VAL("proc",send_bufs[i].first);
REPORT_VAL("empty",send_bufs[i].second.empty());
REPORT_MPI(MPI_Isend(send_bufs[i].second.empty()?&stub:&send_bufs[i].second[shift],static_cast<INMOST_MPI_SIZE>(chunk),MPI_PACKED,send_bufs[i].first,mpi_tag_it,comm,&req));
send_reqs.push_back(req);
shift += chunk;
it++;
if( it >= 1000 )
std::cout << __FILE__ << ":" << __LINE__ << " too many iterations!!! " << it << " datasize " << datasize << std::endl;
} while( shift != datasize );
}
}
#else //USE_MPI
......@@ -5691,7 +5729,7 @@ namespace INMOST
{
ENTER_FUNC();
#if defined(USE_MPI)
int mpirank = GetProcessorRank();
int mpirank = GetProcessorRank(), mpisize = GetProcessorsNumber();
#if defined(USE_MPI_P2P) && defined(PREFFER_MPI_P2P)
unsigned i, end = send_bufs.size();
REPORT_MPI(MPI_Win_fence(MPI_MODE_NOPRECEDE,window)); //start exchange session
......@@ -5700,6 +5738,9 @@ namespace INMOST
for(i = 0; i < end; i++) shared_space[mpisize+i] = static_cast<INMOST_DATA_BIG_ENUM_TYPE>(send_bufs[i].second.size()+1); //put data to special part of the memory
for(i = 0; i < end; i++)
{
REPORT_VAL("put value", shared_space[mpisize+i]);
REPORT_VAL("destination", send_bufs[i].first);
REPORT_VAL("displacement", mpirank);
assert( send_bufs[i].first >= 0 && send_bufs[i].first < GetProcessorsNumber() );
REPORT_MPI(MPI_Put(&shared_space[mpisize+i],1,INMOST_MPI_DATA_BIG_ENUM_TYPE,send_bufs[i].first,mpirank,1,INMOST_MPI_DATA_BIG_ENUM_TYPE,window)); //request rdma
}
......@@ -5736,10 +5777,10 @@ namespace INMOST
#endif //USE_MPI2
if( flag ) max_tag = *p_max_tag;
REPORT_VAL("max_tag",max_tag);
std::vector<int> send_recv_size(send_bufs.size()+recv_bufs.size());
std::vector<INMOST_DATA_BIG_ENUM_TYPE> send_recv_size(send_bufs.size()+recv_bufs.size());
std::vector<INMOST_MPI_Request> reqs(send_bufs.size()+recv_bufs.size());
for(i = 0; i < send_bufs.size(); i++)
send_recv_size[i+recv_bufs.size()] = static_cast<int>(send_bufs[i].second.size());
send_recv_size[i+recv_bufs.size()] = static_cast<INMOST_DATA_BIG_ENUM_TYPE>(send_bufs[i].second.size());
REPORT_VAL("recv buffers size",recv_bufs.size());
for(i = 0; i < recv_bufs.size(); i++)
{
......@@ -5747,7 +5788,7 @@ namespace INMOST
REPORT_VAL("origin",recv_bufs[i].first);
REPORT_VAL("mpi_tag",mpi_tag);
//mpi_tag = parallel_mesh_unique_id*mpisize*mpisize+recv_bufs[i].first*mpisize+mpirank;
REPORT_MPI(MPI_Irecv(&send_recv_size[i],1,MPI_INT,recv_bufs[i].first,mpi_tag,comm,&reqs[i]));
REPORT_MPI(MPI_Irecv(&send_recv_size[i],1,INMOST_MPI_DATA_BIG_ENUM_TYPE,recv_bufs[i].first,mpi_tag,comm,&reqs[i]));
}
REPORT_VAL("send buffers size",send_bufs.size());
for(i = 0; i < send_bufs.size(); i++)
......@@ -5757,7 +5798,7 @@ namespace INMOST
REPORT_VAL("mpi_tag",mpi_tag);
REPORT_VAL("size",send_recv_size[i+recv_bufs.size()]);
//mpi_tag = parallel_mesh_unique_id*mpisize*mpisize+mpirank*mpisize+send_bufs[i].first;
REPORT_MPI(MPI_Isend(&send_recv_size[i+recv_bufs.size()],1,MPI_INT,send_bufs[i].first,mpi_tag,comm,&reqs[i+recv_bufs.size()]));
REPORT_MPI(MPI_Isend(&send_recv_size[i+recv_bufs.size()],1,INMOST_MPI_DATA_BIG_ENUM_TYPE,send_bufs[i].first,mpi_tag,comm,&reqs[i+recv_bufs.size()]));
}
if( !recv_bufs.empty() )
{
......@@ -5780,50 +5821,37 @@ namespace INMOST
{
REPORT_STR("Unknown source");
#if defined(USE_MPI_P2P)
int mpirank = GetProcessorRank(),mpisize = GetProcessorsNumber();
unsigned i, end = static_cast<unsigned>(send_bufs.size());
REPORT_MPI(MPI_Win_fence(MPI_MODE_NOPRECEDE,window)); //start exchange session
memset(shared_space,0,sizeof(INMOST_DATA_BIG_ENUM_TYPE)*mpisize); //zero bits where we receive data
//REPORT_MPI(MPI_Win_fence( MPI_MODE_NOPRECEDE,window)); //start exchange session
REPORT_MPI(MPI_Win_fence( 0,window)); //wait memset finish
REPORT_MPI(MPI_Win_fence( 0,window)); //wait memset finish
for(i = 0; i < end; i++) shared_space[mpisize+i] = static_cast<INMOST_DATA_BIG_ENUM_TYPE>(send_bufs[i].second.size()+1); //put data to special part of the memory
for(i = 0; i < end; i++)
{
REPORT_VAL("put value", shared_space[mpisize+i]);
REPORT_VAL("destination", send_bufs[i].first);
REPORT_VAL("displacement", mpirank);
//~ if( !(send_bufs[i].first >= 0 && send_bufs[i].first < GetProcessorsNumber()) )
//~ std::cout << "requested put to " << send_bufs[i].first << "/" << GetProcessorsNumber() << std::endl;
assert( send_bufs[i].first >= 0 && send_bufs[i].first < GetProcessorsNumber() );
REPORT_MPI(MPI_Put(&shared_space[mpisize+i],1,INMOST_MPI_DATA_BIG_ENUM_TYPE,send_bufs[i].first,mpirank,1,INMOST_MPI_DATA_BIG_ENUM_TYPE,window)); //request rdma to target processors for each value
}
REPORT_MPI(MPI_Win_fence(MPI_MODE_NOSTORE | MPI_MODE_NOSUCCEED,window)); //end exchange session
if( todo == UnknownSize )
{
end = static_cast<unsigned>(recv_bufs.size());
for(i = 0; i < end; i++)
recv_bufs[i].second.resize(shared_space[recv_bufs[i].first]-1);
}
else if( todo == UnknownSource )
{
recv_bufs.clear();
for(int ii = 0; ii < mpisize; ii++)
if( shared_space[ii] > 0 )
{
REPORT_VAL("position", ii);
REPORT_VAL("value", shared_space[ii]);
recv_bufs.push_back(proc_buffer_type(ii,std::vector<INMOST_DATA_BULK_TYPE>(shared_space[ii]-1))); // this call would be optimized by compiler
}
REPORT_VAL("recvs",recv_bufs.size());
}
recv_bufs.clear();
for(int ii = 0; ii < mpisize; ii++)
if( shared_space[ii] > 0 )
{
REPORT_VAL("position", ii);
REPORT_VAL("value", shared_space[ii]);
recv_bufs.push_back(proc_buffer_type(ii,std::vector<INMOST_DATA_BULK_TYPE>(shared_space[ii]-1))); // this call would be optimized by compiler
}
REPORT_VAL("recvs",recv_bufs.size());
#else //USE_MPI_P2P
int mpisize = GetProcessorsNumber(),mpirank = GetProcessorRank();
{
std::vector< unsigned > sends_dest_and_size(send_bufs.size()*2+1);
std::vector< INMOST_DATA_BIG_ENUM_TYPE > sends_dest_and_size(send_bufs.size()*2+1);
for(int i = 0; i < static_cast<int>(send_bufs.size()); i++)
{
sends_dest_and_size[i*2+0] = send_bufs[i].first;
sends_dest_and_size[i*2+1] = static_cast<unsigned>(send_bufs[i].second.size());
sends_dest_and_size[i*2+0] = static_cast<INMOST_DATA_BIG_ENUM_TYPE>(send_bufs[i].first);
sends_dest_and_size[i*2+1] = static_cast<INMOST_DATA_BIG_ENUM_TYPE>(send_bufs[i].second.size());
}
unsigned recvsize = 0;
int k,j;
......@@ -5835,8 +5863,8 @@ namespace INMOST
recvsize += allsize[k];
for(k = 1; k < mpisize+1; k++)
displs[k] = displs[k-1]+allsize[k-1];
std::vector<unsigned> recvs_dest_and_size(recvsize+1);
REPORT_MPI(MPI_Allgatherv(&sends_dest_and_size[0],static_cast<INMOST_MPI_SIZE>(send_bufs.size()*2),MPI_UNSIGNED,&recvs_dest_and_size[0],&allsize[0],&displs[0],MPI_UNSIGNED,comm));
std::vector<INMOST_DATA_BIG_ENUM_TYPE> recvs_dest_and_size(recvsize+1);
REPORT_MPI(MPI_Allgatherv(&sends_dest_and_size[0],static_cast<INMOST_MPI_SIZE>(send_bufs.size()*2),INMOST_MPI_DATA_BIG_ENUM_TYPE,&recvs_dest_and_size[0],&allsize[0],&displs[0],INMOST_MPI_DATA_BIG_ENUM_TYPE,comm));
recv_bufs.clear();
for(k = 0; k < mpisize; k++)
for(j = displs[k]; j < displs[k+1]; j+=2)
......@@ -5848,7 +5876,6 @@ namespace INMOST
recv_bufs.back().second.resize(recvs_dest_and_size[j+1]);
}
}
}
#endif //USE_MPI_P2P
}
......@@ -5860,25 +5887,41 @@ namespace INMOST
EXIT_FUNC();
}
std::vector<int> Mesh::FinishRequests(std::vector<INMOST_MPI_Request> & recv_reqs)
std::vector<int> Mesh::FinishRequests(exch_recv_reqs_type & recv_reqs)
{
std::vector<int> ret;
ENTER_FUNC();
#if defined(USE_MPI)
int outcount = 0;
REPORT_VAL("requests",recv_reqs.size());
if( recv_reqs.empty() )
int outcount = 0, ierr;
REPORT_VAL("requests",recv_reqs.requests.size());
if( recv_reqs.requests.empty() )
ret.clear();
else
{
ret.resize(recv_reqs.size(),-1);
if( !recv_reqs.empty() )
std::vector<int> retreq;
while( ret.empty() && recv_reqs.count() )
{
REPORT_MPI(MPI_Waitsome(static_cast<INMOST_MPI_SIZE>(recv_reqs.size()),&recv_reqs[0],&outcount,&ret[0],MPI_STATUSES_IGNORE));
retreq.resize(recv_reqs.requests.size(),-1);
REPORT_MPI(ierr = MPI_Waitsome(static_cast<INMOST_MPI_SIZE>(recv_reqs.requests.size()),&recv_reqs.requests[0],&outcount,&retreq[0],MPI_STATUSES_IGNORE));
if( ierr != MPI_SUCCESS ) MPI_Abort(GetCommunicator(),__LINE__);
if( outcount == MPI_UNDEFINED )
{
std::cout << __FILE__ << ":" << __LINE__ << " no more requests, error? " << std::endl;
retreq.clear();
break;
}
else retreq.resize(outcount);
for(size_t i = 0; i < retreq.size(); ++i)
{
size_t p = retreq[i]; //corresponds to closed request
//determine number of buffer corresponding to the request
size_t k = std::lower_bound(recv_reqs.buf.begin(),recv_reqs.buf.end(),p) - recv_reqs.buf.begin();
//decrase amount of messages we wait for
recv_reqs.cnt[k]--;
if( recv_reqs.cnt[k] == 0 ) // all messages received
ret.push_back(k);
}
}
else outcount = MPI_UNDEFINED;
if( outcount == MPI_UNDEFINED ) ret.clear();
else ret.resize(outcount);
}
#else
(void) recv_reqs;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment