<email@andreas-eberle.com>
<http://www.gnu.org/licenses/>
#ifndef EBERLE_PARALLEL_LCP_MERGE_BINARY_SPLITTING_H_
#define EBERLE_PARALLEL_LCP_MERGE_BINARY_SPLITTING_H_
#include "eberle-parallel-lcp-merge.h"
#include "../tools/debug.h"
#undef DBGX
#define DBGX DBGX_OMP
namespace eberle_parallel_lcp_merge
{
static const bool debug_binary_splitting = false;
static const bool debug_binary_splitting_splits_count = true;
static inline void
createJobsBinarySplitting(JobQueue &jobQueue, const LcpCacheStringPtr* inputStreams, unsigned numInputs, string* output, size_t numberOfElements);
template <unsigned K>
struct MergeJobBinarySplitting : public Job
{
LcpStringLoserTree<K> loserTree;
string* output;
size_t length;
bool splittable;
MergeJobBinarySplitting(const LcpCacheStringPtr* inputs, unsigned numInputs, string* output, size_t length, bool splittable)
: loserTree(inputs, numInputs), output(output), length(length), splittable(splittable)
{
g_mergeJobsCreated++;
DBG(debug_jobtype_on_creation, "MergeJobStandardSplitting<" << K << "> (output: " << (output - g_outputBase) << ", length: " << length << ")");
}
inline bool shouldShareWork(JobQueue& jobQueue){
return USE_WORK_SHARING && splittable && jobQueue.has_idle() && length > SHARE_WORK_THRESHOLD;
}
inline bool
mergeToOutput(JobQueue& jobQueue)
{
for (size_t lastLength = length; length >= MERGE_BULK_SIZE; length -= MERGE_BULK_SIZE, output += MERGE_BULK_SIZE)
{
if (g_lengthOfLongestJob == lastLength)
g_lengthOfLongestJob = length;
if (g_lengthOfLongestJob < length)
g_lengthOfLongestJob = length;
else if (shouldShareWork(jobQueue) && g_lengthOfLongestJob == length)
return false;
loserTree.writeElementsToStream(output, MERGE_BULK_SIZE);
lastLength = length;
}
loserTree.writeElementsToStream(output, length);
return true;
}
virtual bool
run(JobQueue& jobQueue)
{
if(shouldShareWork(jobQueue))
{
createJobsBinarySplitting(jobQueue, loserTree.getRemaining(), K, output, length);
}
else{
loserTree.initTree(0);
if (!mergeToOutput(jobQueue))
{
createJobsBinarySplitting(jobQueue, loserTree.getRemaining(), K, output, length);
if (g_lengthOfLongestJob == length)
g_lengthOfLongestJob = 0;
}
}
return true;
}
};
static inline void
enqueueBinarySplittingJob(JobQueue &jobQueue, const LcpCacheStringPtr* inputs, unsigned numInputs, string* output, size_t jobLength, bool splittable)
{
if(numInputs == 1)
jobQueue.enqueue(new CopyDataJob(inputs[0], output));
else if(numInputs <= 2)
jobQueue.enqueue(new BinaryMergeJob(inputs[0], inputs[1], 0, output));
else if(numInputs <= 4)
jobQueue.enqueue(new MergeJobBinarySplitting<4>(inputs, numInputs, output, jobLength, splittable));
else if(numInputs <= 8)
jobQueue.enqueue(new MergeJobBinarySplitting<8>(inputs, numInputs, output, jobLength, splittable));
else if(numInputs <= 16)
jobQueue.enqueue(new MergeJobBinarySplitting<16>(inputs, numInputs, output, jobLength, splittable));
else if(numInputs <= 32)
jobQueue.enqueue(new MergeJobBinarySplitting<32>(inputs, numInputs, output, jobLength, splittable));
else if(numInputs <= 64)
jobQueue.enqueue(new MergeJobBinarySplitting<64>(inputs, numInputs, output, jobLength, splittable));
else
{
DBG(1, "Can't create job with that many streams. Add more cases.");
abort();
}
}
static inline void
createJobsBinarySplitting(JobQueue &jobQueue, const LcpCacheStringPtr* inputStreams, unsigned numInputs, string* output, size_t numberOfElements)
{
DBG(debug_binary_splitting, "CREATING JOBS for numberOfElements: " << numberOfElements);
g_splittingsExecuted++;
ClockTimer splittingTimer;
splittingTimer.start();
const unsigned numSplitters = numInputs;
string splitters[numSplitters];
LcpCacheStringPtr streams[numSplitters];
unsigned nonEmptyStreams = 0;
for(unsigned i = 0; i < numInputs; i++)
{
streams[i] = inputStreams[i];
if(!streams[i].empty())
{
splitters[nonEmptyStreams] = streams[i].strings[streams[i].size / 2];
++nonEmptyStreams;
}
}
string splitterString = splitters[unsigned(rand() % nonEmptyStreams)];
DBG(debug_binary_splitting, "SplitterString: " << splitterString);
LcpCacheStringPtr jobStreams[2][numInputs];
unsigned nonEmptyCtr[2] = {0, 0};
unsigned jobLength[2] = {0, 0};
for(unsigned i = 0; i < numInputs; i++)
{
LcpCacheStringPtr stream = streams[i];
if(!stream.empty())
{
const size_t idx = stream.binarySearch(splitterString);
jobStreams[0][nonEmptyCtr[0]] = stream.sub(0, idx);
nonEmptyCtr[0]++;
jobLength[0] += idx;
DBG(debug_binary_splitting, "Found at [" << idx << "]: ");
const size_t restLength = stream.size - idx;
if(restLength > 0)
{
jobStreams[1][nonEmptyCtr[1]] = stream.sub(idx, restLength);
nonEmptyCtr[1]++;
jobLength[1] += restLength;
}
}
}
if(jobLength[0] > 0) {
enqueueBinarySplittingJob(jobQueue, jobStreams[0], nonEmptyCtr[0], output, jobLength[0], jobLength[1] > 0);
output += jobLength[0];
}
if(jobLength[1] > 0) {
enqueueBinarySplittingJob(jobQueue, jobStreams[1], nonEmptyCtr[1], output, jobLength[1], jobLength[0] > 0);
}
g_splittingTime += splittingTimer.elapsed();
}
static inline void
parallelLcpMergeBinarySplitting(const LcpCacheStringPtr* inputs, unsigned numInputs, string* output, size_t length)
{
g_outputBase = output;
g_splittingsExecuted = 0;
g_mergeJobsCreated = 0;
g_splittingTime = 0;
ClockTimer timer;
timer.start();
JobQueue jobQueue;
DBG(debug_merge_start_message, "doing parallel lcp merge for " << numInputs << " input streams using " << omp_get_max_threads() << " threads with binary splitting");
enqueueBinarySplittingJob(jobQueue, inputs, numInputs, output, length, true);
jobQueue.numaLoop(-1, omp_get_max_threads());
DBG(debug_binary_splitting_splits_count, "Binary Splitting executed " << g_splittingsExecuted << " splittings; created " << g_mergeJobsCreated << " jobs");
g_stats >> "toplevelmerge_time" << timer.elapsed();
g_stats >> "splittings_executed" << g_splittingsExecuted;
g_stats >> "mergejobs_created" << g_mergeJobsCreated;
g_stats >> "splitting_time" << g_splittingTime;
}
}
#endif