void shutdown();
bool isShutdown() const;
- virtual bool cork(bool enable) = 0;
+ void cork(bool enable) { outstream->cork(enable); }
// information about the remote end of the socket
virtual char* getPeerAddress() = 0; // a string e.g. "192.168.0.1"
return true;
}
-bool TcpSocket::cork(bool enable) {
-#ifndef TCP_CORK
- return false;
-#else
- int one = enable ? 1 : 0;
- if (setsockopt(getFd(), IPPROTO_TCP, TCP_CORK, (char *)&one, sizeof(one)) < 0)
- return false;
- return true;
-#endif
-}
-
TcpListener::TcpListener(int sock) : SocketListener(sock)
{
}
virtual char* getPeerAddress();
virtual char* getPeerEndpoint();
- virtual bool cork(bool enable);
-
protected:
bool enableNagles(bool enable);
};
return getPeerAddress();
}
-bool UnixSocket::cork(bool enable)
-{
- return true;
-}
-
UnixListener::UnixListener(const char *path, int mode)
{
struct sockaddr_un addr;
virtual char* getPeerAddress();
virtual char* getPeerEndpoint();
-
- virtual bool cork(bool enable);
};
class UnixListener : public SocketListener {
return rfb::msSince(&lastWrite);
}
+void FdOutStream::cork(bool enable)
+{
+ BufferedOutStream::cork(enable);
+
+#ifdef TCP_CORK
+ int one = enable ? 1 : 0;
+ setsockopt(fd, IPPROTO_TCP, TCP_CORK, (char *)&one, sizeof(one));
+#endif
+}
+
bool FdOutStream::flushBuffer(bool wait)
{
size_t n = writeWithTimeout((const void*) sentUpTo,
unsigned getIdleTime();
+ virtual void cork(bool enable);
+
private:
virtual bool flushBuffer(bool wait);
size_t writeWithTimeout(const void* data, size_t length, int timeoutms);
delete [] start;
}
-
char HexOutStream::intToHex(int i) {
if ((i>=0) && (i<=9))
return '0'+i;
out_stream.flush();
}
+void HexOutStream::cork(bool enable)
+{
+ OutStream::cork(enable);
+
+ out_stream.cork(enable);
+}
+
void HexOutStream::overrun(size_t needed) {
if (needed > bufSize)
throw Exception("HexOutStream overrun: buffer size exceeded");
void flush();
size_t length();
+ virtual void cork(bool enable);
static char intToHex(int i);
static char* binToHexStr(const char* data, size_t length);
protected:
- OutStream() {}
+ OutStream() : ptr(NULL), end(NULL), corked(false) {}
public:
virtual void flush() {}
+ // cork() requests that the stream coalesces flushes in an efficient way
+
+ virtual void cork(bool enable) { corked = enable; flush(); }
+
// getptr(), getend() and setptr() are "dirty" methods which allow you to
// manipulate the buffer directly. This is useful for a stream which is a
// wrapper around an underlying stream.
U8* ptr;
U8* end;
+
+ bool corked;
};
}
void TLSOutStream::flush()
{
- U8* sentUpTo = start;
+ U8* sentUpTo;
+
+ // Only give GnuTLS larger chunks if corked to minimize overhead
+ if (corked && ((ptr - start) < 1024))
+ return;
+
+ sentUpTo = start;
while (sentUpTo < ptr) {
size_t n = writeTLS(sentUpTo, ptr - sentUpTo);
sentUpTo += n;
out->flush();
}
+void TLSOutStream::cork(bool enable)
+{
+ OutStream::cork(enable);
+
+ out->cork(enable);
+}
+
void TLSOutStream::overrun(size_t needed)
{
if (needed > bufSize)
throw Exception("TLSOutStream overrun: buffer size exceeded");
+ // A cork might prevent the flush, so disable it temporarily
+ corked = false;
flush();
+ corked = true;
}
size_t TLSOutStream::writeTLS(const U8* data, size_t length)
void flush();
size_t length();
+ virtual void cork(bool enable);
protected:
virtual void overrun(size_t needed);
#endif
// Force out everything from the zlib encoder
- deflate(Z_SYNC_FLUSH);
+ deflate(corked ? Z_NO_FLUSH : Z_SYNC_FLUSH);
+
+ if (zs->avail_in == 0) {
+ offset += ptr - start;
+ ptr = start;
+ } else {
+ // didn't consume all the data? try shifting what's left to the
+ // start of the buffer.
+ memmove(start, zs->next_in, ptr - zs->next_in);
+ offset += zs->next_in - start;
+ ptr -= zs->next_in - start;
+ }
+}
+
+void ZlibOutStream::cork(bool enable)
+{
+ OutStream::cork(enable);
- offset += ptr - start;
- ptr = start;
+ underlying->cork(enable);
}
void ZlibOutStream::overrun(size_t needed)
checkCompressionLevel();
while (avail() < needed) {
- zs->next_in = start;
- zs->avail_in = ptr - start;
-
- deflate(Z_NO_FLUSH);
-
- // output buffer not full
-
- if (zs->avail_in == 0) {
- offset += ptr - start;
- ptr = start;
- } else {
- // but didn't consume all the data? try shifting what's left to the
- // start of the buffer.
- vlog.info("z out buf not full, but in data not consumed");
- memmove(start, zs->next_in, ptr - zs->next_in);
- offset += zs->next_in - start;
- ptr -= zs->next_in - start;
- }
+ // use corked to make zlib a bit more efficient since we're not trying
+ // to end the stream here, just make some room
+ corked = true;
+ flush();
+ corked = false;
}
}
void setCompressionLevel(int level=-1);
void flush();
size_t length();
+ virtual void cork(bool enable);
private:
zlibStreams[streamId].setUnderlying(&memStream);
zlibStreams[streamId].setCompressionLevel(level);
+ zlibStreams[streamId].cork(true);
return &zlibStreams[streamId];
}
if (zos == NULL)
return;
+ zos->cork(false);
zos->flush();
zos->setUnderlying(NULL);
try {
if (sock->outStream().bufferUsage() > 0) {
- sock->cork(false);
+ sock->outStream().cork(false);
sock->outStream().flush();
if (sock->outStream().bufferUsage() > 0)
vlog.error("Failed to flush remaining socket data on close");
inProcessMessages = true;
- // Get the underlying TCP layer to build large packets if we send
+ // Get the underlying transport to build large packets if we send
// multiple small responses.
- sock->cork(true);
+ getOutStream()->cork(true);
while (getInStream()->checkNoWait(1)) {
if (pendingSyncFence) {
}
// Flush out everything in case we go idle after this.
- sock->cork(false);
+ getOutStream()->cork(false);
inProcessMessages = false;
// mode, we will also have small fence messages around the update. We
// need to aggregate these in order to not clog up TCP's congestion
// window.
- sock->cork(true);
+ getOutStream()->cork(true);
// First take care of any updates that cannot contain framebuffer data
// changes.
// Then real data (if possible)
writeDataUpdate();
- sock->cork(false);
+ getOutStream()->cork(false);
congestion.updatePosition(sock->outStream().length());
}