]> source.dussan.org Git - tigervnc.git/commitdiff
Generalise corking to all output streams
authorPierre Ossman <ossman@cendio.se>
Fri, 15 May 2020 20:49:47 +0000 (22:49 +0200)
committerPierre Ossman <ossman@cendio.se>
Thu, 21 May 2020 09:34:22 +0000 (11:34 +0200)
The principle can be used in a more general fashion than just TCP
streams.

16 files changed:
common/network/Socket.h
common/network/TcpSocket.cxx
common/network/TcpSocket.h
common/network/UnixSocket.cxx
common/network/UnixSocket.h
common/rdr/FdOutStream.cxx
common/rdr/FdOutStream.h
common/rdr/HexOutStream.cxx
common/rdr/HexOutStream.h
common/rdr/OutStream.h
common/rdr/TLSOutStream.cxx
common/rdr/TLSOutStream.h
common/rdr/ZlibOutStream.cxx
common/rdr/ZlibOutStream.h
common/rfb/TightEncoder.cxx
common/rfb/VNCSConnectionST.cxx

index d38feba4deda74d72a31978ec2ca7cbf2d49e3ed..901bab1331ebd04ea24c7f487fea4fa828150c17 100644 (file)
@@ -46,7 +46,7 @@ namespace network {
     void shutdown();
     bool isShutdown() const;
 
-    virtual bool cork(bool enable) = 0;
+    void cork(bool enable) { outstream->cork(enable); }
 
     // information about the remote end of the socket
     virtual char* getPeerAddress() = 0; // a string e.g. "192.168.0.1"
index 07c8d2cdc1394bd5c2043f47ce065ee21264085b..a1677c8693ad978d442896d6a52e1a7395a878a6 100644 (file)
@@ -287,17 +287,6 @@ bool TcpSocket::enableNagles(bool enable) {
   return true;
 }
 
-bool TcpSocket::cork(bool enable) {
-#ifndef TCP_CORK
-  return false;
-#else
-  int one = enable ? 1 : 0;
-  if (setsockopt(getFd(), IPPROTO_TCP, TCP_CORK, (char *)&one, sizeof(one)) < 0)
-    return false;
-  return true;
-#endif
-}
-
 TcpListener::TcpListener(int sock) : SocketListener(sock)
 {
 }
index eb6c0958d8481de646402e1bf179baa7371b2a5a..787de6292fe7b0e3c0d60278531119bd3cce6e97 100644 (file)
@@ -58,8 +58,6 @@ namespace network {
     virtual char* getPeerAddress();
     virtual char* getPeerEndpoint();
 
-    virtual bool cork(bool enable);
-
   protected:
     bool enableNagles(bool enable);
   };
index bfabc141d42fb4e3a22adb1f287e8272af4c1458..71f8495007af209745a9ee8f880f0d2be7bebb7d 100644 (file)
@@ -109,11 +109,6 @@ char* UnixSocket::getPeerEndpoint() {
   return getPeerAddress();
 }
 
-bool UnixSocket::cork(bool enable)
-{
-  return true;
-}
-
 UnixListener::UnixListener(const char *path, int mode)
 {
   struct sockaddr_un addr;
index 1ffca456df8ff21e076500ab98a5ebf7fd5d3997..d7c70005ff7770e22e0f8b5fba401b8d9fce7cc0 100644 (file)
@@ -40,8 +40,6 @@ namespace network {
 
     virtual char* getPeerAddress();
     virtual char* getPeerEndpoint();
-
-    virtual bool cork(bool enable);
   };
 
   class UnixListener : public SocketListener {
index d7da71038b2b99d3bd578d9177e3e1c91dcdd116..3405838dfa82aff22f2f235db257bd764a99a022 100644 (file)
@@ -77,6 +77,16 @@ unsigned FdOutStream::getIdleTime()
   return rfb::msSince(&lastWrite);
 }
 
+void FdOutStream::cork(bool enable)
+{
+  BufferedOutStream::cork(enable);
+
+#ifdef TCP_CORK
+  int one = enable ? 1 : 0;
+  setsockopt(fd, IPPROTO_TCP, TCP_CORK, (char *)&one, sizeof(one));
+#endif
+}
+
 bool FdOutStream::flushBuffer(bool wait)
 {
   size_t n = writeWithTimeout((const void*) sentUpTo,
index 24327972817fb0e8637e7d5b988e7b8a8e5afc78..b1fb74c0f40daa583a6281b76fd60f49c3413754 100644 (file)
@@ -43,6 +43,8 @@ namespace rdr {
 
     unsigned getIdleTime();
 
+    virtual void cork(bool enable);
+
   private:
     virtual bool flushBuffer(bool wait);
     size_t writeWithTimeout(const void* data, size_t length, int timeoutms);
index 19c66851e24325bc800c601c542c46ec7ada651c..88153c54ab5dc9dc870c5f4503abb08864670b56 100644 (file)
@@ -38,7 +38,6 @@ HexOutStream::~HexOutStream() {
   delete [] start;
 }
 
-
 char HexOutStream::intToHex(int i) {
   if ((i>=0) && (i<=9))
     return '0'+i;
@@ -95,6 +94,13 @@ HexOutStream::flush() {
   out_stream.flush();
 }
 
+void HexOutStream::cork(bool enable)
+{
+  OutStream::cork(enable);
+
+  out_stream.cork(enable);
+}
+
 void HexOutStream::overrun(size_t needed) {
   if (needed > bufSize)
     throw Exception("HexOutStream overrun: buffer size exceeded");
index 02366478807e92ea9f0633ec52b2752b1eed6561..e591dd886e27fcfc5948955adfe6d936bc983883 100644 (file)
@@ -31,6 +31,7 @@ namespace rdr {
 
     void flush();
     size_t length();
+    virtual void cork(bool enable);
 
     static char intToHex(int i);
     static char* binToHexStr(const char* data, size_t length);
index bb88f325a56cf1f7fbd82f9625c95d8ca09ac0f9..63c431690fd5deec6c894d4b4616d4a117a7b9da 100644 (file)
@@ -34,7 +34,7 @@ namespace rdr {
 
   protected:
 
-    OutStream() {}
+    OutStream() : ptr(NULL), end(NULL), corked(false) {}
 
   public:
 
@@ -128,6 +128,10 @@ namespace rdr {
 
     virtual void flush() {}
 
+    // cork() requests that the stream coalesces flushes in an efficient way
+
+    virtual void cork(bool enable) { corked = enable; flush(); }
+
     // getptr(), getend() and setptr() are "dirty" methods which allow you to
     // manipulate the buffer directly.  This is useful for a stream which is a
     // wrapper around an underlying stream.
@@ -147,6 +151,8 @@ namespace rdr {
 
     U8* ptr;
     U8* end;
+
+    bool corked;
   };
 
 }
index 76f5dffe2584dc38b0458db2340f43fb6693f98c..7d59a9c789a6660cfd6ccaac2a17854cfcaba70d 100644 (file)
@@ -82,7 +82,13 @@ size_t TLSOutStream::length()
 
 void TLSOutStream::flush()
 {
-  U8* sentUpTo = start;
+  U8* sentUpTo;
+
+  // Only give GnuTLS larger chunks if corked to minimize overhead
+  if (corked && ((ptr - start) < 1024))
+    return;
+
+  sentUpTo = start;
   while (sentUpTo < ptr) {
     size_t n = writeTLS(sentUpTo, ptr - sentUpTo);
     sentUpTo += n;
@@ -93,12 +99,22 @@ void TLSOutStream::flush()
   out->flush();
 }
 
+void TLSOutStream::cork(bool enable)
+{
+  OutStream::cork(enable);
+
+  out->cork(enable);
+}
+
 void TLSOutStream::overrun(size_t needed)
 {
   if (needed > bufSize)
     throw Exception("TLSOutStream overrun: buffer size exceeded");
 
+  // A cork might prevent the flush, so disable it temporarily
+  corked = false;
   flush();
+  corked = true;
 }
 
 size_t TLSOutStream::writeTLS(const U8* data, size_t length)
index 9ce6eb6ddf8cdf53cc3104f7e4c386fdd2f03402..c97504635bbb6deb32fe108e1f7aff25dac8118d 100644 (file)
@@ -37,6 +37,7 @@ namespace rdr {
 
     void flush();
     size_t length();
+    virtual void cork(bool enable);
 
   protected:
     virtual void overrun(size_t needed);
index 8f7170dae9b568257409c7fa33b09677cab2e339..0eb89222747826b6935a5327190ca71d95bd50a3 100644 (file)
@@ -92,10 +92,25 @@ void ZlibOutStream::flush()
 #endif
 
   // Force out everything from the zlib encoder
-  deflate(Z_SYNC_FLUSH);
+  deflate(corked ? Z_NO_FLUSH : Z_SYNC_FLUSH);
+
+  if (zs->avail_in == 0) {
+    offset += ptr - start;
+    ptr = start;
+  } else {
+    // didn't consume all the data?  try shifting what's left to the
+    // start of the buffer.
+    memmove(start, zs->next_in, ptr - zs->next_in);
+    offset += zs->next_in - start;
+    ptr -= zs->next_in - start;
+  }
+}
+
+void ZlibOutStream::cork(bool enable)
+{
+  OutStream::cork(enable);
 
-  offset += ptr - start;
-  ptr = start;
+  underlying->cork(enable);
 }
 
 void ZlibOutStream::overrun(size_t needed)
@@ -110,24 +125,11 @@ void ZlibOutStream::overrun(size_t needed)
   checkCompressionLevel();
 
   while (avail() < needed) {
-    zs->next_in = start;
-    zs->avail_in = ptr - start;
-
-    deflate(Z_NO_FLUSH);
-
-    // output buffer not full
-
-    if (zs->avail_in == 0) {
-      offset += ptr - start;
-      ptr = start;
-    } else {
-      // but didn't consume all the data?  try shifting what's left to the
-      // start of the buffer.
-      vlog.info("z out buf not full, but in data not consumed");
-      memmove(start, zs->next_in, ptr - zs->next_in);
-      offset += zs->next_in - start;
-      ptr -= zs->next_in - start;
-    }
+    // use corked to make zlib a bit more efficient since we're not trying
+    // to end the stream here, just make some room
+    corked = true;
+    flush();
+    corked = false;
   }
 }
 
index 2b08f8d5de60f27a9303938e2509e521d744aa24..af917d89d5e6c5bb4c8bddbee633e8f34cf20e3d 100644 (file)
@@ -42,6 +42,7 @@ namespace rdr {
     void setCompressionLevel(int level=-1);
     void flush();
     size_t length();
+    virtual void cork(bool enable);
 
   private:
 
index 1b0792c4dba5d454716481e99777ee70543c35a3..5cefa5c5fabc9f32c7b95049b4ab15d716c28727 100644 (file)
@@ -244,6 +244,7 @@ rdr::OutStream* TightEncoder::getZlibOutStream(int streamId, int level, size_t l
 
   zlibStreams[streamId].setUnderlying(&memStream);
   zlibStreams[streamId].setCompressionLevel(level);
+  zlibStreams[streamId].cork(true);
 
   return &zlibStreams[streamId];
 }
@@ -257,6 +258,7 @@ void TightEncoder::flushZlibOutStream(rdr::OutStream* os_)
   if (zos == NULL)
     return;
 
+  zos->cork(false);
   zos->flush();
   zos->setUnderlying(NULL);
 
index a00ae435b1fdf50c7c8af6341121859788d000b8..e23e9b875c8c985f8cc657623c78871c063f350b 100644 (file)
@@ -116,7 +116,7 @@ void VNCSConnectionST::close(const char* reason)
 
   try {
     if (sock->outStream().bufferUsage() > 0) {
-      sock->cork(false);
+      sock->outStream().cork(false);
       sock->outStream().flush();
       if (sock->outStream().bufferUsage() > 0)
         vlog.error("Failed to flush remaining socket data on close");
@@ -157,9 +157,9 @@ void VNCSConnectionST::processMessages()
 
     inProcessMessages = true;
 
-    // Get the underlying TCP layer to build large packets if we send
+    // Get the underlying transport to build large packets if we send
     // multiple small responses.
-    sock->cork(true);
+    getOutStream()->cork(true);
 
     while (getInStream()->checkNoWait(1)) {
       if (pendingSyncFence) {
@@ -176,7 +176,7 @@ void VNCSConnectionST::processMessages()
     }
 
     // Flush out everything in case we go idle after this.
-    sock->cork(false);
+    getOutStream()->cork(false);
 
     inProcessMessages = false;
 
@@ -880,7 +880,7 @@ void VNCSConnectionST::writeFramebufferUpdate()
   // mode, we will also have small fence messages around the update. We
   // need to aggregate these in order to not clog up TCP's congestion
   // window.
-  sock->cork(true);
+  getOutStream()->cork(true);
 
   // First take care of any updates that cannot contain framebuffer data
   // changes.
@@ -889,7 +889,7 @@ void VNCSConnectionST::writeFramebufferUpdate()
   // Then real data (if possible)
   writeDataUpdate();
 
-  sock->cork(false);
+  getOutStream()->cork(false);
 
   congestion.updatePosition(sock->outStream().length());
 }