From d8b432cf26b835e52a087de9747a51d2401c07e3 Mon Sep 17 00:00:00 2001 From: Pierre Ossman Date: Sat, 16 May 2020 12:14:43 +0200 Subject: [PATCH] Dynamically allocate stream buffers This allows us to handle peaks in input and output streams gracefully without having to block processing. --- common/rdr/BufferedInStream.cxx | 59 +++++++++++++++++++-- common/rdr/BufferedInStream.h | 5 ++ common/rdr/BufferedOutStream.cxx | 91 +++++++++++++++++++++++--------- common/rdr/BufferedOutStream.h | 5 ++ 4 files changed, 132 insertions(+), 28 deletions(-) diff --git a/common/rdr/BufferedInStream.cxx b/common/rdr/BufferedInStream.cxx index 967e5a54..14b73563 100644 --- a/common/rdr/BufferedInStream.cxx +++ b/common/rdr/BufferedInStream.cxx @@ -27,11 +27,14 @@ using namespace rdr; static const size_t DEFAULT_BUF_SIZE = 8192; +static const size_t MAX_BUF_SIZE = 4 * 1024 * 1024; BufferedInStream::BufferedInStream() : bufSize(DEFAULT_BUF_SIZE), offset(0) { ptr = end = start = new U8[bufSize]; + gettimeofday(&lastSizeCheck, NULL); + peakUsage = 0; } BufferedInStream::~BufferedInStream() @@ -46,10 +49,58 @@ size_t BufferedInStream::pos() bool BufferedInStream::overrun(size_t needed, bool wait) { - if (needed > bufSize) - throw Exception("BufferedInStream overrun: " - "requested size of %lu bytes exceeds maximum of %lu bytes", - (long unsigned)needed, (long unsigned)bufSize); + struct timeval now; + + if (needed > bufSize) { + size_t newSize; + U8* newBuffer; + + if (needed > MAX_BUF_SIZE) + throw Exception("BufferedInStream overrun: requested size of " + "%lu bytes exceeds maximum of %lu bytes", + (long unsigned)needed, (long unsigned)MAX_BUF_SIZE); + + newSize = DEFAULT_BUF_SIZE; + while (newSize < needed) + newSize *= 2; + + newBuffer = new U8[newSize]; + memcpy(newBuffer, ptr, end - ptr); + delete [] start; + bufSize = newSize; + + offset += ptr - start; + end = newBuffer + (end - ptr); + ptr = start = newBuffer; + + gettimeofday(&lastSizeCheck, NULL); + peakUsage = needed; + } + + if (needed > peakUsage) + peakUsage = needed; + + // Time to shrink an excessive buffer? + gettimeofday(&now, NULL); + if ((avail() == 0) && (bufSize > DEFAULT_BUF_SIZE) && + ((now.tv_sec < lastSizeCheck.tv_sec) || + (now.tv_sec > (lastSizeCheck.tv_sec + 5)))) { + if (peakUsage < (bufSize / 2)) { + size_t newSize; + + newSize = DEFAULT_BUF_SIZE; + while (newSize < peakUsage) + newSize *= 2; + + // We know the buffer is empty, so just reset everything + delete [] start; + ptr = end = start = new U8[newSize]; + bufSize = newSize; + } + + gettimeofday(&lastSizeCheck, NULL); + peakUsage = needed; + } // Do we need to shuffle things around? if ((bufSize - (ptr - start)) < needed) { diff --git a/common/rdr/BufferedInStream.h b/common/rdr/BufferedInStream.h index acf2b927..24b5a23c 100644 --- a/common/rdr/BufferedInStream.h +++ b/common/rdr/BufferedInStream.h @@ -24,6 +24,8 @@ #ifndef __RDR_BUFFEREDINSTREAM_H__ #define __RDR_BUFFEREDINSTREAM_H__ +#include + #include namespace rdr { @@ -45,6 +47,9 @@ namespace rdr { size_t offset; U8* start; + struct timeval lastSizeCheck; + size_t peakUsage; + protected: BufferedInStream(); }; diff --git a/common/rdr/BufferedOutStream.cxx b/common/rdr/BufferedOutStream.cxx index ac76f6a7..cb2a0b0b 100644 --- a/common/rdr/BufferedOutStream.cxx +++ b/common/rdr/BufferedOutStream.cxx @@ -29,12 +29,15 @@ using namespace rdr; static const size_t DEFAULT_BUF_SIZE = 16384; +static const size_t MAX_BUF_SIZE = 32 * 1024 * 1024; BufferedOutStream::BufferedOutStream() : bufSize(DEFAULT_BUF_SIZE), offset(0) { ptr = start = sentUpTo = new U8[bufSize]; end = start + bufSize; + gettimeofday(&lastSizeCheck, NULL); + peakUsage = 0; } BufferedOutStream::~BufferedOutStream() @@ -55,6 +58,8 @@ size_t BufferedOutStream::bufferUsage() void BufferedOutStream::flush() { + struct timeval now; + while (sentUpTo < ptr) { size_t len; @@ -69,40 +74,78 @@ void BufferedOutStream::flush() // Managed to flush everything? if (sentUpTo == ptr) ptr = sentUpTo = start; + + // Time to shrink an excessive buffer? + gettimeofday(&now, NULL); + if ((sentUpTo == ptr) && (bufSize > DEFAULT_BUF_SIZE) && + ((now.tv_sec < lastSizeCheck.tv_sec) || + (now.tv_sec > (lastSizeCheck.tv_sec + 5)))) { + if (peakUsage < (bufSize / 2)) { + size_t newSize; + + newSize = DEFAULT_BUF_SIZE; + while (newSize < peakUsage) + newSize *= 2; + + // We know the buffer is empty, so just reset everything + delete [] start; + ptr = start = sentUpTo = new U8[newSize]; + end = start + newSize; + bufSize = newSize; + } + + gettimeofday(&lastSizeCheck, NULL); + peakUsage = 0; + } } void BufferedOutStream::overrun(size_t needed) { - if (needed > bufSize) - throw Exception("BufferedOutStream overrun: " - "requested size of %lu bytes exceeds maximum of %lu bytes", - (long unsigned)needed, (long unsigned)bufSize); + size_t totalNeeded, newSize; + U8* newBuffer; // First try to get rid of the data we have flush(); - // Still not enough space? - while (needed > avail()) { - // Can we shuffle things around? - // (don't do this if it gains us less than 25%) - if (((size_t)(sentUpTo - start) > bufSize / 4) && - (needed < bufSize - (ptr - sentUpTo))) { - memmove(start, sentUpTo, ptr - sentUpTo); - ptr = start + (ptr - sentUpTo); - sentUpTo = start; - } else { - size_t len; - - len = bufferUsage(); + // Make note of the total needed space + totalNeeded = needed + (ptr - sentUpTo); - // Have to get rid of more data, so allow the flush to wait... - flushBuffer(true); + if (totalNeeded > peakUsage) + peakUsage = totalNeeded; - offset += len - bufferUsage(); + // Enough free space now? + if (avail() > needed) + return; - // Managed to flush everything? - if (sentUpTo == ptr) - ptr = sentUpTo = start; - } + // Can we shuffle things around? + if (needed < bufSize - (ptr - sentUpTo)) { + memmove(start, sentUpTo, ptr - sentUpTo); + ptr = start + (ptr - sentUpTo); + sentUpTo = start; + return; } + + // We'll need to allocate more buffer space... + + if (totalNeeded > MAX_BUF_SIZE) + throw Exception("BufferedOutStream overrun: requested size of " + "%lu bytes exceeds maximum of %lu bytes", + (long unsigned)totalNeeded, + (long unsigned)MAX_BUF_SIZE); + + newSize = DEFAULT_BUF_SIZE; + while (newSize < totalNeeded) + newSize *= 2; + + newBuffer = new U8[newSize]; + memcpy(newBuffer, sentUpTo, ptr - sentUpTo); + delete [] start; + bufSize = newSize; + + ptr = newBuffer + (ptr - sentUpTo); + sentUpTo = start = newBuffer; + end = newBuffer + newSize; + + gettimeofday(&lastSizeCheck, NULL); + peakUsage = totalNeeded; } diff --git a/common/rdr/BufferedOutStream.h b/common/rdr/BufferedOutStream.h index 8e3229d5..05727f6e 100644 --- a/common/rdr/BufferedOutStream.h +++ b/common/rdr/BufferedOutStream.h @@ -24,6 +24,8 @@ #ifndef __RDR_BUFFEREDOUTSTREAM_H__ #define __RDR_BUFFEREDOUTSTREAM_H__ +#include + #include namespace rdr { @@ -53,6 +55,9 @@ namespace rdr { size_t offset; U8* start; + struct timeval lastSizeCheck; + size_t peakUsage; + protected: U8* sentUpTo; -- 2.39.5