#include "tbb/tbb_stddef.h"
#include "tbb/tbb_machine.h"
#include "tbb/tbb_exception.h"
#define __TBB_concurrent_queue_H
#include "tbb/internal/_concurrent_queue_impl.h"
#include "concurrent_monitor.h"
#include "itt_notify.h"
#include <new>
#if !TBB_USE_EXCEPTIONS && _MSC_VER
#pragma warning (push)
#pragma warning (disable: 4530)
#endif
#include <cstring>
#if !TBB_USE_EXCEPTIONS && _MSC_VER
#pragma warning (pop)
#endif
using namespace std;
#if defined(_MSC_VER) && defined(_Wp64)
#pragma warning (disable: 4267)
#endif
#define RECORD_EVENTS 0
namespace tbb {
namespace internal {
typedef concurrent_queue_base_v3 concurrent_queue_base;
typedef size_t ticket;
struct micro_queue {
typedef concurrent_queue_base::page page;
friend class micro_queue_pop_finalizer;
atomic<page*> head_page;
atomic<ticket> head_counter;
atomic<page*> tail_page;
atomic<ticket> tail_counter;
spin_mutex page_mutex;
void push( const void* item, ticket k, concurrent_queue_base& base );
void abort_push( ticket k, concurrent_queue_base& base );
bool pop( void* dst, ticket k, concurrent_queue_base& base );
micro_queue& assign( const micro_queue& src, concurrent_queue_base& base );
page* make_copy ( concurrent_queue_base& base, const page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) ;
void make_invalid( ticket k );
};
class micro_queue_pop_finalizer: no_copy {
typedef concurrent_queue_base::page page;
ticket my_ticket;
micro_queue& my_queue;
page* my_page;
concurrent_queue_base &base;
public:
micro_queue_pop_finalizer( micro_queue& queue, concurrent_queue_base& b, ticket k, page* p ) :
my_ticket(k), my_queue(queue), my_page(p), base(b)
{}
~micro_queue_pop_finalizer() {
page* p = my_page;
if( p ) {
spin_mutex::scoped_lock lock( my_queue.page_mutex );
page* q = p->next;
my_queue.head_page = q;
if( !q ) {
my_queue.tail_page = NULL;
}
}
my_queue.head_counter = my_ticket;
if( p )
base.deallocate_page( p );
}
};
struct predicate_leq {
ticket t;
predicate_leq( ticket t_ ) : t(t_) {}
bool operator() ( uintptr_t p ) const {return (ticket)p<=t;}
};
class concurrent_queue_rep {
public:
private:
friend struct micro_queue;
static const size_t phi = 3;
public:
static const size_t n_queue = 8;
static size_t index( ticket k ) {
return k*phi%n_queue;
}
atomic<ticket> head_counter;
concurrent_monitor items_avail;
atomic<size_t> n_invalid_entries;
char pad1[NFS_MaxLineSize-((sizeof(atomic<ticket>)+sizeof(concurrent_monitor)+sizeof(atomic<size_t>))&(NFS_MaxLineSize-1))];
atomic<ticket> tail_counter;
concurrent_monitor slots_avail;
char pad2[NFS_MaxLineSize-((sizeof(atomic<ticket>)+sizeof(concurrent_monitor))&(NFS_MaxLineSize-1))];
micro_queue array[n_queue];
micro_queue& choose( ticket k ) {
return array[index(k)];
}
static const ptrdiff_t infinite_capacity = ptrdiff_t(~size_t(0)/2);
};
#if _MSC_VER && !defined(__INTEL_COMPILER)
#pragma warning( push )
#pragma warning( disable: 4146 )
#endif
static void* invalid_page;
void micro_queue::push( const void* item, ticket k, concurrent_queue_base& base ) {
k &= -concurrent_queue_rep::n_queue;
page* p = NULL;
size_t index = k/concurrent_queue_rep::n_queue & (base.items_per_page-1);
if( !index ) {
__TBB_TRY {
p = base.allocate_page();
} __TBB_CATCH(...) {
++base.my_rep->n_invalid_entries;
make_invalid( k );
}
p->mask = 0;
p->next = NULL;
}
if( tail_counter!=k ) {
atomic_backoff backoff;
do {
backoff.pause();
if( tail_counter&0x1 ) {
++base.my_rep->n_invalid_entries;
throw_exception( eid_bad_last_alloc );
}
} while( tail_counter!=k ) ;
}
if( p ) {
spin_mutex::scoped_lock lock( page_mutex );
if( page* q = tail_page )
q->next = p;
else
head_page = p;
tail_page = p;
}
if (item) {
p = tail_page;
ITT_NOTIFY( sync_acquired, p );
__TBB_TRY {
base.copy_item( *p, index, item );
} __TBB_CATCH(...) {
++base.my_rep->n_invalid_entries;
tail_counter += concurrent_queue_rep::n_queue;
__TBB_RETHROW();
}
ITT_NOTIFY( sync_releasing, p );
p->mask |= uintptr_t(1)<<index;
}
else
++base.my_rep->n_invalid_entries;
tail_counter += concurrent_queue_rep::n_queue;
}
void micro_queue::abort_push( ticket k, concurrent_queue_base& base ) {
push(NULL, k, base);
}
bool micro_queue::pop( void* dst, ticket k, concurrent_queue_base& base ) {
k &= -concurrent_queue_rep::n_queue;
spin_wait_until_eq( head_counter, k );
spin_wait_while_eq( tail_counter, k );
page& p = *head_page;
__TBB_ASSERT( &p, NULL );
size_t index = k/concurrent_queue_rep::n_queue & (base.items_per_page-1);
bool success = false;
{
micro_queue_pop_finalizer finalizer( *this, base, k+concurrent_queue_rep::n_queue, index==base.items_per_page-1 ? &p : NULL );
if( p.mask & uintptr_t(1)<<index ) {
success = true;
ITT_NOTIFY( sync_acquired, dst );
ITT_NOTIFY( sync_acquired, head_page );
base.assign_and_destroy_item( dst, p, index );
ITT_NOTIFY( sync_releasing, head_page );
} else {
--base.my_rep->n_invalid_entries;
}
}
return success;
}
micro_queue& micro_queue::assign( const micro_queue& src, concurrent_queue_base& base )
{
head_counter = src.head_counter;
tail_counter = src.tail_counter;
page_mutex = src.page_mutex;
const page* srcp = src.head_page;
if( srcp ) {
ticket g_index = head_counter;
__TBB_TRY {
size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep::n_queue;
size_t index = head_counter/concurrent_queue_rep::n_queue & (base.items_per_page-1);
size_t end_in_first_page = (index+n_items<base.items_per_page)?(index+n_items):base.items_per_page;
head_page = make_copy( base, srcp, index, end_in_first_page, g_index );
page* cur_page = head_page;
if( srcp != src.tail_page ) {
for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
cur_page->next = make_copy( base, srcp, 0, base.items_per_page, g_index );
cur_page = cur_page->next;
}
__TBB_ASSERT( srcp==src.tail_page, NULL );
size_t last_index = tail_counter/concurrent_queue_rep::n_queue & (base.items_per_page-1);
if( last_index==0 ) last_index = base.items_per_page;
cur_page->next = make_copy( base, srcp, 0, last_index, g_index );
cur_page = cur_page->next;
}
tail_page = cur_page;
} __TBB_CATCH(...) {
make_invalid( g_index );
}
} else {
head_page = tail_page = NULL;
}
return *this;
}
concurrent_queue_base::page* micro_queue::make_copy( concurrent_queue_base& base, const concurrent_queue_base::page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index )
{
page* new_page = base.allocate_page();
new_page->next = NULL;
new_page->mask = src_page->mask;
for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
if( new_page->mask & uintptr_t(1)<<begin_in_page )
base.copy_page_item( *new_page, begin_in_page, *src_page, begin_in_page );
return new_page;
}
void micro_queue::make_invalid( ticket k )
{
static concurrent_queue_base::page dummy = {static_cast<page*>((void*)1), 0};
invalid_page = &dummy;
{
spin_mutex::scoped_lock lock( page_mutex );
tail_counter = k+concurrent_queue_rep::n_queue+1;
if( page* q = tail_page )
q->next = static_cast<page*>(invalid_page);
else
head_page = static_cast<page*>(invalid_page);
tail_page = static_cast<page*>(invalid_page);
}
__TBB_RETHROW();
}
#if _MSC_VER && !defined(__INTEL_COMPILER)
#pragma warning( pop )
#endif
concurrent_queue_base_v3::concurrent_queue_base_v3( size_t item_sz ) {
items_per_page = item_sz<= 8 ? 32 :
item_sz<= 16 ? 16 :
item_sz<= 32 ? 8 :
item_sz<= 64 ? 4 :
item_sz<=128 ? 2 :
1;
my_capacity = size_t(-1)/(item_sz>1 ? item_sz : 2);
my_rep = cache_aligned_allocator<concurrent_queue_rep>().allocate(1);
__TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
__TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
__TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
__TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
memset(my_rep,0,sizeof(concurrent_queue_rep));
new ( &my_rep->items_avail ) concurrent_monitor();
new ( &my_rep->slots_avail ) concurrent_monitor();
this->item_size = item_sz;
}
concurrent_queue_base_v3::~concurrent_queue_base_v3() {
size_t nq = my_rep->n_queue;
for( size_t i=0; i<nq; i++ )
__TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
cache_aligned_allocator<concurrent_queue_rep>().deallocate(my_rep,1);
}
void concurrent_queue_base_v3::internal_push( const void* src ) {
concurrent_queue_rep& r = *my_rep;
ticket k = r.tail_counter++;
ptrdiff_t e = my_capacity;
#if DO_ITT_NOTIFY
bool sync_prepare_done = false;
#endif
if( (ptrdiff_t)(k-r.head_counter)>=e ) {
#if DO_ITT_NOTIFY
if( !sync_prepare_done ) {
ITT_NOTIFY( sync_prepare, &sync_prepare_done );
sync_prepare_done = true;
}
#endif
bool slept = false;
concurrent_monitor::thread_context thr_ctx;
r.slots_avail.prepare_wait( thr_ctx, ((ptrdiff_t)(k-e)) );
while( (ptrdiff_t)(k-r.head_counter)>=const_cast<volatile ptrdiff_t&>(e = my_capacity) ) {
__TBB_TRY {
slept = r.slots_avail.commit_wait( thr_ctx );
} __TBB_CATCH( tbb::user_abort& ) {
r.choose(k).abort_push(k, *this);
__TBB_RETHROW();
} __TBB_CATCH(...) {
__TBB_RETHROW();
}
if (slept == true) break;
r.slots_avail.prepare_wait( thr_ctx, ((ptrdiff_t)(k-e)) );
}
if( !slept )
r.slots_avail.cancel_wait( thr_ctx );
}
ITT_NOTIFY( sync_acquired, &sync_prepare_done );
__TBB_ASSERT( (ptrdiff_t)(k-r.head_counter)<my_capacity, NULL);
r.choose( k ).push( src, k, *this );
r.items_avail.notify( predicate_leq(k) );
}
void concurrent_queue_base_v3::internal_pop( void* dst ) {
concurrent_queue_rep& r = *my_rep;
ticket k;
#if DO_ITT_NOTIFY
bool sync_prepare_done = false;
#endif
do {
k=r.head_counter++;
if ( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
#if DO_ITT_NOTIFY
if( !sync_prepare_done ) {
ITT_NOTIFY( sync_prepare, dst );
sync_prepare_done = true;
}
#endif
bool slept = false;
concurrent_monitor::thread_context thr_ctx;
r.items_avail.prepare_wait( thr_ctx, k );
while( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
__TBB_TRY {
slept = r.items_avail.commit_wait( thr_ctx );
} __TBB_CATCH( tbb::user_abort& ) {
r.head_counter--;
__TBB_RETHROW();
} __TBB_CATCH(...) {
__TBB_RETHROW();
}
if (slept == true) break;
r.items_avail.prepare_wait( thr_ctx, k );
}
if( !slept )
r.items_avail.cancel_wait( thr_ctx );
}
__TBB_ASSERT((ptrdiff_t)(r.tail_counter-k)>0, NULL);
} while( !r.choose(k).pop(dst,k,*this) );
r.slots_avail.notify( predicate_leq(k) );
}
void concurrent_queue_base_v3::internal_abort() {
concurrent_queue_rep& r = *my_rep;
r.items_avail.abort_all();
r.slots_avail.abort_all();
}
bool concurrent_queue_base_v3::internal_pop_if_present( void* dst ) {
concurrent_queue_rep& r = *my_rep;
ticket k;
do {
k = r.head_counter;
for(;;) {
if( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
return false;
}
ticket tk=k;
k = r.head_counter.compare_and_swap( tk+1, tk );
if( k==tk )
break;
}
} while( !r.choose( k ).pop( dst, k, *this ) );
r.slots_avail.notify( predicate_leq(k) );
return true;
}
bool concurrent_queue_base_v3::internal_push_if_not_full( const void* src ) {
concurrent_queue_rep& r = *my_rep;
ticket k = r.tail_counter;
for(;;) {
if( (ptrdiff_t)(k-r.head_counter)>=my_capacity ) {
return false;
}
ticket tk=k;
k = r.tail_counter.compare_and_swap( tk+1, tk );
if( k==tk )
break;
}
r.choose(k).push(src,k,*this);
r.items_avail.notify( predicate_leq(k) );
return true;
}
ptrdiff_t concurrent_queue_base_v3::internal_size() const {
__TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
return ptrdiff_t(my_rep->tail_counter-my_rep->head_counter-my_rep->n_invalid_entries);
}
bool concurrent_queue_base_v3::internal_empty() const {
ticket tc = my_rep->tail_counter;
ticket hc = my_rep->head_counter;
return ( tc==my_rep->tail_counter && ptrdiff_t(tc-hc-my_rep->n_invalid_entries)<=0 );
}
void concurrent_queue_base_v3::internal_set_capacity( ptrdiff_t capacity, size_t ) {
my_capacity = capacity<0 ? concurrent_queue_rep::infinite_capacity : capacity;
}
void concurrent_queue_base_v3::internal_finish_clear() {
size_t nq = my_rep->n_queue;
for( size_t i=0; i<nq; ++i ) {
page* tp = my_rep->array[i].tail_page;
__TBB_ASSERT( my_rep->array[i].head_page==tp, "at most one page should remain" );
if( tp!=NULL) {
if( tp!=invalid_page ) deallocate_page( tp );
my_rep->array[i].tail_page = NULL;
}
}
}
void concurrent_queue_base_v3::internal_throw_exception() const {
throw_exception( eid_bad_alloc );
}
void concurrent_queue_base_v3::assign( const concurrent_queue_base& src ) {
items_per_page = src.items_per_page;
my_capacity = src.my_capacity;
my_rep->head_counter = src.my_rep->head_counter;
my_rep->tail_counter = src.my_rep->tail_counter;
my_rep->n_invalid_entries = src.my_rep->n_invalid_entries;
for( size_t i = 0; i<my_rep->n_queue; ++i )
my_rep->array[i].assign( src.my_rep->array[i], *this);
__TBB_ASSERT( my_rep->head_counter==src.my_rep->head_counter && my_rep->tail_counter==src.my_rep->tail_counter,
"the source concurrent queue should not be concurrently modified." );
}
class concurrent_queue_iterator_rep: no_assign {
public:
ticket head_counter;
const concurrent_queue_base& my_queue;
const size_t offset_of_last;
concurrent_queue_base::page* array[concurrent_queue_rep::n_queue];
concurrent_queue_iterator_rep( const concurrent_queue_base& queue, size_t offset_of_last_ ) :
head_counter(queue.my_rep->head_counter),
my_queue(queue),
offset_of_last(offset_of_last_)
{
const concurrent_queue_rep& rep = *queue.my_rep;
for( size_t k=0; k<concurrent_queue_rep::n_queue; ++k )
array[k] = rep.array[k].head_page;
}
bool get_item( void*& item, size_t k ) {
if( k==my_queue.my_rep->tail_counter ) {
item = NULL;
return true;
} else {
concurrent_queue_base::page* p = array[concurrent_queue_rep::index(k)];
__TBB_ASSERT(p,NULL);
size_t i = k/concurrent_queue_rep::n_queue & (my_queue.items_per_page-1);
item = static_cast<unsigned char*>(static_cast<void*>(p)) + offset_of_last + my_queue.item_size*i;
return (p->mask & uintptr_t(1)<<i)!=0;
}
}
};
void concurrent_queue_iterator_base_v3::initialize( const concurrent_queue_base& queue, size_t offset_of_last ) {
my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep>().allocate(1);
new( my_rep ) concurrent_queue_iterator_rep(queue,offset_of_last);
size_t k = my_rep->head_counter;
if( !my_rep->get_item(my_item, k) ) advance();
}
concurrent_queue_iterator_base_v3::concurrent_queue_iterator_base_v3( const concurrent_queue_base& queue ) {
initialize(queue,0);
}
concurrent_queue_iterator_base_v3::concurrent_queue_iterator_base_v3( const concurrent_queue_base& queue, size_t offset_of_last ) {
initialize(queue,offset_of_last);
}
void concurrent_queue_iterator_base_v3::assign( const concurrent_queue_iterator_base& other ) {
if( my_rep!=other.my_rep ) {
if( my_rep ) {
cache_aligned_allocator<concurrent_queue_iterator_rep>().deallocate(my_rep, 1);
my_rep = NULL;
}
if( other.my_rep ) {
my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep>().allocate(1);
new( my_rep ) concurrent_queue_iterator_rep( *other.my_rep );
}
}
my_item = other.my_item;
}
void concurrent_queue_iterator_base_v3::advance() {
__TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
size_t k = my_rep->head_counter;
const concurrent_queue_base& queue = my_rep->my_queue;
#if TBB_USE_ASSERT
void* tmp;
my_rep->get_item(tmp,k);
__TBB_ASSERT( my_item==tmp, NULL );
#endif
size_t i = k/concurrent_queue_rep::n_queue & (queue.items_per_page-1);
if( i==queue.items_per_page-1 ) {
concurrent_queue_base::page*& root = my_rep->array[concurrent_queue_rep::index(k)];
root = root->next;
}
my_rep->head_counter = ++k;
if( !my_rep->get_item(my_item, k) ) advance();
}
concurrent_queue_iterator_base_v3::~concurrent_queue_iterator_base_v3() {
cache_aligned_allocator<concurrent_queue_iterator_rep>().deallocate(my_rep, 1);
my_rep = NULL;
}
}
}