Commit 367fc060 authored by Kirill Terekhov's avatar Kirill Terekhov
Browse files

Fix DATA_REFERENCE exchange

Add another round of communication to inform owner processors about
reference elements that were sent to other processors
parent bdebc580
......@@ -2258,7 +2258,7 @@ namespace INMOST
void ComputeSharedProcs ();
proc_elements ComputeSharedSkinSet(ElementType bridge, MarkerType marker = 0);
void PackTagData (const Tag & tag, const elements_by_type & elements, int destination, ElementType mask, MarkerType select, buffer_type & buffer);
void UnpackTagData (const Tag & tag, const elements_by_type & elements, int source, ElementType mask, MarkerType select, buffer_type & buffer, int & position, ReduceOperation op);
void UnpackTagData (const Tag & tag, const elements_by_type & elements, int source, ElementType mask, MarkerType select, buffer_type & buffer, int & position, ReduceOperation op, proc_elements * send_elements = NULL);
void PackElementsData (element_set & input, buffer_type & buffer, int destination, const std::vector<std::string> & tag_list);
void UnpackElementsData (element_set & output, buffer_type & buffer, int source, int & position, std::vector<std::string> & tag_list);
void PrepareReceiveInner(Prepare todo, exch_buffer_type & send_bufs, exch_buffer_type & recv_bufs);
......@@ -2268,8 +2268,9 @@ namespace INMOST
std::vector<int> FinishRequests (std::vector<INMOST_MPI_Request> & recv_reqs);
void SortParallelStorage(parallel_storage & ghost, parallel_storage & shared,ElementType mask);
void GatherParallelStorage(parallel_storage & ghost, parallel_storage & shared, ElementType mask);
void InformElementsOwners(proc_elements & send_elements, exchange_data & storage);
public:
bool FindSharedGhost(int global_id, INMOST_DATA_INTEGER_TYPE el_type_num, HandleType& res);
//bool FindSharedGhost(int global_id, INMOST_DATA_INTEGER_TYPE el_type_num, HandleType& res);
#if defined(USE_PARALLEL_WRITE_TIME)
//this part is needed to test parallel performance
void Enter ();
......
......@@ -2519,7 +2519,24 @@ namespace INMOST
if( !array_size_send.empty() ) MPI_Pack(&array_size_send[0],static_cast<INMOST_MPI_SIZE>(array_size_send.size()),INMOST_MPI_DATA_ENUM_TYPE,&buffer[0],static_cast<INMOST_MPI_SIZE>(buffer.size()),&position,comm);
if( !array_data_send.empty() ) MPI_Pack(&array_data_send[0],static_cast<INMOST_MPI_SIZE>(array_data_send.size()/bytes),tag.GetBulkDataType(),&buffer[0],static_cast<INMOST_MPI_SIZE>(buffer.size()),&position,comm);
if( tag.GetDataType() == DATA_REFERENCE )
PackElementsData(pack_elements,buffer,destination,std::vector<std::string>());
{
std::vector<std::string> tag_list;
ListTagNames(tag_list);
{
std::vector<std::string>::iterator it = tag_list.begin();
while( it != tag_list.end() )
{
if( it->substr(0,9) == "PROTECTED" )
it = tag_list.erase(it);
else if(GetTag(*it).GetDataType() == DATA_REFERENCE)
it = tag_list.erase(it);
else if(GetTag(*it).GetDataType() == DATA_REMOTE_REFERENCE)
it = tag_list.erase(it);
else it++;
}
}
PackElementsData(pack_elements,buffer,destination,tag_list);
}
buffer.resize(position);
REPORT_VAL("Buffer size after pack",buffer.size());
if( size_send < 6 )
......@@ -2542,7 +2559,7 @@ namespace INMOST
}
/*
bool Mesh::FindSharedGhost(int global_id, INMOST_DATA_INTEGER_TYPE el_type_num, HandleType& res)
{
int dim = el_type_num;
......@@ -2569,10 +2586,11 @@ namespace INMOST
}
}
return false;
}
}
*/
void Mesh::UnpackTagData(const Tag & tag, const elements_by_type & elements, int source, ElementType mask, MarkerType select, buffer_type & buffer, int & position, ReduceOperation op)
void Mesh::UnpackTagData(const Tag & tag, const elements_by_type & elements, int source, ElementType mask, MarkerType select, buffer_type & buffer, int & position, ReduceOperation op, proc_elements * send_elements)
{
(void) mask;
//if( tag.GetDataType() == DATA_REMOTE_REFERENCE) return; //NOT IMPLEMENTED TODO 14
......@@ -2619,8 +2637,22 @@ namespace INMOST
}
if( tag.GetDataType() == DATA_REFERENCE )
{
std::vector<std::string> tag_recv; //should remain empty
int mpirank = GetProcessorRank();
std::vector<std::string> tag_recv;
UnpackElementsData(unpack_elements,buffer,source,position,tag_recv);
assert(send_elements);
for(element_set::iterator it = unpack_elements.begin(); it != unpack_elements.end(); it++)
{
Storage::integer owner = IntegerDF(*it,tag_owner);
if( owner == mpirank ) continue;
Storage::integer_array proc = IntegerArrayDV(*it,tag_processors);
Storage::integer_array::iterator ip = std::lower_bound(proc.begin(),proc.end(),mpirank);
if( ip == proc.end() || (*ip) != mpirank )
{
proc.insert(ip,mpirank);
(*send_elements)[owner].push_back(*it);
}
}
}
REPORT_VAL("Position after unpack",position);
for(int i = ElementNum(NODE); i <= ElementNum(ESET); i++) if( (recv_mask[0] & ElementTypeFromDim(i)) )
......@@ -2926,18 +2958,81 @@ namespace INMOST
}
void Mesh::InformElementsOwners(proc_elements & send_elements, exchange_data & storage)
{
ENTER_FUNC();
#if defined(USE_MPI)
int mpirank = GetProcessorRank();
std::vector<std::string> tag_list_recv;
std::vector<int> done;
storage.send_buffers.clear();
storage.send_reqs.clear();
storage.recv_buffers.clear();
storage.recv_reqs.clear();
storage.send_buffers.resize(send_elements.size());
int num_wait = 0;
for(proc_elements::iterator it = send_elements.begin(); it != send_elements.end(); it++)
if( !it->second.empty() )
{
PackElementsData(it->second,storage.send_buffers[num_wait].second,it->first,std::vector<std::string>());
storage.send_buffers[num_wait].first = it->first;
num_wait++;
}
storage.send_buffers.resize(num_wait);
PrepareReceiveInner(UnknownSource, storage.send_buffers,storage.recv_buffers);
ExchangeBuffersInner(storage.send_buffers,storage.recv_buffers,storage.send_reqs,storage.recv_reqs);
send_elements.clear();
while( !(done = FinishRequests(storage.recv_reqs)).empty() )
{
for(std::vector<int>::iterator qt = done.begin(); qt != done.end(); qt++)
{
element_set recv_elements;
int position = 0;
UnpackElementsData(recv_elements,storage.recv_buffers[*qt].second,storage.recv_buffers[*qt].first,position,tag_list_recv);
for(element_set::iterator it = recv_elements.begin(); it != recv_elements.end(); it++)
{
Storage::integer_array proc = IntegerArrayDV(*it,tag_processors);
Storage::integer_array::iterator ip = std::lower_bound(proc.begin(),proc.end(),storage.recv_buffers[*qt].first);
if( ip == proc.end() || (*ip) != storage.recv_buffers[*qt].first ) proc.insert(ip,storage.recv_buffers[*qt].first);
if( IntegerDF(*it,tag_owner) == mpirank )
SetStatus(*it,Element::Shared);
else
SetStatus(*it,Element::Ghost);
}
}
}
//wait for second round of communication
if( !storage.send_reqs.empty() )
{
REPORT_MPI(MPI_Waitall(num_wait,&storage.send_reqs[0],MPI_STATUSES_IGNORE));
}
#endif
EXIT_FUNC();
}
void Mesh::ExchangeDataInnerEnd(const tag_set & tags, const parallel_storage & from, const parallel_storage & to, ElementType mask, MarkerType select, ReduceOperation op, exchange_data & storage)
{
(void) from;
bool have_reference_tag = false;
{ // checks for bad input
if( mask == NONE ) return;
for(tag_set::const_iterator it = tags.begin(); it != tags.end(); ++it) assert( it->isValid() );
for(tag_set::const_iterator it = tags.begin(); it != tags.end(); ++it)
{
assert( it->isValid() );
if( it->GetDataType() == DATA_REFERENCE )
have_reference_tag = true;
}
if( tags.empty() ) return;
}
if( m_state == Serial ) return;
ENTER_FUNC();
#if defined(USE_MPI)
std::vector<int> done;
proc_elements send_elements; //for DATA_REFERENCE
while( !(done = FinishRequests(storage.recv_reqs)).empty() )
{
for(std::vector<int>::iterator qt = done.begin(); qt != done.end(); qt++)
......@@ -2946,7 +3041,7 @@ namespace INMOST
for(unsigned int k = 0; k < tags.size(); k++)
{
REPORT_VAL("processor",storage.recv_buffers[*qt].first);
UnpackTagData(tags[k],to.find(storage.recv_buffers[*qt].first)->second,storage.recv_buffers[*qt].first,mask,select,storage.recv_buffers[*qt].second,position,op);
UnpackTagData(tags[k],to.find(storage.recv_buffers[*qt].first)->second,storage.recv_buffers[*qt].first,mask,select,storage.recv_buffers[*qt].second,position,op, &send_elements);
}
}
}
......@@ -2954,6 +3049,14 @@ namespace INMOST
{
REPORT_MPI(MPI_Waitall(static_cast<INMOST_MPI_SIZE>(storage.send_reqs.size()),&storage.send_reqs[0],MPI_STATUSES_IGNORE));
}
//additional round of communication to tell the tag owner about exchange of elements
if( have_reference_tag )
{
InformElementsOwners(send_elements,storage);
RecomputeParallelStorage(ESET | CELL | FACE | EDGE | NODE);
ExchangeData(tag_processors,ESET | CELL | FACE | EDGE | NODE,0);
ComputeSharedProcs();
}
#else //USE_MPI
(void) tags;
(void) from;
......@@ -4696,11 +4799,12 @@ namespace INMOST
#if defined(USE_MPI)
INMOST_DATA_BIG_ENUM_TYPE num_wait;
int mpirank = GetProcessorRank();
std::vector<MPI_Request> send_reqs, recv_reqs;
std::vector<std::string> tag_list, tag_list_recv;
std::vector<std::string> tag_list_empty;
exch_buffer_type send_bufs;
exch_buffer_type recv_bufs;
//std::vector<MPI_Request> send_reqs, recv_reqs;
//exch_buffer_type send_bufs;
//exch_buffer_type recv_bufs;
exchange_data storage;
proc_elements send_elements;
std::vector<int> done;
......@@ -4716,6 +4820,8 @@ namespace INMOST
it = tag_list.erase(it);
else if(GetTag(*it).GetDataType() == DATA_REFERENCE)
it = tag_list.erase(it);
else if(GetTag(*it).GetDataType() == DATA_REMOTE_REFERENCE)
it = tag_list.erase(it);
else it++;
}
}
......@@ -4731,7 +4837,7 @@ namespace INMOST
}
}
num_wait = 0;
send_bufs.resize(send_elements.size());
storage.send_buffers.resize(send_elements.size());
//std::cout << mpirank << ": Send size: " << send_elements.size() << std::endl;
REPORT_STR("Packing elements to send");
for(proc_elements::iterator it = send_elements.begin(); it != send_elements.end(); it++)
......@@ -4740,42 +4846,15 @@ namespace INMOST
REPORT_VAL("pack for", it->first);
REPORT_VAL("number of provided elements",it->second.size());
// std::cout << mpirank << "Number of provided els " << it->second.size() << std::endl;
PackElementsData(it->second,send_bufs[num_wait].second,it->first,tag_list);
PackElementsData(it->second,storage.send_buffers[num_wait].second,it->first,tag_list);
REPORT_VAL("number of got elements",it->second.size());
send_bufs[num_wait].first = it->first;
storage.send_buffers[num_wait].first = it->first;
num_wait++;
}
send_bufs.resize(num_wait);
//NEW ALGORITHM
if( false )
{ //in most cases the exchange of elements will effect only nearest neighbours, we want to detect this
Storage::integer_array p = IntegerArrayDV(GetHandle(),tag_processors);
int halo_exchange = 0, test = 1;
for(unsigned k = 0; k < send_bufs.size(); k++)
if( !std::binary_search(p.begin(),p.end(),send_bufs[k].first) ) test = 0;
//we should exchange, because other processor may want to send something to us
REPORT_MPI(MPI_Allreduce(&test,&halo_exchange,1,MPI_INT,MPI_MIN,GetCommunicator()));
if( halo_exchange )
{
//prepare missing send buffers
{
std::vector<int> present(send_bufs.size()), missing(p.size());
for(unsigned k = 0; k < send_bufs.size(); k++) present[k] = send_bufs[k].first;
missing.resize(std::set_difference(p.begin(),p.end(),present.begin(),present.end(),missing.begin())-missing.begin());
for(unsigned k = 0; k < missing.size(); k++) send_bufs.push_back(proc_buffer_type(missing[k],std::vector<INMOST_DATA_BULK_TYPE>()));
}
//prepare recv buffers
{
recv_bufs.resize(p.size());
for(integer_array::size_type k = 0; k < p.size(); k++) recv_bufs[k].first = p[k];
}
PrepareReceiveInner(UnknownSize, send_bufs,recv_bufs);
}
else PrepareReceiveInner(UnknownSource, send_bufs,recv_bufs);
}
else PrepareReceiveInner(UnknownSource, send_bufs,recv_bufs);
ExchangeBuffersInner(send_bufs,recv_bufs,send_reqs,recv_reqs);
storage.send_buffers.resize(num_wait);
PrepareReceiveInner(UnknownSource, storage.send_buffers,storage.recv_buffers);
ExchangeBuffersInner(storage.send_buffers,storage.recv_buffers,storage.send_reqs,storage.recv_reqs);
if( action == AMigrate ) // delete packed elements
{
......@@ -4835,14 +4914,14 @@ namespace INMOST
}
send_elements.clear();
REPORT_STR("Unpacking received data");
while( !(done = FinishRequests(recv_reqs)).empty() )
while( !(done = FinishRequests(storage.recv_reqs)).empty() )
{
for(std::vector<int>::iterator qt = done.begin(); qt != done.end(); qt++)
{
element_set recv_elements;
REPORT_STR("call unpack");
int position = 0;
UnpackElementsData(recv_elements,recv_bufs[*qt].second,recv_bufs[*qt].first,position,tag_list_recv);
UnpackElementsData(recv_elements,storage.recv_buffers[*qt].second,storage.recv_buffers[*qt].first,position,tag_list_recv);
if( action == AGhost )
{
for(element_set::iterator it = recv_elements.begin(); it != recv_elements.end(); it++)
......@@ -4866,58 +4945,34 @@ namespace INMOST
if( action == AGhost ) //second round to inform owner about ghosted elements
{
REPORT_STR("Second round for ghost exchange");
if( !send_reqs.empty() )
if( !storage.send_reqs.empty() )
{
REPORT_MPI(MPI_Waitall(static_cast<INMOST_MPI_SIZE>(send_reqs.size()),&send_reqs[0],MPI_STATUSES_IGNORE));
REPORT_MPI(MPI_Waitall(static_cast<INMOST_MPI_SIZE>(storage.send_reqs.size()),&storage.send_reqs[0],MPI_STATUSES_IGNORE));
}
//Inform owners about the fact that we have received their elements
InformElementsOwners(send_elements,storage);
/*
tag_list.clear();
tag_list_recv.clear();
send_bufs.clear();
send_buffers.clear();
send_reqs.clear();
recv_bufs.clear();
recv_buffers.clear();
recv_reqs.clear();
send_bufs.resize(send_elements.size());
send_buffers.resize(send_elements.size());
num_wait = 0;
for(proc_elements::iterator it = send_elements.begin(); it != send_elements.end(); it++)
if( !it->second.empty() )
{
PackElementsData(it->second,send_bufs[num_wait].second,it->first,tag_list);
send_bufs[num_wait].first = it->first;
PackElementsData(it->second,send_buffers[num_wait].second,it->first,tag_list);
send_buffers[num_wait].first = it->first;
num_wait++;
}
send_bufs.resize(num_wait);
send_buffers.resize(num_wait);
if( false )
{ //in most cases the exchange of elements will effect only nearest neighbours, we want to detect this
Storage::integer_array p = IntegerArrayDV(GetHandle(),tag_processors);
int halo_exchange = 0, test = 1;
for(unsigned k = 0; k < send_bufs.size(); k++)
if( !std::binary_search(p.begin(),p.end(),send_bufs[k].first) ) test = 0;
//we should exchange, because other processor may want to send something to us
REPORT_MPI(MPI_Allreduce(&test,&halo_exchange,1,MPI_INT,MPI_MIN,GetCommunicator()));
if( halo_exchange )
{
//prepare missing send buffers
{
std::vector<int> present(send_bufs.size()), missing(p.size());
for(unsigned k = 0; k < send_bufs.size(); k++) present[k] = send_bufs[k].first;
missing.resize(std::set_difference(p.begin(),p.end(),present.begin(),present.end(),missing.begin())-missing.begin());
for(unsigned k = 0; k < missing.size(); k++) send_bufs.push_back(proc_buffer_type(missing[k],std::vector<INMOST_DATA_BULK_TYPE>()));
}
//prepare recv buffers
{
recv_bufs.resize(p.size());
for(integer_array::size_type k = 0; k < p.size(); k++) recv_bufs[k].first = p[k];
}
PrepareReceiveInner(UnknownSize, send_bufs,recv_bufs);
}
else PrepareReceiveInner(UnknownSource, send_bufs,recv_bufs);
}
else PrepareReceiveInner(UnknownSource, send_bufs,recv_bufs);
ExchangeBuffersInner(send_bufs,recv_bufs,send_reqs,recv_reqs);
PrepareReceiveInner(UnknownSource, send_buffers,recv_buffers);
ExchangeBuffersInner(send_buffers,recv_buffers,send_reqs,recv_reqs);
send_elements.clear();
......@@ -4927,12 +4982,12 @@ namespace INMOST
{
element_set recv_elements;
int position = 0;
UnpackElementsData(recv_elements,recv_bufs[*qt].second,recv_bufs[*qt].first,position,tag_list_recv);
UnpackElementsData(recv_elements,recv_buffers[*qt].second,recv_buffers[*qt].first,position,tag_list_recv);
for(element_set::iterator it = recv_elements.begin(); it != recv_elements.end(); it++)
{
Storage::integer_array proc = IntegerArrayDV(*it,tag_processors);
Storage::integer_array::iterator ip = std::lower_bound(proc.begin(),proc.end(),recv_bufs[*qt].first);
if( ip == proc.end() || (*ip) != recv_bufs[*qt].first ) proc.insert(ip,recv_bufs[*qt].first);
Storage::integer_array::iterator ip = std::lower_bound(proc.begin(),proc.end(),recv_buffers[*qt].first);
if( ip == proc.end() || (*ip) != recv_buffers[*qt].first ) proc.insert(ip,recv_buffers[*qt].first);
if( IntegerDF(*it,tag_owner) == mpirank )
SetStatus(*it,Element::Shared);
else
......@@ -4940,6 +4995,7 @@ namespace INMOST
}
}
}
*/
}
if( action == AMigrate ) //Compute new values
......@@ -4950,7 +5006,6 @@ namespace INMOST
Tag tag_new_processors = GetTag("TEMPORARY_NEW_PROCESSORS");
for(ElementType etype = NODE; etype <= CELL; etype = NextElementType(etype))
for(iteratorElement it = BeginElement(etype); it != EndElement(); it++)
//for(iteratorElement it = BeginElement(CELL | FACE | EDGE | NODE); it != EndElement(); it++)
{
Storage::integer new_owner = it->Integer(tag_new_owner);
it->IntegerDF(tag_owner) = new_owner;
......@@ -4969,25 +5024,20 @@ namespace INMOST
}
}
RecomputeParallelStorage(CELL | FACE | EDGE | NODE);
RecomputeParallelStorage(ESET | CELL | FACE | EDGE | NODE);
if( action == AGhost )
{
//wait for second round of communication
if( !send_reqs.empty() )
{
REPORT_MPI(MPI_Waitall(num_wait,&send_reqs[0],MPI_STATUSES_IGNORE));
}
//Probably now owner should send processors_tag data
ExchangeData(tag_processors,CELL | FACE | EDGE | NODE,0);
ExchangeData(tag_processors,ESET | CELL | FACE | EDGE | NODE,0);
}
ComputeSharedProcs();
if( action == AMigrate )
{
if( !send_reqs.empty() )
if( !storage.send_reqs.empty() )
{
REPORT_MPI(MPI_Waitall(static_cast<INMOST_MPI_SIZE>(send_reqs.size()),&send_reqs[0],MPI_STATUSES_IGNORE));
REPORT_MPI(MPI_Waitall(static_cast<INMOST_MPI_SIZE>(storage.send_reqs.size()),&storage.send_reqs[0],MPI_STATUSES_IGNORE));
}
}
#else
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment