From d7a61c7df39c25945e1aa5f7843e8f3681ed7169 Mon Sep 17 00:00:00 2001 From: Pierre Ossman Date: Fri, 15 May 2020 22:49:47 +0200 Subject: [PATCH] Generalise corking to all output streams The principle can be used in a more general fashion than just TCP streams. --- common/network/Socket.h | 2 +- common/network/TcpSocket.cxx | 11 --------- common/network/TcpSocket.h | 2 -- common/network/UnixSocket.cxx | 5 ---- common/network/UnixSocket.h | 2 -- common/rdr/FdOutStream.cxx | 10 ++++++++ common/rdr/FdOutStream.h | 2 ++ common/rdr/HexOutStream.cxx | 8 +++++- common/rdr/HexOutStream.h | 1 + common/rdr/OutStream.h | 8 +++++- common/rdr/TLSOutStream.cxx | 18 +++++++++++++- common/rdr/TLSOutStream.h | 1 + common/rdr/ZlibOutStream.cxx | 44 +++++++++++++++++---------------- common/rdr/ZlibOutStream.h | 1 + common/rfb/TightEncoder.cxx | 2 ++ common/rfb/VNCSConnectionST.cxx | 12 ++++----- 16 files changed, 78 insertions(+), 51 deletions(-) diff --git a/common/network/Socket.h b/common/network/Socket.h index d38feba4..901bab13 100644 --- a/common/network/Socket.h +++ b/common/network/Socket.h @@ -46,7 +46,7 @@ namespace network { void shutdown(); bool isShutdown() const; - virtual bool cork(bool enable) = 0; + void cork(bool enable) { outstream->cork(enable); } // information about the remote end of the socket virtual char* getPeerAddress() = 0; // a string e.g. "192.168.0.1" diff --git a/common/network/TcpSocket.cxx b/common/network/TcpSocket.cxx index 07c8d2cd..a1677c86 100644 --- a/common/network/TcpSocket.cxx +++ b/common/network/TcpSocket.cxx @@ -287,17 +287,6 @@ bool TcpSocket::enableNagles(bool enable) { return true; } -bool TcpSocket::cork(bool enable) { -#ifndef TCP_CORK - return false; -#else - int one = enable ? 1 : 0; - if (setsockopt(getFd(), IPPROTO_TCP, TCP_CORK, (char *)&one, sizeof(one)) < 0) - return false; - return true; -#endif -} - TcpListener::TcpListener(int sock) : SocketListener(sock) { } diff --git a/common/network/TcpSocket.h b/common/network/TcpSocket.h index eb6c0958..787de629 100644 --- a/common/network/TcpSocket.h +++ b/common/network/TcpSocket.h @@ -58,8 +58,6 @@ namespace network { virtual char* getPeerAddress(); virtual char* getPeerEndpoint(); - virtual bool cork(bool enable); - protected: bool enableNagles(bool enable); }; diff --git a/common/network/UnixSocket.cxx b/common/network/UnixSocket.cxx index bfabc141..71f84950 100644 --- a/common/network/UnixSocket.cxx +++ b/common/network/UnixSocket.cxx @@ -109,11 +109,6 @@ char* UnixSocket::getPeerEndpoint() { return getPeerAddress(); } -bool UnixSocket::cork(bool enable) -{ - return true; -} - UnixListener::UnixListener(const char *path, int mode) { struct sockaddr_un addr; diff --git a/common/network/UnixSocket.h b/common/network/UnixSocket.h index 1ffca456..d7c70005 100644 --- a/common/network/UnixSocket.h +++ b/common/network/UnixSocket.h @@ -40,8 +40,6 @@ namespace network { virtual char* getPeerAddress(); virtual char* getPeerEndpoint(); - - virtual bool cork(bool enable); }; class UnixListener : public SocketListener { diff --git a/common/rdr/FdOutStream.cxx b/common/rdr/FdOutStream.cxx index d7da7103..3405838d 100644 --- a/common/rdr/FdOutStream.cxx +++ b/common/rdr/FdOutStream.cxx @@ -77,6 +77,16 @@ unsigned FdOutStream::getIdleTime() return rfb::msSince(&lastWrite); } +void FdOutStream::cork(bool enable) +{ + BufferedOutStream::cork(enable); + +#ifdef TCP_CORK + int one = enable ? 1 : 0; + setsockopt(fd, IPPROTO_TCP, TCP_CORK, (char *)&one, sizeof(one)); +#endif +} + bool FdOutStream::flushBuffer(bool wait) { size_t n = writeWithTimeout((const void*) sentUpTo, diff --git a/common/rdr/FdOutStream.h b/common/rdr/FdOutStream.h index 24327972..b1fb74c0 100644 --- a/common/rdr/FdOutStream.h +++ b/common/rdr/FdOutStream.h @@ -43,6 +43,8 @@ namespace rdr { unsigned getIdleTime(); + virtual void cork(bool enable); + private: virtual bool flushBuffer(bool wait); size_t writeWithTimeout(const void* data, size_t length, int timeoutms); diff --git a/common/rdr/HexOutStream.cxx b/common/rdr/HexOutStream.cxx index 19c66851..88153c54 100644 --- a/common/rdr/HexOutStream.cxx +++ b/common/rdr/HexOutStream.cxx @@ -38,7 +38,6 @@ HexOutStream::~HexOutStream() { delete [] start; } - char HexOutStream::intToHex(int i) { if ((i>=0) && (i<=9)) return '0'+i; @@ -95,6 +94,13 @@ HexOutStream::flush() { out_stream.flush(); } +void HexOutStream::cork(bool enable) +{ + OutStream::cork(enable); + + out_stream.cork(enable); +} + void HexOutStream::overrun(size_t needed) { if (needed > bufSize) throw Exception("HexOutStream overrun: buffer size exceeded"); diff --git a/common/rdr/HexOutStream.h b/common/rdr/HexOutStream.h index 02366478..e591dd88 100644 --- a/common/rdr/HexOutStream.h +++ b/common/rdr/HexOutStream.h @@ -31,6 +31,7 @@ namespace rdr { void flush(); size_t length(); + virtual void cork(bool enable); static char intToHex(int i); static char* binToHexStr(const char* data, size_t length); diff --git a/common/rdr/OutStream.h b/common/rdr/OutStream.h index bb88f325..63c43169 100644 --- a/common/rdr/OutStream.h +++ b/common/rdr/OutStream.h @@ -34,7 +34,7 @@ namespace rdr { protected: - OutStream() {} + OutStream() : ptr(NULL), end(NULL), corked(false) {} public: @@ -128,6 +128,10 @@ namespace rdr { virtual void flush() {} + // cork() requests that the stream coalesces flushes in an efficient way + + virtual void cork(bool enable) { corked = enable; flush(); } + // getptr(), getend() and setptr() are "dirty" methods which allow you to // manipulate the buffer directly. This is useful for a stream which is a // wrapper around an underlying stream. @@ -147,6 +151,8 @@ namespace rdr { U8* ptr; U8* end; + + bool corked; }; } diff --git a/common/rdr/TLSOutStream.cxx b/common/rdr/TLSOutStream.cxx index 76f5dffe..7d59a9c7 100644 --- a/common/rdr/TLSOutStream.cxx +++ b/common/rdr/TLSOutStream.cxx @@ -82,7 +82,13 @@ size_t TLSOutStream::length() void TLSOutStream::flush() { - U8* sentUpTo = start; + U8* sentUpTo; + + // Only give GnuTLS larger chunks if corked to minimize overhead + if (corked && ((ptr - start) < 1024)) + return; + + sentUpTo = start; while (sentUpTo < ptr) { size_t n = writeTLS(sentUpTo, ptr - sentUpTo); sentUpTo += n; @@ -93,12 +99,22 @@ void TLSOutStream::flush() out->flush(); } +void TLSOutStream::cork(bool enable) +{ + OutStream::cork(enable); + + out->cork(enable); +} + void TLSOutStream::overrun(size_t needed) { if (needed > bufSize) throw Exception("TLSOutStream overrun: buffer size exceeded"); + // A cork might prevent the flush, so disable it temporarily + corked = false; flush(); + corked = true; } size_t TLSOutStream::writeTLS(const U8* data, size_t length) diff --git a/common/rdr/TLSOutStream.h b/common/rdr/TLSOutStream.h index 9ce6eb6d..c9750463 100644 --- a/common/rdr/TLSOutStream.h +++ b/common/rdr/TLSOutStream.h @@ -37,6 +37,7 @@ namespace rdr { void flush(); size_t length(); + virtual void cork(bool enable); protected: virtual void overrun(size_t needed); diff --git a/common/rdr/ZlibOutStream.cxx b/common/rdr/ZlibOutStream.cxx index 8f7170da..0eb89222 100644 --- a/common/rdr/ZlibOutStream.cxx +++ b/common/rdr/ZlibOutStream.cxx @@ -92,10 +92,25 @@ void ZlibOutStream::flush() #endif // Force out everything from the zlib encoder - deflate(Z_SYNC_FLUSH); + deflate(corked ? Z_NO_FLUSH : Z_SYNC_FLUSH); + + if (zs->avail_in == 0) { + offset += ptr - start; + ptr = start; + } else { + // didn't consume all the data? try shifting what's left to the + // start of the buffer. + memmove(start, zs->next_in, ptr - zs->next_in); + offset += zs->next_in - start; + ptr -= zs->next_in - start; + } +} + +void ZlibOutStream::cork(bool enable) +{ + OutStream::cork(enable); - offset += ptr - start; - ptr = start; + underlying->cork(enable); } void ZlibOutStream::overrun(size_t needed) @@ -110,24 +125,11 @@ void ZlibOutStream::overrun(size_t needed) checkCompressionLevel(); while (avail() < needed) { - zs->next_in = start; - zs->avail_in = ptr - start; - - deflate(Z_NO_FLUSH); - - // output buffer not full - - if (zs->avail_in == 0) { - offset += ptr - start; - ptr = start; - } else { - // but didn't consume all the data? try shifting what's left to the - // start of the buffer. - vlog.info("z out buf not full, but in data not consumed"); - memmove(start, zs->next_in, ptr - zs->next_in); - offset += zs->next_in - start; - ptr -= zs->next_in - start; - } + // use corked to make zlib a bit more efficient since we're not trying + // to end the stream here, just make some room + corked = true; + flush(); + corked = false; } } diff --git a/common/rdr/ZlibOutStream.h b/common/rdr/ZlibOutStream.h index 2b08f8d5..af917d89 100644 --- a/common/rdr/ZlibOutStream.h +++ b/common/rdr/ZlibOutStream.h @@ -42,6 +42,7 @@ namespace rdr { void setCompressionLevel(int level=-1); void flush(); size_t length(); + virtual void cork(bool enable); private: diff --git a/common/rfb/TightEncoder.cxx b/common/rfb/TightEncoder.cxx index 1b0792c4..5cefa5c5 100644 --- a/common/rfb/TightEncoder.cxx +++ b/common/rfb/TightEncoder.cxx @@ -244,6 +244,7 @@ rdr::OutStream* TightEncoder::getZlibOutStream(int streamId, int level, size_t l zlibStreams[streamId].setUnderlying(&memStream); zlibStreams[streamId].setCompressionLevel(level); + zlibStreams[streamId].cork(true); return &zlibStreams[streamId]; } @@ -257,6 +258,7 @@ void TightEncoder::flushZlibOutStream(rdr::OutStream* os_) if (zos == NULL) return; + zos->cork(false); zos->flush(); zos->setUnderlying(NULL); diff --git a/common/rfb/VNCSConnectionST.cxx b/common/rfb/VNCSConnectionST.cxx index a00ae435..e23e9b87 100644 --- a/common/rfb/VNCSConnectionST.cxx +++ b/common/rfb/VNCSConnectionST.cxx @@ -116,7 +116,7 @@ void VNCSConnectionST::close(const char* reason) try { if (sock->outStream().bufferUsage() > 0) { - sock->cork(false); + sock->outStream().cork(false); sock->outStream().flush(); if (sock->outStream().bufferUsage() > 0) vlog.error("Failed to flush remaining socket data on close"); @@ -157,9 +157,9 @@ void VNCSConnectionST::processMessages() inProcessMessages = true; - // Get the underlying TCP layer to build large packets if we send + // Get the underlying transport to build large packets if we send // multiple small responses. - sock->cork(true); + getOutStream()->cork(true); while (getInStream()->checkNoWait(1)) { if (pendingSyncFence) { @@ -176,7 +176,7 @@ void VNCSConnectionST::processMessages() } // Flush out everything in case we go idle after this. - sock->cork(false); + getOutStream()->cork(false); inProcessMessages = false; @@ -880,7 +880,7 @@ void VNCSConnectionST::writeFramebufferUpdate() // mode, we will also have small fence messages around the update. We // need to aggregate these in order to not clog up TCP's congestion // window. - sock->cork(true); + getOutStream()->cork(true); // First take care of any updates that cannot contain framebuffer data // changes. @@ -889,7 +889,7 @@ void VNCSConnectionST::writeFramebufferUpdate() // Then real data (if possible) writeDataUpdate(); - sock->cork(false); + getOutStream()->cork(false); congestion.updatePosition(sock->outStream().length()); } -- 2.39.5