static const size_t DEFAULT_BUF_SIZE = 16384;
static const size_t MAX_BUF_SIZE = 32 * 1024 * 1024;
-BufferedOutStream::BufferedOutStream()
- : bufSize(DEFAULT_BUF_SIZE), offset(0)
+BufferedOutStream::BufferedOutStream(bool emulateCork)
+ : bufSize(DEFAULT_BUF_SIZE), offset(0), emulateCork(emulateCork)
{
ptr = start = sentUpTo = new U8[bufSize];
end = start + bufSize;
{
struct timeval now;
+ // Only give larger chunks if corked to minimize overhead
+ if (corked && emulateCork && ((ptr - sentUpTo) < 1024))
+ return;
+
while (sentUpTo < ptr) {
size_t len;
void BufferedOutStream::overrun(size_t needed)
{
+ bool oldCorked;
size_t totalNeeded, newSize;
U8* newBuffer;
// First try to get rid of the data we have
+ // (use corked to make things a bit more efficient since we're not
+ // trying to flush out everything, just make some room)
+ oldCorked = corked;
+ cork(true);
flush();
+ cork(oldCorked);
// Make note of the total needed space
totalNeeded = needed + (ptr - sentUpTo);
struct timeval lastSizeCheck;
size_t peakUsage;
+ bool emulateCork;
+
protected:
U8* sentUpTo;
protected:
- BufferedOutStream();
+ BufferedOutStream(bool emulateCork=true);
};
}
using namespace rdr;
FdOutStream::FdOutStream(int fd_)
- : fd(fd_)
+ : BufferedOutStream(false), fd(fd_)
{
gettimeofday(&lastWrite, NULL);
}
using namespace rdr;
-const int DEFAULT_BUF_LEN = 16384;
-
static inline size_t min(size_t a, size_t b) {return a<b ? a : b;}
HexOutStream::HexOutStream(OutStream& os)
- : out_stream(os), offset(0), bufSize(DEFAULT_BUF_LEN)
+ : out_stream(os)
{
- if (bufSize % 2)
- bufSize--;
- ptr = start = new U8[bufSize];
- end = start + bufSize;
}
-HexOutStream::~HexOutStream() {
- delete [] start;
+HexOutStream::~HexOutStream()
+{
}
char HexOutStream::intToHex(int i) {
return buffer;
}
-
-void
-HexOutStream::writeBuffer() {
- U8* pos = start;
- while (pos != ptr) {
+bool HexOutStream::flushBuffer()
+{
+ while (sentUpTo != ptr) {
U8* optr = out_stream.getptr(2);
- size_t length = min(ptr-pos, out_stream.avail()/2);
+ size_t length = min(ptr-sentUpTo, out_stream.avail()/2);
for (size_t i=0; i<length; i++) {
- optr[i*2] = intToHex((pos[i] >> 4) & 0xf);
- optr[i*2+1] = intToHex(pos[i] & 0xf);
+ optr[i*2] = intToHex((sentUpTo[i] >> 4) & 0xf);
+ optr[i*2+1] = intToHex(sentUpTo[i] & 0xf);
}
out_stream.setptr(length*2);
- pos += length;
+ sentUpTo += length;
}
- offset += ptr - start;
- ptr = start;
-}
-size_t HexOutStream::length()
-{
- return offset + ptr - start;
+ return true;
}
void
HexOutStream::flush() {
- writeBuffer();
+ BufferedOutStream::flush();
out_stream.flush();
}
void HexOutStream::cork(bool enable)
{
- OutStream::cork(enable);
-
+ BufferedOutStream::cork(enable);
out_stream.cork(enable);
}
-void HexOutStream::overrun(size_t needed) {
- if (needed > bufSize)
- throw Exception("HexOutStream overrun: buffer size exceeded");
-
- writeBuffer();
-}
-
#ifndef __RDR_HEX_OUTSTREAM_H__
#define __RDR_HEX_OUTSTREAM_H__
-#include <rdr/OutStream.h>
+#include <rdr/BufferedOutStream.h>
namespace rdr {
- class HexOutStream : public OutStream {
+ class HexOutStream : public BufferedOutStream {
public:
HexOutStream(OutStream& os);
virtual ~HexOutStream();
- void flush();
- size_t length();
+ virtual void flush();
virtual void cork(bool enable);
static char intToHex(int i);
static char* binToHexStr(const char* data, size_t length);
private:
+ virtual bool flushBuffer();
void writeBuffer();
- virtual void overrun(size_t needed);
OutStream& out_stream;
-
- U8* start;
- size_t offset;
- size_t bufSize;
};
}
static rfb::LogWriter vlog("TLSOutStream");
-enum { DEFAULT_BUF_SIZE = 16384 };
-
ssize_t TLSOutStream::push(gnutls_transport_ptr_t str, const void* data,
size_t size)
{
}
TLSOutStream::TLSOutStream(OutStream* _out, gnutls_session_t _session)
- : session(_session), out(_out), bufSize(DEFAULT_BUF_SIZE), offset(0),
- saved_exception(NULL)
+ : session(_session), out(_out), saved_exception(NULL)
{
gnutls_transport_ptr_t recv, send;
- ptr = start = new U8[bufSize];
- end = start + bufSize;
-
gnutls_transport_set_push_function(session, push);
gnutls_transport_get_ptr2(session, &recv, &send);
gnutls_transport_set_ptr2(session, recv, this);
#endif
gnutls_transport_set_push_function(session, NULL);
- delete [] start;
delete saved_exception;
}
-size_t TLSOutStream::length()
-{
- return offset + ptr - start;
-}
-
void TLSOutStream::flush()
{
- U8* sentUpTo;
-
- // Only give GnuTLS larger chunks if corked to minimize overhead
- if (corked && ((ptr - start) < 1024))
- return;
-
- sentUpTo = start;
- while (sentUpTo < ptr) {
- size_t n = writeTLS(sentUpTo, ptr - sentUpTo);
- sentUpTo += n;
- offset += n;
- }
-
- ptr = start;
+ BufferedOutStream::flush();
out->flush();
}
void TLSOutStream::cork(bool enable)
{
- OutStream::cork(enable);
-
+ BufferedOutStream::cork(enable);
out->cork(enable);
}
-void TLSOutStream::overrun(size_t needed)
+bool TLSOutStream::flushBuffer()
{
- bool oldCorked;
-
- if (needed > bufSize)
- throw Exception("TLSOutStream overrun: buffer size exceeded");
-
- // A cork might prevent the flush, so disable it temporarily
- oldCorked = corked;
- corked = false;
-
- flush();
+ while (sentUpTo < ptr) {
+ size_t n = writeTLS(sentUpTo, ptr - sentUpTo);
+ sentUpTo += n;
+ }
- corked = oldCorked;
+ return true;
}
size_t TLSOutStream::writeTLS(const U8* data, size_t length)
#ifdef HAVE_GNUTLS
#include <gnutls/gnutls.h>
-#include <rdr/OutStream.h>
+#include <rdr/BufferedOutStream.h>
namespace rdr {
- class TLSOutStream : public OutStream {
+ class TLSOutStream : public BufferedOutStream {
public:
TLSOutStream(OutStream* out, gnutls_session_t session);
virtual ~TLSOutStream();
- void flush();
- size_t length();
+ virtual void flush();
virtual void cork(bool enable);
- protected:
- virtual void overrun(size_t needed);
-
private:
+ virtual bool flushBuffer();
size_t writeTLS(const U8* data, size_t length);
static ssize_t push(gnutls_transport_ptr_t str, const void* data, size_t size);
gnutls_session_t session;
OutStream* out;
- size_t bufSize;
- U8* start;
- size_t offset;
Exception* saved_exception;
};
using namespace rdr;
-enum { DEFAULT_BUF_SIZE = 16384 };
-
ZlibOutStream::ZlibOutStream(OutStream* os, int compressLevel)
- : underlying(os), compressionLevel(compressLevel), newLevel(compressLevel),
- bufSize(DEFAULT_BUF_SIZE), offset(0)
+ : underlying(os), compressionLevel(compressLevel), newLevel(compressLevel)
{
zs = new z_stream;
zs->zalloc = Z_NULL;
delete zs;
throw Exception("ZlibOutStream: deflateInit failed");
}
- ptr = start = new U8[bufSize];
- end = start + bufSize;
}
ZlibOutStream::~ZlibOutStream()
flush();
} catch (Exception&) {
}
- delete [] start;
deflateEnd(zs);
delete zs;
}
void ZlibOutStream::setUnderlying(OutStream* os)
{
underlying = os;
+ if (underlying)
+ underlying->cork(corked);
}
void ZlibOutStream::setCompressionLevel(int level)
newLevel = level;
}
-size_t ZlibOutStream::length()
-{
- return offset + ptr - start;
-}
-
void ZlibOutStream::flush()
{
- checkCompressionLevel();
-
- zs->next_in = start;
- zs->avail_in = ptr - start;
-
-#ifdef ZLIBOUT_DEBUG
- vlog.debug("flush: avail_in %d",zs->avail_in);
-#endif
-
- // Force out everything from the zlib encoder
- deflate(corked ? Z_NO_FLUSH : Z_SYNC_FLUSH);
-
- if (zs->avail_in == 0) {
- offset += ptr - start;
- ptr = start;
- } else {
- // didn't consume all the data? try shifting what's left to the
- // start of the buffer.
- memmove(start, zs->next_in, ptr - zs->next_in);
- offset += zs->next_in - start;
- ptr -= zs->next_in - start;
- }
+ BufferedOutStream::flush();
+ if (underlying != NULL)
+ underlying->flush();
}
void ZlibOutStream::cork(bool enable)
{
- OutStream::cork(enable);
-
- underlying->cork(enable);
+ BufferedOutStream::cork(enable);
+ if (underlying != NULL)
+ underlying->cork(enable);
}
-void ZlibOutStream::overrun(size_t needed)
+bool ZlibOutStream::flushBuffer()
{
-#ifdef ZLIBOUT_DEBUG
- vlog.debug("overrun");
-#endif
-
- if (needed > bufSize)
- throw Exception("ZlibOutStream overrun: buffer size exceeded");
-
checkCompressionLevel();
- while (avail() < needed) {
- bool oldCorked;
+ zs->next_in = sentUpTo;
+ zs->avail_in = ptr - sentUpTo;
- // use corked to make zlib a bit more efficient since we're not trying
- // to end the stream here, just make some room
+#ifdef ZLIBOUT_DEBUG
+ vlog.debug("flush: avail_in %d",zs->avail_in);
+#endif
- oldCorked = corked;
- corked = true;
+ // Force out everything from the zlib encoder
+ deflate(corked ? Z_NO_FLUSH : Z_SYNC_FLUSH);
- flush();
+ sentUpTo = zs->next_in;
- corked = oldCorked;
- }
+ return true;
}
void ZlibOutStream::deflate(int flush)
#ifndef __RDR_ZLIBOUTSTREAM_H__
#define __RDR_ZLIBOUTSTREAM_H__
-#include <rdr/OutStream.h>
+#include <rdr/BufferedOutStream.h>
struct z_stream_s;
namespace rdr {
- class ZlibOutStream : public OutStream {
+ class ZlibOutStream : public BufferedOutStream {
public:
void setUnderlying(OutStream* os);
void setCompressionLevel(int level=-1);
- void flush();
- size_t length();
+ virtual void flush();
virtual void cork(bool enable);
private:
-
- virtual void overrun(size_t needed);
+ virtual bool flushBuffer();
void deflate(int flush);
void checkCompressionLevel();
OutStream* underlying;
int compressionLevel;
int newLevel;
- size_t bufSize;
- size_t offset;
z_stream_s* zs;
- U8* start;
};
} // end of namespace rdr