aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/network/Socket.h2
-rw-r--r--common/network/TcpSocket.cxx11
-rw-r--r--common/network/TcpSocket.h2
-rw-r--r--common/network/UnixSocket.cxx5
-rw-r--r--common/network/UnixSocket.h2
-rw-r--r--common/rdr/FdOutStream.cxx10
-rw-r--r--common/rdr/FdOutStream.h2
-rw-r--r--common/rdr/HexOutStream.cxx8
-rw-r--r--common/rdr/HexOutStream.h1
-rw-r--r--common/rdr/OutStream.h8
-rw-r--r--common/rdr/TLSOutStream.cxx18
-rw-r--r--common/rdr/TLSOutStream.h1
-rw-r--r--common/rdr/ZlibOutStream.cxx44
-rw-r--r--common/rdr/ZlibOutStream.h1
-rw-r--r--common/rfb/TightEncoder.cxx2
-rw-r--r--common/rfb/VNCSConnectionST.cxx12
16 files changed, 78 insertions, 51 deletions
diff --git a/common/network/Socket.h b/common/network/Socket.h
index d38feba4..901bab13 100644
--- a/common/network/Socket.h
+++ b/common/network/Socket.h
@@ -46,7 +46,7 @@ namespace network {
void shutdown();
bool isShutdown() const;
- virtual bool cork(bool enable) = 0;
+ void cork(bool enable) { outstream->cork(enable); }
// information about the remote end of the socket
virtual char* getPeerAddress() = 0; // a string e.g. "192.168.0.1"
diff --git a/common/network/TcpSocket.cxx b/common/network/TcpSocket.cxx
index 07c8d2cd..a1677c86 100644
--- a/common/network/TcpSocket.cxx
+++ b/common/network/TcpSocket.cxx
@@ -287,17 +287,6 @@ bool TcpSocket::enableNagles(bool enable) {
return true;
}
-bool TcpSocket::cork(bool enable) {
-#ifndef TCP_CORK
- return false;
-#else
- int one = enable ? 1 : 0;
- if (setsockopt(getFd(), IPPROTO_TCP, TCP_CORK, (char *)&one, sizeof(one)) < 0)
- return false;
- return true;
-#endif
-}
-
TcpListener::TcpListener(int sock) : SocketListener(sock)
{
}
diff --git a/common/network/TcpSocket.h b/common/network/TcpSocket.h
index eb6c0958..787de629 100644
--- a/common/network/TcpSocket.h
+++ b/common/network/TcpSocket.h
@@ -58,8 +58,6 @@ namespace network {
virtual char* getPeerAddress();
virtual char* getPeerEndpoint();
- virtual bool cork(bool enable);
-
protected:
bool enableNagles(bool enable);
};
diff --git a/common/network/UnixSocket.cxx b/common/network/UnixSocket.cxx
index bfabc141..71f84950 100644
--- a/common/network/UnixSocket.cxx
+++ b/common/network/UnixSocket.cxx
@@ -109,11 +109,6 @@ char* UnixSocket::getPeerEndpoint() {
return getPeerAddress();
}
-bool UnixSocket::cork(bool enable)
-{
- return true;
-}
-
UnixListener::UnixListener(const char *path, int mode)
{
struct sockaddr_un addr;
diff --git a/common/network/UnixSocket.h b/common/network/UnixSocket.h
index 1ffca456..d7c70005 100644
--- a/common/network/UnixSocket.h
+++ b/common/network/UnixSocket.h
@@ -40,8 +40,6 @@ namespace network {
virtual char* getPeerAddress();
virtual char* getPeerEndpoint();
-
- virtual bool cork(bool enable);
};
class UnixListener : public SocketListener {
diff --git a/common/rdr/FdOutStream.cxx b/common/rdr/FdOutStream.cxx
index d7da7103..3405838d 100644
--- a/common/rdr/FdOutStream.cxx
+++ b/common/rdr/FdOutStream.cxx
@@ -77,6 +77,16 @@ unsigned FdOutStream::getIdleTime()
return rfb::msSince(&lastWrite);
}
+void FdOutStream::cork(bool enable)
+{
+ BufferedOutStream::cork(enable);
+
+#ifdef TCP_CORK
+ int one = enable ? 1 : 0;
+ setsockopt(fd, IPPROTO_TCP, TCP_CORK, (char *)&one, sizeof(one));
+#endif
+}
+
bool FdOutStream::flushBuffer(bool wait)
{
size_t n = writeWithTimeout((const void*) sentUpTo,
diff --git a/common/rdr/FdOutStream.h b/common/rdr/FdOutStream.h
index 24327972..b1fb74c0 100644
--- a/common/rdr/FdOutStream.h
+++ b/common/rdr/FdOutStream.h
@@ -43,6 +43,8 @@ namespace rdr {
unsigned getIdleTime();
+ virtual void cork(bool enable);
+
private:
virtual bool flushBuffer(bool wait);
size_t writeWithTimeout(const void* data, size_t length, int timeoutms);
diff --git a/common/rdr/HexOutStream.cxx b/common/rdr/HexOutStream.cxx
index 19c66851..88153c54 100644
--- a/common/rdr/HexOutStream.cxx
+++ b/common/rdr/HexOutStream.cxx
@@ -38,7 +38,6 @@ HexOutStream::~HexOutStream() {
delete [] start;
}
-
char HexOutStream::intToHex(int i) {
if ((i>=0) && (i<=9))
return '0'+i;
@@ -95,6 +94,13 @@ HexOutStream::flush() {
out_stream.flush();
}
+void HexOutStream::cork(bool enable)
+{
+ OutStream::cork(enable);
+
+ out_stream.cork(enable);
+}
+
void HexOutStream::overrun(size_t needed) {
if (needed > bufSize)
throw Exception("HexOutStream overrun: buffer size exceeded");
diff --git a/common/rdr/HexOutStream.h b/common/rdr/HexOutStream.h
index 02366478..e591dd88 100644
--- a/common/rdr/HexOutStream.h
+++ b/common/rdr/HexOutStream.h
@@ -31,6 +31,7 @@ namespace rdr {
void flush();
size_t length();
+ virtual void cork(bool enable);
static char intToHex(int i);
static char* binToHexStr(const char* data, size_t length);
diff --git a/common/rdr/OutStream.h b/common/rdr/OutStream.h
index bb88f325..63c43169 100644
--- a/common/rdr/OutStream.h
+++ b/common/rdr/OutStream.h
@@ -34,7 +34,7 @@ namespace rdr {
protected:
- OutStream() {}
+ OutStream() : ptr(NULL), end(NULL), corked(false) {}
public:
@@ -128,6 +128,10 @@ namespace rdr {
virtual void flush() {}
+ // cork() requests that the stream coalesces flushes in an efficient way
+
+ virtual void cork(bool enable) { corked = enable; flush(); }
+
// getptr(), getend() and setptr() are "dirty" methods which allow you to
// manipulate the buffer directly. This is useful for a stream which is a
// wrapper around an underlying stream.
@@ -147,6 +151,8 @@ namespace rdr {
U8* ptr;
U8* end;
+
+ bool corked;
};
}
diff --git a/common/rdr/TLSOutStream.cxx b/common/rdr/TLSOutStream.cxx
index 76f5dffe..7d59a9c7 100644
--- a/common/rdr/TLSOutStream.cxx
+++ b/common/rdr/TLSOutStream.cxx
@@ -82,7 +82,13 @@ size_t TLSOutStream::length()
void TLSOutStream::flush()
{
- U8* sentUpTo = start;
+ U8* sentUpTo;
+
+ // Only give GnuTLS larger chunks if corked to minimize overhead
+ if (corked && ((ptr - start) < 1024))
+ return;
+
+ sentUpTo = start;
while (sentUpTo < ptr) {
size_t n = writeTLS(sentUpTo, ptr - sentUpTo);
sentUpTo += n;
@@ -93,12 +99,22 @@ void TLSOutStream::flush()
out->flush();
}
+void TLSOutStream::cork(bool enable)
+{
+ OutStream::cork(enable);
+
+ out->cork(enable);
+}
+
void TLSOutStream::overrun(size_t needed)
{
if (needed > bufSize)
throw Exception("TLSOutStream overrun: buffer size exceeded");
+ // A cork might prevent the flush, so disable it temporarily
+ corked = false;
flush();
+ corked = true;
}
size_t TLSOutStream::writeTLS(const U8* data, size_t length)
diff --git a/common/rdr/TLSOutStream.h b/common/rdr/TLSOutStream.h
index 9ce6eb6d..c9750463 100644
--- a/common/rdr/TLSOutStream.h
+++ b/common/rdr/TLSOutStream.h
@@ -37,6 +37,7 @@ namespace rdr {
void flush();
size_t length();
+ virtual void cork(bool enable);
protected:
virtual void overrun(size_t needed);
diff --git a/common/rdr/ZlibOutStream.cxx b/common/rdr/ZlibOutStream.cxx
index 8f7170da..0eb89222 100644
--- a/common/rdr/ZlibOutStream.cxx
+++ b/common/rdr/ZlibOutStream.cxx
@@ -92,10 +92,25 @@ void ZlibOutStream::flush()
#endif
// Force out everything from the zlib encoder
- deflate(Z_SYNC_FLUSH);
+ deflate(corked ? Z_NO_FLUSH : Z_SYNC_FLUSH);
+
+ if (zs->avail_in == 0) {
+ offset += ptr - start;
+ ptr = start;
+ } else {
+ // didn't consume all the data? try shifting what's left to the
+ // start of the buffer.
+ memmove(start, zs->next_in, ptr - zs->next_in);
+ offset += zs->next_in - start;
+ ptr -= zs->next_in - start;
+ }
+}
+
+void ZlibOutStream::cork(bool enable)
+{
+ OutStream::cork(enable);
- offset += ptr - start;
- ptr = start;
+ underlying->cork(enable);
}
void ZlibOutStream::overrun(size_t needed)
@@ -110,24 +125,11 @@ void ZlibOutStream::overrun(size_t needed)
checkCompressionLevel();
while (avail() < needed) {
- zs->next_in = start;
- zs->avail_in = ptr - start;
-
- deflate(Z_NO_FLUSH);
-
- // output buffer not full
-
- if (zs->avail_in == 0) {
- offset += ptr - start;
- ptr = start;
- } else {
- // but didn't consume all the data? try shifting what's left to the
- // start of the buffer.
- vlog.info("z out buf not full, but in data not consumed");
- memmove(start, zs->next_in, ptr - zs->next_in);
- offset += zs->next_in - start;
- ptr -= zs->next_in - start;
- }
+ // use corked to make zlib a bit more efficient since we're not trying
+ // to end the stream here, just make some room
+ corked = true;
+ flush();
+ corked = false;
}
}
diff --git a/common/rdr/ZlibOutStream.h b/common/rdr/ZlibOutStream.h
index 2b08f8d5..af917d89 100644
--- a/common/rdr/ZlibOutStream.h
+++ b/common/rdr/ZlibOutStream.h
@@ -42,6 +42,7 @@ namespace rdr {
void setCompressionLevel(int level=-1);
void flush();
size_t length();
+ virtual void cork(bool enable);
private:
diff --git a/common/rfb/TightEncoder.cxx b/common/rfb/TightEncoder.cxx
index 1b0792c4..5cefa5c5 100644
--- a/common/rfb/TightEncoder.cxx
+++ b/common/rfb/TightEncoder.cxx
@@ -244,6 +244,7 @@ rdr::OutStream* TightEncoder::getZlibOutStream(int streamId, int level, size_t l
zlibStreams[streamId].setUnderlying(&memStream);
zlibStreams[streamId].setCompressionLevel(level);
+ zlibStreams[streamId].cork(true);
return &zlibStreams[streamId];
}
@@ -257,6 +258,7 @@ void TightEncoder::flushZlibOutStream(rdr::OutStream* os_)
if (zos == NULL)
return;
+ zos->cork(false);
zos->flush();
zos->setUnderlying(NULL);
diff --git a/common/rfb/VNCSConnectionST.cxx b/common/rfb/VNCSConnectionST.cxx
index a00ae435..e23e9b87 100644
--- a/common/rfb/VNCSConnectionST.cxx
+++ b/common/rfb/VNCSConnectionST.cxx
@@ -116,7 +116,7 @@ void VNCSConnectionST::close(const char* reason)
try {
if (sock->outStream().bufferUsage() > 0) {
- sock->cork(false);
+ sock->outStream().cork(false);
sock->outStream().flush();
if (sock->outStream().bufferUsage() > 0)
vlog.error("Failed to flush remaining socket data on close");
@@ -157,9 +157,9 @@ void VNCSConnectionST::processMessages()
inProcessMessages = true;
- // Get the underlying TCP layer to build large packets if we send
+ // Get the underlying transport to build large packets if we send
// multiple small responses.
- sock->cork(true);
+ getOutStream()->cork(true);
while (getInStream()->checkNoWait(1)) {
if (pendingSyncFence) {
@@ -176,7 +176,7 @@ void VNCSConnectionST::processMessages()
}
// Flush out everything in case we go idle after this.
- sock->cork(false);
+ getOutStream()->cork(false);
inProcessMessages = false;
@@ -880,7 +880,7 @@ void VNCSConnectionST::writeFramebufferUpdate()
// mode, we will also have small fence messages around the update. We
// need to aggregate these in order to not clog up TCP's congestion
// window.
- sock->cork(true);
+ getOutStream()->cork(true);
// First take care of any updates that cannot contain framebuffer data
// changes.
@@ -889,7 +889,7 @@ void VNCSConnectionST::writeFramebufferUpdate()
// Then real data (if possible)
writeDataUpdate();
- sock->cork(false);
+ getOutStream()->cork(false);
congestion.updatePosition(sock->outStream().length());
}