http://stxxl.sourceforge.net
<dementiev@mpi-sb.mpg.de>
<beckmann@cs.uni-frankfurt.de>
http://www.boost.org/LICENSE_1_0.txt
#include <stxxl/bits/algo/async_schedule.h>
#include <stxxl/bits/common/types.h>
#include <stxxl/bits/io/file.h>
#include <stxxl/bits/namespace.h>
#include <stxxl/bits/parallel.h>
#include <stxxl/bits/unused.h>
#include <stxxl/bits/verbose.h>
#include <algorithm>
#include <cassert>
#include <functional>
#include <queue>
#include <utility>
#include <vector>
STXXL_BEGIN_NAMESPACE
namespace async_schedule_local {
struct sim_event
{
int_type timestamp;
int_type iblock;
inline sim_event(int_type t, int_type b) : timestamp(t), iblock(b) { }
};
struct sim_event_cmp : public std::binary_function<sim_event, sim_event, bool>
{
inline bool operator () (const sim_event& a, const sim_event& b) const
{
return a.timestamp > b.timestamp;
}
};
typedef std::pair<int_type, int_type> write_time_pair;
struct write_time_cmp : public std::binary_function<write_time_pair, write_time_pair, bool>
{
inline bool operator () (const write_time_pair& a, const write_time_pair& b) const
{
return a.second > b.second;
}
};
static inline int_type get_disk(int_type i, const int_type* disks, int_type D)
{
int_type disk = disks[i];
if (disk == file::NO_ALLOCATOR)
disk = D;
assert(0 <= disk && disk <= D);
return disk;
}
int_type simulate_async_write(
const int_type* disks,
const int_type L,
const int_type m_init,
const int_type D,
std::pair<int_type, int_type>* o_time)
{
typedef std::priority_queue<sim_event, std::vector<sim_event>, sim_event_cmp> event_queue_type;
typedef std::queue<int_type> disk_queue_type;
assert(L >= D);
disk_queue_type* disk_queues = new disk_queue_type[D + 1];
event_queue_type event_queue;
int_type m = m_init;
int_type i = L - 1;
int_type oldtime = 0;
bool* disk_busy = new bool[D + 1];
while (m && (i >= 0))
{
int_type disk = get_disk(i, disks, D);
disk_queues[disk].push(i);
i--;
m--;
}
for (int_type ii = 0; ii <= D; ii++)
if (!disk_queues[ii].empty())
{
int_type j = disk_queues[ii].front();
disk_queues[ii].pop();
event_queue.push(sim_event(1, j));
}
while (!event_queue.empty())
{
sim_event cur = event_queue.top();
event_queue.pop();
if (oldtime != cur.timestamp)
{
for (int_type i = 0; i <= D; i++)
disk_busy[i] = false;
oldtime = cur.timestamp;
}
STXXL_VERBOSE1("Block " << cur.iblock << " put out, time " << cur.timestamp << " disk: " << disks[cur.iblock]);
o_time[cur.iblock] = std::pair<int_type, int_type>(cur.iblock, cur.timestamp);
if (i >= 0)
{
int_type disk = get_disk(i, disks, D);
if (disk_busy[disk])
{
disk_queues[disk].push(i--);
}
else
{
if (!disk_queues[disk].empty())
{
STXXL_VERBOSE1("c Block " << disk_queues[disk].front() << " scheduled for time " << cur.timestamp + 1);
event_queue.push(sim_event(cur.timestamp + 1, disk_queues[disk].front()));
disk_queues[disk].pop();
}
else
{
STXXL_VERBOSE1("a Block " << i << " scheduled for time " << cur.timestamp + 1);
event_queue.push(sim_event(cur.timestamp + 1, i--));
}
disk_busy[disk] = true;
}
}
int_type disk = get_disk(cur.iblock, disks, D);
if (!disk_busy[disk] && !disk_queues[disk].empty())
{
STXXL_VERBOSE1("b Block " << disk_queues[disk].front() << " scheduled for time " << cur.timestamp + 1);
event_queue.push(sim_event(cur.timestamp + 1, disk_queues[disk].front()));
disk_queues[disk].pop();
disk_busy[disk] = true;
}
}
assert(i == -1);
for (int_type i = 0; i <= D; i++)
assert(disk_queues[i].empty());
delete[] disk_busy;
delete[] disk_queues;
return (oldtime - 1);
}
}
void compute_prefetch_schedule(
const int_type* first,
const int_type* last,
int_type* out_first,
int_type m,
int_type D)
{
typedef std::pair<int_type, int_type> pair_type;
int_type L = last - first;
if (L <= D)
{
for (int_type i = 0; i < L; ++i)
out_first[i] = i;
return;
}
pair_type* write_order = new pair_type[L];
int_type w_steps = async_schedule_local::simulate_async_write(first, L, m, D, write_order);
STXXL_VERBOSE1("Write steps: " << w_steps);
for (int_type i = 0; i < L; i++)
STXXL_VERBOSE1(first[i] << " " << write_order[i].first << " " << write_order[i].second);
std::stable_sort(write_order, write_order + L, async_schedule_local::write_time_cmp() _STXXL_FORCE_SEQUENTIAL);
for (int_type i = 0; i < L; i++)
{
out_first[i] = write_order[i].first;
STXXL_VERBOSE1(i << " " << out_first[i]);
}
delete[] write_order;
STXXL_UNUSED(w_steps);
}
STXXL_END_NAMESPACE