summaryrefslogtreecommitdiffstats
path: root/common/rdr/FdOutStream.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'common/rdr/FdOutStream.cxx')
-rw-r--r--common/rdr/FdOutStream.cxx77
1 files changed, 68 insertions, 9 deletions
diff --git a/common/rdr/FdOutStream.cxx b/common/rdr/FdOutStream.cxx
index c84c946f..a6b85e21 100644
--- a/common/rdr/FdOutStream.cxx
+++ b/common/rdr/FdOutStream.cxx
@@ -50,17 +50,18 @@ using namespace rdr;
enum { DEFAULT_BUF_SIZE = 16384 };
-FdOutStream::FdOutStream(int fd_, int timeoutms_, int bufSize_)
- : fd(fd_), timeoutms(timeoutms_),
+FdOutStream::FdOutStream(int fd_, bool blocking_, int timeoutms_, int bufSize_)
+ : fd(fd_), blocking(blocking_), timeoutms(timeoutms_),
bufSize(bufSize_ ? bufSize_ : DEFAULT_BUF_SIZE), offset(0)
{
- ptr = start = new U8[bufSize];
+ ptr = start = sentUpTo = new U8[bufSize];
end = start + bufSize;
}
FdOutStream::~FdOutStream()
{
try {
+ blocking = true;
flush();
} catch (Exception&) {
}
@@ -71,21 +72,56 @@ void FdOutStream::setTimeout(int timeoutms_) {
timeoutms = timeoutms_;
}
+void FdOutStream::setBlocking(bool blocking_) {
+ blocking = blocking_;
+}
+
int FdOutStream::length()
{
- return offset + ptr - start;
+ return offset + ptr - sentUpTo;
+}
+
+int FdOutStream::bufferUsage()
+{
+ return ptr - sentUpTo;
}
void FdOutStream::flush()
{
- U8* sentUpTo = start;
+ int timeoutms_;
+
+ if (blocking)
+ timeoutms_ = timeoutms;
+ else
+ timeoutms_ = 0;
+
while (sentUpTo < ptr) {
- int n = writeWithTimeout((const void*) sentUpTo, ptr - sentUpTo);
+ int n = writeWithTimeout((const void*) sentUpTo,
+ ptr - sentUpTo, timeoutms_);
+
+ // Timeout?
+ if (n == 0) {
+ // If non-blocking then we're done here
+ if (!blocking)
+ break;
+
+ // Otherwise try blocking (with possible timeout)
+ if ((timeoutms_ == 0) && (timeoutms != 0)) {
+ timeoutms_ = timeoutms;
+ break;
+ }
+
+ // Proper timeout
+ throw TimedOut();
+ }
+
sentUpTo += n;
offset += n;
}
- ptr = start;
+ // Managed to flush everything?
+ if (sentUpTo == ptr)
+ ptr = sentUpTo = start;
}
@@ -94,8 +130,31 @@ int FdOutStream::overrun(int itemSize, int nItems)
if (itemSize > bufSize)
throw Exception("FdOutStream overrun: max itemSize exceeded");
+ // First try to get rid of the data we have
flush();
+ // Still not enough space?
+ if (itemSize > end - ptr) {
+ // Can we shuffle things around?
+ // (don't do this if it gains us less than 25%)
+ if ((sentUpTo - start > bufSize / 4) &&
+ (itemSize < bufSize - (ptr - sentUpTo))) {
+ memmove(start, sentUpTo, ptr - sentUpTo);
+ ptr = start + (ptr - sentUpTo);
+ sentUpTo = start;
+ } else {
+ // Have to get rid of more data, so turn off non-blocking
+ // for a bit...
+ bool realBlocking;
+
+ realBlocking = blocking;
+ blocking = true;
+ flush();
+ blocking = realBlocking;
+ }
+ }
+
+ // Can we fit all the items asked for?
if (itemSize * nItems > end - ptr)
nItems = (end - ptr) / itemSize;
@@ -112,7 +171,7 @@ int FdOutStream::overrun(int itemSize, int nItems)
// select() and write() returning EINTR.
//
-int FdOutStream::writeWithTimeout(const void* data, int length)
+int FdOutStream::writeWithTimeout(const void* data, int length, int timeoutms)
{
int n;
@@ -146,7 +205,7 @@ int FdOutStream::writeWithTimeout(const void* data, int length)
if (n < 0) throw SystemException("select",errno);
- if (n == 0) throw TimedOut();
+ if (n == 0) return 0;
do {
n = ::write(fd, data, length);