]> source.dussan.org Git - tigervnc.git/commitdiff
Use BufferedOutStream in more streams
authorPierre Ossman <ossman@cendio.se>
Wed, 24 Aug 2022 08:31:42 +0000 (10:31 +0200)
committerPierre Ossman <ossman@cendio.se>
Thu, 25 Aug 2022 10:21:15 +0000 (12:21 +0200)
Avoid duplicating all the memory mangement, and instead use the
BufferedOutStream as a base clase for all out streams that need an
intermediate buffer.

common/rdr/BufferedOutStream.cxx
common/rdr/BufferedOutStream.h
common/rdr/FdOutStream.cxx
common/rdr/HexOutStream.cxx
common/rdr/HexOutStream.h
common/rdr/TLSOutStream.cxx
common/rdr/TLSOutStream.h
common/rdr/ZlibOutStream.cxx
common/rdr/ZlibOutStream.h

index c8f6ce9cb6d733f0bfcea49f86f77e18c71a15f0..ff7b6b51b1b0301b62dc018c4b06a5776771a133 100644 (file)
@@ -31,8 +31,8 @@ using namespace rdr;
 static const size_t DEFAULT_BUF_SIZE = 16384;
 static const size_t MAX_BUF_SIZE = 32 * 1024 * 1024;
 
-BufferedOutStream::BufferedOutStream()
-  : bufSize(DEFAULT_BUF_SIZE), offset(0)
+BufferedOutStream::BufferedOutStream(bool emulateCork)
+  : bufSize(DEFAULT_BUF_SIZE), offset(0), emulateCork(emulateCork)
 {
   ptr = start = sentUpTo = new U8[bufSize];
   end = start + bufSize;
@@ -55,6 +55,10 @@ void BufferedOutStream::flush()
 {
   struct timeval now;
 
+  // Only give larger chunks if corked to minimize overhead
+  if (corked && emulateCork && ((ptr - sentUpTo) < 1024))
+    return;
+
   while (sentUpTo < ptr) {
     size_t len;
 
@@ -101,11 +105,17 @@ bool BufferedOutStream::hasBufferedData()
 
 void BufferedOutStream::overrun(size_t needed)
 {
+  bool oldCorked;
   size_t totalNeeded, newSize;
   U8* newBuffer;
 
   // First try to get rid of the data we have
+  // (use corked to make things a bit more efficient since we're not
+  // trying to flush out everything, just make some room)
+  oldCorked = corked;
+  cork(true);
   flush();
+  cork(oldCorked);
 
   // Make note of the total needed space
   totalNeeded = needed + (ptr - sentUpTo);
index b01d1fee3137a74bfa1ae97214068ec6da9c1fc7..94707118616017a524bb783e29b8d36f45e09aba 100644 (file)
@@ -59,11 +59,13 @@ namespace rdr {
     struct timeval lastSizeCheck;
     size_t peakUsage;
 
+    bool emulateCork;
+
   protected:
     U8* sentUpTo;
 
   protected:
-    BufferedOutStream();
+    BufferedOutStream(bool emulateCork=true);
   };
 
 }
index ed6535a8d12b07b5c627f16b2e5dfe88fa4c1819..f89fd34530cacfd83e5a02afceeaa9a17b8ff6ce 100644 (file)
@@ -52,7 +52,7 @@
 using namespace rdr;
 
 FdOutStream::FdOutStream(int fd_)
-  : fd(fd_)
+  : BufferedOutStream(false), fd(fd_)
 {
   gettimeofday(&lastWrite, NULL);
 }
index 85aff47ba9b6b938322217562fa6c1f73f733248..4bcd8ba48430234799645e6aa134a74f3d049838 100644 (file)
 
 using namespace rdr;
 
-const int DEFAULT_BUF_LEN = 16384;
-
 static inline size_t min(size_t a, size_t b) {return a<b ? a : b;}
 
 HexOutStream::HexOutStream(OutStream& os)
-  : out_stream(os), offset(0), bufSize(DEFAULT_BUF_LEN)
+  : out_stream(os)
 {
-  if (bufSize % 2)
-    bufSize--;
-  ptr = start = new U8[bufSize];
-  end = start + bufSize;
 }
 
-HexOutStream::~HexOutStream() {
-  delete [] start;
+HexOutStream::~HexOutStream()
+{
 }
 
 char HexOutStream::intToHex(int i) {
@@ -65,48 +59,33 @@ char* HexOutStream::binToHexStr(const char* data, size_t length) {
   return buffer;
 }
 
-
-void
-HexOutStream::writeBuffer() {
-  U8* pos = start;
-  while (pos != ptr) {
+bool HexOutStream::flushBuffer()
+{
+  while (sentUpTo != ptr) {
     U8* optr = out_stream.getptr(2);
-    size_t length = min(ptr-pos, out_stream.avail()/2);
+    size_t length = min(ptr-sentUpTo, out_stream.avail()/2);
 
     for (size_t i=0; i<length; i++) {
-      optr[i*2] = intToHex((pos[i] >> 4) & 0xf);
-      optr[i*2+1] = intToHex(pos[i] & 0xf);
+      optr[i*2] = intToHex((sentUpTo[i] >> 4) & 0xf);
+      optr[i*2+1] = intToHex(sentUpTo[i] & 0xf);
     }
 
     out_stream.setptr(length*2);
-    pos += length;
+    sentUpTo += length;
   }
-  offset += ptr - start;
-  ptr = start;
-}
 
-size_t HexOutStream::length()
-{
-  return offset + ptr - start;
+  return true;
 }
 
 void
 HexOutStream::flush() {
-  writeBuffer();
+  BufferedOutStream::flush();
   out_stream.flush();
 }
 
 void HexOutStream::cork(bool enable)
 {
-  OutStream::cork(enable);
-
+  BufferedOutStream::cork(enable);
   out_stream.cork(enable);
 }
 
-void HexOutStream::overrun(size_t needed) {
-  if (needed > bufSize)
-    throw Exception("HexOutStream overrun: buffer size exceeded");
-
-  writeBuffer();
-}
-
index e591dd886e27fcfc5948955adfe6d936bc983883..8b1ab9ec316162694ba8a00d0445c59a2cd72a01 100644 (file)
 #ifndef __RDR_HEX_OUTSTREAM_H__
 #define __RDR_HEX_OUTSTREAM_H__
 
-#include <rdr/OutStream.h>
+#include <rdr/BufferedOutStream.h>
 
 namespace rdr {
 
-  class HexOutStream : public OutStream {
+  class HexOutStream : public BufferedOutStream {
   public:
 
     HexOutStream(OutStream& os);
     virtual ~HexOutStream();
 
-    void flush();
-    size_t length();
+    virtual void flush();
     virtual void cork(bool enable);
 
     static char intToHex(int i);
     static char* binToHexStr(const char* data, size_t length);
 
   private:
+    virtual bool flushBuffer();
     void writeBuffer();
-    virtual void overrun(size_t needed);
 
     OutStream& out_stream;
-
-    U8* start;
-    size_t offset;
-    size_t bufSize;
   };
 
 }
index 63bdeef145b3168e93410511cbfcc3331e676452..cdfe9b1217068be0b3c2673bc383d53b2d903b36 100644 (file)
@@ -34,8 +34,6 @@ using namespace rdr;
 
 static rfb::LogWriter vlog("TLSOutStream");
 
-enum { DEFAULT_BUF_SIZE = 16384 };
-
 ssize_t TLSOutStream::push(gnutls_transport_ptr_t str, const void* data,
                                   size_t size)
 {
@@ -64,14 +62,10 @@ ssize_t TLSOutStream::push(gnutls_transport_ptr_t str, const void* data,
 }
 
 TLSOutStream::TLSOutStream(OutStream* _out, gnutls_session_t _session)
-  : session(_session), out(_out), bufSize(DEFAULT_BUF_SIZE), offset(0),
-    saved_exception(NULL)
+  : session(_session), out(_out), saved_exception(NULL)
 {
   gnutls_transport_ptr_t recv, send;
 
-  ptr = start = new U8[bufSize];
-  end = start + bufSize;
-
   gnutls_transport_set_push_function(session, push);
   gnutls_transport_get_ptr2(session, &recv, &send);
   gnutls_transport_set_ptr2(session, recv, this);
@@ -87,55 +81,29 @@ TLSOutStream::~TLSOutStream()
 #endif
   gnutls_transport_set_push_function(session, NULL);
 
-  delete [] start;
   delete saved_exception;
 }
 
-size_t TLSOutStream::length()
-{
-  return offset + ptr - start;
-}
-
 void TLSOutStream::flush()
 {
-  U8* sentUpTo;
-
-  // Only give GnuTLS larger chunks if corked to minimize overhead
-  if (corked && ((ptr - start) < 1024))
-    return;
-
-  sentUpTo = start;
-  while (sentUpTo < ptr) {
-    size_t n = writeTLS(sentUpTo, ptr - sentUpTo);
-    sentUpTo += n;
-    offset += n;
-  }
-
-  ptr = start;
+  BufferedOutStream::flush();
   out->flush();
 }
 
 void TLSOutStream::cork(bool enable)
 {
-  OutStream::cork(enable);
-
+  BufferedOutStream::cork(enable);
   out->cork(enable);
 }
 
-void TLSOutStream::overrun(size_t needed)
+bool TLSOutStream::flushBuffer()
 {
-  bool oldCorked;
-
-  if (needed > bufSize)
-    throw Exception("TLSOutStream overrun: buffer size exceeded");
-
-  // A cork might prevent the flush, so disable it temporarily
-  oldCorked = corked;
-  corked = false;
-
-  flush();
+  while (sentUpTo < ptr) {
+    size_t n = writeTLS(sentUpTo, ptr - sentUpTo);
+    sentUpTo += n;
+  }
 
-  corked = oldCorked;
+  return true;
 }
 
 size_t TLSOutStream::writeTLS(const U8* data, size_t length)
index 5e725bd51f5c45126a5133fb52373a50fa884357..276e5c7830ab345d34951f836bb6212a050a87ce 100644 (file)
 
 #ifdef HAVE_GNUTLS
 #include <gnutls/gnutls.h>
-#include <rdr/OutStream.h>
+#include <rdr/BufferedOutStream.h>
 
 namespace rdr {
 
-  class TLSOutStream : public OutStream {
+  class TLSOutStream : public BufferedOutStream {
   public:
     TLSOutStream(OutStream* out, gnutls_session_t session);
     virtual ~TLSOutStream();
 
-    void flush();
-    size_t length();
+    virtual void flush();
     virtual void cork(bool enable);
 
-  protected:
-    virtual void overrun(size_t needed);
-
   private:
+    virtual bool flushBuffer();
     size_t writeTLS(const U8* data, size_t length);
     static ssize_t push(gnutls_transport_ptr_t str, const void* data, size_t size);
 
     gnutls_session_t session;
     OutStream* out;
-    size_t bufSize;
-    U8* start;
-    size_t offset;
 
     Exception* saved_exception;
   };
index 1ff234cee8cf72a3e28316944a8a678a29c9e5a9..63820b8ecba629d9e929ab2240617207516fd454 100644 (file)
@@ -35,11 +35,8 @@ static rfb::LogWriter vlog("ZlibOutStream");
 
 using namespace rdr;
 
-enum { DEFAULT_BUF_SIZE = 16384 };
-
 ZlibOutStream::ZlibOutStream(OutStream* os, int compressLevel)
-  : underlying(os), compressionLevel(compressLevel), newLevel(compressLevel),
-    bufSize(DEFAULT_BUF_SIZE), offset(0)
+  : underlying(os), compressionLevel(compressLevel), newLevel(compressLevel)
 {
   zs = new z_stream;
   zs->zalloc    = Z_NULL;
@@ -51,8 +48,6 @@ ZlibOutStream::ZlibOutStream(OutStream* os, int compressLevel)
     delete zs;
     throw Exception("ZlibOutStream: deflateInit failed");
   }
-  ptr = start = new U8[bufSize];
-  end = start + bufSize;
 }
 
 ZlibOutStream::~ZlibOutStream()
@@ -61,7 +56,6 @@ ZlibOutStream::~ZlibOutStream()
     flush();
   } catch (Exception&) {
   }
-  delete [] start;
   deflateEnd(zs);
   delete zs;
 }
@@ -69,6 +63,8 @@ ZlibOutStream::~ZlibOutStream()
 void ZlibOutStream::setUnderlying(OutStream* os)
 {
   underlying = os;
+  if (underlying)
+    underlying->cork(corked);
 }
 
 void ZlibOutStream::setCompressionLevel(int level)
@@ -79,68 +75,37 @@ void ZlibOutStream::setCompressionLevel(int level)
   newLevel = level;
 }
 
-size_t ZlibOutStream::length()
-{
-  return offset + ptr - start;
-}
-
 void ZlibOutStream::flush()
 {
-  checkCompressionLevel();
-
-  zs->next_in = start;
-  zs->avail_in = ptr - start;
-
-#ifdef ZLIBOUT_DEBUG
-  vlog.debug("flush: avail_in %d",zs->avail_in);
-#endif
-
-  // Force out everything from the zlib encoder
-  deflate(corked ? Z_NO_FLUSH : Z_SYNC_FLUSH);
-
-  if (zs->avail_in == 0) {
-    offset += ptr - start;
-    ptr = start;
-  } else {
-    // didn't consume all the data?  try shifting what's left to the
-    // start of the buffer.
-    memmove(start, zs->next_in, ptr - zs->next_in);
-    offset += zs->next_in - start;
-    ptr -= zs->next_in - start;
-  }
+  BufferedOutStream::flush();
+  if (underlying != NULL)
+    underlying->flush();
 }
 
 void ZlibOutStream::cork(bool enable)
 {
-  OutStream::cork(enable);
-
-  underlying->cork(enable);
+  BufferedOutStream::cork(enable);
+  if (underlying != NULL)
+    underlying->cork(enable);
 }
 
-void ZlibOutStream::overrun(size_t needed)
+bool ZlibOutStream::flushBuffer()
 {
-#ifdef ZLIBOUT_DEBUG
-  vlog.debug("overrun");
-#endif
-
-  if (needed > bufSize)
-    throw Exception("ZlibOutStream overrun: buffer size exceeded");
-
   checkCompressionLevel();
 
-  while (avail() < needed) {
-    bool oldCorked;
+  zs->next_in = sentUpTo;
+  zs->avail_in = ptr - sentUpTo;
 
-    // use corked to make zlib a bit more efficient since we're not trying
-    // to end the stream here, just make some room
+#ifdef ZLIBOUT_DEBUG
+  vlog.debug("flush: avail_in %d",zs->avail_in);
+#endif
 
-    oldCorked = corked;
-    corked = true;
+  // Force out everything from the zlib encoder
+  deflate(corked ? Z_NO_FLUSH : Z_SYNC_FLUSH);
 
-    flush();
+  sentUpTo = zs->next_in;
 
-    corked = oldCorked;
-  }
+  return true;
 }
 
 void ZlibOutStream::deflate(int flush)
index af917d89d5e6c5bb4c8bddbee633e8f34cf20e3d..8061a58c697a4431cd354ab746b8ed9979fbcf4d 100644 (file)
 #ifndef __RDR_ZLIBOUTSTREAM_H__
 #define __RDR_ZLIBOUTSTREAM_H__
 
-#include <rdr/OutStream.h>
+#include <rdr/BufferedOutStream.h>
 
 struct z_stream_s;
 
 namespace rdr {
 
-  class ZlibOutStream : public OutStream {
+  class ZlibOutStream : public BufferedOutStream {
 
   public:
 
@@ -40,23 +40,18 @@ namespace rdr {
 
     void setUnderlying(OutStream* os);
     void setCompressionLevel(int level=-1);
-    void flush();
-    size_t length();
+    virtual void flush();
     virtual void cork(bool enable);
 
   private:
-
-    virtual void overrun(size_t needed);
+    virtual bool flushBuffer();
     void deflate(int flush);
     void checkCompressionLevel();
 
     OutStream* underlying;
     int compressionLevel;
     int newLevel;
-    size_t bufSize;
-    size_t offset;
     z_stream_s* zs;
-    U8* start;
   };
 
 } // end of namespace rdr