Commit c5e58b47 authored by Kirill Terekhov's avatar Kirill Terekhov
Browse files

change the way the messages are packed to overcome 2Gb limit

parent 86377f47
......@@ -2336,9 +2336,9 @@ namespace INMOST
void PackElementsEnumerate (elements_by_type & selems, TagInteger pack_position);
void PackElementsUnenumerate (elements_by_type & selems, TagInteger pack_position);
void PackTagData (const Tag & tag, const elements_by_type & elements, int destination, ElementType mask, MarkerType select, buffer_type & buffer, TagInteger pack_position);
void UnpackTagData (const Tag & tag, const elements_by_type & elements, int source, ElementType mask, MarkerType select, buffer_type & buffer, int & position, ReduceOperation op, const elements_by_type & unpack_elements);//, proc_elements_by_type * send_elements = NULL);
void UnpackTagData (const Tag & tag, const elements_by_type & elements, int source, ElementType mask, MarkerType select, buffer_type & buffer, size_t & buffer_position, ReduceOperation op, const elements_by_type & unpack_elements);//, proc_elements_by_type * send_elements = NULL);
void PackElementsData (elements_by_type & input, buffer_type & buffer, int destination, const tag_set & tag_list,TagInteger pack_position, bool pack_gids);
void UnpackElementsData (elements_by_type & output, buffer_type & buffer, int source, int & position, tag_set & tag_list);
void UnpackElementsData (elements_by_type & output, buffer_type & buffer, int source, size_t & buffer_position, tag_set & tag_list);
void PrepareReceiveInner(Prepare todo, exch_buffer_type & send_bufs, exch_buffer_type & recv_bufs);
void ExchangeDataInnerBegin(const tag_set & tag, const parallel_storage & from, const parallel_storage & to, ElementType mask, MarkerType select, exchange_data & storage);
void ExchangeDataInnerEnd(const tag_set & tag, const parallel_storage & from, const parallel_storage & to, ElementType mask, MarkerType select, ReduceOperation op, exchange_data & storage);
......
......@@ -101,6 +101,7 @@ namespace INMOST
template<typename T> struct MPIType;
template<> struct MPIType<bool> {static MPI_Datatype Type() {return MPI_C_BOOL;}};
template<> struct MPIType<char> {static MPI_Datatype Type() {return MPI_CHAR;}};
template<> struct MPIType<const char> {static MPI_Datatype Type() {return MPI_CHAR;}};
template<> struct MPIType<int> {static MPI_Datatype Type() {return MPI_INT;}};
template<> struct MPIType<short> {static MPI_Datatype Type() {return MPI_SHORT;}};
template<> struct MPIType<long> {static MPI_Datatype Type() {return MPI_LONG;}};
......@@ -116,51 +117,158 @@ namespace INMOST
//template<> struct MPIType<size_t> {static MPI_Datatype Type() {return MPI_SIZE_T;}};
template<typename T>
void pack_data(Mesh::exch_buffer_type & buf, const T & data, INMOST_MPI_Comm comm)
void pack_data(Mesh::buffer_type & buf, const T & data, INMOST_MPI_Comm comm)
{
int ierr;
POS_TYPE pack_size = 0, shift;
POS_TYPE pack_size = 0, shift = 0;
size_t write_pos = buf.size();
MPI_Pack_size_call(1,MPIType<T>::Type(),comm,&pack_size);
ierr = MPI_Pack_size_call(1,MPIType<T>::Type(),comm,&pack_size);
if( ierr != MPI_SUCCESS ) MPI_Abort(comm,__LINE__);
buf.resize(write_pos+pack_size);
MPI_Pack_call((void*)&data,1,MPIType<T>::Type(),&buf[write_pos],pack_size,&shift,comm);
ierr = MPI_Pack_call((void*)&data,1,MPIType<T>::Type(),&buf[write_pos],pack_size,&shift,comm);
if( ierr != MPI_SUCCESS ) MPI_Abort(comm,__LINE__);
buf.resize(write_pos+shift);
}
template<typename T>
size_t pack_data_size(INMOST_MPI_Comm comm)
{
int ierr;
POS_TYPE pack_size = 0, shift = 0;
size_t ret = 0;
ierr = MPI_Pack_size_call(1,MPIType<T>::Type(),comm,&pack_size);
if( ierr != MPI_SUCCESS ) MPI_Abort(comm,__LINE__);
ret += pack_size;
return ret;
}
template<typename T>
void pack_data_array(Mesh::exch_buffer_type & buf, const std::vector<T> & data, INMOST_MPI_Comm comm)
void pack_data_array(Mesh::buffer_type & buf, const T * data, size_t N, INMOST_MPI_Comm comm)
{
int ierr;
if( N )
{
POS_TYPE pack_size = 0;
ierr = MPI_Pack_size_call(1,MPIType<T>::Type(),comm,&pack_size);
if( ierr != MPI_SUCCESS ) MPI_Abort(comm,__LINE__);
size_t rec_bytes = static_cast<size_t>(pack_size);
size_t max_bytes = rec_bytes*N;
size_t chunk_bytes = std::min(max_bytes,static_cast<size_t>(INT_MAX));
size_t chunk_size = chunk_bytes / rec_bytes;
size_t offset = 0;
while( offset != N )
{
size_t write_pos = buf.size();
size_t chunk = std::min(chunk_size, N-offset);
ierr = MPI_Pack_size_call(static_cast<POS_TYPE>(chunk),MPIType<T>::Type(),comm,&pack_size); //pack_size is expected to be within INT_MAX
if( ierr != MPI_SUCCESS ) MPI_Abort(comm,__LINE__);
buf.resize(write_pos+pack_size);
POS_TYPE shift = 0;
ierr = MPI_Pack_call((void *)&data[offset],static_cast<POS_TYPE>(chunk),MPIType<T>::Type(),&buf[write_pos],pack_size,&shift,comm);
if( ierr != MPI_SUCCESS ) MPI_Abort(comm,__LINE__);
buf.resize(write_pos+shift);
offset += chunk;
}
}
}
template<typename T>
void pack_data_vector(Mesh::buffer_type & buf, const std::vector<T> & data, INMOST_MPI_Comm comm)
{
int ierr;
pack_data(buf,data.size(),comm);
if( !data.empty() )
{
POS_TYPE pack_size = 0, shift;
size_t write_pos = buf.size();
MPI_Pack_size_call((int)data.size(),MPIType<T>::Type(),comm,&pack_size);
buf.resize(write_pos+pack_size);
MPI_Pack_call((void *)&data[0],data.size(),MPIType<T>::Type(),&buf[write_pos],pack_size,&shift,comm);
buf.resize(write_pos+shift);
if( !data.empty() )
pack_data_array(buf,&data[0],data.size(),comm);
}
template<typename T>
size_t pack_data_array_size(size_t N, INMOST_MPI_Comm comm)
{
int ierr;
size_t ret = 0;
if( N )
{
POS_TYPE pack_size = 0;
ierr = MPI_Pack_size_call(1,MPIType<T>::Type(),comm,&pack_size);
if( ierr != MPI_SUCCESS ) MPI_Abort(comm,__LINE__);
size_t rec_bytes = static_cast<size_t>(pack_size);
size_t max_bytes = rec_bytes*N;
size_t chunk_bytes = std::min(max_bytes,static_cast<size_t>(INT_MAX));
size_t chunk_size = chunk_bytes / rec_bytes;
size_t offset = 0;
while( offset != N )
{
size_t chunk = std::min(chunk_size, N-offset);
ierr = MPI_Pack_size_call(static_cast<POS_TYPE>(chunk),MPIType<T>::Type(),comm,&pack_size); //pack_size is expected to be within INT_MAX
if( ierr != MPI_SUCCESS ) MPI_Abort(comm,__LINE__);
ret += pack_size;
offset += chunk;
}
}
return ret;
}
template<typename T>
size_t pack_data_vector_size(size_t N, INMOST_MPI_Comm comm)
{
size_t ret = pack_data_size<size_t>(comm);
if( N ) ret += pack_data_array_size<T>(N,comm);
return ret;
}
template<typename T>
void unpack_data(Mesh::exch_buffer_type & buf, int & pos, T & data, INMOST_MPI_Comm comm)
void unpack_data(Mesh::buffer_type & buf, size_t & buf_pos, T & data, INMOST_MPI_Comm comm)
{
POS_TYPE lpos = (POS_TYPE)pos;
MPI_Unpack_call((void*)&buf[0],(int)buf.size(),&lpos,&data,1,MPIType<T>::Type(),comm);
pos = (int)lpos;
int ierr;
POS_TYPE shift = 0, pack_size;
ierr = MPI_Pack_size_call(1,MPIType<T>::Type(),comm,&pack_size);
if( ierr != MPI_SUCCESS ) MPI_Abort(comm,__LINE__);
ierr = MPI_Unpack_call((void*)&buf[buf_pos],pack_size,&shift,&data,1,MPIType<T>::Type(),comm);
if( ierr != MPI_SUCCESS ) MPI_Abort(comm,__LINE__);
buf_pos += shift;
}
template<typename T>
void unpack_data_array(Mesh::buffer_type & buf, size_t & buf_pos, T * data, size_t N, INMOST_MPI_Comm comm)
{
int ierr;
if( N )
{
POS_TYPE pack_size = 0;
ierr = MPI_Pack_size_call(1,MPIType<T>::Type(),comm,&pack_size);
if( ierr != MPI_SUCCESS ) MPI_Abort(comm,__LINE__);
size_t rec_bytes = static_cast<size_t>(pack_size);
size_t max_bytes = rec_bytes*N;
size_t chunk_bytes = std::min(max_bytes,static_cast<size_t>(INT_MAX));
size_t chunk_size = chunk_bytes / rec_bytes;
size_t offset = 0;
while( offset != N )
{
size_t chunk = std::min(chunk_size, N-offset);
ierr = MPI_Pack_size_call(static_cast<POS_TYPE>(chunk),MPIType<T>::Type(),comm,&pack_size); //pack_size is expected to be within INT_MAX
if( ierr != MPI_SUCCESS ) MPI_Abort(comm,__LINE__);
POS_TYPE shift = 0;
ierr = MPI_Unpack_call((void *)&buf[buf_pos],pack_size,&shift,&data[offset],static_cast<POS_TYPE>(chunk),MPIType<T>::Type(),comm);
if( ierr != MPI_SUCCESS ) MPI_Abort(comm,__LINE__);
buf_pos += shift;
offset += chunk;
}
}
}
template<typename T>
void unpack_data_array(Mesh::exch_buffer_type & buf, int & pos, std::vector<T> & data, INMOST_MPI_Comm comm)
void unpack_data_vector(Mesh::buffer_type & buf, size_t & buf_pos, std::vector<T> & data, INMOST_MPI_Comm comm)
{
int ierr;
size_t unpack_size;
unpack_data(buf,pos,unpack_size,comm);
unpack_data(buf,buf_pos,unpack_size,comm);
data.resize(unpack_size);
POS_TYPE lpos = (POS_TYPE)pos;
MPI_Unpack_call((void*)&buf[0],(int)buf.size(),&lpos,&data[0],(int)unpack_size,MPIType<T>::Type(),comm);
pos = (int)lpos;
if( !data.empty() )
unpack_data_array(buf,buf_pos,&data[0],data.size(),comm);
}
......@@ -1267,7 +1375,6 @@ namespace INMOST
if( m_state == Mesh::Serial ) SetCommunicator(INMOST_MPI_COMM_WORLD);
if( tag_global_id.isValid() ) tag_global_id = DeleteTag(tag_global_id,CELL | EDGE | FACE | NODE);
integer dim = GetDimensions();
int sendsize;
int mpirank = GetProcessorRank(),mpisize = GetProcessorsNumber();
int rank = mpirank;
#if defined(USE_PARALLEL_STORAGE)
......@@ -1516,13 +1623,11 @@ namespace INMOST
EXIT_BLOCK();
int position = 0;
//~ int position = 0;
ENTER_BLOCK();
//TODO: overflow
MPI_Pack_size(static_cast<int>(pack_real.size()),INMOST_MPI_DATA_REAL_TYPE,comm,&sendsize);
exch_data.resize(sendsize);
if( sendsize > 0 ) MPI_Pack(&pack_real[0],static_cast<INMOST_MPI_SIZE>(pack_real.size()),INMOST_MPI_DATA_REAL_TYPE,&exch_data[0],static_cast<INMOST_MPI_SIZE>(exch_data.size()),&position,comm);
pack_data_vector(exch_data,pack_real,GetCommunicator());
......@@ -1537,19 +1642,15 @@ namespace INMOST
exch_buffer_type send_buffs(procs.size()), recv_buffs(procs.size());
std::vector<int> done;
std::vector<unsigned> sendsizeall(mpisize*2);
int pack_size2 = 0;
unsigned usend[2] = {static_cast<unsigned>(sendsize),static_cast<unsigned>(pack_real.size())};
std::vector<size_t> sendsizeall(mpisize);
ENTER_BLOCK();
MPI_Pack_size(2,MPI_UNSIGNED,comm,&pack_size2);
for(dynarray<integer,64>::size_type k = 0; k < procs.size(); k++)
{
send_buffs[k].first = procs[k];
send_buffs[k].second.resize(pack_size2);
position = 0;
MPI_Pack(usend,2,MPI_UNSIGNED,&send_buffs[k].second[0],static_cast<INMOST_MPI_SIZE>(send_buffs[k].second.size()),&position,comm);
pack_data(send_buffs[k].second,exch_data.size(),GetCommunicator());
recv_buffs[k].first = procs[k];
recv_buffs[k].second.resize(pack_size2);
recv_buffs[k].second.resize(send_buffs[k].second.size());
}
EXIT_BLOCK();
ExchangeBuffersInner(send_buffs,recv_buffs,send_reqs,recv_reqs);
......@@ -1558,8 +1659,10 @@ namespace INMOST
{
for(std::vector<int>::iterator qt = done.begin(); qt != done.end(); qt++)
{
position = 0;
MPI_Unpack(&recv_buffs[*qt].second[0],static_cast<INMOST_MPI_SIZE>(recv_buffs[*qt].second.size()),&position,&sendsizeall[procs[*qt]*2],2,MPI_UNSIGNED,comm);
//~ position = 0;
//~ MPI_Unpack(&recv_buffs[*qt].second[0],static_cast<INMOST_MPI_SIZE>(recv_buffs[*qt].second.size()),&position,&sendsizeall[procs[*qt]*2],2,MPI_UNSIGNED,comm);
size_t buf_pos = 0;
unpack_data(recv_buffs[*qt].second,buf_pos,sendsizeall[procs[*qt]],GetCommunicator());
}
}
if( !send_reqs.empty() )
......@@ -1577,7 +1680,7 @@ namespace INMOST
send_buffs[k].first = procs[k];
send_buffs[k].second = exch_data;
recv_buffs[k].first = procs[k];
recv_buffs[k].second.resize(sendsizeall[procs[k]*2]);
recv_buffs[k].second.resize(sendsizeall[procs[k]]);
}
EXIT_BLOCK();
//PrepareReceiveInner(send_buffs,recv_buffs);
......@@ -1591,12 +1694,14 @@ namespace INMOST
{
REPORT_STR("receive node coordinates");
REPORT_VAL("processor",recv_buffs[*qt].first);
int count = 0;
int position = 0;
unpack_real.resize(sendsizeall[recv_buffs[*qt].first*2+1]);
size_t count = 0;
//~ int position = 0;
//~ unpack_real.resize(sendsizeall[recv_buffs[*qt].first*2+1]);
//TODO: overflow
MPI_Unpack(&recv_buffs[*qt].second[0],static_cast<INMOST_MPI_SIZE>(recv_buffs[*qt].second.size()),&position,&unpack_real[0],static_cast<INMOST_MPI_SIZE>(unpack_real.size()),INMOST_MPI_DATA_REAL_TYPE,comm);
//~ MPI_Unpack(&recv_buffs[*qt].second[0],static_cast<INMOST_MPI_SIZE>(recv_buffs[*qt].second.size()),&position,&unpack_real[0],static_cast<INMOST_MPI_SIZE>(unpack_real.size()),INMOST_MPI_DATA_REAL_TYPE,comm);
size_t buf_pos = 0;
unpack_data_vector(recv_buffs[*qt].second,buf_pos,unpack_real,GetCommunicator());
std::vector<Storage::real>::iterator it1 = pack_real.begin() , it2 = unpack_real.begin();
while(it1 != pack_real.end() && it2 != unpack_real.end() )
{
......@@ -1805,7 +1910,6 @@ namespace INMOST
{
message_send.clear();
message_send.push_back(0);
message_send.push_back(0);
for(Mesh::iteratorElement it = BeginElement(current_mask); it != EndElement(); it++)
{
if (only_new && !it->GetMarker(new_lc)) continue;
......@@ -1826,21 +1930,17 @@ namespace INMOST
ElementByLocalID(PrevElementType(current_mask),GetHandleID(*kt))->Centroid(cnt);
//REPORT_STR("global id " << GlobalID(*kt) << " local id " << GetHandleID(*kt) << " " << Element::StatusName(Element(this,*kt)->GetStatus()) << " cnt " << cnt[0] << " " << cnt[1] << " " << cnt[2]);
}
message_send[1]++;
//~ message_send[1]++;
message_send[0]++;
elements[m].push_back(*it);
}
}
REPORT_VAL("gathered elements",elements[m].size());
message_send[0] = static_cast<int>(message_send.size());
//TODO: overflow
MPI_Pack_size(static_cast<INMOST_MPI_SIZE>(message_send.size()),MPI_INT,comm,&sendsize);
send_buffs[m].first = *p;
send_buffs[m].second.resize(sendsize);
int position = 0;
MPI_Pack(&message_send[0],static_cast<INMOST_MPI_SIZE>(message_send.size()),MPI_INT,&send_buffs[m].second[0],static_cast<INMOST_MPI_SIZE>(send_buffs[m].second.size()),&position,comm);
send_buffs[m].second.resize(position);
pack_data_vector(send_buffs[m].second,message_send,GetCommunicator());
recv_buffs[m].first = *p;
}
}
......@@ -1859,8 +1959,8 @@ namespace INMOST
{
for(std::vector<int>::iterator qt = done.begin(); qt != done.end(); qt++)
{
int position = 0;
int size;
//~ int position = 0;
//~ int size;
int pos = -1;
for(p = procs.begin(); p != procs.end(); p++)
if( *p == recv_buffs[*qt].first )
......@@ -1869,11 +1969,8 @@ namespace INMOST
break;
}
if( pos == -1 ) throw Impossible;
//TODO: overflow
MPI_Unpack(&recv_buffs[*qt].second[0],static_cast<INMOST_MPI_SIZE>(recv_buffs[*qt].second.size()),&position,&size,1,MPI_INT,comm);
REPORT_VAL("unpacked message size",size-1);
message_recv[pos].resize(size-1);
MPI_Unpack(&recv_buffs[*qt].second[0],static_cast<INMOST_MPI_SIZE>(recv_buffs[*qt].second.size()),&position,&message_recv[pos][0],static_cast<INMOST_MPI_SIZE>(message_recv[pos].size()),MPI_INT,comm);
size_t buf_pos = 0;
unpack_data_vector(recv_buffs[*qt].second,buf_pos,message_recv[pos],GetCommunicator());
}
}
......@@ -2825,7 +2922,7 @@ namespace INMOST
array_size_send.push_back(static_cast<INMOST_DATA_ENUM_TYPE>(eit-elements[i].begin()));
array_size_send[count]++;
INMOST_DATA_ENUM_TYPE s = GetDataSize(*eit,tag);
INMOST_DATA_ENUM_TYPE had_s = static_cast<INMOST_DATA_ENUM_TYPE>(array_data_send.size());
size_t had_s = array_data_send.size();
if( s )
{
array_data_send.resize(had_s+GetDataCapacity(*eit,tag));
......@@ -2856,7 +2953,7 @@ namespace INMOST
for(eit = elements[i].begin(); eit != elements[i].end(); eit++) if( !select || GetMarker(*eit,select) )
{
INMOST_DATA_ENUM_TYPE s = GetDataSize(*eit,tag);
INMOST_DATA_ENUM_TYPE had_s = static_cast<INMOST_DATA_ENUM_TYPE>(array_data_send.size());
size_t had_s = array_data_send.size();
if( s )
{
array_data_send.resize(had_s+GetDataCapacity(*eit,tag));
......@@ -2880,57 +2977,15 @@ namespace INMOST
}
REPORT_VAL("total packed records",total_packed);
}
//if( pack_position.isValid() ) DeleteTag(pack_position);
INMOST_DATA_ENUM_TYPE size_send, data_send;
size_send = static_cast<INMOST_DATA_ENUM_TYPE>(array_size_send.size());
data_send = static_cast<INMOST_DATA_ENUM_TYPE>(array_data_send.size());
REPORT_VAL("tag defined on",static_cast<int>(pack_types[0]));
REPORT_VAL("tag sparse on",static_cast<int>(pack_types[1]));
REPORT_VAL("size_send",size_send);
REPORT_VAL("data_send",data_send);
int buffer_size = 0,position = static_cast<int>(buffer.size()),temp,bytes;
bytes = tag.GetPackedBytesSize();
MPI_Pack_size(2 ,INMOST_MPI_DATA_BULK_TYPE,comm,&temp); buffer_size+= temp;
MPI_Pack_size(1 ,INMOST_MPI_DATA_ENUM_TYPE,comm,&temp); buffer_size+= temp;
MPI_Pack_size(1 ,INMOST_MPI_DATA_ENUM_TYPE,comm,&temp); buffer_size+= temp;
MPI_Pack_size(static_cast<INMOST_MPI_SIZE>(array_size_send.size()) ,INMOST_MPI_DATA_ENUM_TYPE,comm,&temp); buffer_size+= temp;
MPI_Pack_size(static_cast<INMOST_MPI_SIZE>(array_data_send.size()/bytes),tag.GetBulkDataType() ,comm,&temp); buffer_size+= temp;
buffer.resize(position+buffer_size);
MPI_Pack(pack_types,2,INMOST_MPI_DATA_BULK_TYPE,&buffer[0],static_cast<INMOST_MPI_SIZE>(buffer.size()),&position,comm);
MPI_Pack(&size_send,1,INMOST_MPI_DATA_ENUM_TYPE,&buffer[0],static_cast<INMOST_MPI_SIZE>(buffer.size()),&position,comm);
MPI_Pack(&data_send,1,INMOST_MPI_DATA_ENUM_TYPE,&buffer[0],static_cast<INMOST_MPI_SIZE>(buffer.size()),&position,comm);
if( !array_size_send.empty() ) MPI_Pack(&array_size_send[0],static_cast<INMOST_MPI_SIZE>(array_size_send.size()),INMOST_MPI_DATA_ENUM_TYPE,&buffer[0],static_cast<INMOST_MPI_SIZE>(buffer.size()),&position,comm);
if( !array_data_send.empty() ) MPI_Pack(&array_data_send[0],static_cast<INMOST_MPI_SIZE>(array_data_send.size()/bytes),tag.GetBulkDataType(),&buffer[0],static_cast<INMOST_MPI_SIZE>(buffer.size()),&position,comm);
buffer.resize(position);
/*
if( tag.GetDataType() == DATA_REFERENCE )
{
std::vector<std::string> tag_list;
ListTagNames(tag_list);
{
std::vector<std::string>::iterator it = tag_list.begin();
while( it != tag_list.end() )
{
if( it->substr(0,9) == "PROTECTED" )
it = tag_list.erase(it);
else if(GetTag(*it).GetDataType() == DATA_REFERENCE)
it = tag_list.erase(it);
else if(GetTag(*it).GetDataType() == DATA_REMOTE_REFERENCE)
it = tag_list.erase(it);
else it++;
}
}
PackElementsData(pack_elements,buffer,destination,tag_list);
}
*/
REPORT_VAL("size_send",array_size_send.size());
REPORT_VAL("data_send",array_data_send.size());
pack_data_array(buffer,pack_types,2,GetCommunicator());
pack_data_vector(buffer,array_size_send,GetCommunicator());
pack_data_vector(buffer,array_data_send,GetCommunicator());
REPORT_VAL("Buffer size after pack",buffer.size());
if( size_send < 6 )
{
for(INMOST_DATA_ENUM_TYPE qq = 0; qq < size_send; ++qq)
{
REPORT_VAL("array_size_send["<<qq<<"]",array_size_send[qq]);
}
}
#else
(void) tag;
(void) elements;
......@@ -2983,7 +3038,7 @@ namespace INMOST
void Mesh::UnpackTagData(const Tag & tag, const elements_by_type & elements, int source, ElementType mask, MarkerType select, buffer_type & buffer, int & position, ReduceOperation op, const elements_by_type & unpack_elements)//, proc_elements_by_type * send_elements)
void Mesh::UnpackTagData(const Tag & tag, const elements_by_type & elements, int source, ElementType mask, MarkerType select, buffer_type & buffer, size_t & buffer_position, ReduceOperation op, const elements_by_type & unpack_elements)//, proc_elements_by_type * send_elements)
{
(void) mask;
//if( tag.GetDataType() == DATA_REMOTE_REFERENCE) return; //NOT IMPLEMENTED TODO 14
......@@ -2991,68 +3046,22 @@ namespace INMOST
REPORT_VAL("TagName",tag.GetTagName());
REPORT_VAL("select marker",select);
#if defined(USE_MPI)
REPORT_VAL("Position before unpack",position);
REPORT_VAL("Position before unpack",buffer_position);
if( !buffer.empty() )
{
int pos = 0, k = 0;
ElementType recv_mask[2] = {NONE,NONE};
INMOST_DATA_ENUM_TYPE data_recv, size_recv;
element_set::const_iterator eit;
//elements_by_type unpack_elements;
unsigned int size = tag.GetSize();
std::vector<INMOST_DATA_BULK_TYPE> array_data_recv;
buffer_type array_data_recv;
std::vector<INMOST_DATA_ENUM_TYPE> array_size_recv;
MPI_Unpack(&buffer[0],static_cast<INMOST_MPI_SIZE>(buffer.size()),&position,recv_mask ,2,INMOST_MPI_DATA_BULK_TYPE,comm);
//assert(recv_mask[0] == mask);//Element types mask is not synchronized among processors, this may lead to nasty errors
MPI_Unpack(&buffer[0],static_cast<INMOST_MPI_SIZE>(buffer.size()),&position,&size_recv,1,INMOST_MPI_DATA_ENUM_TYPE,comm);
MPI_Unpack(&buffer[0],static_cast<INMOST_MPI_SIZE>(buffer.size()),&position,&data_recv,1,INMOST_MPI_DATA_ENUM_TYPE,comm);
REPORT_VAL("size of size array",size_recv);
REPORT_VAL("size of data array",data_recv);
array_size_recv.resize(size_recv);
array_data_recv.resize(data_recv);
if( !array_size_recv.empty() )
{
MPI_Unpack(&buffer[0],static_cast<INMOST_MPI_SIZE>(buffer.size()),&position,&array_size_recv[0],static_cast<INMOST_MPI_SIZE>(array_size_recv.size()),INMOST_MPI_DATA_ENUM_TYPE,comm);
REPORT_VAL("size array last", array_size_recv.back());
}
if( !array_data_recv.empty() )
{
int bytes = tag.GetPackedBytesSize();
REPORT_VAL("occupied by type",bytes);
REPORT_VAL("bytes in entry",sizeof(Sparse::Row::entry));
REPORT_VAL("stored in type",tag.GetBytesSize());
REPORT_VAL("all size recv",array_data_recv.size());
REPORT_VAL("incoming size of data",data_recv);
REPORT_VAL("calculated size of data",array_data_recv.size()/bytes);
REPORT_VAL("calculated size of data",array_data_recv.size()/sizeof(Sparse::Row::entry));
MPI_Unpack(&buffer[0],static_cast<INMOST_MPI_SIZE>(buffer.size()),&position,&array_data_recv[0],static_cast<INMOST_MPI_SIZE>(array_data_recv.size()/bytes),tag.GetBulkDataType(),comm);
}
/*
ENTER_BLOCK();
if( tag.GetDataType() == DATA_REFERENCE )
{
int mpirank = GetProcessorRank();
std::vector<std::string> tag_recv;
UnpackElementsData(unpack_elements,buffer,source,position,tag_recv);
assert(send_elements);
for(int k = 0; k < 5; ++k)
for(element_set::iterator it = unpack_elements[k].begin(); it != unpack_elements[k].end(); it++)
{
Storage::integer owner = IntegerDF(*it,tag_owner);
if( owner == mpirank ) continue;
Storage::integer_array proc = IntegerArrayDV(*it,tag_processors);
Storage::integer_array::iterator ip = std::lower_bound(proc.begin(),proc.end(),mpirank);
if( ip == proc.end() || (*ip) != mpirank )
{
proc.insert(ip,mpirank);
(*send_elements)[owner][GetHandleElementNum(*it)].push_back(*it);
}
}
}
EXIT_BLOCK();
*/
REPORT_VAL("Position after unpack",position);
unpack_data_array(buffer,buffer_position,recv_mask,2,GetCommunicator());
unpack_data_vector(buffer,buffer_position,array_size_recv,GetCommunicator());
unpack_data_vector(buffer,buffer_position,array_data_recv,GetCommunicator());
REPORT_VAL("size of size array",array_size_recv.size());
REPORT_VAL("size of data array",array_data_recv.size());
REPORT_VAL("Position after unpack",buffer_position);
for(int i = ElementNum(NODE); i <= ElementNum(ESET); i++) if( (recv_mask[0] & ElementTypeFromDim(i)) )
{
REPORT_VAL("unpack for type",ElementTypeName(ElementTypeFromDim(i)));
......@@ -3236,7 +3245,7 @@ namespace INMOST
REPORT_VAL("total unpacked records",total_unpacked);
}
}
REPORT_VAL("Position after unpack",position);
REPORT_VAL("Position after unpack",buffer_position);
#else
(void) tag;
(void) elements;
......@@ -3268,7 +3277,7 @@ namespace INMOST
REPORT_VAL("recv buffers size",storage.recv_buffers.size());
std::vector<int> done;
parallel_storage::const_iterator find;
std::vector<INMOST_DATA_ENUM_TYPE> send_size(procs.size(),0), recv_size(procs.size(),0);
std::vector<size_t> send_size(procs.size(),0), recv_size(procs.size(),0);
bool unknown_size = false;
bool have_reference_tag = false;
......@@ -3322,62 +3331,6 @@ namespace INMOST
}
int num_send = 0, num_recv = 0;
///////////
/*
if ( block_recursion == 0)
{
int call_exchange = 0;
for(unsigned int k = 0; k < tags.size(); k++)
{
if(tags[k].GetDataType() == DATA_REFERENCE)
{
for(p = procs.begin(); p != procs.end(); p++ )
{
if (from.find(*p) == from.end()) continue;
const elements_by_type& elements = from.find(*p)->second;
for(int i = ElementNum(NODE); i <= ElementNum(ESET); i++) if( (mask & ElementTypeFromDim(i)) && tags[k].isDefinedByDim(i) )
{
for (int j = 0; j < elements[i].size(); j++)
{
if (!isValidHandleRange(elements[i][j])) continue;
//if (!isValidElement(ElementTypeFromDim(i),elements[i][j])) continue;
//cout << GlobalID(elements[i][j]) << " <-=-" << endl;
reference_array refs = ReferenceArray(elements[i][j], tags[k]);
if (refs.size() == 0) continue;
if (tags[k] == HighConnTag())
{
assert(i == ElementNum(ESET));
ElementSet set(this,elements[i][j]);
for(ElementSet child = set.GetChild(); child.isValid(); child = child.GetSibling())
{
child.IntegerArray(tag_sendto).push_back(*p);
call_exchange = 1;
}
}
else
{
for(Storage::reference_array::size_type i = 0; i < refs.size(); ++i)
{
if (refs[i] == InvalidElement()) continue;
refs[i].IntegerArray(tag_sendto).push_back(*p);
call_exchange = 1;
}
}
}
}
}
}
}
call_exchange = Integrate(call_exchange);
if( call_exchange )