00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "stx-execpipe.h"
00024
00025 #include <stdexcept>
00026 #include <sstream>
00027 #include <iostream>
00028
00029 #include <assert.h>
00030 #include <stdlib.h>
00031 #include <string.h>
00032 #include <unistd.h>
00033 #include <errno.h>
00034 #include <fcntl.h>
00035 #include <sys/types.h>
00036 #include <sys/wait.h>
00037 #include <sys/select.h>
00038
00039 #define LOG_OUTPUT(msg, level) \
00040 do { \
00041 if (m_debug_level >= level) { \
00042 std::ostringstream oss; \
00043 oss << msg; \
00044 if (m_debug_output) \
00045 m_debug_output(oss.str().c_str()); \
00046 else \
00047 std::cout << oss.str() << std::endl; \
00048 } \
00049 } while (0)
00050
00051 #define LOG_ERROR(msg) LOG_OUTPUT(msg, ExecPipe::DL_ERROR)
00052 #define LOG_INFO(msg) LOG_OUTPUT(msg, ExecPipe::DL_INFO)
00053 #define LOG_DEBUG(msg) LOG_OUTPUT(msg, ExecPipe::DL_DEBUG)
00054 #define LOG_TRACE(msg) LOG_OUTPUT(msg, ExecPipe::DL_TRACE)
00055
00056 namespace stx {
00057
00058 #ifndef _STX_RINGBUFFER_H_
00059 #define _STX_RINGBUFFER_H_
00060
00062 namespace {
00063
00098 class RingBuffer
00099 {
00100 private:
00102 char* m_data;
00103
00105 unsigned int m_buffsize;
00106
00108 unsigned int m_size;
00109
00111 unsigned int m_bottom;
00112
00113 public:
00115 inline RingBuffer()
00116 : m_data(NULL),
00117 m_buffsize(0), m_size(0), m_bottom(0)
00118 {
00119 }
00120
00122 inline ~RingBuffer()
00123 {
00124 if (m_data) free(m_data);
00125 }
00126
00128 inline unsigned int size() const
00129 {
00130 return m_size;
00131 }
00132
00134 inline unsigned int buffsize() const
00135 {
00136 return m_buffsize;
00137 }
00138
00140 inline void clear()
00141 {
00142 m_size = m_bottom = 0;
00143 }
00144
00150 inline char* bottom() const
00151 {
00152 return m_data + m_bottom;
00153 }
00154
00156 inline unsigned int bottomsize() const
00157 {
00158 return (m_bottom + m_size > m_buffsize)
00159 ? (m_buffsize - m_bottom)
00160 : (m_size);
00161 }
00162
00167 inline void advance(unsigned int n)
00168 {
00169 assert(m_size >= n);
00170 m_bottom += n;
00171 m_size -= n;
00172 if (m_bottom >= m_buffsize) m_bottom -= m_buffsize;
00173 }
00174
00179 void write(const void *src, unsigned int len)
00180 {
00181 if (len == 0) return;
00182
00183 if (m_buffsize < m_size + len)
00184 {
00185
00186
00187
00188 unsigned int newbuffsize = m_buffsize;
00189 while (newbuffsize < m_size + len)
00190 {
00191 if (newbuffsize == 0) newbuffsize = 1024;
00192 else newbuffsize = newbuffsize * 2;
00193 }
00194
00195 m_data = static_cast<char*>(realloc(m_data, newbuffsize));
00196
00197 if (m_bottom + m_size > m_buffsize)
00198 {
00199
00200
00201
00202 unsigned int taillen = m_buffsize - m_bottom;
00203
00204 memcpy(m_data + newbuffsize - taillen,
00205 m_data + m_bottom, taillen);
00206
00207 m_bottom = newbuffsize - taillen;
00208 }
00209
00210 m_buffsize = newbuffsize;
00211 }
00212
00213
00214
00215
00216 if (m_bottom + m_size > m_buffsize)
00217 {
00218 memcpy(m_data + m_bottom + m_size - m_buffsize, src, len);
00219 m_size += len;
00220 }
00221 else
00222 {
00223
00224 unsigned int tailfit = m_buffsize - (m_bottom + m_size);
00225
00226 if (tailfit >= len)
00227 {
00228 memcpy(m_data + m_bottom + m_size, src, len);
00229 m_size += len;
00230 }
00231 else
00232 {
00233
00234 memcpy(m_data + m_bottom + m_size, src, tailfit);
00235 memcpy(m_data, reinterpret_cast<const char*>(src) + tailfit,
00236 len - tailfit);
00237 m_size += len;
00238 }
00239 }
00240 }
00241 };
00242
00243 }
00244
00245 #endif // _STX_RINGBUFFER_H_
00246
00253 class ExecPipeImpl
00254 {
00255 private:
00256
00258 unsigned int m_refs;
00259
00260 private:
00261
00262
00263
00265 enum ExecPipe::DebugLevel m_debug_level;
00266
00268 void (*m_debug_output)(const char* line);
00269
00270 public:
00271
00273 void set_debug_level(enum ExecPipe::DebugLevel dl)
00274 {
00275 m_debug_level = dl;
00276 }
00277
00280 void set_debug_output(void (*output)(const char *line))
00281 {
00282 m_debug_output = output;
00283 }
00284
00285 private:
00286
00288 enum StreamType
00289 {
00290 ST_NONE = 0,
00291 ST_FD,
00292 ST_FILE,
00293 ST_STRING,
00294 ST_OBJECT
00295 };
00296
00298 StreamType m_input;
00299
00300
00301
00304 int m_input_fd;
00305
00307 const char* m_input_file;
00308
00311 const std::string* m_input_string;
00312
00314 std::string::size_type m_input_string_pos;
00315
00317 PipeSource* m_input_source;
00318
00320 RingBuffer m_input_rbuffer;
00321
00322
00323
00325 StreamType m_output;
00326
00329 int m_output_fd;
00330
00332 const char* m_output_file;
00333
00335 int m_output_file_mode;
00336
00339 std::string* m_output_string;
00340
00342 PipeSink* m_output_sink;
00343
00344
00345
00350 struct Stage
00351 {
00353 std::vector<std::string> args;
00354
00356 const char* prog;
00357
00359 const std::vector<std::string>* argsp;
00360
00362 const std::vector<std::string>* envp;
00363
00365 PipeFunction* func;
00366
00368 RingBuffer outbuffer;
00369
00370
00371
00373 bool withpath;
00374
00376 pid_t pid;
00377
00379 int retstatus;
00380
00382 int stdin_fd;
00383
00385 int stdout_fd;
00386
00388 Stage()
00389 : prog(NULL), argsp(NULL), envp(NULL), func(NULL),
00390 withpath(false), pid(0), retstatus(0),
00391 stdin_fd(-1), stdout_fd(-1)
00392 {
00393 }
00394 };
00395
00397 typedef std::vector<Stage> stagelist_type;
00398
00400 stagelist_type m_stages;
00401
00403 char m_buffer[4096];
00404
00405 public:
00406
00408 ExecPipeImpl()
00409 : m_refs(0),
00410 m_debug_level(ExecPipe::DL_ERROR),
00411 m_debug_output(NULL),
00412 m_input(ST_NONE),
00413 m_input_fd(-1),
00414 m_output(ST_NONE),
00415 m_output_fd(-1)
00416 {
00417 }
00418
00420 unsigned int& refs()
00421 {
00422 return m_refs;
00423 }
00424
00425
00426
00428
00433 void set_input_fd(int fd)
00434 {
00435 assert(m_input == ST_NONE);
00436 if (m_input != ST_NONE) return;
00437
00438 m_input = ST_FD;
00439 m_input_fd = fd;
00440 }
00441
00446 void set_input_file(const char* path)
00447 {
00448 assert(m_input == ST_NONE);
00449 if (m_input != ST_NONE) return;
00450
00451 m_input = ST_FILE;
00452 m_input_file = path;
00453 }
00454
00460 void set_input_string(const std::string* input)
00461 {
00462 assert(m_input == ST_NONE);
00463 if (m_input != ST_NONE) return;
00464
00465 m_input = ST_STRING;
00466 m_input_string = input;
00467 m_input_string_pos = 0;
00468 }
00469
00475 void set_input_source(PipeSource* source)
00476 {
00477 assert(m_input == ST_NONE);
00478 if (m_input != ST_NONE) return;
00479
00480 m_input = ST_OBJECT;
00481 m_input_source = source;
00482 source->m_impl = this;
00483 }
00484
00486
00491 void input_source_write(const void* data, unsigned int datalen)
00492 {
00493 m_input_rbuffer.write(data, datalen);
00494 }
00495
00496
00497
00499
00504 void set_output_fd(int fd)
00505 {
00506 assert(m_output == ST_NONE);
00507 if (m_output != ST_NONE) return;
00508
00509 m_output = ST_FD;
00510 m_output_fd = fd;
00511 }
00512
00517 void set_output_file(const char* path, int mode = 0666)
00518 {
00519 assert(m_output == ST_NONE);
00520 if (m_output != ST_NONE) return;
00521
00522 m_output = ST_FILE;
00523 m_output_file = path;
00524 m_output_file_mode = mode;
00525 }
00526
00532 void set_output_string(std::string* output)
00533 {
00534 assert(m_output == ST_NONE);
00535 if (m_output != ST_NONE) return;
00536
00537 m_output = ST_STRING;
00538 m_output_string = output;
00539 }
00540
00545 void set_output_sink(PipeSink* sink)
00546 {
00547 assert(m_output == ST_NONE);
00548 if (m_output != ST_NONE) return;
00549
00550 m_output = ST_OBJECT;
00551 m_output_sink = sink;
00552 }
00553
00555
00556
00557
00559
00563 unsigned int size() const
00564 {
00565 return m_stages.size();
00566 }
00567
00572 void add_exec(const char* prog)
00573 {
00574 struct Stage newstage;
00575 newstage.prog = prog;
00576 newstage.args.push_back(prog);
00577 m_stages.push_back(newstage);
00578 }
00579
00584 void add_exec(const char* prog, const char* arg1)
00585 {
00586 struct Stage newstage;
00587 newstage.prog = prog;
00588 newstage.args.push_back(prog);
00589 newstage.args.push_back(arg1);
00590 m_stages.push_back(newstage);
00591 }
00592
00597 void add_exec(const char* prog, const char* arg1, const char* arg2)
00598 {
00599 struct Stage newstage;
00600 newstage.prog = prog;
00601 newstage.args.push_back(prog);
00602 newstage.args.push_back(arg1);
00603 newstage.args.push_back(arg2);
00604 m_stages.push_back(newstage);
00605 }
00606
00611 void add_exec(const char* prog, const char* arg1, const char* arg2, const char* arg3)
00612 {
00613 struct Stage newstage;
00614 newstage.prog = prog;
00615 newstage.args.push_back(prog);
00616 newstage.args.push_back(arg1);
00617 newstage.args.push_back(arg2);
00618 newstage.args.push_back(arg3);
00619 m_stages.push_back(newstage);
00620 }
00621
00627 void add_exec(const std::vector<std::string>* args)
00628 {
00629 assert(args->size() > 0);
00630 if (args->size() == 0) return;
00631
00632 struct Stage newstage;
00633 newstage.prog = (*args)[0].c_str();
00634 newstage.argsp = args;
00635 m_stages.push_back(newstage);
00636 }
00637
00643 void add_execp(const char* prog)
00644 {
00645 struct Stage newstage;
00646 newstage.prog = prog;
00647 newstage.args.push_back(prog);
00648 newstage.withpath = true;
00649 m_stages.push_back(newstage);
00650 }
00651
00657 void add_execp(const char* prog, const char* arg1)
00658 {
00659 struct Stage newstage;
00660 newstage.prog = prog;
00661 newstage.args.push_back(prog);
00662 newstage.args.push_back(arg1);
00663 newstage.withpath = true;
00664 m_stages.push_back(newstage);
00665 }
00666
00672 void add_execp(const char* prog, const char* arg1, const char* arg2)
00673 {
00674 struct Stage newstage;
00675 newstage.prog = prog;
00676 newstage.args.push_back(prog);
00677 newstage.args.push_back(arg1);
00678 newstage.args.push_back(arg2);
00679 newstage.withpath = true;
00680 m_stages.push_back(newstage);
00681 }
00682
00688 void add_execp(const char* prog, const char* arg1, const char* arg2, const char* arg3)
00689 {
00690 struct Stage newstage;
00691 newstage.prog = prog;
00692 newstage.args.push_back(prog);
00693 newstage.args.push_back(arg1);
00694 newstage.args.push_back(arg2);
00695 newstage.args.push_back(arg3);
00696 newstage.withpath = true;
00697 m_stages.push_back(newstage);
00698 }
00699
00706 void add_execp(const std::vector<std::string>* args)
00707 {
00708 assert(args->size() > 0);
00709 if (args->size() == 0) return;
00710
00711 struct Stage newstage;
00712 newstage.prog = (*args)[0].c_str();
00713 newstage.argsp = args;
00714 newstage.withpath = true;
00715 m_stages.push_back(newstage);
00716 }
00717
00726 void add_exece(const char* path,
00727 const std::vector<std::string>* argsp,
00728 const std::vector<std::string>* envp)
00729 {
00730 assert(path && argsp);
00731 assert(argsp->size() > 0);
00732 if (argsp->size() == 0) return;
00733
00734 struct Stage newstage;
00735 newstage.prog = path;
00736 newstage.argsp = argsp;
00737 newstage.envp = envp;
00738 m_stages.push_back(newstage);
00739 }
00740
00746 void add_function(PipeFunction* func)
00747 {
00748 assert(func);
00749 if (!func) return;
00750
00751 func->m_impl = this;
00752 func->m_stageid = m_stages.size();
00753
00754 struct Stage newstage;
00755 newstage.func = func;
00756 m_stages.push_back(newstage);
00757 }
00758
00760
00765 void stage_function_write(unsigned int st, const void* data, unsigned int datalen)
00766 {
00767 assert(st < m_stages.size());
00768
00769 return m_stages[st].outbuffer.write(data, datalen);
00770 }
00771
00772
00773
00781 void run();
00782
00783
00784
00786
00791 int get_return_status(unsigned int stageid) const
00792 {
00793 assert(stageid < m_stages.size());
00794 assert(!m_stages[stageid].func);
00795
00796 return m_stages[stageid].retstatus;
00797 }
00798
00803 int get_return_code(unsigned int stageid) const
00804 {
00805 assert(stageid < m_stages.size());
00806 assert(!m_stages[stageid].func);
00807
00808 if (WIFEXITED(m_stages[stageid].retstatus))
00809 return WEXITSTATUS(m_stages[stageid].retstatus);
00810 else
00811 return -1;
00812 }
00813
00818 int get_return_signal(unsigned int stageid) const
00819 {
00820 assert(stageid < m_stages.size());
00821 assert(!m_stages[stageid].func);
00822
00823 if (WIFSIGNALED(m_stages[stageid].retstatus))
00824 return WTERMSIG(m_stages[stageid].retstatus);
00825 else
00826 return -1;
00827 }
00828
00832 bool all_return_codes_zero() const
00833 {
00834 for (unsigned int i = 0; i < m_stages.size(); ++i)
00835 {
00836 if (m_stages[i].func) continue;
00837
00838 if (get_return_code(i) != 0)
00839 return false;
00840 }
00841
00842 return true;
00843 }
00844
00846
00847 protected:
00848
00849
00850
00853 void exec_stage(const Stage& stage);
00854
00856 void print_exec(const std::vector<std::string>& args);
00857
00859 void sclose(int fd);
00860 };
00861
00862
00863
00864 void ExecPipeImpl::print_exec(const std::vector<std::string>& args)
00865 {
00866 std::ostringstream oss;
00867 oss << "Exec()";
00868 for (unsigned ai = 0; ai < args.size(); ++ai)
00869 {
00870 oss << " " << args[ai];
00871 }
00872 LOG_INFO(oss.str());
00873 }
00874
00875 void ExecPipeImpl::exec_stage(const Stage& stage)
00876 {
00877
00878 const std::vector<std::string>& args = stage.argsp ? *stage.argsp : stage.args;
00879
00880
00881
00882 const char* cargs[args.size()+1];
00883
00884 for (unsigned ai = 0; ai < args.size(); ++ai)
00885 {
00886 cargs[ai] = args[ai].c_str();
00887 }
00888 cargs[ args.size() ] = NULL;
00889
00890 if (!stage.envp)
00891 {
00892 if (stage.withpath)
00893 execvp(stage.prog, (char* const*)cargs);
00894 else
00895 execv(stage.prog, (char* const*)cargs);
00896 }
00897 else
00898 {
00899
00900
00901 const char* cenv[args.size()+1];
00902
00903 for (unsigned ei = 0; ei < stage.envp->size(); ++ei)
00904 {
00905 cenv[ei] = (*stage.envp)[ei].c_str();
00906 }
00907 cenv[ stage.envp->size() ] = NULL;
00908
00909 execve(stage.prog, (char* const*)cargs, (char* const*)cenv);
00910 }
00911
00912 LOG_ERROR("Error executing child process: " << strerror(errno));
00913 }
00914
00915 void ExecPipeImpl::sclose(int fd)
00916 {
00917 int r = close(fd);
00918
00919 if (r != 0) {
00920 LOG_ERROR("Could not correctly close fd: " << strerror(errno));
00921 }
00922 }
00923
00924
00925
00926 void ExecPipeImpl::run()
00927 {
00928 if (m_stages.size() == 0)
00929 throw(std::runtime_error("No stages to in exec pipe."));
00930
00931
00932
00933
00934 switch(m_input)
00935 {
00936 case ST_NONE:
00937
00938 m_stages[0].stdin_fd = -1;
00939 break;
00940
00941 case ST_STRING:
00942 case ST_OBJECT: {
00943
00944 int pipefd[2];
00945
00946 if (pipe(pipefd) != 0)
00947 throw(std::runtime_error(std::string("Could not create an input pipe: ") + strerror(errno)));
00948
00949 if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) != 0)
00950 throw(std::runtime_error(std::string("Could not set non-block mode on input pipe: ") + strerror(errno)));
00951
00952 m_input_fd = pipefd[1];
00953 m_stages[0].stdin_fd = pipefd[0];
00954 break;
00955 }
00956 case ST_FILE: {
00957
00958
00959 int infd = open(m_input_file, O_RDONLY);
00960 if (infd < 0)
00961 throw(std::runtime_error(std::string("Could not open input file: ") + strerror(errno)));
00962
00963 m_stages[0].stdin_fd = infd;
00964 break;
00965 }
00966 case ST_FD:
00967
00968 m_stages[0].stdin_fd = m_input_fd;
00969 m_input_fd = -1;
00970 break;
00971 }
00972
00973
00974 for (unsigned int i = 0; i < m_stages.size() - 1; ++i)
00975 {
00976 int pipefd[2];
00977
00978 if (pipe(pipefd) != 0)
00979 throw(std::runtime_error(std::string("Could not create a stage pipe: ") + strerror(errno)));
00980
00981 m_stages[i].stdout_fd = pipefd[1];
00982 m_stages[i+1].stdin_fd = pipefd[0];
00983
00984 if (m_stages[i].func)
00985 {
00986 if (fcntl(m_stages[i].stdout_fd, F_SETFL, O_NONBLOCK) != 0)
00987 throw(std::runtime_error(std::string("Could not set non-block mode on a stage pipe: ") + strerror(errno)));
00988 }
00989 if (m_stages[i+1].func)
00990 {
00991 if (fcntl(m_stages[i+1].stdin_fd, F_SETFL, O_NONBLOCK) != 0)
00992 throw(std::runtime_error(std::string("Could not set non-block mode on a stage pipe: ") + strerror(errno)));
00993 }
00994 }
00995
00996
00997 switch(m_output)
00998 {
00999 case ST_NONE:
01000
01001 m_stages.back().stdout_fd = -1;
01002 break;
01003
01004 case ST_STRING:
01005 case ST_OBJECT: {
01006
01007 int pipefd[2];
01008
01009 if (pipe(pipefd) != 0)
01010 throw(std::runtime_error(std::string("Could not create an output pipe: ") + strerror(errno)));
01011
01012 if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) != 0)
01013 throw(std::runtime_error(std::string("Could not set non-block mode on output pipe: ") + strerror(errno)));
01014
01015 m_stages.back().stdout_fd = pipefd[1];
01016 m_output_fd = pipefd[0];
01017 break;
01018 }
01019 case ST_FILE: {
01020
01021
01022 int outfd = open(m_output_file, O_WRONLY | O_CREAT | O_TRUNC, m_output_file_mode);
01023 if (outfd < 0)
01024 throw(std::runtime_error(std::string("Could not open output file: ") + strerror(errno)));
01025
01026 m_stages.back().stdout_fd = outfd;
01027 break;
01028 }
01029 case ST_FD:
01030
01031 m_stages.back().stdout_fd = m_output_fd;
01032 m_output_fd = -1;
01033 break;
01034 }
01035
01036
01037
01038 for (unsigned int i = 0; i < m_stages.size(); ++i)
01039 {
01040 if (m_stages[i].func) continue;
01041
01042 print_exec(m_stages[i].args);
01043
01044 pid_t child = fork();
01045 if (child == 0)
01046 {
01047
01048
01049
01050 if (m_input_fd >= 0)
01051 sclose(m_input_fd);
01052
01053 for (unsigned int j = 0; j < m_stages.size(); ++j)
01054 {
01055 if (i == j)
01056 {
01057
01058
01059 if (m_stages[i].stdin_fd >= 0)
01060 {
01061 if (dup2(m_stages[i].stdin_fd, STDIN_FILENO) == -1) {
01062 LOG_ERROR("Could not redirect file descriptor: " << strerror(errno));
01063 exit(255);
01064 }
01065 }
01066
01067 if (m_stages[i].stdout_fd >= 0)
01068 {
01069 if (dup2(m_stages[i].stdout_fd, STDOUT_FILENO) == -1) {
01070 LOG_ERROR("Could not redirect file descriptor: " << strerror(errno));
01071 exit(255);
01072 }
01073 }
01074 }
01075 else
01076 {
01077
01078
01079 if (m_stages[j].stdin_fd >= 0)
01080 sclose(m_stages[j].stdin_fd);
01081
01082 if (m_stages[j].stdout_fd >= 0)
01083 sclose(m_stages[j].stdout_fd);
01084 }
01085 }
01086
01087 if (m_output_fd >= 0)
01088 sclose(m_output_fd);
01089
01090
01091 exec_stage(m_stages[i]);
01092
01093 exit(255);
01094 }
01095
01096 m_stages[i].pid = child;
01097 }
01098
01099
01100
01101 for (stagelist_type::const_iterator st = m_stages.begin();
01102 st != m_stages.end(); ++st)
01103 {
01104 if (st->func) continue;
01105
01106 if (st->stdin_fd >= 0)
01107 sclose(st->stdin_fd);
01108
01109 if (st->stdout_fd >= 0)
01110 sclose(st->stdout_fd);
01111 }
01112
01113
01114
01115 while(1)
01116 {
01117
01118
01119 int max_fds = -1;
01120 fd_set read_fds, write_fds;
01121
01122 FD_ZERO(&read_fds);
01123 FD_ZERO(&write_fds);
01124
01125 if (m_input_fd >= 0)
01126 {
01127 if (m_input == ST_OBJECT)
01128 {
01129 assert(m_input_source);
01130
01131 if (!m_input_rbuffer.size() && !m_input_source->poll() && !m_input_rbuffer.size())
01132 {
01133 sclose(m_input_fd);
01134 m_input_fd = -1;
01135
01136 LOG_INFO("Closing input file descriptor: " << strerror(errno));
01137 }
01138 else
01139 {
01140 FD_SET(m_input_fd, &write_fds);
01141 if (max_fds < m_input_fd) max_fds = m_input_fd;
01142
01143 LOG_DEBUG("Select on input file descriptor");
01144 }
01145 }
01146 else
01147 {
01148 FD_SET(m_input_fd, &write_fds);
01149 if (max_fds < m_input_fd) max_fds = m_input_fd;
01150
01151 LOG_DEBUG("Select on input file descriptor");
01152 }
01153 }
01154
01155 for (unsigned int i = 0; i < m_stages.size(); ++i)
01156 {
01157 if (!m_stages[i].func) continue;
01158
01159 if (m_stages[i].stdin_fd >= 0)
01160 {
01161 FD_SET(m_stages[i].stdin_fd, &read_fds);
01162 if (max_fds < m_stages[i].stdin_fd) max_fds = m_stages[i].stdin_fd;
01163
01164 LOG_DEBUG("Select on stage input file descriptor");
01165 }
01166
01167 if (m_stages[i].stdout_fd >= 0)
01168 {
01169 if (m_stages[i].outbuffer.size())
01170 {
01171 FD_SET(m_stages[i].stdout_fd, &write_fds);
01172 if (max_fds < m_stages[i].stdout_fd) max_fds = m_stages[i].stdout_fd;
01173
01174 LOG_DEBUG("Select on stage output file descriptor");
01175 }
01176 else if (m_stages[i].stdin_fd < 0 && !m_stages[i].outbuffer.size())
01177 {
01178 sclose(m_stages[i].stdout_fd);
01179 m_stages[i].stdout_fd = -1;
01180
01181 LOG_INFO("Close stage output file descriptor");
01182 }
01183 }
01184 }
01185
01186 if (m_output_fd >= 0)
01187 {
01188 FD_SET(m_output_fd, &read_fds);
01189 if (max_fds < m_output_fd) max_fds = m_output_fd;
01190
01191 LOG_DEBUG("Select on output file descriptor");
01192 }
01193
01194
01195
01196 if (max_fds < 0)
01197 break;
01198
01199 int retval = select(max_fds+1, &read_fds, &write_fds, NULL, NULL);
01200 if (retval < 0)
01201 throw(std::runtime_error(std::string("Error during select() on file descriptors: ") + strerror(errno)));
01202
01203 LOG_TRACE("select() on " << retval << " file descriptors: " << strerror(errno));
01204
01205
01206
01207 if (m_input_fd >= 0 && FD_ISSET(m_input_fd, &write_fds))
01208 {
01209 if (m_input == ST_STRING)
01210 {
01211
01212
01213 assert(m_input_string);
01214 assert(m_input_string_pos < m_input_string->size());
01215
01216 ssize_t wb;
01217
01218 do
01219 {
01220 wb = write(m_input_fd,
01221 m_input_string->data() + m_input_string_pos,
01222 m_input_string->size() - m_input_string_pos);
01223
01224 LOG_TRACE("Write on input fd: " << wb);
01225
01226 if (wb < 0)
01227 {
01228 if (errno == EAGAIN || errno == EINTR)
01229 {
01230 }
01231 else
01232 {
01233 LOG_DEBUG("Error writing to input file descriptor: " << strerror(errno));
01234
01235 sclose(m_input_fd);
01236 m_input_fd = -1;
01237
01238 LOG_INFO("Closing input file descriptor: " << strerror(errno));
01239 }
01240 }
01241 else if (wb > 0)
01242 {
01243 m_input_string_pos += wb;
01244
01245 if (m_input_string_pos >= m_input_string->size())
01246 {
01247 sclose(m_input_fd);
01248 m_input_fd = -1;
01249
01250 LOG_INFO("Closing input file descriptor: " << strerror(errno));
01251 break;
01252 }
01253 }
01254 } while (wb > 0);
01255
01256 }
01257 else if (m_input == ST_OBJECT)
01258 {
01259
01260
01261 ssize_t wb;
01262
01263 do
01264 {
01265 wb = write(m_input_fd,
01266 m_input_rbuffer.bottom(),
01267 m_input_rbuffer.bottomsize());
01268
01269 LOG_TRACE("Write on input fd: " << wb);
01270
01271 if (wb < 0)
01272 {
01273 if (errno == EAGAIN || errno == EINTR)
01274 {
01275 }
01276 else
01277 {
01278 LOG_INFO("Error writing to input file descriptor: " << strerror(errno));
01279
01280 sclose(m_input_fd);
01281 m_input_fd = -1;
01282
01283 LOG_INFO("Closing input file descriptor: " << strerror(errno));
01284 }
01285 }
01286 else if (wb > 0)
01287 {
01288 m_input_rbuffer.advance(wb);
01289 }
01290 } while (wb > 0);
01291 }
01292 }
01293
01294 if (m_output_fd >= 0 && FD_ISSET(m_output_fd, &read_fds))
01295 {
01296
01297
01298 ssize_t rb;
01299
01300 do
01301 {
01302 errno = 0;
01303
01304 rb = read(m_output_fd,
01305 m_buffer, sizeof(m_buffer));
01306
01307 LOG_TRACE("Read on output fd: " << rb);
01308
01309 if (rb <= 0)
01310 {
01311 if (rb == 0 && errno == 0)
01312 {
01313
01314
01315 LOG_INFO("Closing output file descriptor: " << strerror(errno));
01316
01317 if (m_output == ST_OBJECT)
01318 {
01319 assert(m_output_sink);
01320 m_output_sink->eof();
01321 }
01322
01323 sclose(m_output_fd);
01324 m_output_fd = -1;
01325 }
01326 else if (errno == EAGAIN || errno == EINTR)
01327 {
01328 }
01329 else
01330 {
01331 LOG_ERROR("Error reading from output file descriptor: " << strerror(errno));
01332 }
01333 }
01334 else
01335 {
01336 if (m_output == ST_STRING)
01337 {
01338 assert(m_output_string);
01339 m_output_string->append(m_buffer, rb);
01340 }
01341 else if (m_output == ST_OBJECT)
01342 {
01343 assert(m_output_sink);
01344 m_output_sink->process(m_buffer, rb);
01345 }
01346 }
01347 } while (rb > 0);
01348 }
01349
01350 for (unsigned int i = 0; i < m_stages.size(); ++i)
01351 {
01352 if (!m_stages[i].func) continue;
01353
01354 if (m_stages[i].stdin_fd >= 0 && FD_ISSET(m_stages[i].stdin_fd, &read_fds))
01355 {
01356 ssize_t rb;
01357
01358 do
01359 {
01360 errno = 0;
01361
01362 rb = read(m_stages[i].stdin_fd,
01363 m_buffer, sizeof(m_buffer));
01364
01365 LOG_TRACE("Read on stage fd: " << rb);
01366
01367 if (rb <= 0)
01368 {
01369 if (rb == 0 && errno == 0)
01370 {
01371
01372
01373 LOG_INFO("Closing stage input file descriptor: " << strerror(errno));
01374
01375 m_stages[i].func->eof();
01376
01377 sclose(m_stages[i].stdin_fd);
01378 m_stages[i].stdin_fd = -1;
01379 }
01380 else if (errno == EAGAIN || errno == EINTR)
01381 {
01382 }
01383 else
01384 {
01385 LOG_ERROR("Error reading from stage input file descriptor: " << strerror(errno));
01386 }
01387 }
01388 else
01389 {
01390 m_stages[i].func->process(m_buffer, rb);
01391 }
01392 } while (rb > 0);
01393 }
01394
01395 if (m_stages[i].stdout_fd >= 0 && FD_ISSET(m_stages[i].stdout_fd, &write_fds))
01396 {
01397 while (m_stages[i].outbuffer.size() > 0)
01398 {
01399 ssize_t wb = write(m_stages[i].stdout_fd,
01400 m_stages[i].outbuffer.bottom(),
01401 m_stages[i].outbuffer.bottomsize());
01402
01403 LOG_TRACE("Write on stage fd: " << wb);
01404
01405 if (wb < 0)
01406 {
01407 if (errno == EAGAIN || errno == EINTR)
01408 {
01409 }
01410 else
01411 {
01412 LOG_INFO("Error writing to stage output file descriptor: " << strerror(errno));
01413 }
01414 break;
01415 }
01416 else if (wb > 0)
01417 {
01418 m_stages[i].outbuffer.advance(wb);
01419 }
01420 }
01421
01422 if (m_stages[i].stdin_fd < 0 && !m_stages[i].outbuffer.size())
01423 {
01424 LOG_INFO("Closing stage output file descriptor: " << strerror(errno));
01425
01426 sclose(m_stages[i].stdout_fd);
01427 m_stages[i].stdout_fd = -1;
01428 }
01429 }
01430 }
01431 }
01432
01433
01434
01435 unsigned int donepid = 0;
01436
01437 for (unsigned int i = 0; i < m_stages.size(); ++i)
01438 {
01439 if (!m_stages[i].func) continue;
01440 ++donepid;
01441 }
01442
01443 while (donepid != m_stages.size())
01444 {
01445 int status;
01446 int p = wait(&status);
01447
01448 if (p < 0)
01449 {
01450 LOG_ERROR("Error calling wait(): " << strerror(errno));
01451 break;
01452 }
01453
01454 bool found = false;
01455
01456 for (unsigned int i = 0; i < m_stages.size(); ++i)
01457 {
01458 if (p == m_stages[i].pid)
01459 {
01460 m_stages[i].retstatus = status;
01461
01462 if (WIFEXITED(status))
01463 {
01464 LOG_INFO("Finished exec() stage " << p << " with retcode " << WEXITSTATUS(status));
01465 }
01466 else if (WIFSIGNALED(status))
01467 {
01468 LOG_INFO("Finished exec() stage " << p << " with signal " << WTERMSIG(status));
01469 }
01470 else
01471 {
01472 LOG_ERROR("Error in wait(): unknown return status for pid " << p);
01473 }
01474
01475 ++donepid;
01476 found = true;
01477 break;
01478 }
01479 }
01480
01481 if (!found)
01482 {
01483 LOG_ERROR("Error in wait(): syscall returned an unknown child pid.");
01484 }
01485 }
01486
01487 LOG_INFO("Finished running pipe.");
01488 }
01489
01490
01491
01492 ExecPipe::ExecPipe()
01493 : m_impl(new ExecPipeImpl)
01494 {
01495 ++m_impl->refs();
01496 }
01497
01498 ExecPipe::~ExecPipe()
01499 {
01500 if (--m_impl->refs() == 0)
01501 delete m_impl;
01502 }
01503
01504 ExecPipe::ExecPipe(const ExecPipe& ep)
01505 : m_impl(ep.m_impl)
01506 {
01507 ++m_impl->refs();
01508 }
01509
01510 ExecPipe& ExecPipe::operator=(const ExecPipe& ep)
01511 {
01512 if (this != &ep)
01513 {
01514 if (--m_impl->refs() == 0)
01515 delete m_impl;
01516
01517 m_impl = ep.m_impl;
01518 ++m_impl->refs();
01519 }
01520 return *this;
01521 }
01522
01523 void ExecPipe::set_debug_level(enum DebugLevel dl)
01524 {
01525 return m_impl->set_debug_level(dl);
01526 }
01527
01528 void ExecPipe::set_debug_output(void (*output)(const char *line))
01529 {
01530 return m_impl->set_debug_output(output);
01531 }
01532
01533 void ExecPipe::set_input_fd(int fd)
01534 {
01535 return m_impl->set_input_fd(fd);
01536 }
01537
01538 void ExecPipe::set_input_file(const char* path)
01539 {
01540 return m_impl->set_input_file(path);
01541 }
01542
01543 void ExecPipe::set_input_string(const std::string* input)
01544 {
01545 return m_impl->set_input_string(input);
01546 }
01547
01548 void ExecPipe::set_input_source(PipeSource* source)
01549 {
01550 return m_impl->set_input_source(source);
01551 }
01552
01553 void ExecPipe::set_output_fd(int fd)
01554 {
01555 return m_impl->set_output_fd(fd);
01556 }
01557
01558 void ExecPipe::set_output_file(const char* path, int mode)
01559 {
01560 return m_impl->set_output_file(path, mode);
01561 }
01562
01563 void ExecPipe::set_output_string(std::string* output)
01564 {
01565 return m_impl->set_output_string(output);
01566 }
01567
01568 void ExecPipe::set_output_sink(PipeSink* sink)
01569 {
01570 return m_impl->set_output_sink(sink);
01571 }
01572
01573 unsigned int ExecPipe::size() const
01574 {
01575 return m_impl->size();
01576 }
01577
01578 void ExecPipe::add_exec(const char* prog)
01579 {
01580 return m_impl->add_exec(prog);
01581 }
01582
01583 void ExecPipe::add_exec(const char* prog, const char* arg1)
01584 {
01585 return m_impl->add_exec(prog, arg1);
01586 }
01587
01588 void ExecPipe::add_exec(const char* prog, const char* arg1, const char* arg2)
01589 {
01590 return m_impl->add_exec(prog, arg1, arg2);
01591 }
01592
01593 void ExecPipe::add_exec(const char* prog, const char* arg1, const char* arg2, const char* arg3)
01594 {
01595 return m_impl->add_exec(prog, arg1, arg2, arg3);
01596 }
01597
01598 void ExecPipe::add_exec(const std::vector<std::string>* args)
01599 {
01600 return m_impl->add_exec(args);
01601 }
01602
01603 void ExecPipe::add_execp(const char* prog)
01604 {
01605 return m_impl->add_execp(prog);
01606 }
01607
01608 void ExecPipe::add_execp(const char* prog, const char* arg1)
01609 {
01610 return m_impl->add_execp(prog, arg1);
01611 }
01612
01613 void ExecPipe::add_execp(const char* prog, const char* arg1, const char* arg2)
01614 {
01615 return m_impl->add_execp(prog, arg1, arg2);
01616 }
01617
01618 void ExecPipe::add_execp(const char* prog, const char* arg1, const char* arg2, const char* arg3)
01619 {
01620 return m_impl->add_execp(prog, arg1, arg2, arg3);
01621 }
01622
01623 void ExecPipe::add_execp(const std::vector<std::string>* args)
01624 {
01625 return m_impl->add_execp(args);
01626 }
01627
01628 void ExecPipe::add_exece(const char* path,
01629 const std::vector<std::string>* args,
01630 const std::vector<std::string>* env)
01631 {
01632 return m_impl->add_exece(path, args, env);
01633 }
01634
01635 void ExecPipe::add_function(PipeFunction* func)
01636 {
01637 return m_impl->add_function(func);
01638 }
01639
01640 ExecPipe& ExecPipe::run()
01641 {
01642 m_impl->run();
01643 return *this;
01644 }
01645
01646 int ExecPipe::get_return_status(unsigned int stageid) const
01647 {
01648 return m_impl->get_return_status(stageid);
01649 }
01650
01651 int ExecPipe::get_return_code(unsigned int stageid) const
01652 {
01653 return m_impl->get_return_code(stageid);
01654 }
01655
01656 int ExecPipe::get_return_signal(unsigned int stageid) const
01657 {
01658 return m_impl->get_return_signal(stageid);
01659 }
01660
01661 bool ExecPipe::all_return_codes_zero() const
01662 {
01663 return m_impl->all_return_codes_zero();
01664 }
01665
01666
01667
01668 PipeSource::PipeSource()
01669 : m_impl(NULL)
01670 {
01671 }
01672
01673 void PipeSource::write(const void* data, unsigned int datalen)
01674 {
01675 assert(m_impl);
01676 return m_impl->input_source_write(data, datalen);
01677 }
01678
01679
01680
01681 PipeFunction::PipeFunction()
01682 : m_impl(NULL), m_stageid(0)
01683 {
01684 }
01685
01686 void PipeFunction::write(const void* data, unsigned int datalen)
01687 {
01688 assert(m_impl);
01689 return m_impl->stage_function_write(m_stageid, data, datalen);
01690 }
01691
01692
01693
01694 }