aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/rfb/Congestion.cxx357
-rw-r--r--common/rfb/Congestion.h57
-rw-r--r--common/rfb/VNCSConnectionST.cxx27
-rw-r--r--common/rfb/VNCSConnectionST.h1
-rw-r--r--common/rfb/util.cxx31
-rw-r--r--common/rfb/util.h8
6 files changed, 363 insertions, 118 deletions
diff --git a/common/rfb/Congestion.cxx b/common/rfb/Congestion.cxx
index c4c4d96d..94d78e32 100644
--- a/common/rfb/Congestion.cxx
+++ b/common/rfb/Congestion.cxx
@@ -24,12 +24,16 @@
* The basic principle is TCP Congestion Control (RFC 5618), with the
* addition of using the TCP Vegas algorithm. The reason we use Vegas
* is that we run on top of a reliable transport so we need a latency
- * based algorithm rather than a loss based one.
+ * based algorithm rather than a loss based one. There is also a lot of
+ * interpolation of values. This is because we have rather horrible
+ * granularity in our measurements.
*/
+#include <assert.h>
#include <sys/time.h>
#include <rfb/Congestion.h>
+#include <rfb/LogWriter.h>
#include <rfb/util.h>
// Debug output on what the congestion control is up to
@@ -49,158 +53,332 @@ static const unsigned MINIMUM_WINDOW = 4096;
// limit for now...
static const unsigned MAXIMUM_WINDOW = 4194304;
-struct Congestion::RTTInfo {
- struct timeval tv;
- int offset;
- unsigned inFlight;
-};
+static LogWriter vlog("Congestion");
Congestion::Congestion() :
+ lastPosition(0), extraBuffer(0),
baseRTT(-1), congWindow(INITIAL_WINDOW),
- ackedOffset(0), sentOffset(0),
- minRTT(-1), seenCongestion(false),
- congestionTimer(this)
+ measurements(0), minRTT(-1), minCongestedRTT(-1)
{
+ gettimeofday(&lastUpdate, NULL);
+ gettimeofday(&lastSent, NULL);
+ memset(&lastPong, 0, sizeof(lastPong));
+ gettimeofday(&lastPongArrival, NULL);
+ gettimeofday(&lastAdjustment, NULL);
}
Congestion::~Congestion()
{
}
+void Congestion::updatePosition(unsigned pos)
+{
+ struct timeval now;
+ unsigned delta, consumed;
+
+ gettimeofday(&now, NULL);
+
+ delta = pos - lastPosition;
+ if ((delta > 0) || (extraBuffer > 0))
+ lastSent = now;
+
+ // Idle for too long?
+ // We use a very crude RTO calculation in order to keep things simple
+ // FIXME: should implement RFC 2861
+ if (msBetween(&lastSent, &now) > __rfbmax(baseRTT*2, 100)) {
+
+#ifdef CONGESTION_DEBUG
+ vlog.debug("Connection idle for %d ms, resetting congestion control",
+ msBetween(&lastSent, &now));
+#endif
+
+ // Close congestion window and redo wire latency measurement
+ congWindow = __rfbmin(INITIAL_WINDOW, congWindow);
+ baseRTT = -1;
+ measurements = 0;
+ gettimeofday(&lastAdjustment, NULL);
+ minRTT = minCongestedRTT = -1;
+ }
+
+ // Commonly we will be in a state of overbuffering. We need to
+ // estimate the extra delay that causes so we can separate it from
+ // the delay caused by an incorrect congestion window.
+ // (we cannot do this until we have a RTT measurement though)
+ if (baseRTT != (unsigned)-1) {
+ extraBuffer += delta;
+ consumed = msBetween(&lastUpdate, &now) * congWindow / baseRTT;
+ if (extraBuffer < consumed)
+ extraBuffer = 0;
+ else
+ extraBuffer -= consumed;
+ }
+
+ lastPosition = pos;
+ lastUpdate = now;
+}
-void Congestion::sentPing(int offset)
+void Congestion::sentPing()
{
struct RTTInfo rttInfo;
- if (ackedOffset == 0)
- ackedOffset = offset;
-
memset(&rttInfo, 0, sizeof(struct RTTInfo));
gettimeofday(&rttInfo.tv, NULL);
- rttInfo.offset = offset;
- rttInfo.inFlight = rttInfo.offset - ackedOffset;
+ rttInfo.pos = lastPosition;
+ rttInfo.extra = getExtraBuffer();
+ rttInfo.congested = isCongested();
pings.push_back(rttInfo);
-
- sentOffset = offset;
-
- // Let some data flow before we adjust the settings
- if (!congestionTimer.isStarted())
- congestionTimer.start(__rfbmin(baseRTT * 2, 100));
}
void Congestion::gotPong()
{
+ struct timeval now;
struct RTTInfo rttInfo;
unsigned rtt, delay;
if (pings.empty())
return;
+ gettimeofday(&now, NULL);
+
rttInfo = pings.front();
pings.pop_front();
- rtt = msSince(&rttInfo.tv);
+ lastPong = rttInfo;
+ lastPongArrival = now;
+
+ rtt = msBetween(&rttInfo.tv, &now);
if (rtt < 1)
rtt = 1;
- ackedOffset = rttInfo.offset;
-
// Try to estimate wire latency by tracking lowest seen latency
if (rtt < baseRTT)
baseRTT = rtt;
- if (rttInfo.inFlight > congWindow) {
- seenCongestion = true;
-
- // Estimate added delay because of overtaxed buffers
- delay = (rttInfo.inFlight - congWindow) * baseRTT / congWindow;
+ // Pings sent before the last adjustment aren't interesting as they
+ // aren't a measurement of the current congestion window
+ if (isBefore(&rttInfo.tv, &lastAdjustment))
+ return;
- if (delay < rtt)
- rtt -= delay;
- else
- rtt = 1;
+ // Estimate added delay because of overtaxed buffers (see above)
+ delay = rttInfo.extra * baseRTT / congWindow;
+ if (delay < rtt)
+ rtt -= delay;
+ else
+ rtt = 1;
- // If we underestimate the congestion window, then we'll get a latency
- // that's less than the wire latency, which will confuse other portions
- // of the code.
- if (rtt < baseRTT)
- rtt = baseRTT;
- }
+ // A latency less than the wire latency means that we've
+ // understimated the congestion window. We can't really determine
+ // how much, so pretend that we got no buffer latency at all.
+ if (rtt < baseRTT)
+ rtt = baseRTT;
- // We only keep track of the minimum latency seen (for a given interval)
- // on the basis that we want to avoid continuous buffer issue, but don't
- // mind (or even approve of) bursts.
+ // Record the minimum seen delay (hopefully ignores jitter) and let
+ // the congestion control do its thing.
+ //
+ // Note: We are delay based rather than loss based, which means we
+ // need to look at pongs even if they weren't limited by the
+ // current window ("congested"). Otherwise we will fail to
+ // detect increasing congestion until the application exceeds
+ // the congestion window.
if (rtt < minRTT)
minRTT = rtt;
+ if (rttInfo.congested) {
+ if (rtt < minCongestedRTT)
+ minCongestedRTT = rtt;
+ }
+
+ measurements++;
+ updateCongestion();
}
-bool Congestion::isCongested(int offset, unsigned idleTime)
+bool Congestion::isCongested()
{
- // Idle for too long? (and no data on the wire)
- //
- // FIXME: This should really just be one baseRTT, but we're getting
- // problems with triggering the idle timeout on each update.
- // Maybe we need to use a moving average for the wire latency
- // instead of baseRTT.
- if ((sentOffset == ackedOffset) && (idleTime > 2 * baseRTT)) {
+ if (getInFlight() < congWindow)
+ return false;
-#ifdef CONGESTION_DEBUG
- if (congWindow > INITIAL_WINDOW)
- fprintf(stderr, "Reverting to initial window (%d KiB) after %d ms\n",
- INITIAL_WINDOW / 1024, sock->outStream().getIdleTime());
-#endif
+ return true;
+}
- // Close congestion window and allow a transfer
- // FIXME: Reset baseRTT like Linux Vegas?
- congWindow = __rfbmin(INITIAL_WINDOW, congWindow);
+int Congestion::getUncongestedETA()
+{
+ unsigned targetAcked;
+
+ const struct RTTInfo* prevPing;
+ unsigned eta, elapsed;
+ unsigned etaNext, delay;
+
+ std::list<struct RTTInfo>::const_iterator iter;
+
+ targetAcked = lastPosition - congWindow;
+
+ // Simple case?
+ if (lastPong.pos > targetAcked)
+ return 0;
+
+ // No measurements yet?
+ if (baseRTT == (unsigned)-1)
+ return -1;
+
+ prevPing = &lastPong;
+ eta = 0;
+ elapsed = msSince(&lastPongArrival);
+
+ // Walk the ping queue and figure out which one we are waiting for to
+ // get to an uncongested state
+
+ for (iter = pings.begin(); ;++iter) {
+ struct RTTInfo curPing;
+
+ // If we aren't waiting for a pong that will clear the congested
+ // state then we have to estimate the final bit by pretending that
+ // we had a ping just after the last position update.
+ if (iter == pings.end()) {
+ curPing.tv = lastUpdate;
+ curPing.pos = lastPosition;
+ curPing.extra = extraBuffer;
+ } else {
+ curPing = *iter;
+ }
+
+ etaNext = msBetween(&prevPing->tv, &curPing.tv);
+ // Compensate for buffering delays
+ delay = curPing.extra * baseRTT / congWindow;
+ etaNext += delay;
+ delay = prevPing->extra * baseRTT / congWindow;
+ if (delay >= etaNext)
+ etaNext = 0;
+ else
+ etaNext -= delay;
- return false;
+ // Found it?
+ if (curPing.pos > targetAcked) {
+ eta += etaNext * (curPing.pos - targetAcked) / (curPing.pos - prevPing->pos);
+ if (elapsed > eta)
+ return 0;
+ else
+ return eta - elapsed;
+ }
+
+ assert(iter != pings.end());
+
+ eta += etaNext;
+ prevPing = &*iter;
}
+}
- // FIXME: Should we compensate for non-update data?
- // (i.e. use sentOffset instead of offset)
- if ((offset - ackedOffset) < congWindow)
- return false;
+unsigned Congestion::getExtraBuffer()
+{
+ unsigned elapsed;
+ unsigned consumed;
- // If we just have one outstanding "ping", that means the client has
- // started receiving our update. In order to not regress compared to
- // before we had congestion avoidance, we allow another update here.
- // This could further clog up the tubes, but congestion control isn't
- // really working properly right now anyway as the wire would otherwise
- // be idle for at least RTT/2.
- if (pings.size() == 1)
- return false;
+ if (baseRTT == (unsigned)-1)
+ return 0;
- return true;
+ elapsed = msSince(&lastUpdate);
+ consumed = elapsed * congWindow / baseRTT;
+
+ if (consumed >= extraBuffer)
+ return 0;
+ else
+ return extraBuffer - consumed;
}
-bool Congestion::handleTimeout(Timer* t)
+unsigned Congestion::getInFlight()
+{
+ struct RTTInfo nextPong;
+ unsigned etaNext, delay, elapsed, acked;
+
+ // Simple case?
+ if (lastPosition == lastPong.pos)
+ return 0;
+
+ // No measurements yet?
+ if (baseRTT == (unsigned)-1) {
+ if (!pings.empty())
+ return lastPosition - pings.front().pos;
+ return 0;
+ }
+
+ // If we aren't waiting for any pong then we have to estimate things
+ // by pretending that we had a ping just after the last position
+ // update.
+ if (pings.empty()) {
+ nextPong.tv = lastUpdate;
+ nextPong.pos = lastPosition;
+ nextPong.extra = extraBuffer;
+ } else {
+ nextPong = pings.front();
+ }
+
+ // First we need to estimate how many bytes have made it through
+ // completely. Look at the next ping that should arrive and figure
+ // out how far behind it should be and interpolate the positions.
+
+ etaNext = msBetween(&lastPong.tv, &nextPong.tv);
+ // Compensate for buffering delays
+ delay = nextPong.extra * baseRTT / congWindow;
+ etaNext += delay;
+ delay = lastPong.extra * baseRTT / congWindow;
+ if (delay >= etaNext)
+ etaNext = 0;
+ else
+ etaNext -= delay;
+
+ elapsed = msSince(&lastPongArrival);
+
+ // The pong should be here any second. Be optimistic and assume
+ // we can already use its value.
+ if (etaNext <= elapsed)
+ acked = nextPong.pos;
+ else {
+ acked = lastPong.pos;
+ acked += (nextPong.pos - lastPong.pos) * elapsed / etaNext;
+ }
+
+ return lastPosition - acked;
+}
+
+void Congestion::updateCongestion()
{
unsigned diff;
- if (!seenCongestion)
- return false;
+ // We want at least three measurements to avoid noise
+ if (measurements < 3)
+ return;
+
+ assert(minRTT >= baseRTT);
+ assert(minCongestedRTT >= baseRTT);
// The goal is to have a slightly too large congestion window since
// a "perfect" one cannot be distinguished from a too small one. This
// translates to a goal of a few extra milliseconds of delay.
+ // First we check all pongs to make sure we're not having a too large
+ // congestion window.
diff = minRTT - baseRTT;
- if (diff > __rfbmin(100, baseRTT)) {
+ // FIXME: Should we do slow start?
+ if (diff > 100) {
// Way too fast
congWindow = congWindow * baseRTT / minRTT;
- } else if (diff > __rfbmin(50, baseRTT/2)) {
+ } else if (diff > 50) {
// Slightly too fast
congWindow -= 4096;
- } else if (diff < 5) {
- // Way too slow
- congWindow += 8192;
- } else if (diff < 25) {
- // Too slow
- congWindow += 4096;
+ } else {
+ // Secondly only the "congested" pongs are checked to see if the
+ // window is too small.
+
+ diff = minCongestedRTT - baseRTT;
+
+ if (diff < 5) {
+ // Way too slow
+ congWindow += 8192;
+ } else if (diff < 25) {
+ // Too slow
+ congWindow += 4096;
+ }
}
if (congWindow < MINIMUM_WINDOW)
@@ -209,14 +387,13 @@ bool Congestion::handleTimeout(Timer* t)
congWindow = MAXIMUM_WINDOW;
#ifdef CONGESTION_DEBUG
- fprintf(stderr, "RTT: %d ms (%d ms), Window: %d KiB, Bandwidth: %g Mbps\n",
- minRTT, baseRTT, congWindow / 1024,
- congWindow * 8.0 / baseRTT / 1000.0);
+ vlog.debug("RTT: %d ms (%d ms), Window: %d KiB, Bandwidth: %g Mbps",
+ minRTT, baseRTT, congWindow / 1024,
+ congWindow * 8.0 / baseRTT / 1000.0);
#endif
- minRTT = -1;
- seenCongestion = false;
-
- return false;
+ measurements = 0;
+ gettimeofday(&lastAdjustment, NULL);
+ minRTT = minCongestedRTT = -1;
}
diff --git a/common/rfb/Congestion.h b/common/rfb/Congestion.h
index e8548f93..2bea5dac 100644
--- a/common/rfb/Congestion.h
+++ b/common/rfb/Congestion.h
@@ -21,41 +21,62 @@
#include <list>
-#include <rfb/Timer.h>
-
namespace rfb {
- class Congestion : public Timer::Callback {
+ class Congestion {
public:
Congestion();
~Congestion();
+ // updatePosition() registers the current stream position and can
+ // and should be called often.
+ void updatePosition(unsigned pos);
+
// sentPing() must be called when a marker is placed on the
- // outgoing stream, along with the current stream position.
- // gotPong() must be called when the response for such a marker
- // is received.
- void sentPing(int offset);
+ // outgoing stream. gotPong() must be called when the response for
+ // such a marker is received.
+ void sentPing();
void gotPong();
// isCongested() determines if the transport is currently congested
- // or if more data can be sent. The curren stream position and how
- // long the transport has been idle must be specified.
- bool isCongested(int offset, unsigned idleTime);
+ // or if more data can be sent.
+ bool isCongested();
- private:
- // Timer callbacks
- virtual bool handleTimeout(Timer* t);
+ // getUncongestedETA() returns the number of milliseconds until the
+ // transport is no longer congested. Returns 0 if there is no
+ // congestion, and -1 if it is unknown when the transport will no
+ // longer be congested.
+ int getUncongestedETA();
+
+ protected:
+ unsigned getExtraBuffer();
+ unsigned getInFlight();
+
+ void updateCongestion();
private:
+ unsigned lastPosition;
+ unsigned extraBuffer;
+ struct timeval lastUpdate;
+ struct timeval lastSent;
+
unsigned baseRTT;
unsigned congWindow;
- unsigned ackedOffset, sentOffset;
- unsigned minRTT;
- bool seenCongestion;
- Timer congestionTimer;
+ struct RTTInfo {
+ struct timeval tv;
+ unsigned pos;
+ unsigned extra;
+ bool congested;
+ };
- struct RTTInfo;
std::list<struct RTTInfo> pings;
+
+ struct RTTInfo lastPong;
+ struct timeval lastPongArrival;
+
+ int measurements;
+ struct timeval lastAdjustment;
+ unsigned minRTT, minCongestedRTT;
};
}
diff --git a/common/rfb/VNCSConnectionST.cxx b/common/rfb/VNCSConnectionST.cxx
index 43eb8256..b2ceb7d2 100644
--- a/common/rfb/VNCSConnectionST.cxx
+++ b/common/rfb/VNCSConnectionST.cxx
@@ -43,7 +43,7 @@ VNCSConnectionST::VNCSConnectionST(VNCServerST* server_, network::Socket *s,
: sock(s), reverseConnection(reverse),
queryConnectTimer(this), inProcessMessages(false),
pendingSyncFence(false), syncFence(false), fenceFlags(0),
- fenceDataLen(0), fenceData(NULL),
+ fenceDataLen(0), fenceData(NULL), congestionTimer(this),
server(server_), updates(false),
updateRenderedCursor(false), removeRenderedCursor(false),
continuousUpdates(false), encodeManager(this), pointerEventTime(0),
@@ -726,6 +726,8 @@ bool VNCSConnectionST::handleTimeout(Timer* t)
if (t == &queryConnectTimer) {
if (state() == RFBSTATE_QUERYING)
approveConnection(false, "The attempt to prompt the user to accept the connection failed");
+ } else if (t == &congestionTimer) {
+ writeFramebufferUpdate();
}
} catch (rdr::Exception& e) {
close(e.str());
@@ -742,6 +744,8 @@ void VNCSConnectionST::writeRTTPing()
if (!cp.supportsFence)
return;
+ congestion.updatePosition(sock->outStream().length());
+
// We need to make sure any old update are already processed by the
// time we get the response back. This allows us to reliably throttle
// back on client overload, as well as network overload.
@@ -749,11 +753,15 @@ void VNCSConnectionST::writeRTTPing()
writer()->writeFence(fenceFlagRequest | fenceFlagBlockBefore,
sizeof(type), &type);
- congestion.sentPing(sock->outStream().length());
+ congestion.sentPing();
}
bool VNCSConnectionST::isCongested()
{
+ unsigned eta;
+
+ congestionTimer.stop();
+
// Stuff still waiting in the send buffer?
sock->outStream().flush();
if (sock->outStream().bufferUsage() > 0)
@@ -762,13 +770,22 @@ bool VNCSConnectionST::isCongested()
if (!cp.supportsFence)
return false;
- return congestion.isCongested(sock->outStream().length(),
- sock->outStream().getIdleTime());
+ congestion.updatePosition(sock->outStream().length());
+ if (!congestion.isCongested())
+ return false;
+
+ eta = congestion.getUncongestedETA();
+ if (eta >= 0)
+ congestionTimer.start(eta);
+
+ return true;
}
void VNCSConnectionST::writeFramebufferUpdate()
{
+ congestion.updatePosition(sock->outStream().length());
+
// We're in the middle of processing a command that's supposed to be
// synchronised. Allowing an update to slip out right now might violate
// that synchronisation.
@@ -805,6 +822,8 @@ void VNCSConnectionST::writeFramebufferUpdate()
writeDataUpdate();
network::TcpSocket::cork(sock->getFd(), false);
+
+ congestion.updatePosition(sock->outStream().length());
}
void VNCSConnectionST::writeNoDataUpdate()
diff --git a/common/rfb/VNCSConnectionST.h b/common/rfb/VNCSConnectionST.h
index 96d7e4c9..dde0b1ec 100644
--- a/common/rfb/VNCSConnectionST.h
+++ b/common/rfb/VNCSConnectionST.h
@@ -187,6 +187,7 @@ namespace rfb {
char *fenceData;
Congestion congestion;
+ Timer congestionTimer;
VNCServerST* server;
SimpleUpdateTracker updates;
diff --git a/common/rfb/util.cxx b/common/rfb/util.cxx
index 22e00ffc..755c91fd 100644
--- a/common/rfb/util.cxx
+++ b/common/rfb/util.cxx
@@ -121,19 +121,38 @@ namespace rfb {
dest[src ? destlen-1 : 0] = 0;
}
+ unsigned msBetween(const struct timeval *first,
+ const struct timeval *second)
+ {
+ unsigned diff;
+
+ diff = (second->tv_sec - first->tv_sec) * 1000;
+
+ diff += second->tv_usec / 1000;
+ diff -= first->tv_usec / 1000;
+
+ return diff;
+ }
+
unsigned msSince(const struct timeval *then)
{
struct timeval now;
- unsigned diff;
gettimeofday(&now, NULL);
- diff = (now.tv_sec - then->tv_sec) * 1000;
-
- diff += now.tv_usec / 1000;
- diff -= then->tv_usec / 1000;
+ return msBetween(then, &now);
+ }
- return diff;
+ bool isBefore(const struct timeval *first,
+ const struct timeval *second)
+ {
+ if (first->tv_sec < second->tv_sec)
+ return true;
+ if (first->tv_sec > second->tv_sec)
+ return false;
+ if (first->tv_usec < second->tv_usec)
+ return true;
+ return false;
}
static size_t doPrefix(long long value, const char *unit,
diff --git a/common/rfb/util.h b/common/rfb/util.h
index e9114c3d..3ca92f9d 100644
--- a/common/rfb/util.h
+++ b/common/rfb/util.h
@@ -95,9 +95,17 @@ namespace rfb {
return (secs < 0 || secs > (INT_MAX/1000) ? INT_MAX : secs * 1000);
}
+ // Returns time elapsed between two moments in milliseconds.
+ unsigned msBetween(const struct timeval *first,
+ const struct timeval *second);
+
// Returns time elapsed since given moment in milliseconds.
unsigned msSince(const struct timeval *then);
+ // Returns true if first happened before seconds
+ bool isBefore(const struct timeval *first,
+ const struct timeval *second);
+
size_t siPrefix(long long value, const char *unit,
char *buffer, size_t maxlen, int precision=6);
size_t iecPrefix(long long value, const char *unit,