#ifndef _FILE_OFFSET_BITS
#define _FILE_OFFSET_BITS 64
#endif
#include "stx-execpipe.h"
#include <stdexcept>
#include <sstream>
#include <iostream>
#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/select.h>
#define LOG_OUTPUT(msg, level) \
do { \
if (m_debug_level >= level) { \
std::ostringstream oss; \
oss << msg; \
if (m_debug_output) \
m_debug_output(oss.str().c_str()); \
else \
std::cout << oss.str() << std::endl; \
} \
} while (0)
#define LOG_ERROR(msg) LOG_OUTPUT(msg, ExecPipe::DL_ERROR)
#define LOG_INFO(msg) LOG_OUTPUT(msg, ExecPipe::DL_INFO)
#define LOG_DEBUG(msg) LOG_OUTPUT(msg, ExecPipe::DL_DEBUG)
#define LOG_TRACE(msg) LOG_OUTPUT(msg, ExecPipe::DL_TRACE)
namespace stx {
#ifndef _STX_RINGBUFFER_H_
#define _STX_RINGBUFFER_H_
namespace {
<pre>
</pre>
<pre>
</pre>
class RingBuffer
{
private:
char* m_data;
unsigned int m_buffsize;
unsigned int m_size;
unsigned int m_bottom;
public:
inline RingBuffer()
: m_data(NULL),
m_buffsize(0), m_size(0), m_bottom(0)
{
}
inline ~RingBuffer()
{
if (m_data) free(m_data);
}
inline unsigned int size() const
{
return m_size;
}
inline unsigned int buffsize() const
{
return m_buffsize;
}
inline void clear()
{
m_size = m_bottom = 0;
}
inline char* bottom() const
{
return m_data + m_bottom;
}
inline unsigned int bottomsize() const
{
return (m_bottom + m_size > m_buffsize)
? (m_buffsize - m_bottom)
: (m_size);
}
inline void advance(unsigned int n)
{
assert(m_size >= n);
m_bottom += n;
m_size -= n;
if (m_bottom >= m_buffsize) m_bottom -= m_buffsize;
}
void write(const void *src, unsigned int len)
{
if (len == 0) return;
if (m_buffsize < m_size + len)
{
unsigned int newbuffsize = m_buffsize;
while (newbuffsize < m_size + len)
{
if (newbuffsize == 0) newbuffsize = 1024;
else newbuffsize = newbuffsize * 2;
}
m_data = static_cast<char*>(realloc(m_data, newbuffsize));
if (m_bottom + m_size > m_buffsize)
{
unsigned int taillen = m_buffsize - m_bottom;
memcpy(m_data + newbuffsize - taillen,
m_data + m_bottom, taillen);
m_bottom = newbuffsize - taillen;
}
m_buffsize = newbuffsize;
}
if (m_bottom + m_size > m_buffsize)
{
memcpy(m_data + m_bottom + m_size - m_buffsize, src, len);
m_size += len;
}
else
{
unsigned int tailfit = m_buffsize - (m_bottom + m_size);
if (tailfit >= len)
{
memcpy(m_data + m_bottom + m_size, src, len);
m_size += len;
}
else
{
memcpy(m_data + m_bottom + m_size, src, tailfit);
memcpy(m_data, reinterpret_cast<const char*>(src) + tailfit,
len - tailfit);
m_size += len;
}
}
}
};
}
#endif
class ExecPipeImpl
{
private:
unsigned int m_refs;
private:
enum ExecPipe::DebugLevel m_debug_level;
void (*m_debug_output)(const char* line);
public:
void set_debug_level(enum ExecPipe::DebugLevel dl)
{
m_debug_level = dl;
}
void set_debug_output(void (*output)(const char *line))
{
m_debug_output = output;
}
private:
enum StreamType
{
ST_NONE = 0,
ST_FD,
ST_FILE,
ST_STRING,
ST_OBJECT
};
StreamType m_input;
int m_input_fd;
const char* m_input_file;
const std::string* m_input_string;
std::string::size_type m_input_string_pos;
PipeSource* m_input_source;
RingBuffer m_input_rbuffer;
StreamType m_output;
int m_output_fd;
const char* m_output_file;
int m_output_file_mode;
std::string* m_output_string;
PipeSink* m_output_sink;
struct Stage
{
std::vector<std::string> args;
const char* prog;
const std::vector<std::string>* argsp;
const std::vector<std::string>* envp;
PipeFunction* func;
RingBuffer outbuffer;
bool withpath;
pid_t pid;
int retstatus;
int stdin_fd;
int stdout_fd;
Stage()
: prog(NULL), argsp(NULL), envp(NULL), func(NULL),
withpath(false), pid(0), retstatus(0),
stdin_fd(-1), stdout_fd(-1)
{
}
};
typedef std::vector<Stage> stagelist_type;
stagelist_type m_stages;
char m_buffer[4096];
public:
ExecPipeImpl()
: m_refs(0),
m_debug_level(ExecPipe::DL_ERROR),
m_debug_output(NULL),
m_input(ST_NONE),
m_input_fd(-1),
m_output(ST_NONE),
m_output_fd(-1)
{
}
unsigned int& refs()
{
return m_refs;
}
void set_input_fd(int fd)
{
assert(m_input == ST_NONE);
if (m_input != ST_NONE) return;
m_input = ST_FD;
m_input_fd = fd;
}
void set_input_file(const char* path)
{
assert(m_input == ST_NONE);
if (m_input != ST_NONE) return;
m_input = ST_FILE;
m_input_file = path;
}
void set_input_string(const std::string* input)
{
assert(m_input == ST_NONE);
if (m_input != ST_NONE) return;
m_input = ST_STRING;
m_input_string = input;
m_input_string_pos = 0;
}
void set_input_source(PipeSource* source)
{
assert(m_input == ST_NONE);
if (m_input != ST_NONE) return;
m_input = ST_OBJECT;
m_input_source = source;
source->m_impl = this;
}
void input_source_write(const void* data, unsigned int datalen)
{
m_input_rbuffer.write(data, datalen);
}
void set_output_fd(int fd)
{
assert(m_output == ST_NONE);
if (m_output != ST_NONE) return;
m_output = ST_FD;
m_output_fd = fd;
}
void set_output_file(const char* path, int mode = 0666)
{
assert(m_output == ST_NONE);
if (m_output != ST_NONE) return;
m_output = ST_FILE;
m_output_file = path;
m_output_file_mode = mode;
}
void set_output_string(std::string* output)
{
assert(m_output == ST_NONE);
if (m_output != ST_NONE) return;
m_output = ST_STRING;
m_output_string = output;
}
void set_output_sink(PipeSink* sink)
{
assert(m_output == ST_NONE);
if (m_output != ST_NONE) return;
m_output = ST_OBJECT;
m_output_sink = sink;
}
unsigned int size() const
{
return m_stages.size();
}
void add_exec(const char* prog)
{
struct Stage newstage;
newstage.prog = prog;
newstage.args.push_back(prog);
m_stages.push_back(newstage);
}
void add_exec(const char* prog, const char* arg1)
{
struct Stage newstage;
newstage.prog = prog;
newstage.args.push_back(prog);
newstage.args.push_back(arg1);
m_stages.push_back(newstage);
}
void add_exec(const char* prog, const char* arg1, const char* arg2)
{
struct Stage newstage;
newstage.prog = prog;
newstage.args.push_back(prog);
newstage.args.push_back(arg1);
newstage.args.push_back(arg2);
m_stages.push_back(newstage);
}
void add_exec(const char* prog, const char* arg1, const char* arg2, const char* arg3)
{
struct Stage newstage;
newstage.prog = prog;
newstage.args.push_back(prog);
newstage.args.push_back(arg1);
newstage.args.push_back(arg2);
newstage.args.push_back(arg3);
m_stages.push_back(newstage);
}
void add_exec(const std::vector<std::string>* args)
{
assert(args->size() > 0);
if (args->size() == 0) return;
struct Stage newstage;
newstage.prog = (*args)[0].c_str();
newstage.argsp = args;
m_stages.push_back(newstage);
}
void add_execp(const char* prog)
{
struct Stage newstage;
newstage.prog = prog;
newstage.args.push_back(prog);
newstage.withpath = true;
m_stages.push_back(newstage);
}
void add_execp(const char* prog, const char* arg1)
{
struct Stage newstage;
newstage.prog = prog;
newstage.args.push_back(prog);
newstage.args.push_back(arg1);
newstage.withpath = true;
m_stages.push_back(newstage);
}
void add_execp(const char* prog, const char* arg1, const char* arg2)
{
struct Stage newstage;
newstage.prog = prog;
newstage.args.push_back(prog);
newstage.args.push_back(arg1);
newstage.args.push_back(arg2);
newstage.withpath = true;
m_stages.push_back(newstage);
}
void add_execp(const char* prog, const char* arg1, const char* arg2, const char* arg3)
{
struct Stage newstage;
newstage.prog = prog;
newstage.args.push_back(prog);
newstage.args.push_back(arg1);
newstage.args.push_back(arg2);
newstage.args.push_back(arg3);
newstage.withpath = true;
m_stages.push_back(newstage);
}
void add_execp(const std::vector<std::string>* args)
{
assert(args->size() > 0);
if (args->size() == 0) return;
struct Stage newstage;
newstage.prog = (*args)[0].c_str();
newstage.argsp = args;
newstage.withpath = true;
m_stages.push_back(newstage);
}
void add_exece(const char* path,
const std::vector<std::string>* argsp,
const std::vector<std::string>* envp)
{
assert(path && argsp);
assert(argsp->size() > 0);
if (argsp->size() == 0) return;
struct Stage newstage;
newstage.prog = path;
newstage.argsp = argsp;
newstage.envp = envp;
m_stages.push_back(newstage);
}
void add_function(PipeFunction* func)
{
assert(func);
if (!func) return;
func->m_impl = this;
func->m_stageid = m_stages.size();
struct Stage newstage;
newstage.func = func;
m_stages.push_back(newstage);
}
void stage_function_write(unsigned int st, const void* data, unsigned int datalen)
{
assert(st < m_stages.size());
return m_stages[st].outbuffer.write(data, datalen);
}
void run();
int get_return_status(unsigned int stageid) const
{
assert(stageid < m_stages.size());
assert(!m_stages[stageid].func);
return m_stages[stageid].retstatus;
}
int get_return_code(unsigned int stageid) const
{
assert(stageid < m_stages.size());
assert(!m_stages[stageid].func);
if (WIFEXITED(m_stages[stageid].retstatus))
return WEXITSTATUS(m_stages[stageid].retstatus);
else
return -1;
}
int get_return_signal(unsigned int stageid) const
{
assert(stageid < m_stages.size());
assert(!m_stages[stageid].func);
if (WIFSIGNALED(m_stages[stageid].retstatus))
return WTERMSIG(m_stages[stageid].retstatus);
else
return -1;
}
bool all_return_codes_zero() const
{
for (unsigned int i = 0; i < m_stages.size(); ++i)
{
if (m_stages[i].func) continue;
if (get_return_code(i) != 0)
return false;
}
return true;
}
protected:
void exec_stage(const Stage& stage);
void print_exec(const std::vector<std::string>& args);
void sclose(int fd);
};
void ExecPipeImpl::print_exec(const std::vector<std::string>& args)
{
std::ostringstream oss;
oss << "Exec()";
for (unsigned ai = 0; ai < args.size(); ++ai)
{
oss << " " << args[ai];
}
LOG_INFO(oss.str());
}
void ExecPipeImpl::exec_stage(const Stage& stage)
{
const std::vector<std::string>& args = stage.argsp ? *stage.argsp : stage.args;
const char* cargs[args.size()+1];
for (unsigned ai = 0; ai < args.size(); ++ai)
{
cargs[ai] = args[ai].c_str();
}
cargs[ args.size() ] = NULL;
if (!stage.envp)
{
if (stage.withpath)
execvp(stage.prog, (char* const*)cargs);
else
execv(stage.prog, (char* const*)cargs);
}
else
{
const char* cenv[args.size()+1];
for (unsigned ei = 0; ei < stage.envp->size(); ++ei)
{
cenv[ei] = (*stage.envp)[ei].c_str();
}
cenv[ stage.envp->size() ] = NULL;
execve(stage.prog, (char* const*)cargs, (char* const*)cenv);
}
LOG_ERROR("Error executing child process: " << strerror(errno));
}
void ExecPipeImpl::sclose(int fd)
{
int r = close(fd);
if (r != 0) {
LOG_ERROR("Could not correctly close fd: " << strerror(errno));
}
}
void ExecPipeImpl::run()
{
if (m_stages.size() == 0)
throw(std::runtime_error("No stages to in exec pipe."));
switch(m_input)
{
case ST_NONE:
m_stages[0].stdin_fd = -1;
break;
case ST_STRING:
case ST_OBJECT: {
int pipefd[2];
if (pipe(pipefd) != 0)
throw(std::runtime_error(std::string("Could not create an input pipe: ") + strerror(errno)));
if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) != 0)
throw(std::runtime_error(std::string("Could not set non-block mode on input pipe: ") + strerror(errno)));
m_input_fd = pipefd[1];
m_stages[0].stdin_fd = pipefd[0];
break;
}
case ST_FILE: {
int infd = open(m_input_file, O_RDONLY);
if (infd < 0)
throw(std::runtime_error(std::string("Could not open input file: ") + strerror(errno)));
m_stages[0].stdin_fd = infd;
break;
}
case ST_FD:
m_stages[0].stdin_fd = m_input_fd;
m_input_fd = -1;
break;
}
for (unsigned int i = 0; i < m_stages.size() - 1; ++i)
{
int pipefd[2];
if (pipe(pipefd) != 0)
throw(std::runtime_error(std::string("Could not create a stage pipe: ") + strerror(errno)));
m_stages[i].stdout_fd = pipefd[1];
m_stages[i+1].stdin_fd = pipefd[0];
if (m_stages[i].func)
{
if (fcntl(m_stages[i].stdout_fd, F_SETFL, O_NONBLOCK) != 0)
throw(std::runtime_error(std::string("Could not set non-block mode on a stage pipe: ") + strerror(errno)));
}
if (m_stages[i+1].func)
{
if (fcntl(m_stages[i+1].stdin_fd, F_SETFL, O_NONBLOCK) != 0)
throw(std::runtime_error(std::string("Could not set non-block mode on a stage pipe: ") + strerror(errno)));
}
}
switch(m_output)
{
case ST_NONE:
m_stages.back().stdout_fd = -1;
break;
case ST_STRING:
case ST_OBJECT: {
int pipefd[2];
if (pipe(pipefd) != 0)
throw(std::runtime_error(std::string("Could not create an output pipe: ") + strerror(errno)));
if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) != 0)
throw(std::runtime_error(std::string("Could not set non-block mode on output pipe: ") + strerror(errno)));
m_stages.back().stdout_fd = pipefd[1];
m_output_fd = pipefd[0];
break;
}
case ST_FILE: {
int outfd = open(m_output_file, O_WRONLY | O_CREAT | O_TRUNC, m_output_file_mode);
if (outfd < 0)
throw(std::runtime_error(std::string("Could not open output file: ") + strerror(errno)));
m_stages.back().stdout_fd = outfd;
break;
}
case ST_FD:
m_stages.back().stdout_fd = m_output_fd;
m_output_fd = -1;
break;
}
for (unsigned int i = 0; i < m_stages.size(); ++i)
{
if (m_stages[i].func) continue;
print_exec(m_stages[i].args);
pid_t child = fork();
if (child == 0)
{
if (m_input_fd >= 0)
sclose(m_input_fd);
for (unsigned int j = 0; j < m_stages.size(); ++j)
{
if (i == j)
{
if (m_stages[i].stdin_fd >= 0)
{
if (dup2(m_stages[i].stdin_fd, STDIN_FILENO) == -1) {
LOG_ERROR("Could not redirect file descriptor: " << strerror(errno));
exit(255);
}
}
if (m_stages[i].stdout_fd >= 0)
{
if (dup2(m_stages[i].stdout_fd, STDOUT_FILENO) == -1) {
LOG_ERROR("Could not redirect file descriptor: " << strerror(errno));
exit(255);
}
}
}
else
{
if (m_stages[j].stdin_fd >= 0)
sclose(m_stages[j].stdin_fd);
if (m_stages[j].stdout_fd >= 0)
sclose(m_stages[j].stdout_fd);
}
}
if (m_output_fd >= 0)
sclose(m_output_fd);
exec_stage(m_stages[i]);
exit(255);
}
m_stages[i].pid = child;
}
for (stagelist_type::const_iterator st = m_stages.begin();
st != m_stages.end(); ++st)
{
if (st->func) continue;
if (st->stdin_fd >= 0)
sclose(st->stdin_fd);
if (st->stdout_fd >= 0)
sclose(st->stdout_fd);
}
while(1)
{
int max_fds = -1;
fd_set read_fds, write_fds;
FD_ZERO(&read_fds);
FD_ZERO(&write_fds);
if (m_input_fd >= 0)
{
if (m_input == ST_OBJECT)
{
assert(m_input_source);
if (!m_input_rbuffer.size() && !m_input_source->poll() && !m_input_rbuffer.size())
{
sclose(m_input_fd);
m_input_fd = -1;
LOG_INFO("Closing input file descriptor: " << strerror(errno));
}
else
{
FD_SET(m_input_fd, &write_fds);
if (max_fds < m_input_fd) max_fds = m_input_fd;
LOG_DEBUG("Select on input file descriptor");
}
}
else
{
FD_SET(m_input_fd, &write_fds);
if (max_fds < m_input_fd) max_fds = m_input_fd;
LOG_DEBUG("Select on input file descriptor");
}
}
for (unsigned int i = 0; i < m_stages.size(); ++i)
{
if (!m_stages[i].func) continue;
if (m_stages[i].stdin_fd >= 0)
{
FD_SET(m_stages[i].stdin_fd, &read_fds);
if (max_fds < m_stages[i].stdin_fd) max_fds = m_stages[i].stdin_fd;
LOG_DEBUG("Select on stage input file descriptor");
}
if (m_stages[i].stdout_fd >= 0)
{
if (m_stages[i].outbuffer.size())
{
FD_SET(m_stages[i].stdout_fd, &write_fds);
if (max_fds < m_stages[i].stdout_fd) max_fds = m_stages[i].stdout_fd;
LOG_DEBUG("Select on stage output file descriptor");
}
else if (m_stages[i].stdin_fd < 0 && !m_stages[i].outbuffer.size())
{
sclose(m_stages[i].stdout_fd);
m_stages[i].stdout_fd = -1;
LOG_INFO("Close stage output file descriptor");
}
}
}
if (m_output_fd >= 0)
{
FD_SET(m_output_fd, &read_fds);
if (max_fds < m_output_fd) max_fds = m_output_fd;
LOG_DEBUG("Select on output file descriptor");
}
if (max_fds < 0)
break;
int retval = select(max_fds+1, &read_fds, &write_fds, NULL, NULL);
if (retval < 0)
throw(std::runtime_error(std::string("Error during select() on file descriptors: ") + strerror(errno)));
LOG_TRACE("select() on " << retval << " file descriptors: " << strerror(errno));
if (m_input_fd >= 0 && FD_ISSET(m_input_fd, &write_fds))
{
if (m_input == ST_STRING)
{
assert(m_input_string);
assert(m_input_string_pos < m_input_string->size());
ssize_t wb;
do
{
wb = write(m_input_fd,
m_input_string->data() + m_input_string_pos,
m_input_string->size() - m_input_string_pos);
LOG_TRACE("Write on input fd: " << wb);
if (wb < 0)
{
if (errno == EAGAIN || errno == EINTR)
{
}
else
{
LOG_DEBUG("Error writing to input file descriptor: " << strerror(errno));
sclose(m_input_fd);
m_input_fd = -1;
LOG_INFO("Closing input file descriptor: " << strerror(errno));
}
}
else if (wb > 0)
{
m_input_string_pos += wb;
if (m_input_string_pos >= m_input_string->size())
{
sclose(m_input_fd);
m_input_fd = -1;
LOG_INFO("Closing input file descriptor: " << strerror(errno));
break;
}
}
} while (wb > 0);
}
else if (m_input == ST_OBJECT)
{
ssize_t wb;
do
{
wb = write(m_input_fd,
m_input_rbuffer.bottom(),
m_input_rbuffer.bottomsize());
LOG_TRACE("Write on input fd: " << wb);
if (wb < 0)
{
if (errno == EAGAIN || errno == EINTR)
{
}
else
{
LOG_INFO("Error writing to input file descriptor: " << strerror(errno));
sclose(m_input_fd);
m_input_fd = -1;
LOG_INFO("Closing input file descriptor: " << strerror(errno));
}
}
else if (wb > 0)
{
m_input_rbuffer.advance(wb);
}
} while (wb > 0);
}
}
if (m_output_fd >= 0 && FD_ISSET(m_output_fd, &read_fds))
{
ssize_t rb;
do
{
errno = 0;
rb = read(m_output_fd,
m_buffer, sizeof(m_buffer));
LOG_TRACE("Read on output fd: " << rb);
if (rb <= 0)
{
if (rb == 0 && errno == 0)
{
LOG_INFO("Closing output file descriptor: " << strerror(errno));
if (m_output == ST_OBJECT)
{
assert(m_output_sink);
m_output_sink->eof();
}
sclose(m_output_fd);
m_output_fd = -1;
}
else if (errno == EAGAIN || errno == EINTR)
{
}
else
{
LOG_ERROR("Error reading from output file descriptor: " << strerror(errno));
}
}
else
{
if (m_output == ST_STRING)
{
assert(m_output_string);
m_output_string->append(m_buffer, rb);
}
else if (m_output == ST_OBJECT)
{
assert(m_output_sink);
m_output_sink->process(m_buffer, rb);
}
}
} while (rb > 0);
}
for (unsigned int i = 0; i < m_stages.size(); ++i)
{
if (!m_stages[i].func) continue;
if (m_stages[i].stdin_fd >= 0 && FD_ISSET(m_stages[i].stdin_fd, &read_fds))
{
ssize_t rb;
do
{
errno = 0;
rb = read(m_stages[i].stdin_fd,
m_buffer, sizeof(m_buffer));
LOG_TRACE("Read on stage fd: " << rb);
if (rb <= 0)
{
if (rb == 0 && errno == 0)
{
LOG_INFO("Closing stage input file descriptor: " << strerror(errno));
m_stages[i].func->eof();
sclose(m_stages[i].stdin_fd);
m_stages[i].stdin_fd = -1;
}
else if (errno == EAGAIN || errno == EINTR)
{
}
else
{
LOG_ERROR("Error reading from stage input file descriptor: " << strerror(errno));
}
}
else
{
m_stages[i].func->process(m_buffer, rb);
}
} while (rb > 0);
}
if (m_stages[i].stdout_fd >= 0 && FD_ISSET(m_stages[i].stdout_fd, &write_fds))
{
while (m_stages[i].outbuffer.size() > 0)
{
ssize_t wb = write(m_stages[i].stdout_fd,
m_stages[i].outbuffer.bottom(),
m_stages[i].outbuffer.bottomsize());
LOG_TRACE("Write on stage fd: " << wb);
if (wb < 0)
{
if (errno == EAGAIN || errno == EINTR)
{
}
else
{
LOG_INFO("Error writing to stage output file descriptor: " << strerror(errno));
}
break;
}
else if (wb > 0)
{
m_stages[i].outbuffer.advance(wb);
}
}
if (m_stages[i].stdin_fd < 0 && !m_stages[i].outbuffer.size())
{
LOG_INFO("Closing stage output file descriptor: " << strerror(errno));
sclose(m_stages[i].stdout_fd);
m_stages[i].stdout_fd = -1;
}
}
}
}
unsigned int donepid = 0;
for (unsigned int i = 0; i < m_stages.size(); ++i)
{
if (!m_stages[i].func) continue;
++donepid;
}
while (donepid != m_stages.size())
{
int status;
int p = wait(&status);
if (p < 0)
{
LOG_ERROR("Error calling wait(): " << strerror(errno));
break;
}
bool found = false;
for (unsigned int i = 0; i < m_stages.size(); ++i)
{
if (p == m_stages[i].pid)
{
m_stages[i].retstatus = status;
if (WIFEXITED(status))
{
LOG_INFO("Finished exec() stage " << p << " with retcode " << WEXITSTATUS(status));
}
else if (WIFSIGNALED(status))
{
LOG_INFO("Finished exec() stage " << p << " with signal " << WTERMSIG(status));
}
else
{
LOG_ERROR("Error in wait(): unknown return status for pid " << p);
}
++donepid;
found = true;
break;
}
}
if (!found)
{
LOG_ERROR("Error in wait(): syscall returned an unknown child pid.");
}
}
LOG_INFO("Finished running pipe.");
}
ExecPipe::ExecPipe()
: m_impl(new ExecPipeImpl)
{
++m_impl->refs();
}
ExecPipe::~ExecPipe()
{
if (--m_impl->refs() == 0)
delete m_impl;
}
ExecPipe::ExecPipe(const ExecPipe& ep)
: m_impl(ep.m_impl)
{
++m_impl->refs();
}
ExecPipe& ExecPipe::operator=(const ExecPipe& ep)
{
if (this != &ep)
{
if (--m_impl->refs() == 0)
delete m_impl;
m_impl = ep.m_impl;
++m_impl->refs();
}
return *this;
}
void ExecPipe::set_debug_level(enum DebugLevel dl)
{
return m_impl->set_debug_level(dl);
}
void ExecPipe::set_debug_output(void (*output)(const char *line))
{
return m_impl->set_debug_output(output);
}
void ExecPipe::set_input_fd(int fd)
{
return m_impl->set_input_fd(fd);
}
void ExecPipe::set_input_file(const char* path)
{
return m_impl->set_input_file(path);
}
void ExecPipe::set_input_string(const std::string* input)
{
return m_impl->set_input_string(input);
}
void ExecPipe::set_input_source(PipeSource* source)
{
return m_impl->set_input_source(source);
}
void ExecPipe::set_output_fd(int fd)
{
return m_impl->set_output_fd(fd);
}
void ExecPipe::set_output_file(const char* path, int mode)
{
return m_impl->set_output_file(path, mode);
}
void ExecPipe::set_output_string(std::string* output)
{
return m_impl->set_output_string(output);
}
void ExecPipe::set_output_sink(PipeSink* sink)
{
return m_impl->set_output_sink(sink);
}
unsigned int ExecPipe::size() const
{
return m_impl->size();
}
void ExecPipe::add_exec(const char* prog)
{
return m_impl->add_exec(prog);
}
void ExecPipe::add_exec(const char* prog, const char* arg1)
{
return m_impl->add_exec(prog, arg1);
}
void ExecPipe::add_exec(const char* prog, const char* arg1, const char* arg2)
{
return m_impl->add_exec(prog, arg1, arg2);
}
void ExecPipe::add_exec(const char* prog, const char* arg1, const char* arg2, const char* arg3)
{
return m_impl->add_exec(prog, arg1, arg2, arg3);
}
void ExecPipe::add_exec(const std::vector<std::string>* args)
{
return m_impl->add_exec(args);
}
void ExecPipe::add_execp(const char* prog)
{
return m_impl->add_execp(prog);
}
void ExecPipe::add_execp(const char* prog, const char* arg1)
{
return m_impl->add_execp(prog, arg1);
}
void ExecPipe::add_execp(const char* prog, const char* arg1, const char* arg2)
{
return m_impl->add_execp(prog, arg1, arg2);
}
void ExecPipe::add_execp(const char* prog, const char* arg1, const char* arg2, const char* arg3)
{
return m_impl->add_execp(prog, arg1, arg2, arg3);
}
void ExecPipe::add_execp(const std::vector<std::string>* args)
{
return m_impl->add_execp(args);
}
void ExecPipe::add_exece(const char* path,
const std::vector<std::string>* args,
const std::vector<std::string>* env)
{
return m_impl->add_exece(path, args, env);
}
void ExecPipe::add_function(PipeFunction* func)
{
return m_impl->add_function(func);
}
ExecPipe& ExecPipe::run()
{
m_impl->run();
return *this;
}
int ExecPipe::get_return_status(unsigned int stageid) const
{
return m_impl->get_return_status(stageid);
}
int ExecPipe::get_return_code(unsigned int stageid) const
{
return m_impl->get_return_code(stageid);
}
int ExecPipe::get_return_signal(unsigned int stageid) const
{
return m_impl->get_return_signal(stageid);
}
bool ExecPipe::all_return_codes_zero() const
{
return m_impl->all_return_codes_zero();
}
PipeSource::PipeSource()
: m_impl(NULL)
{
}
void PipeSource::write(const void* data, unsigned int datalen)
{
assert(m_impl);
return m_impl->input_source_write(data, datalen);
}
PipeFunction::PipeFunction()
: m_impl(NULL), m_stageid(0)
{
}
void PipeFunction::write(const void* data, unsigned int datalen)
{
assert(m_impl);
return m_impl->stage_function_write(m_stageid, data, datalen);
}
}