<tb@panthema.net>
<http://www.gnu.org/licenses/>
#ifndef JOBQUEUE_H_
#define JOBQUEUE_H_
#include <iostream>
#include <assert.h>
#include <omp.h>
#include <numa.h>
#include "src/config.h"
#if defined(HAVE_ATOMIC_H)
#include <atomic>
#elif defined(HAVE_CSTDATOMIC_H)
#include <cstdatomic>
#endif
#include <tbb/concurrent_queue.h>
#include "../tools/debug.h"
#undef DBGX
#define DBGX DBGX_OMP
#include "../tools/agglogger.h"
#include "../tools/timer.h"
#include "../tools/timer_array.h"
extern std::string gopt_memory_type;
extern size_t g_numa_nodes;
namespace jobqueue {
static const bool debug_queue = false;
template <typename CookieType>
class JobT
{
public:
virtual ~JobT()
{ }
typedef CookieType cookie_type;
virtual bool run(cookie_type& cookie) = 0;
};
template <typename CookieType>
class DefaultJobQueueGroup;
template <typename CookieType,
template <typename> class JobQueueGroupType = DefaultJobQueueGroup>
class JobQueueT
{
public:
typedef CookieType cookie_type;
typedef JobT<CookieType> job_type;
typedef JobQueueGroupType<CookieType> jobqueuegroup_type;
private:
tbb::concurrent_queue<job_type*> m_queue;
unsigned m_numthrs;
std::atomic<unsigned int> m_idle_count;
cookie_type& m_cookie;
jobqueuegroup_type* m_group;
unsigned m_id;
typedef AggregateLogger<unsigned int> IntLogger;
typedef IntLogger::DummyLogger logger_type;
logger_type m_logger, m_work_logger;
public:
typedef TimerArrayDummy TimerArrayMT;
enum { TM_WORK, TM_IDLE };
TimerArrayMT m_timers;
public:
JobQueueT(cookie_type& cookie,
jobqueuegroup_type* group)
: m_queue(),
m_numthrs(0),
m_idle_count(0),
m_cookie(cookie),
m_group(group),
m_logger("jobqueue.txt", 0.005, 10000),
m_work_logger("worker_count.txt", 0.005, 10000),
m_timers(2)
{
}
bool has_idle() const
{
return (m_idle_count != 0);
}
void enqueue(job_type* job)
{
m_queue.push(job);
m_logger << m_queue.unsafe_size();
}
void set_id(unsigned id)
{
m_id = id;
}
bool try_run()
{
job_type* job = NULL;
if (!m_queue.try_pop(job))
return (m_idle_count != m_numthrs);
m_logger << m_queue.unsafe_size();
if (job->run(m_cookie))
delete job;
return true;
}
inline void executeThreadWork()
{
job_type* job = NULL;
m_numthrs = omp_get_num_threads();
m_timers.change(TM_WORK);
m_logger.start();
m_work_logger.start();
while (true)
{
while (m_queue.try_pop(job))
{
m_logger << m_queue.unsafe_size();
if (job->run(m_cookie))
delete job;
}
DBG(debug_queue, "Queue" << m_id << " is empty");
m_timers.change(TM_IDLE);
++m_idle_count;
m_logger << m_queue.unsafe_size();
m_work_logger << (m_numthrs - m_idle_count);
while (!m_queue.try_pop(job))
{
DBG(debug_queue, "Idle thread - m_idle_count: " << m_idle_count);
if (
m_idle_count == m_numthrs)
{
while (m_group->assist(m_id)) { }
return;
}
}
m_timers.change(TM_WORK);
--m_idle_count;
m_logger << m_queue.unsafe_size();
m_work_logger << (m_numthrs - m_idle_count);
if (job->run(m_cookie))
delete job;
}
}
void loop()
{
m_timers.start(omp_get_max_threads());
#pragma omp parallel
{
if (gopt_memory_type == "mmap_node0")
{
numa_run_on_node(0);
numa_set_preferred(0);
}
executeThreadWork();
}
m_timers.stop();
assert(m_queue.unsafe_size() == 0);
}
void numaLoop(int numaNode, int numberOfThreads)
{
m_timers.start(omp_get_max_threads());
#pragma omp parallel num_threads(numberOfThreads)
{
numa_run_on_node(numaNode);
numa_set_preferred(numaNode);
executeThreadWork();
}
m_timers.stop();
assert(m_queue.unsafe_size() == 0);
}
};
template <typename CookieType>
class DefaultJobQueueGroup
{
public:
typedef JobQueueT<CookieType, DefaultJobQueueGroup> jobqueue_type;
typedef JobT<CookieType> job_type;
public:
static inline bool assist(unsigned)
{
return false;
}
};
template <typename CookieType>
class NumaJobQueueGroup
{
public:
typedef JobQueueT<CookieType, NumaJobQueueGroup> jobqueue_type;
typedef JobT<CookieType> job_type;
protected:
std::vector<jobqueue_type*> m_queues;
public:
void add_jobqueue(jobqueue_type* jq)
{
jq->set_id(m_queues.size());
m_queues.push_back(jq);
}
static unsigned calcThreadNum(int k, int numJobQueues)
{
int realNumaNodes = numa_num_configured_nodes();
if (realNumaNodes < 1) realNumaNodes = 1;
int numThreadsPerNode = omp_get_max_threads() / numJobQueues;
int remainThreads = omp_get_max_threads() % numJobQueues;
int nodeThreads = numThreadsPerNode;
if (k < remainThreads) nodeThreads++;
DBG(1, "JobQueue[" << k << "] prospective " << nodeThreads << " threads");
return nodeThreads;
}
void numaLaunch()
{
int realNumaNodes = numa_num_configured_nodes();
if (realNumaNodes < 1) realNumaNodes = 1;
if (realNumaNodes == 1) {
DBG(1, "No or just one NUMA nodes detected on the system.");
DBG(1, "Continuing anyway, at your own peril!");
}
g_stats >> "num_real_numa_nodes" << realNumaNodes;
if ((int)m_queues.size() != realNumaNodes || g_numa_nodes == 0)
{
DBG(1, "!!! WARNING !!! emulating NUMA nodes! "
<< "Remove --numa-nodes for REAL EXPERIMENTS.");
}
g_stats >> "num_jobqueues" << m_queues.size();
int numJobQueues = m_queues.size();
int numThreadsPerNode = omp_get_max_threads() / numJobQueues;
int remainThreads = omp_get_max_threads() % numJobQueues;
if (numThreadsPerNode == 0)
{
DBG(1, "Fewer threads than NUMA nodes detected.");
DBG(1, "Strange things may happen, continuing anyway, at your own peril!");
}
int runThreads = std::min(omp_get_max_threads(), numJobQueues);
omp_set_nested(true);
ClockTimer timer;
#pragma omp parallel for num_threads(runThreads) schedule(dynamic)
for (int k = 0; k < numJobQueues; k++)
{
int nodeThreads = numThreadsPerNode;
int numaNode = k % realNumaNodes;
if (k < remainThreads) nodeThreads++;
DBG(1, "JobQueue[" << k << "] gets " << nodeThreads << " threads");
if (nodeThreads == 0) nodeThreads = 1;
ClockTimer timer;
m_queues[k]->numaLoop(numaNode, nodeThreads);
DBG(1, "JobQueue[" << k << "] took : " << timer.elapsed() << " s");
}
}
bool assist(unsigned qid)
{
unsigned id = qid;
for (unsigned i = 1; i < m_queues.size(); ++i)
{
if (++id >= m_queues.size()) id = 0;
if (m_queues[id]->try_run())
{
DBG(debug_queue, "JobQueue[" << qid << "] assisted " << id);
return true;
}
}
return false;
}
};
class JobQueue : public JobQueueT<JobQueue>
{
public:
typedef JobQueueT<JobQueue> super_type;
JobQueue()
: super_type(*this, NULL)
{
}
void loop()
{
return super_type::loop();
}
void numaLoop(int numaNode, int numberOfThreads)
{
return super_type::numaLoop(numaNode, numberOfThreads);
}
};
typedef JobT<JobQueue> Job;
}
#endif