Commit 85fedb59 authored by Kirill Terekhov's avatar Kirill Terekhov

Structure for safer OpenMP

Added linked_array structure that represents linked list with subarray of constant size. This elemenates possible crash with openmp when mesh is simultaneously modified and data is accessed. Previously when array of links was reallocated in chunk_array due to mesh growth, the pointer may be simultaneously changed due to reallocation and be accessed by another thread. Now reallocation adds another segment of the array in linked list making previous records intact. To restore previous behavior undefine NEW_CHUNKS in container.hpp.
parent 09636f8d
......@@ -14,6 +14,8 @@
#include <limits>
#include "inmost_common.h"
#define NEW_CHUNKS
#define __VOLATILE volatile
//#define OUT_OF_RANGE
......@@ -2129,7 +2131,113 @@ namespace INMOST
}
};
//this is supposed to provide thread-safe storage type for chunks in chunk_array
//the idea is that pointer to block is never moved
//so that access to any entry before size() is correct on resize
//critical section is required on resize()
template<typename element, size_t base = 512>
class linked_array
{
element e[base]; //storage of current node
size_t ne; //number of elements in array
linked_array * next; //link to next node
linked_array(const linked_array & b) {} //don't copy me
linked_array & operator = (linked_array const & b) { return *this; } //don't copy me
public:
linked_array() : ne(0), next(NULL) {}
void clear()
{
if (next)
next->clear();
for (int k = 0; k < ne; ++k)
e[k].~element();
ne = 0;
}
element & operator [](size_t n)
{
if (n < base)
{
assert(n < ne);
return e[n];
}
else
{
assert(next);
return next->operator [](n-base);
}
}
const element & operator [](size_t n) const
{
if (n < base)
{
assert(n < ne);
return e[n];
}
else
{
assert(next);
return next->operator [](n - base);
}
}
int size() const
{
if( ne == base && next )
return base+next()->size();
else
return ne;
}
void resize(size_t new_n, const element & c = element())
{
if (new_n <= base)
{
for (size_t k = new_n; k < ne; ++k)
e[k].~element();
for (size_t k = ne; k < new_n; ++k)
new (&e[k]) element(c);
ne = new_n;
if( next ) next->clear();
}
else
{
if( !next )
next = new linked_array;
if (ne < base)
{
for (size_t k = ne; k < base; ++k)
new (&e[k]) element(c);
ne = base;
}
next->resize(new_n-base);
}
}
/*
//not needed
void push_back(const element & c)
{
if (ne < base)
new (&e[ne++])(c);
else
{
if( !next )
next = new linked_array;
next->push_back(c);
}
}
//not needed
void pop_back()
{
if( ne == base && next && next->ne != 0)
next->pop_back();
else
e[--ne].~element();
}
*/
~linked_array()
{
clear();
if( next ) delete next;
}
};
template<typename element, int block_bits>
class chunk_array
......@@ -2145,7 +2253,12 @@ namespace INMOST
static size_type const fwd_alloc_chunk_bits_mask = (1 << (fwd_alloc_chunk_bits))-1;
static size_type const fwd_alloc_chunk_val = 1 << fwd_alloc_chunk_bits;
static size_type const fwd_alloc_chunk_size = sizeof(element *)*fwd_alloc_chunk_val;
#if defined(NEW_CHUNKS)
linked_array<element *> chunks;
#else
element ** __VOLATILE chunks;
#endif
size_type m_size;
//This neads static_cast to unsigned to
__INLINE size_type GetChunkNumber(size_type k) const {return static_cast<uenum>(k) >> block_bits;}
......@@ -2272,6 +2385,9 @@ namespace INMOST
{
if( newn > 0 )
{
#if defined(NEW_CHUNKS)
chunks.resize(fwd_alloc_chunk_size*newn);
#else
element ** __VOLATILE chunks_old = chunks;
element ** __VOLATILE chunks_new = (element **)malloc(fwd_alloc_chunk_size*newn);
assert(chunks_new != NULL);
......@@ -2279,15 +2395,19 @@ namespace INMOST
memset(chunks_new + oldn*fwd_alloc_chunk_val, 0, fwd_alloc_chunk_size*(newn - oldn));
chunks = chunks_new; //this must be atomic
free(chunks_old); //hopefully no other thread use it
#endif
//chunks = (element **) realloc(chunks,fwd_alloc_chunk_size*newn);
//assert(chunks != NULL);
//if( newn > oldn ) memset(chunks+oldn*fwd_alloc_chunk_val,0,fwd_alloc_chunk_size*(newn-oldn));
}
else
{
#if defined(NEW_CHUNKS)
chunks.clear();
#else
free((void *)chunks);
chunks = NULL;
#endif
}
}
for(size_type q = oldnchunks2; q < newnchunks2; q++)
......@@ -2316,18 +2436,26 @@ namespace INMOST
free(chunks[q]);
chunks[q] = NULL;
}
#if defined(NEW_CHUNKS)
chunks.clear();
#else
free((void *)chunks);
chunks = NULL;
#endif
m_size = 0;
}
chunk_array()
{
m_size = 0;
#if !defined(NEW_CHUNKS)
chunks = NULL;
#endif
}
chunk_array(const chunk_array & other)
{
#if !defined(NEW_CHUNKS)
chunks = NULL;
#endif
m_size = 0;
inner_resize(other.size());
for(size_type k = 0; k < other.size(); k++)
......@@ -2431,7 +2559,12 @@ namespace INMOST
static size_type const fwd_alloc_chunk_bits_mask = (1 << (fwd_alloc_chunk_bits))-1;
static size_type const fwd_alloc_chunk_val = 1 << fwd_alloc_chunk_bits;
static size_type const fwd_alloc_chunk_size = sizeof(char *)*fwd_alloc_chunk_val;
#if defined(NEW_CHUNKS)
linked_array<char *> chunks;
#else
char ** __VOLATILE chunks;
#endif
size_type record_size;
size_type m_size;
......@@ -2478,6 +2611,9 @@ namespace INMOST
{
if( newn > 0 )
{
#if defined(NEW_CHUNKS)
chunks.resize(fwd_alloc_chunk_size*newn);
#else
char ** __VOLATILE chunks_old = chunks;
char ** __VOLATILE chunks_new = (char **)malloc(fwd_alloc_chunk_size*newn);
assert(chunks_new != NULL);
......@@ -2485,14 +2621,19 @@ namespace INMOST
memset(chunks_new + oldn*fwd_alloc_chunk_val, 0, fwd_alloc_chunk_size*(newn - oldn));
chunks = chunks_new; //this must be atomic
free(chunks_old); //hopefully no other thread use it
#endif
//chunks = reinterpret_cast<char **>(realloc(chunks,fwd_alloc_chunk_size*newn));
//assert(chunks != NULL);
//if( newn > oldn ) memset(chunks+oldn*fwd_alloc_chunk_val,0,fwd_alloc_chunk_size*(newn-oldn));
}
else
{
#if defined(NEW_CHUNKS)
chunks.clear();
#else
free((void *)chunks);
chunks = NULL;
#endif
}
}
for(size_type q = oldnchunks2; q < newnchunks2; q++)
......@@ -2520,19 +2661,27 @@ namespace INMOST
free(chunks[q]);
chunks[q] = NULL;
}
#if defined(NEW_CHUNKS)
chunks.clear();
#else
free((void *)chunks);
chunks = NULL;
#endif
m_size = 0;
}
chunk_bulk_array(size_type set_record_size = 1)
{
record_size = set_record_size;
m_size = 0;
#if !defined(NEW_CHUNKS)
chunks = NULL;
#endif
}
chunk_bulk_array(const chunk_bulk_array & other)
{
#if !defined(NEW_CHUNKS)
chunks = NULL;
#endif
record_size = other.record_size;
m_size = 0;
inner_resize(other.size());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment