#include "concurrent_monitor.h"
namespace tbb {
namespace internal {
void concurrent_monitor::thread_context::init() {
new (sema.begin()) binary_semaphore;
ready = true;
}
concurrent_monitor::~concurrent_monitor() {
abort_all();
__TBB_ASSERT( waitset_ec.empty(), "waitset not empty?" );
}
void concurrent_monitor::prepare_wait( thread_context& thr, uintptr_t ctx ) {
if( !thr.ready )
thr.init();
else if( thr.spurious ) {
thr.spurious = false;
thr.semaphore().P();
}
thr.context = ctx;
thr.in_waitset = true;
{
tbb::spin_mutex::scoped_lock l( mutex_ec );
__TBB_store_relaxed( thr.epoch, __TBB_load_relaxed(epoch) );
waitset_ec.add( (waitset_t::node_t*)&thr );
}
atomic_fence();
}
void concurrent_monitor::cancel_wait( thread_context& thr ) {
thr.spurious = true;
bool th_in_waitset = thr.in_waitset;
if( th_in_waitset ) {
tbb::spin_mutex::scoped_lock l( mutex_ec );
if (thr.in_waitset) {
thr.in_waitset = false;
thr.spurious = false;
waitset_ec.remove( (waitset_t::node_t&)thr );
}
}
}
void concurrent_monitor::notify_one_relaxed() {
if( waitset_ec.empty() )
return;
waitset_node_t* n;
const waitset_node_t* end = waitset_ec.end();
{
tbb::spin_mutex::scoped_lock l( mutex_ec );
__TBB_store_relaxed( epoch, __TBB_load_relaxed(epoch) + 1 );
n = waitset_ec.front();
if( n!=end ) {
waitset_ec.remove( *n );
to_thread_context(n)->in_waitset = false;
}
}
if( n!=end )
to_thread_context(n)->semaphore().V();
}
void concurrent_monitor::notify_all_relaxed() {
if( waitset_ec.empty() )
return;
dllist_t temp;
const waitset_node_t* end;
{
tbb::spin_mutex::scoped_lock l( mutex_ec );
__TBB_store_relaxed( epoch, __TBB_load_relaxed(epoch) + 1 );
waitset_ec.flush_to( temp );
end = temp.end();
for( waitset_node_t* n=temp.front(); n!=end; n=n->next )
to_thread_context(n)->in_waitset = false;
}
waitset_node_t* nxt;
for( waitset_node_t* n=temp.front(); n!=end; n=nxt ) {
nxt = n->next;
to_thread_context(n)->semaphore().V();
}
#if TBB_USE_ASSERT
temp.clear();
#endif
}
void concurrent_monitor::abort_all_relaxed() {
if( waitset_ec.empty() )
return;
dllist_t temp;
const waitset_node_t* end;
{
tbb::spin_mutex::scoped_lock l( mutex_ec );
__TBB_store_relaxed( epoch, __TBB_load_relaxed(epoch) + 1 );
waitset_ec.flush_to( temp );
end = temp.end();
for( waitset_node_t* n=temp.front(); n!=end; n=n->next )
to_thread_context(n)->in_waitset = false;
}
waitset_node_t* nxt;
for( waitset_node_t* n=temp.front(); n!=end; n=nxt ) {
nxt = n->next;
to_thread_context(n)->aborted = true;
to_thread_context(n)->semaphore().V();
}
#if TBB_USE_ASSERT
temp.clear();
#endif
}
}
}