From 79329c0a8fc6889ec2f70db5992db74f5f92c573 Mon Sep 17 00:00:00 2001 From: Pierre Ossman Date: Wed, 24 Aug 2022 10:31:42 +0200 Subject: [PATCH] Use BufferedOutStream in more streams Avoid duplicating all the memory mangement, and instead use the BufferedOutStream as a base clase for all out streams that need an intermediate buffer. --- common/rdr/BufferedOutStream.cxx | 14 +++++- common/rdr/BufferedOutStream.h | 4 +- common/rdr/FdOutStream.cxx | 2 +- common/rdr/HexOutStream.cxx | 47 ++++++-------------- common/rdr/HexOutStream.h | 13 ++---- common/rdr/TLSOutStream.cxx | 50 ++++------------------ common/rdr/TLSOutStream.h | 14 ++---- common/rdr/ZlibOutStream.cxx | 73 +++++++++----------------------- common/rdr/ZlibOutStream.h | 13 ++---- 9 files changed, 69 insertions(+), 161 deletions(-) diff --git a/common/rdr/BufferedOutStream.cxx b/common/rdr/BufferedOutStream.cxx index c8f6ce9c..ff7b6b51 100644 --- a/common/rdr/BufferedOutStream.cxx +++ b/common/rdr/BufferedOutStream.cxx @@ -31,8 +31,8 @@ using namespace rdr; static const size_t DEFAULT_BUF_SIZE = 16384; static const size_t MAX_BUF_SIZE = 32 * 1024 * 1024; -BufferedOutStream::BufferedOutStream() - : bufSize(DEFAULT_BUF_SIZE), offset(0) +BufferedOutStream::BufferedOutStream(bool emulateCork) + : bufSize(DEFAULT_BUF_SIZE), offset(0), emulateCork(emulateCork) { ptr = start = sentUpTo = new U8[bufSize]; end = start + bufSize; @@ -55,6 +55,10 @@ void BufferedOutStream::flush() { struct timeval now; + // Only give larger chunks if corked to minimize overhead + if (corked && emulateCork && ((ptr - sentUpTo) < 1024)) + return; + while (sentUpTo < ptr) { size_t len; @@ -101,11 +105,17 @@ bool BufferedOutStream::hasBufferedData() void BufferedOutStream::overrun(size_t needed) { + bool oldCorked; size_t totalNeeded, newSize; U8* newBuffer; // First try to get rid of the data we have + // (use corked to make things a bit more efficient since we're not + // trying to flush out everything, just make some room) + oldCorked = corked; + cork(true); flush(); + cork(oldCorked); // Make note of the total needed space totalNeeded = needed + (ptr - sentUpTo); diff --git a/common/rdr/BufferedOutStream.h b/common/rdr/BufferedOutStream.h index b01d1fee..94707118 100644 --- a/common/rdr/BufferedOutStream.h +++ b/common/rdr/BufferedOutStream.h @@ -59,11 +59,13 @@ namespace rdr { struct timeval lastSizeCheck; size_t peakUsage; + bool emulateCork; + protected: U8* sentUpTo; protected: - BufferedOutStream(); + BufferedOutStream(bool emulateCork=true); }; } diff --git a/common/rdr/FdOutStream.cxx b/common/rdr/FdOutStream.cxx index ed6535a8..f89fd345 100644 --- a/common/rdr/FdOutStream.cxx +++ b/common/rdr/FdOutStream.cxx @@ -52,7 +52,7 @@ using namespace rdr; FdOutStream::FdOutStream(int fd_) - : fd(fd_) + : BufferedOutStream(false), fd(fd_) { gettimeofday(&lastWrite, NULL); } diff --git a/common/rdr/HexOutStream.cxx b/common/rdr/HexOutStream.cxx index 85aff47b..4bcd8ba4 100644 --- a/common/rdr/HexOutStream.cxx +++ b/common/rdr/HexOutStream.cxx @@ -25,21 +25,15 @@ using namespace rdr; -const int DEFAULT_BUF_LEN = 16384; - static inline size_t min(size_t a, size_t b) {return a> 4) & 0xf); - optr[i*2+1] = intToHex(pos[i] & 0xf); + optr[i*2] = intToHex((sentUpTo[i] >> 4) & 0xf); + optr[i*2+1] = intToHex(sentUpTo[i] & 0xf); } out_stream.setptr(length*2); - pos += length; + sentUpTo += length; } - offset += ptr - start; - ptr = start; -} -size_t HexOutStream::length() -{ - return offset + ptr - start; + return true; } void HexOutStream::flush() { - writeBuffer(); + BufferedOutStream::flush(); out_stream.flush(); } void HexOutStream::cork(bool enable) { - OutStream::cork(enable); - + BufferedOutStream::cork(enable); out_stream.cork(enable); } -void HexOutStream::overrun(size_t needed) { - if (needed > bufSize) - throw Exception("HexOutStream overrun: buffer size exceeded"); - - writeBuffer(); -} - diff --git a/common/rdr/HexOutStream.h b/common/rdr/HexOutStream.h index e591dd88..8b1ab9ec 100644 --- a/common/rdr/HexOutStream.h +++ b/common/rdr/HexOutStream.h @@ -19,32 +19,27 @@ #ifndef __RDR_HEX_OUTSTREAM_H__ #define __RDR_HEX_OUTSTREAM_H__ -#include +#include namespace rdr { - class HexOutStream : public OutStream { + class HexOutStream : public BufferedOutStream { public: HexOutStream(OutStream& os); virtual ~HexOutStream(); - void flush(); - size_t length(); + virtual void flush(); virtual void cork(bool enable); static char intToHex(int i); static char* binToHexStr(const char* data, size_t length); private: + virtual bool flushBuffer(); void writeBuffer(); - virtual void overrun(size_t needed); OutStream& out_stream; - - U8* start; - size_t offset; - size_t bufSize; }; } diff --git a/common/rdr/TLSOutStream.cxx b/common/rdr/TLSOutStream.cxx index 63bdeef1..cdfe9b12 100644 --- a/common/rdr/TLSOutStream.cxx +++ b/common/rdr/TLSOutStream.cxx @@ -34,8 +34,6 @@ using namespace rdr; static rfb::LogWriter vlog("TLSOutStream"); -enum { DEFAULT_BUF_SIZE = 16384 }; - ssize_t TLSOutStream::push(gnutls_transport_ptr_t str, const void* data, size_t size) { @@ -64,14 +62,10 @@ ssize_t TLSOutStream::push(gnutls_transport_ptr_t str, const void* data, } TLSOutStream::TLSOutStream(OutStream* _out, gnutls_session_t _session) - : session(_session), out(_out), bufSize(DEFAULT_BUF_SIZE), offset(0), - saved_exception(NULL) + : session(_session), out(_out), saved_exception(NULL) { gnutls_transport_ptr_t recv, send; - ptr = start = new U8[bufSize]; - end = start + bufSize; - gnutls_transport_set_push_function(session, push); gnutls_transport_get_ptr2(session, &recv, &send); gnutls_transport_set_ptr2(session, recv, this); @@ -87,55 +81,29 @@ TLSOutStream::~TLSOutStream() #endif gnutls_transport_set_push_function(session, NULL); - delete [] start; delete saved_exception; } -size_t TLSOutStream::length() -{ - return offset + ptr - start; -} - void TLSOutStream::flush() { - U8* sentUpTo; - - // Only give GnuTLS larger chunks if corked to minimize overhead - if (corked && ((ptr - start) < 1024)) - return; - - sentUpTo = start; - while (sentUpTo < ptr) { - size_t n = writeTLS(sentUpTo, ptr - sentUpTo); - sentUpTo += n; - offset += n; - } - - ptr = start; + BufferedOutStream::flush(); out->flush(); } void TLSOutStream::cork(bool enable) { - OutStream::cork(enable); - + BufferedOutStream::cork(enable); out->cork(enable); } -void TLSOutStream::overrun(size_t needed) +bool TLSOutStream::flushBuffer() { - bool oldCorked; - - if (needed > bufSize) - throw Exception("TLSOutStream overrun: buffer size exceeded"); - - // A cork might prevent the flush, so disable it temporarily - oldCorked = corked; - corked = false; - - flush(); + while (sentUpTo < ptr) { + size_t n = writeTLS(sentUpTo, ptr - sentUpTo); + sentUpTo += n; + } - corked = oldCorked; + return true; } size_t TLSOutStream::writeTLS(const U8* data, size_t length) diff --git a/common/rdr/TLSOutStream.h b/common/rdr/TLSOutStream.h index 5e725bd5..276e5c78 100644 --- a/common/rdr/TLSOutStream.h +++ b/common/rdr/TLSOutStream.h @@ -22,31 +22,25 @@ #ifdef HAVE_GNUTLS #include -#include +#include namespace rdr { - class TLSOutStream : public OutStream { + class TLSOutStream : public BufferedOutStream { public: TLSOutStream(OutStream* out, gnutls_session_t session); virtual ~TLSOutStream(); - void flush(); - size_t length(); + virtual void flush(); virtual void cork(bool enable); - protected: - virtual void overrun(size_t needed); - private: + virtual bool flushBuffer(); size_t writeTLS(const U8* data, size_t length); static ssize_t push(gnutls_transport_ptr_t str, const void* data, size_t size); gnutls_session_t session; OutStream* out; - size_t bufSize; - U8* start; - size_t offset; Exception* saved_exception; }; diff --git a/common/rdr/ZlibOutStream.cxx b/common/rdr/ZlibOutStream.cxx index 1ff234ce..63820b8e 100644 --- a/common/rdr/ZlibOutStream.cxx +++ b/common/rdr/ZlibOutStream.cxx @@ -35,11 +35,8 @@ static rfb::LogWriter vlog("ZlibOutStream"); using namespace rdr; -enum { DEFAULT_BUF_SIZE = 16384 }; - ZlibOutStream::ZlibOutStream(OutStream* os, int compressLevel) - : underlying(os), compressionLevel(compressLevel), newLevel(compressLevel), - bufSize(DEFAULT_BUF_SIZE), offset(0) + : underlying(os), compressionLevel(compressLevel), newLevel(compressLevel) { zs = new z_stream; zs->zalloc = Z_NULL; @@ -51,8 +48,6 @@ ZlibOutStream::ZlibOutStream(OutStream* os, int compressLevel) delete zs; throw Exception("ZlibOutStream: deflateInit failed"); } - ptr = start = new U8[bufSize]; - end = start + bufSize; } ZlibOutStream::~ZlibOutStream() @@ -61,7 +56,6 @@ ZlibOutStream::~ZlibOutStream() flush(); } catch (Exception&) { } - delete [] start; deflateEnd(zs); delete zs; } @@ -69,6 +63,8 @@ ZlibOutStream::~ZlibOutStream() void ZlibOutStream::setUnderlying(OutStream* os) { underlying = os; + if (underlying) + underlying->cork(corked); } void ZlibOutStream::setCompressionLevel(int level) @@ -79,68 +75,37 @@ void ZlibOutStream::setCompressionLevel(int level) newLevel = level; } -size_t ZlibOutStream::length() -{ - return offset + ptr - start; -} - void ZlibOutStream::flush() { - checkCompressionLevel(); - - zs->next_in = start; - zs->avail_in = ptr - start; - -#ifdef ZLIBOUT_DEBUG - vlog.debug("flush: avail_in %d",zs->avail_in); -#endif - - // Force out everything from the zlib encoder - deflate(corked ? Z_NO_FLUSH : Z_SYNC_FLUSH); - - if (zs->avail_in == 0) { - offset += ptr - start; - ptr = start; - } else { - // didn't consume all the data? try shifting what's left to the - // start of the buffer. - memmove(start, zs->next_in, ptr - zs->next_in); - offset += zs->next_in - start; - ptr -= zs->next_in - start; - } + BufferedOutStream::flush(); + if (underlying != NULL) + underlying->flush(); } void ZlibOutStream::cork(bool enable) { - OutStream::cork(enable); - - underlying->cork(enable); + BufferedOutStream::cork(enable); + if (underlying != NULL) + underlying->cork(enable); } -void ZlibOutStream::overrun(size_t needed) +bool ZlibOutStream::flushBuffer() { -#ifdef ZLIBOUT_DEBUG - vlog.debug("overrun"); -#endif - - if (needed > bufSize) - throw Exception("ZlibOutStream overrun: buffer size exceeded"); - checkCompressionLevel(); - while (avail() < needed) { - bool oldCorked; + zs->next_in = sentUpTo; + zs->avail_in = ptr - sentUpTo; - // use corked to make zlib a bit more efficient since we're not trying - // to end the stream here, just make some room +#ifdef ZLIBOUT_DEBUG + vlog.debug("flush: avail_in %d",zs->avail_in); +#endif - oldCorked = corked; - corked = true; + // Force out everything from the zlib encoder + deflate(corked ? Z_NO_FLUSH : Z_SYNC_FLUSH); - flush(); + sentUpTo = zs->next_in; - corked = oldCorked; - } + return true; } void ZlibOutStream::deflate(int flush) diff --git a/common/rdr/ZlibOutStream.h b/common/rdr/ZlibOutStream.h index af917d89..8061a58c 100644 --- a/common/rdr/ZlibOutStream.h +++ b/common/rdr/ZlibOutStream.h @@ -25,13 +25,13 @@ #ifndef __RDR_ZLIBOUTSTREAM_H__ #define __RDR_ZLIBOUTSTREAM_H__ -#include +#include struct z_stream_s; namespace rdr { - class ZlibOutStream : public OutStream { + class ZlibOutStream : public BufferedOutStream { public: @@ -40,23 +40,18 @@ namespace rdr { void setUnderlying(OutStream* os); void setCompressionLevel(int level=-1); - void flush(); - size_t length(); + virtual void flush(); virtual void cork(bool enable); private: - - virtual void overrun(size_t needed); + virtual bool flushBuffer(); void deflate(int flush); void checkCompressionLevel(); OutStream* underlying; int compressionLevel; int newLevel; - size_t bufSize; - size_t offset; z_stream_s* zs; - U8* start; }; } // end of namespace rdr -- 2.39.5