diff options
Diffstat (limited to 'common/rdr/FdOutStream.cxx')
-rw-r--r-- | common/rdr/FdOutStream.cxx | 77 |
1 files changed, 68 insertions, 9 deletions
diff --git a/common/rdr/FdOutStream.cxx b/common/rdr/FdOutStream.cxx index c84c946f..a6b85e21 100644 --- a/common/rdr/FdOutStream.cxx +++ b/common/rdr/FdOutStream.cxx @@ -50,17 +50,18 @@ using namespace rdr; enum { DEFAULT_BUF_SIZE = 16384 }; -FdOutStream::FdOutStream(int fd_, int timeoutms_, int bufSize_) - : fd(fd_), timeoutms(timeoutms_), +FdOutStream::FdOutStream(int fd_, bool blocking_, int timeoutms_, int bufSize_) + : fd(fd_), blocking(blocking_), timeoutms(timeoutms_), bufSize(bufSize_ ? bufSize_ : DEFAULT_BUF_SIZE), offset(0) { - ptr = start = new U8[bufSize]; + ptr = start = sentUpTo = new U8[bufSize]; end = start + bufSize; } FdOutStream::~FdOutStream() { try { + blocking = true; flush(); } catch (Exception&) { } @@ -71,21 +72,56 @@ void FdOutStream::setTimeout(int timeoutms_) { timeoutms = timeoutms_; } +void FdOutStream::setBlocking(bool blocking_) { + blocking = blocking_; +} + int FdOutStream::length() { - return offset + ptr - start; + return offset + ptr - sentUpTo; +} + +int FdOutStream::bufferUsage() +{ + return ptr - sentUpTo; } void FdOutStream::flush() { - U8* sentUpTo = start; + int timeoutms_; + + if (blocking) + timeoutms_ = timeoutms; + else + timeoutms_ = 0; + while (sentUpTo < ptr) { - int n = writeWithTimeout((const void*) sentUpTo, ptr - sentUpTo); + int n = writeWithTimeout((const void*) sentUpTo, + ptr - sentUpTo, timeoutms_); + + // Timeout? + if (n == 0) { + // If non-blocking then we're done here + if (!blocking) + break; + + // Otherwise try blocking (with possible timeout) + if ((timeoutms_ == 0) && (timeoutms != 0)) { + timeoutms_ = timeoutms; + break; + } + + // Proper timeout + throw TimedOut(); + } + sentUpTo += n; offset += n; } - ptr = start; + // Managed to flush everything? + if (sentUpTo == ptr) + ptr = sentUpTo = start; } @@ -94,8 +130,31 @@ int FdOutStream::overrun(int itemSize, int nItems) if (itemSize > bufSize) throw Exception("FdOutStream overrun: max itemSize exceeded"); + // First try to get rid of the data we have flush(); + // Still not enough space? + if (itemSize > end - ptr) { + // Can we shuffle things around? + // (don't do this if it gains us less than 25%) + if ((sentUpTo - start > bufSize / 4) && + (itemSize < bufSize - (ptr - sentUpTo))) { + memmove(start, sentUpTo, ptr - sentUpTo); + ptr = start + (ptr - sentUpTo); + sentUpTo = start; + } else { + // Have to get rid of more data, so turn off non-blocking + // for a bit... + bool realBlocking; + + realBlocking = blocking; + blocking = true; + flush(); + blocking = realBlocking; + } + } + + // Can we fit all the items asked for? if (itemSize * nItems > end - ptr) nItems = (end - ptr) / itemSize; @@ -112,7 +171,7 @@ int FdOutStream::overrun(int itemSize, int nItems) // select() and write() returning EINTR. // -int FdOutStream::writeWithTimeout(const void* data, int length) +int FdOutStream::writeWithTimeout(const void* data, int length, int timeoutms) { int n; @@ -146,7 +205,7 @@ int FdOutStream::writeWithTimeout(const void* data, int length) if (n < 0) throw SystemException("select",errno); - if (n == 0) throw TimedOut(); + if (n == 0) return 0; do { n = ::write(fd, data, length); |