wibble 0.1.28
pipe.h
Go to the documentation of this file.
00001 // -*- C++ -*- (c) 2008 Petr Rockai <me@mornfall.net>
00002 
00003 #include <wibble/sys/macros.h>
00004 
00005 #ifdef POSIX
00006 #include <fcntl.h>
00007 #include <sys/select.h>
00008 
00009 #include <deque>
00010 #include <cerrno>
00011 
00012 #include <wibble/exception.h>
00013 #include <wibble/sys/thread.h>
00014 #include <wibble/sys/mutex.h>
00015 
00016 #ifndef WIBBLE_SYS_PIPE_H
00017 #define WIBBLE_SYS_PIPE_H
00018 
00019 namespace wibble {
00020 namespace sys {
00021 
00022 namespace wexcept = wibble::exception;
00023 
00024 struct Pipe {
00025 
00026     struct Writer : wibble::sys::Thread {
00027         int fd;
00028         bool close;
00029         std::string data;
00030         bool running;
00031         bool closed;
00032         wibble::sys::Mutex mutex;
00033 
00034         Writer() : fd( -1 ), close( false ), running( false ) {}
00035 
00036         void *main() {
00037             do {
00038                 int wrote = 0;
00039 
00040                 {
00041                     wibble::sys::MutexLock __l( mutex );
00042                     wrote = ::write( fd, data.c_str(), data.length() );
00043                     if ( wrote > 0 )
00044                         data.erase( data.begin(), data.begin() + wrote );
00045                 }
00046 
00047                 if ( wrote == -1 ) {
00048                     if ( errno == EAGAIN || errno == EWOULDBLOCK )
00049                         sched_yield();
00050                     else
00051                         throw wexcept::System( "writing to pipe" );
00052                 }
00053             } while ( !done() );
00054 
00055             if ( close )
00056                 ::close( fd );
00057 
00058             return 0;
00059         }
00060 
00061         bool done() {
00062             wibble::sys::MutexLock __l( mutex );
00063             if ( data.empty() )
00064                 running = false;
00065             return !running;
00066         }
00067 
00068         void run( int _fd, std::string what ) {
00069             wibble::sys::MutexLock __l( mutex );
00070 
00071             if ( running )
00072                 assert_eq( _fd, fd );
00073             fd = _fd;
00074             assert_neq( fd, -1 );
00075 
00076             data += what;
00077             if ( running )
00078                 return;
00079             running = true;
00080             start();
00081         }
00082     };
00083 
00084     typedef std::deque< char > Buffer;
00085     Buffer buffer;
00086     int fd;
00087     bool _eof;
00088     Writer writer;
00089 
00090     Pipe( int p ) : fd( p ), _eof( false )
00091     {
00092         if ( p == -1 )
00093             return;
00094         if ( fcntl( fd, F_SETFL, O_NONBLOCK ) == -1 )
00095             throw wexcept::System( "fcntl on a pipe" );
00096     }
00097     Pipe() : fd( -1 ), _eof( false ) {}
00098 
00099     /* Writes data to the pipe, asynchronously. */
00100     void write( std::string what ) {
00101         writer.run( fd, what );
00102     }
00103 
00104     void close() {
00105         writer.close = true;
00106         writer.run( fd, "" );
00107     }
00108 
00109     bool valid() {
00110         return fd != -1;
00111     }
00112 
00113     bool active() {
00114         return valid() && !eof();
00115     }
00116 
00117     bool eof() {
00118         return _eof;
00119     }
00120 
00121     int readMore() {
00122         assert( valid() );
00123         char _buffer[1024];
00124         int r = ::read( fd, _buffer, 1023 );
00125         if ( r == -1 && errno != EAGAIN && errno != EWOULDBLOCK )
00126             throw wexcept::System( "reading from pipe" );
00127         else if ( r == -1 )
00128             return 0;
00129         if ( r == 0 )
00130             _eof = true;
00131         else
00132             std::copy( _buffer, _buffer + r, std::back_inserter( buffer ) );
00133         return r;
00134     }
00135 
00136     std::string nextChunk() {
00137         std::string line( buffer.begin(), buffer.end() );
00138         buffer.clear();
00139         return line;
00140     }
00141 
00142     std::string nextLine() {
00143         assert( valid() );
00144         Buffer::iterator nl =
00145             std::find( buffer.begin(), buffer.end(), '\n' );
00146         while ( nl == buffer.end() ) {
00147             if ( !readMore() )
00148                 return ""; // would block, so give up
00149             nl = std::find( buffer.begin(), buffer.end(), '\n' );
00150         }
00151         std::string line( buffer.begin(), nl );
00152 
00153         if ( nl != buffer.end() )
00154             ++ nl;
00155         buffer.erase( buffer.begin(), nl );
00156 
00157         return line;
00158     }
00159 
00160     /* Only returns on eof() or when data is buffered. */
00161     void wait() {
00162         assert( valid() );
00163         fd_set fds;
00164         FD_ZERO( &fds );
00165         while ( buffer.empty() && !eof() ) {
00166             if ( readMore() )
00167                 return;
00168             if ( eof() )
00169                 return;
00170             FD_SET( fd, &fds );
00171             select( fd + 1, &fds, 0, 0, 0 );
00172         }
00173     }
00174     std::string nextLineBlocking() {
00175         assert( valid() );
00176         std::string l;
00177         while ( !eof() ) {
00178             l = nextLine();
00179             if ( !l.empty() )
00180                 return l;
00181             if ( eof() )
00182                 return std::string( buffer.begin(), buffer.end() );
00183             wait();
00184         }
00185         return l;
00186     }
00187 
00188 };
00189 
00190 }
00191 }
00192 #endif
00193 #endif