#ifndef __TBB_concurrent_monitor_H
#define __TBB_concurrent_monitor_H
#include "tbb/tbb_stddef.h"
#include "tbb/atomic.h"
#include "tbb/spin_mutex.h"
#include "tbb/tbb_exception.h"
#include "tbb/aligned_space.h"
#include "semaphore.h"
namespace tbb {
namespace internal {
class circular_doubly_linked_list_with_sentinel : no_copy {
public:
struct node_t {
node_t* next;
node_t* prev;
explicit node_t() : next((node_t*)(uintptr_t)0xcdcdcdcd), prev((node_t*)(uintptr_t)0xcdcdcdcd) {}
};
circular_doubly_linked_list_with_sentinel() {clear();}
~circular_doubly_linked_list_with_sentinel() {__TBB_ASSERT( head.next==&head && head.prev==&head, "the list is not empty" );}
inline size_t size() const {return count;}
inline bool empty() const {return size()==0;}
inline node_t* front() const {return head.next;}
inline node_t* last() const {return head.prev;}
inline node_t* begin() const {return front();}
inline const node_t* end() const {return &head;}
inline void add( node_t* n ) {
__TBB_store_relaxed(count, __TBB_load_relaxed(count) + 1);
n->prev = head.prev;
n->next = &head;
head.prev->next = n;
head.prev = n;
}
inline void remove( node_t& n ) {
__TBB_store_relaxed(count, __TBB_load_relaxed(count) - 1);
n.prev->next = n.next;
n.next->prev = n.prev;
}
inline void flush_to( circular_doubly_linked_list_with_sentinel& lst ) {
if( const size_t l_count = __TBB_load_relaxed(count) ) {
__TBB_store_relaxed(lst.count, l_count);
lst.head.next = head.next;
lst.head.prev = head.prev;
head.next->prev = &lst.head;
head.prev->next = &lst.head;
clear();
}
}
void clear() {head.next = head.prev = &head; __TBB_store_relaxed(count, 0);}
private:
__TBB_atomic size_t count;
node_t head;
};
typedef circular_doubly_linked_list_with_sentinel waitset_t;
typedef circular_doubly_linked_list_with_sentinel dllist_t;
typedef circular_doubly_linked_list_with_sentinel::node_t waitset_node_t;
class concurrent_monitor : no_copy {
public:
class thread_context : waitset_node_t, no_copy {
friend class concurrent_monitor;
public:
thread_context() : spurious(false), aborted(false), ready(false), context(0) {
epoch = 0;
in_waitset = false;
}
~thread_context() {
if (ready) {
if( spurious ) semaphore().P();
semaphore().~binary_semaphore();
}
}
binary_semaphore& semaphore() { return *sema.begin(); }
private:
__TBB_NOINLINE( void init() );
tbb::aligned_space<binary_semaphore, 1> sema;
__TBB_atomic unsigned epoch;
tbb::atomic<bool> in_waitset;
bool spurious;
bool aborted;
bool ready;
uintptr_t context;
};
concurrent_monitor() {__TBB_store_relaxed(epoch, 0);}
~concurrent_monitor() ;
void prepare_wait( thread_context& thr, uintptr_t ctx = 0 );
inline bool commit_wait( thread_context& thr ) {
const bool do_it = thr.epoch == __TBB_load_relaxed(epoch);
if( do_it ) {
__TBB_ASSERT( thr.ready, "use of commit_wait() without prior prepare_wait()");
thr.semaphore().P();
__TBB_ASSERT( !thr.in_waitset, "still in the queue?" );
if( thr.aborted )
throw_exception( eid_user_abort );
} else {
cancel_wait( thr );
}
return do_it;
}
void cancel_wait( thread_context& thr );
template<typename WaitUntil, typename Context>
void wait( WaitUntil until, Context on );
void notify_one() {atomic_fence(); notify_one_relaxed();}
void notify_one_relaxed();
void notify_all() {atomic_fence(); notify_all_relaxed();}
void notify_all_relaxed();
template<typename P> void notify( const P& predicate ) {atomic_fence(); notify_relaxed( predicate );}
template<typename P> void notify_relaxed( const P& predicate );
void abort_all() {atomic_fence(); abort_all_relaxed(); }
void abort_all_relaxed();
private:
tbb::spin_mutex mutex_ec;
waitset_t waitset_ec;
__TBB_atomic unsigned epoch;
thread_context* to_thread_context( waitset_node_t* n ) { return static_cast<thread_context*>(n); }
};
template<typename WaitUntil, typename Context>
void concurrent_monitor::wait( WaitUntil until, Context on )
{
bool slept = false;
thread_context thr_ctx;
prepare_wait( thr_ctx, on() );
while( !until() ) {
if( (slept = commit_wait( thr_ctx ) )==true )
if( until() ) break;
slept = false;
prepare_wait( thr_ctx, on() );
}
if( !slept )
cancel_wait( thr_ctx );
}
template<typename P>
void concurrent_monitor::notify_relaxed( const P& predicate ) {
if( waitset_ec.empty() )
return;
dllist_t temp;
waitset_node_t* nxt;
const waitset_node_t* end = waitset_ec.end();
{
tbb::spin_mutex::scoped_lock l( mutex_ec );
__TBB_store_relaxed(epoch, __TBB_load_relaxed(epoch) + 1);
for( waitset_node_t* n=waitset_ec.last(); n!=end; n=nxt ) {
nxt = n->prev;
thread_context* thr = to_thread_context( n );
if( predicate( thr->context ) ) {
waitset_ec.remove( *n );
thr->in_waitset = false;
temp.add( n );
}
}
}
end = temp.end();
for( waitset_node_t* n=temp.front(); n!=end; n=nxt ) {
nxt = n->next;
to_thread_context(n)->semaphore().V();
}
#if TBB_USE_ASSERT
temp.clear();
#endif
}
}
}
#endif