The principle can be used in a more general fashion than just TCP streams.tags/v1.11.90
void shutdown(); | void shutdown(); | ||||
bool isShutdown() const; | bool isShutdown() const; | ||||
virtual bool cork(bool enable) = 0; | |||||
void cork(bool enable) { outstream->cork(enable); } | |||||
// information about the remote end of the socket | // information about the remote end of the socket | ||||
virtual char* getPeerAddress() = 0; // a string e.g. "192.168.0.1" | virtual char* getPeerAddress() = 0; // a string e.g. "192.168.0.1" |
return true; | return true; | ||||
} | } | ||||
bool TcpSocket::cork(bool enable) { | |||||
#ifndef TCP_CORK | |||||
return false; | |||||
#else | |||||
int one = enable ? 1 : 0; | |||||
if (setsockopt(getFd(), IPPROTO_TCP, TCP_CORK, (char *)&one, sizeof(one)) < 0) | |||||
return false; | |||||
return true; | |||||
#endif | |||||
} | |||||
TcpListener::TcpListener(int sock) : SocketListener(sock) | TcpListener::TcpListener(int sock) : SocketListener(sock) | ||||
{ | { | ||||
} | } |
virtual char* getPeerAddress(); | virtual char* getPeerAddress(); | ||||
virtual char* getPeerEndpoint(); | virtual char* getPeerEndpoint(); | ||||
virtual bool cork(bool enable); | |||||
protected: | protected: | ||||
bool enableNagles(bool enable); | bool enableNagles(bool enable); | ||||
}; | }; |
return getPeerAddress(); | return getPeerAddress(); | ||||
} | } | ||||
bool UnixSocket::cork(bool enable) | |||||
{ | |||||
return true; | |||||
} | |||||
UnixListener::UnixListener(const char *path, int mode) | UnixListener::UnixListener(const char *path, int mode) | ||||
{ | { | ||||
struct sockaddr_un addr; | struct sockaddr_un addr; |
virtual char* getPeerAddress(); | virtual char* getPeerAddress(); | ||||
virtual char* getPeerEndpoint(); | virtual char* getPeerEndpoint(); | ||||
virtual bool cork(bool enable); | |||||
}; | }; | ||||
class UnixListener : public SocketListener { | class UnixListener : public SocketListener { |
return rfb::msSince(&lastWrite); | return rfb::msSince(&lastWrite); | ||||
} | } | ||||
void FdOutStream::cork(bool enable) | |||||
{ | |||||
BufferedOutStream::cork(enable); | |||||
#ifdef TCP_CORK | |||||
int one = enable ? 1 : 0; | |||||
setsockopt(fd, IPPROTO_TCP, TCP_CORK, (char *)&one, sizeof(one)); | |||||
#endif | |||||
} | |||||
bool FdOutStream::flushBuffer(bool wait) | bool FdOutStream::flushBuffer(bool wait) | ||||
{ | { | ||||
size_t n = writeWithTimeout((const void*) sentUpTo, | size_t n = writeWithTimeout((const void*) sentUpTo, |
unsigned getIdleTime(); | unsigned getIdleTime(); | ||||
virtual void cork(bool enable); | |||||
private: | private: | ||||
virtual bool flushBuffer(bool wait); | virtual bool flushBuffer(bool wait); | ||||
size_t writeWithTimeout(const void* data, size_t length, int timeoutms); | size_t writeWithTimeout(const void* data, size_t length, int timeoutms); |
delete [] start; | delete [] start; | ||||
} | } | ||||
char HexOutStream::intToHex(int i) { | char HexOutStream::intToHex(int i) { | ||||
if ((i>=0) && (i<=9)) | if ((i>=0) && (i<=9)) | ||||
return '0'+i; | return '0'+i; | ||||
out_stream.flush(); | out_stream.flush(); | ||||
} | } | ||||
void HexOutStream::cork(bool enable) | |||||
{ | |||||
OutStream::cork(enable); | |||||
out_stream.cork(enable); | |||||
} | |||||
void HexOutStream::overrun(size_t needed) { | void HexOutStream::overrun(size_t needed) { | ||||
if (needed > bufSize) | if (needed > bufSize) | ||||
throw Exception("HexOutStream overrun: buffer size exceeded"); | throw Exception("HexOutStream overrun: buffer size exceeded"); |
void flush(); | void flush(); | ||||
size_t length(); | size_t length(); | ||||
virtual void cork(bool enable); | |||||
static char intToHex(int i); | static char intToHex(int i); | ||||
static char* binToHexStr(const char* data, size_t length); | static char* binToHexStr(const char* data, size_t length); |
protected: | protected: | ||||
OutStream() {} | |||||
OutStream() : ptr(NULL), end(NULL), corked(false) {} | |||||
public: | public: | ||||
virtual void flush() {} | virtual void flush() {} | ||||
// cork() requests that the stream coalesces flushes in an efficient way | |||||
virtual void cork(bool enable) { corked = enable; flush(); } | |||||
// getptr(), getend() and setptr() are "dirty" methods which allow you to | // getptr(), getend() and setptr() are "dirty" methods which allow you to | ||||
// manipulate the buffer directly. This is useful for a stream which is a | // manipulate the buffer directly. This is useful for a stream which is a | ||||
// wrapper around an underlying stream. | // wrapper around an underlying stream. | ||||
U8* ptr; | U8* ptr; | ||||
U8* end; | U8* end; | ||||
bool corked; | |||||
}; | }; | ||||
} | } |
void TLSOutStream::flush() | void TLSOutStream::flush() | ||||
{ | { | ||||
U8* sentUpTo = start; | |||||
U8* sentUpTo; | |||||
// Only give GnuTLS larger chunks if corked to minimize overhead | |||||
if (corked && ((ptr - start) < 1024)) | |||||
return; | |||||
sentUpTo = start; | |||||
while (sentUpTo < ptr) { | while (sentUpTo < ptr) { | ||||
size_t n = writeTLS(sentUpTo, ptr - sentUpTo); | size_t n = writeTLS(sentUpTo, ptr - sentUpTo); | ||||
sentUpTo += n; | sentUpTo += n; | ||||
out->flush(); | out->flush(); | ||||
} | } | ||||
void TLSOutStream::cork(bool enable) | |||||
{ | |||||
OutStream::cork(enable); | |||||
out->cork(enable); | |||||
} | |||||
void TLSOutStream::overrun(size_t needed) | void TLSOutStream::overrun(size_t needed) | ||||
{ | { | ||||
if (needed > bufSize) | if (needed > bufSize) | ||||
throw Exception("TLSOutStream overrun: buffer size exceeded"); | throw Exception("TLSOutStream overrun: buffer size exceeded"); | ||||
// A cork might prevent the flush, so disable it temporarily | |||||
corked = false; | |||||
flush(); | flush(); | ||||
corked = true; | |||||
} | } | ||||
size_t TLSOutStream::writeTLS(const U8* data, size_t length) | size_t TLSOutStream::writeTLS(const U8* data, size_t length) |
void flush(); | void flush(); | ||||
size_t length(); | size_t length(); | ||||
virtual void cork(bool enable); | |||||
protected: | protected: | ||||
virtual void overrun(size_t needed); | virtual void overrun(size_t needed); |
#endif | #endif | ||||
// Force out everything from the zlib encoder | // Force out everything from the zlib encoder | ||||
deflate(Z_SYNC_FLUSH); | |||||
deflate(corked ? Z_NO_FLUSH : Z_SYNC_FLUSH); | |||||
if (zs->avail_in == 0) { | |||||
offset += ptr - start; | |||||
ptr = start; | |||||
} else { | |||||
// didn't consume all the data? try shifting what's left to the | |||||
// start of the buffer. | |||||
memmove(start, zs->next_in, ptr - zs->next_in); | |||||
offset += zs->next_in - start; | |||||
ptr -= zs->next_in - start; | |||||
} | |||||
} | |||||
void ZlibOutStream::cork(bool enable) | |||||
{ | |||||
OutStream::cork(enable); | |||||
offset += ptr - start; | |||||
ptr = start; | |||||
underlying->cork(enable); | |||||
} | } | ||||
void ZlibOutStream::overrun(size_t needed) | void ZlibOutStream::overrun(size_t needed) | ||||
checkCompressionLevel(); | checkCompressionLevel(); | ||||
while (avail() < needed) { | while (avail() < needed) { | ||||
zs->next_in = start; | |||||
zs->avail_in = ptr - start; | |||||
deflate(Z_NO_FLUSH); | |||||
// output buffer not full | |||||
if (zs->avail_in == 0) { | |||||
offset += ptr - start; | |||||
ptr = start; | |||||
} else { | |||||
// but didn't consume all the data? try shifting what's left to the | |||||
// start of the buffer. | |||||
vlog.info("z out buf not full, but in data not consumed"); | |||||
memmove(start, zs->next_in, ptr - zs->next_in); | |||||
offset += zs->next_in - start; | |||||
ptr -= zs->next_in - start; | |||||
} | |||||
// use corked to make zlib a bit more efficient since we're not trying | |||||
// to end the stream here, just make some room | |||||
corked = true; | |||||
flush(); | |||||
corked = false; | |||||
} | } | ||||
} | } | ||||
void setCompressionLevel(int level=-1); | void setCompressionLevel(int level=-1); | ||||
void flush(); | void flush(); | ||||
size_t length(); | size_t length(); | ||||
virtual void cork(bool enable); | |||||
private: | private: | ||||
zlibStreams[streamId].setUnderlying(&memStream); | zlibStreams[streamId].setUnderlying(&memStream); | ||||
zlibStreams[streamId].setCompressionLevel(level); | zlibStreams[streamId].setCompressionLevel(level); | ||||
zlibStreams[streamId].cork(true); | |||||
return &zlibStreams[streamId]; | return &zlibStreams[streamId]; | ||||
} | } | ||||
if (zos == NULL) | if (zos == NULL) | ||||
return; | return; | ||||
zos->cork(false); | |||||
zos->flush(); | zos->flush(); | ||||
zos->setUnderlying(NULL); | zos->setUnderlying(NULL); | ||||
try { | try { | ||||
if (sock->outStream().bufferUsage() > 0) { | if (sock->outStream().bufferUsage() > 0) { | ||||
sock->cork(false); | |||||
sock->outStream().cork(false); | |||||
sock->outStream().flush(); | sock->outStream().flush(); | ||||
if (sock->outStream().bufferUsage() > 0) | if (sock->outStream().bufferUsage() > 0) | ||||
vlog.error("Failed to flush remaining socket data on close"); | vlog.error("Failed to flush remaining socket data on close"); | ||||
inProcessMessages = true; | inProcessMessages = true; | ||||
// Get the underlying TCP layer to build large packets if we send | |||||
// Get the underlying transport to build large packets if we send | |||||
// multiple small responses. | // multiple small responses. | ||||
sock->cork(true); | |||||
getOutStream()->cork(true); | |||||
while (getInStream()->checkNoWait(1)) { | while (getInStream()->checkNoWait(1)) { | ||||
if (pendingSyncFence) { | if (pendingSyncFence) { | ||||
} | } | ||||
// Flush out everything in case we go idle after this. | // Flush out everything in case we go idle after this. | ||||
sock->cork(false); | |||||
getOutStream()->cork(false); | |||||
inProcessMessages = false; | inProcessMessages = false; | ||||
// mode, we will also have small fence messages around the update. We | // mode, we will also have small fence messages around the update. We | ||||
// need to aggregate these in order to not clog up TCP's congestion | // need to aggregate these in order to not clog up TCP's congestion | ||||
// window. | // window. | ||||
sock->cork(true); | |||||
getOutStream()->cork(true); | |||||
// First take care of any updates that cannot contain framebuffer data | // First take care of any updates that cannot contain framebuffer data | ||||
// changes. | // changes. | ||||
// Then real data (if possible) | // Then real data (if possible) | ||||
writeDataUpdate(); | writeDataUpdate(); | ||||
sock->cork(false); | |||||
getOutStream()->cork(false); | |||||
congestion.updatePosition(sock->outStream().length()); | congestion.updatePosition(sock->outStream().length()); | ||||
} | } |