Commit 8ac91d56 authored by Kirill Terekhov's avatar Kirill Terekhov
Browse files

remove choice for parallel communication strategy; change type of P2P...

remove choice for parallel communication strategy; change type of P2P communications to support large message sizes
parent bc6e4561
......@@ -2446,9 +2446,9 @@ int main(int argc, char ** argv)
mesh = new Mesh();
printf("Started loading mesh.\n");
tt = Timer();
//mesh->SetCommunicator(INMOST_MPI_COMM_WORLD);
mesh->SetParallelFileStrategy(0);
mesh->SetParallelStrategy(1);
//mesh->SetCommunicator(INMOST_MPI_COMM_WORLD);
mesh->SetParallelFileStrategy(0);
//mesh->SetParallelStrategy(1);
mesh->SetFileOption("VERBOSITY","2");
if( argc < 2 )
{
......
......@@ -450,11 +450,11 @@ int main(int argc,char ** argv)
BARRIER
if( m->GetProcessorRank() == 0 ) std::cout << "Exchange phi: " << Timer()-ttt << std::endl;
for(int s = 0; s < 3; s++)
//for(int s = 0; s < 3; s++)
{
if( m->GetProcessorRank() == 0 ) std::cout << "strategy: " << s << std::endl;
// if( m->GetProcessorRank() == 0 ) std::cout << "strategy: " << s << std::endl;
m->SetParallelStrategy(s);
//m->SetParallelStrategy(s);
ttt = Timer();
m->ExchangeData(tensor_K,CELL,0);
......
......@@ -2300,9 +2300,8 @@ namespace INMOST
#endif
#if defined(USE_MPI_P2P)
INMOST_MPI_Win window;
unsigned * shared_space;
INMOST_DATA_BIG_ENUM_TYPE * shared_space;
#endif
int parallel_strategy;
int parallel_file_strategy;
private:
void ListTags (tag_set & list);
......@@ -2419,10 +2418,12 @@ namespace INMOST
/// Algorithms above are implemented in Mesh::ExchangeBuffersInner
/// @see Mesh::PrepareReceiveInner
/// @see Mesh::ExchangeBuffersInner
void SetParallelStrategy(int strategy){assert( !(strategy < 0 || strategy > 3) ); parallel_strategy = strategy;}
//no longer used
//void SetParallelStrategy(int strategy){assert( !(strategy < 0 || strategy > 3) ); parallel_strategy = strategy;}
/// Retrieve currently set parallel strategy.
/// @see Mesh::SetParallelStrategy
int GetParallelStrategy() {return parallel_strategy;}
//no longer used
//int GetParallelStrategy() {return parallel_strategy;}
/// This strategy correspond only to internal ".pmf" mesh format.
/// There are two availible strategies for ".pmf" files loading and saving:
///
......
......@@ -599,7 +599,6 @@ namespace INMOST
}
}
}
//std::cout << "parallel strategy " << parallel_strategy << " file strategy " << parallel_file_strategy << std::endl;
io_converter<INMOST_DATA_INTEGER_TYPE ,INMOST_DATA_REAL_TYPE> iconv;
io_converter<INMOST_DATA_ENUM_TYPE ,INMOST_DATA_REAL_TYPE> uconv;
io_converter<INMOST_DATA_BIG_ENUM_TYPE,INMOST_DATA_REAL_TYPE> buconv;
......
......@@ -946,7 +946,6 @@ namespace INMOST
#if defined(USE_MPI)
randomizer = Random();
parallel_strategy = 1;
parallel_file_strategy = 1;
......@@ -964,7 +963,7 @@ namespace INMOST
#if defined(USE_MPI_P2P)
int err;
REPORT_MPI(err = MPI_Alloc_mem(GetProcessorsNumber()*sizeof(unsigned)*2,MPI_INFO_NULL,&shared_space));
REPORT_MPI(err = MPI_Alloc_mem(GetProcessorsNumber()*sizeof(INMOST_DATA_BIG_ENUM_TYPE)*2,MPI_INFO_NULL,&shared_space));
if( err )
{
int errclass;
......@@ -980,7 +979,7 @@ namespace INMOST
std::cout << std::endl;
MPI_Abort(comm,err);
}
REPORT_MPI(MPI_Win_create(shared_space,sizeof(unsigned)*GetProcessorsNumber()*2,sizeof(unsigned),MPI_INFO_NULL,comm,&window));
REPORT_MPI(MPI_Win_create(shared_space,sizeof(INMOST_DATA_BIG_ENUM_TYPE)*GetProcessorsNumber()*2,sizeof(INMOST_DATA_BIG_ENUM_TYPE),MPI_INFO_NULL,comm,&window));
#endif //USE_MPI_P2P
#else //USE_MPI
(void) _comm;
......@@ -3360,7 +3359,7 @@ namespace INMOST
DeleteTag(pack_position);
storage.send_buffers.resize(num_send);
storage.recv_buffers.resize(num_recv);
if( unknown_size && parallel_strategy != 0 )
if( unknown_size )
{
REPORT_STR("prepare receive with unknown size");
PrepareReceiveInner(UnknownSize,storage.send_buffers,storage.recv_buffers);
......@@ -5654,27 +5653,6 @@ namespace INMOST
if( flag ) max_tag = *p_max_tag;
recv_reqs.resize(recv_bufs.size());
send_reqs.resize(send_bufs.size());
REPORT_VAL("strategy",parallel_strategy);
if( parallel_strategy == 0 )
{
for(i = 0; i < send_bufs.size(); i++)
{
mpi_tag = ((parallel_mesh_unique_id+1)*mpisize*mpisize + (send_bufs[i].first+mpisize+rand_num))%max_tag;
REPORT_MPI(MPI_Isend(&send_bufs[i].second[0],static_cast<INMOST_MPI_SIZE>(send_bufs[i].second.size()),MPI_PACKED,send_bufs[i].first,mpi_tag,comm,&send_reqs[i]));
}
for(i = 0; i < recv_bufs.size(); i++)
{
int buffer_size;
MPI_Status recv_stat;
mpi_tag = ((parallel_mesh_unique_id+1)*mpisize*mpisize + (mpirank+mpisize+rand_num))%max_tag;
REPORT_MPI(MPI_Probe(MPI_ANY_SOURCE,mpi_tag,comm,&recv_stat));
REPORT_MPI(MPI_Get_count(&recv_stat,MPI_PACKED,&buffer_size));
recv_bufs[i].second.resize(buffer_size);
recv_bufs[i].first = recv_stat.MPI_SOURCE;
REPORT_MPI(MPI_Irecv(&recv_bufs[i].second[0],static_cast<INMOST_MPI_SIZE>(recv_bufs[i].second.size()),MPI_PACKED,recv_stat.MPI_SOURCE,mpi_tag,comm,&recv_reqs[i]));
}
}
else if( parallel_strategy == 1 )
{
INMOST_DATA_BULK_TYPE stub;
REPORT_VAL("recv bufs size",recv_bufs.size());
......@@ -5700,23 +5678,6 @@ namespace INMOST
REPORT_MPI(MPI_Isend(send_bufs[i].second.empty()?&stub:&send_bufs[i].second[0],static_cast<INMOST_MPI_SIZE>(send_bufs[i].second.size()),MPI_PACKED,send_bufs[i].first,mpi_tag,comm,&send_reqs[i]));
}
}
else if( parallel_strategy == 2 )
{
INMOST_DATA_BULK_TYPE stub;
REPORT_VAL("recv bufs size",recv_bufs.size());
for(i = 0; i < recv_bufs.size(); i++) //if( !recv_bufs[i].second.empty() )
{
mpi_tag = ((parallel_mesh_unique_id+1)*mpisize*mpisize + (mpirank+mpisize+rand_num))%max_tag;
REPORT_MPI(MPI_Irecv(recv_bufs[i].second.empty()? &stub : &recv_bufs[i].second[0],static_cast<INMOST_MPI_SIZE>(recv_bufs[i].second.size()),MPI_PACKED,recv_bufs[i].first,mpi_tag,comm,&recv_reqs[i]));
}
REPORT_MPI(MPI_Barrier(comm));
REPORT_VAL("send bufs size",send_bufs.size());
for(i = 0; i < send_bufs.size(); i++)// if( !send_bufs[i].second.empty() )
{
mpi_tag = ((parallel_mesh_unique_id+1)*mpisize*mpisize + (send_bufs[i].first+mpisize+rand_num))%max_tag;
REPORT_MPI(MPI_Irsend(send_bufs[i].second.empty()? &stub : &send_bufs[i].second[0],static_cast<INMOST_MPI_SIZE>(send_bufs[i].second.size()),MPI_PACKED,send_bufs[i].first,mpi_tag,comm,&send_reqs[i]));
}
}
#else //USE_MPI
(void) send_bufs;
(void) recv_bufs;
......@@ -5728,30 +5689,22 @@ namespace INMOST
void Mesh::PrepareReceiveInner(Prepare todo, exch_buffer_type & send_bufs, exch_buffer_type & recv_bufs)
{
if( parallel_strategy == 0 && todo == UnknownSize ) return; //in this case we know all we need
ENTER_FUNC();
#if defined(USE_MPI)
int mpirank = GetProcessorRank();
#if defined(USE_MPI_P2P) && defined(PREFFER_MPI_P2P)
unsigned i, end = send_bufs.size();
REPORT_MPI(MPI_Win_fence(MPI_MODE_NOPRECEDE,window)); //start exchange session
memset(shared_space,0,sizeof(unsigned)*mpisize); //zero bits where we receive data
memset(shared_space,0,sizeof(INMOST_DATA_BIG_ENUM_TYPE)*mpisize); //zero bits where we receive data
REPORT_MPI(MPI_Win_fence(0,window)); //wait memset finish
for(i = 0; i < end; i++) shared_space[mpisize+i] = send_bufs[i].second.size()+1; //put data to special part of the memory
for(i = 0; i < end; i++) shared_space[mpisize+i] = static_cast<INMOST_DATA_BIG_ENUM_TYPE>(send_bufs[i].second.size()+1); //put data to special part of the memory
for(i = 0; i < end; i++)
{
assert( send_bufs[i].first >= 0 && send_bufs[i].first < GetProcessorsNumber() );
REPORT_MPI(MPI_Put(&shared_space[mpisize+i],1,MPI_UNSIGNED,send_bufs[i].first,mpirank,1,MPI_UNSIGNED,window)); //request rdma
REPORT_MPI(MPI_Put(&shared_space[mpisize+i],1,INMOST_MPI_DATA_BIG_ENUM_TYPE,send_bufs[i].first,mpirank,1,INMOST_MPI_DATA_BIG_ENUM_TYPE,window)); //request rdma
}
REPORT_MPI(MPI_Win_fence(MPI_MODE_NOSTORE | MPI_MODE_NOSUCCEED,window)); //end exchange session
if( parallel_strategy == 0 )
{
unsigned num = 0;
for(int ii = 0; ii < mpisize; ii++) if( shared_space[ii] > 0 ) num++;
recv_bufs.resize(num);
}
else if( todo == UnknownSize )
if( todo == UnknownSize )
{
end = recv_bufs.size();
for(i = 0; i < end; i++)
......@@ -5768,7 +5721,6 @@ namespace INMOST
if( todo == UnknownSize )
{
REPORT_STR("Unknown size");
if( parallel_strategy != 0 )
{
REPORT_VAL("exchange number", ++num_exchanges);
unsigned i;
......@@ -5831,10 +5783,10 @@ namespace INMOST
int mpirank = GetProcessorRank(),mpisize = GetProcessorsNumber();
unsigned i, end = static_cast<unsigned>(send_bufs.size());
REPORT_MPI(MPI_Win_fence(MPI_MODE_NOPRECEDE,window)); //start exchange session
memset(shared_space,0,sizeof(unsigned)*mpisize); //zero bits where we receive data
memset(shared_space,0,sizeof(INMOST_DATA_BIG_ENUM_TYPE)*mpisize); //zero bits where we receive data
//REPORT_MPI(MPI_Win_fence( MPI_MODE_NOPRECEDE,window)); //start exchange session
REPORT_MPI(MPI_Win_fence( 0,window)); //wait memset finish
for(i = 0; i < end; i++) shared_space[mpisize+i] = static_cast<unsigned>(send_bufs[i].second.size()+1); //put data to special part of the memory
for(i = 0; i < end; i++) shared_space[mpisize+i] = static_cast<INMOST_DATA_BIG_ENUM_TYPE>(send_bufs[i].second.size()+1); //put data to special part of the memory
for(i = 0; i < end; i++)
{
REPORT_VAL("put value", shared_space[mpisize+i]);
......@@ -5843,16 +5795,10 @@ namespace INMOST
//~ if( !(send_bufs[i].first >= 0 && send_bufs[i].first < GetProcessorsNumber()) )
//~ std::cout << "requested put to " << send_bufs[i].first << "/" << GetProcessorsNumber() << std::endl;
assert( send_bufs[i].first >= 0 && send_bufs[i].first < GetProcessorsNumber() );
REPORT_MPI(MPI_Put(&shared_space[mpisize+i],1,MPI_UNSIGNED,send_bufs[i].first,mpirank,1,MPI_UNSIGNED,window)); //request rdma to target processors for each value
REPORT_MPI(MPI_Put(&shared_space[mpisize+i],1,INMOST_MPI_DATA_BIG_ENUM_TYPE,send_bufs[i].first,mpirank,1,INMOST_MPI_DATA_BIG_ENUM_TYPE,window)); //request rdma to target processors for each value
}
REPORT_MPI(MPI_Win_fence(MPI_MODE_NOSTORE | MPI_MODE_NOSUCCEED,window)); //end exchange session
if( parallel_strategy == 0 )
{
unsigned num = 0;
for(int ii = 0; ii < mpisize; ii++) if( shared_space[ii] > 0 ) num++;
recv_bufs.resize(num);
}
else if( todo == UnknownSize )
if( todo == UnknownSize )
{
end = static_cast<unsigned>(recv_bufs.size());
for(i = 0; i < end; i++)
......@@ -5872,17 +5818,6 @@ namespace INMOST
}
#else //USE_MPI_P2P
int mpisize = GetProcessorsNumber(),mpirank = GetProcessorRank();
if( parallel_strategy == 0 )
{
std::vector<unsigned> recvs_at_proc_temp(mpisize,0);
std::vector<unsigned> recvs_at_proc(mpisize,0);
for(unsigned i = 0; i < send_bufs.size(); i++)
recvs_at_proc_temp[send_bufs[i].first]++;
REPORT_MPI(MPI_Allreduce(&recvs_at_proc_temp[0],&recvs_at_proc[0],mpisize,MPI_UNSIGNED,MPI_SUM,comm));
recv_bufs.resize(recvs_at_proc[mpirank]);
}
else if( parallel_strategy == 1 || parallel_strategy == 2 )
{
std::vector< unsigned > sends_dest_and_size(send_bufs.size()*2+1);
for(int i = 0; i < static_cast<int>(send_bufs.size()); i++)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment