diff options
author | Pierre Ossman <ossman@cendio.se> | 2015-12-13 15:43:46 +0100 |
---|---|---|
committer | Pierre Ossman <ossman@cendio.se> | 2017-11-17 08:23:07 +0100 |
commit | a99d14d1939cb2338b6268d9aebe3850df66daed (patch) | |
tree | fe6882d055923a03c5bc010b13045cb556a6c8e4 /common | |
parent | c09e5580d1058718301fe83fdb8750cc579e7ea1 (diff) | |
download | tigervnc-a99d14d1939cb2338b6268d9aebe3850df66daed.tar.gz tigervnc-a99d14d1939cb2338b6268d9aebe3850df66daed.zip |
Improved congestion control handling
Refine the previous method by interpolating the values we need.
This reduces the effect of the problem that we cannot send enough
ping packets.
Diffstat (limited to 'common')
-rw-r--r-- | common/rfb/Congestion.cxx | 357 | ||||
-rw-r--r-- | common/rfb/Congestion.h | 57 | ||||
-rw-r--r-- | common/rfb/VNCSConnectionST.cxx | 27 | ||||
-rw-r--r-- | common/rfb/VNCSConnectionST.h | 1 | ||||
-rw-r--r-- | common/rfb/util.cxx | 31 | ||||
-rw-r--r-- | common/rfb/util.h | 8 |
6 files changed, 363 insertions, 118 deletions
diff --git a/common/rfb/Congestion.cxx b/common/rfb/Congestion.cxx index c4c4d96d..94d78e32 100644 --- a/common/rfb/Congestion.cxx +++ b/common/rfb/Congestion.cxx @@ -24,12 +24,16 @@ * The basic principle is TCP Congestion Control (RFC 5618), with the * addition of using the TCP Vegas algorithm. The reason we use Vegas * is that we run on top of a reliable transport so we need a latency - * based algorithm rather than a loss based one. + * based algorithm rather than a loss based one. There is also a lot of + * interpolation of values. This is because we have rather horrible + * granularity in our measurements. */ +#include <assert.h> #include <sys/time.h> #include <rfb/Congestion.h> +#include <rfb/LogWriter.h> #include <rfb/util.h> // Debug output on what the congestion control is up to @@ -49,158 +53,332 @@ static const unsigned MINIMUM_WINDOW = 4096; // limit for now... static const unsigned MAXIMUM_WINDOW = 4194304; -struct Congestion::RTTInfo { - struct timeval tv; - int offset; - unsigned inFlight; -}; +static LogWriter vlog("Congestion"); Congestion::Congestion() : + lastPosition(0), extraBuffer(0), baseRTT(-1), congWindow(INITIAL_WINDOW), - ackedOffset(0), sentOffset(0), - minRTT(-1), seenCongestion(false), - congestionTimer(this) + measurements(0), minRTT(-1), minCongestedRTT(-1) { + gettimeofday(&lastUpdate, NULL); + gettimeofday(&lastSent, NULL); + memset(&lastPong, 0, sizeof(lastPong)); + gettimeofday(&lastPongArrival, NULL); + gettimeofday(&lastAdjustment, NULL); } Congestion::~Congestion() { } +void Congestion::updatePosition(unsigned pos) +{ + struct timeval now; + unsigned delta, consumed; + + gettimeofday(&now, NULL); + + delta = pos - lastPosition; + if ((delta > 0) || (extraBuffer > 0)) + lastSent = now; + + // Idle for too long? + // We use a very crude RTO calculation in order to keep things simple + // FIXME: should implement RFC 2861 + if (msBetween(&lastSent, &now) > __rfbmax(baseRTT*2, 100)) { + +#ifdef CONGESTION_DEBUG + vlog.debug("Connection idle for %d ms, resetting congestion control", + msBetween(&lastSent, &now)); +#endif + + // Close congestion window and redo wire latency measurement + congWindow = __rfbmin(INITIAL_WINDOW, congWindow); + baseRTT = -1; + measurements = 0; + gettimeofday(&lastAdjustment, NULL); + minRTT = minCongestedRTT = -1; + } + + // Commonly we will be in a state of overbuffering. We need to + // estimate the extra delay that causes so we can separate it from + // the delay caused by an incorrect congestion window. + // (we cannot do this until we have a RTT measurement though) + if (baseRTT != (unsigned)-1) { + extraBuffer += delta; + consumed = msBetween(&lastUpdate, &now) * congWindow / baseRTT; + if (extraBuffer < consumed) + extraBuffer = 0; + else + extraBuffer -= consumed; + } + + lastPosition = pos; + lastUpdate = now; +} -void Congestion::sentPing(int offset) +void Congestion::sentPing() { struct RTTInfo rttInfo; - if (ackedOffset == 0) - ackedOffset = offset; - memset(&rttInfo, 0, sizeof(struct RTTInfo)); gettimeofday(&rttInfo.tv, NULL); - rttInfo.offset = offset; - rttInfo.inFlight = rttInfo.offset - ackedOffset; + rttInfo.pos = lastPosition; + rttInfo.extra = getExtraBuffer(); + rttInfo.congested = isCongested(); pings.push_back(rttInfo); - - sentOffset = offset; - - // Let some data flow before we adjust the settings - if (!congestionTimer.isStarted()) - congestionTimer.start(__rfbmin(baseRTT * 2, 100)); } void Congestion::gotPong() { + struct timeval now; struct RTTInfo rttInfo; unsigned rtt, delay; if (pings.empty()) return; + gettimeofday(&now, NULL); + rttInfo = pings.front(); pings.pop_front(); - rtt = msSince(&rttInfo.tv); + lastPong = rttInfo; + lastPongArrival = now; + + rtt = msBetween(&rttInfo.tv, &now); if (rtt < 1) rtt = 1; - ackedOffset = rttInfo.offset; - // Try to estimate wire latency by tracking lowest seen latency if (rtt < baseRTT) baseRTT = rtt; - if (rttInfo.inFlight > congWindow) { - seenCongestion = true; - - // Estimate added delay because of overtaxed buffers - delay = (rttInfo.inFlight - congWindow) * baseRTT / congWindow; + // Pings sent before the last adjustment aren't interesting as they + // aren't a measurement of the current congestion window + if (isBefore(&rttInfo.tv, &lastAdjustment)) + return; - if (delay < rtt) - rtt -= delay; - else - rtt = 1; + // Estimate added delay because of overtaxed buffers (see above) + delay = rttInfo.extra * baseRTT / congWindow; + if (delay < rtt) + rtt -= delay; + else + rtt = 1; - // If we underestimate the congestion window, then we'll get a latency - // that's less than the wire latency, which will confuse other portions - // of the code. - if (rtt < baseRTT) - rtt = baseRTT; - } + // A latency less than the wire latency means that we've + // understimated the congestion window. We can't really determine + // how much, so pretend that we got no buffer latency at all. + if (rtt < baseRTT) + rtt = baseRTT; - // We only keep track of the minimum latency seen (for a given interval) - // on the basis that we want to avoid continuous buffer issue, but don't - // mind (or even approve of) bursts. + // Record the minimum seen delay (hopefully ignores jitter) and let + // the congestion control do its thing. + // + // Note: We are delay based rather than loss based, which means we + // need to look at pongs even if they weren't limited by the + // current window ("congested"). Otherwise we will fail to + // detect increasing congestion until the application exceeds + // the congestion window. if (rtt < minRTT) minRTT = rtt; + if (rttInfo.congested) { + if (rtt < minCongestedRTT) + minCongestedRTT = rtt; + } + + measurements++; + updateCongestion(); } -bool Congestion::isCongested(int offset, unsigned idleTime) +bool Congestion::isCongested() { - // Idle for too long? (and no data on the wire) - // - // FIXME: This should really just be one baseRTT, but we're getting - // problems with triggering the idle timeout on each update. - // Maybe we need to use a moving average for the wire latency - // instead of baseRTT. - if ((sentOffset == ackedOffset) && (idleTime > 2 * baseRTT)) { + if (getInFlight() < congWindow) + return false; -#ifdef CONGESTION_DEBUG - if (congWindow > INITIAL_WINDOW) - fprintf(stderr, "Reverting to initial window (%d KiB) after %d ms\n", - INITIAL_WINDOW / 1024, sock->outStream().getIdleTime()); -#endif + return true; +} - // Close congestion window and allow a transfer - // FIXME: Reset baseRTT like Linux Vegas? - congWindow = __rfbmin(INITIAL_WINDOW, congWindow); +int Congestion::getUncongestedETA() +{ + unsigned targetAcked; + + const struct RTTInfo* prevPing; + unsigned eta, elapsed; + unsigned etaNext, delay; + + std::list<struct RTTInfo>::const_iterator iter; + + targetAcked = lastPosition - congWindow; + + // Simple case? + if (lastPong.pos > targetAcked) + return 0; + + // No measurements yet? + if (baseRTT == (unsigned)-1) + return -1; + + prevPing = &lastPong; + eta = 0; + elapsed = msSince(&lastPongArrival); + + // Walk the ping queue and figure out which one we are waiting for to + // get to an uncongested state + + for (iter = pings.begin(); ;++iter) { + struct RTTInfo curPing; + + // If we aren't waiting for a pong that will clear the congested + // state then we have to estimate the final bit by pretending that + // we had a ping just after the last position update. + if (iter == pings.end()) { + curPing.tv = lastUpdate; + curPing.pos = lastPosition; + curPing.extra = extraBuffer; + } else { + curPing = *iter; + } + + etaNext = msBetween(&prevPing->tv, &curPing.tv); + // Compensate for buffering delays + delay = curPing.extra * baseRTT / congWindow; + etaNext += delay; + delay = prevPing->extra * baseRTT / congWindow; + if (delay >= etaNext) + etaNext = 0; + else + etaNext -= delay; - return false; + // Found it? + if (curPing.pos > targetAcked) { + eta += etaNext * (curPing.pos - targetAcked) / (curPing.pos - prevPing->pos); + if (elapsed > eta) + return 0; + else + return eta - elapsed; + } + + assert(iter != pings.end()); + + eta += etaNext; + prevPing = &*iter; } +} - // FIXME: Should we compensate for non-update data? - // (i.e. use sentOffset instead of offset) - if ((offset - ackedOffset) < congWindow) - return false; +unsigned Congestion::getExtraBuffer() +{ + unsigned elapsed; + unsigned consumed; - // If we just have one outstanding "ping", that means the client has - // started receiving our update. In order to not regress compared to - // before we had congestion avoidance, we allow another update here. - // This could further clog up the tubes, but congestion control isn't - // really working properly right now anyway as the wire would otherwise - // be idle for at least RTT/2. - if (pings.size() == 1) - return false; + if (baseRTT == (unsigned)-1) + return 0; - return true; + elapsed = msSince(&lastUpdate); + consumed = elapsed * congWindow / baseRTT; + + if (consumed >= extraBuffer) + return 0; + else + return extraBuffer - consumed; } -bool Congestion::handleTimeout(Timer* t) +unsigned Congestion::getInFlight() +{ + struct RTTInfo nextPong; + unsigned etaNext, delay, elapsed, acked; + + // Simple case? + if (lastPosition == lastPong.pos) + return 0; + + // No measurements yet? + if (baseRTT == (unsigned)-1) { + if (!pings.empty()) + return lastPosition - pings.front().pos; + return 0; + } + + // If we aren't waiting for any pong then we have to estimate things + // by pretending that we had a ping just after the last position + // update. + if (pings.empty()) { + nextPong.tv = lastUpdate; + nextPong.pos = lastPosition; + nextPong.extra = extraBuffer; + } else { + nextPong = pings.front(); + } + + // First we need to estimate how many bytes have made it through + // completely. Look at the next ping that should arrive and figure + // out how far behind it should be and interpolate the positions. + + etaNext = msBetween(&lastPong.tv, &nextPong.tv); + // Compensate for buffering delays + delay = nextPong.extra * baseRTT / congWindow; + etaNext += delay; + delay = lastPong.extra * baseRTT / congWindow; + if (delay >= etaNext) + etaNext = 0; + else + etaNext -= delay; + + elapsed = msSince(&lastPongArrival); + + // The pong should be here any second. Be optimistic and assume + // we can already use its value. + if (etaNext <= elapsed) + acked = nextPong.pos; + else { + acked = lastPong.pos; + acked += (nextPong.pos - lastPong.pos) * elapsed / etaNext; + } + + return lastPosition - acked; +} + +void Congestion::updateCongestion() { unsigned diff; - if (!seenCongestion) - return false; + // We want at least three measurements to avoid noise + if (measurements < 3) + return; + + assert(minRTT >= baseRTT); + assert(minCongestedRTT >= baseRTT); // The goal is to have a slightly too large congestion window since // a "perfect" one cannot be distinguished from a too small one. This // translates to a goal of a few extra milliseconds of delay. + // First we check all pongs to make sure we're not having a too large + // congestion window. diff = minRTT - baseRTT; - if (diff > __rfbmin(100, baseRTT)) { + // FIXME: Should we do slow start? + if (diff > 100) { // Way too fast congWindow = congWindow * baseRTT / minRTT; - } else if (diff > __rfbmin(50, baseRTT/2)) { + } else if (diff > 50) { // Slightly too fast congWindow -= 4096; - } else if (diff < 5) { - // Way too slow - congWindow += 8192; - } else if (diff < 25) { - // Too slow - congWindow += 4096; + } else { + // Secondly only the "congested" pongs are checked to see if the + // window is too small. + + diff = minCongestedRTT - baseRTT; + + if (diff < 5) { + // Way too slow + congWindow += 8192; + } else if (diff < 25) { + // Too slow + congWindow += 4096; + } } if (congWindow < MINIMUM_WINDOW) @@ -209,14 +387,13 @@ bool Congestion::handleTimeout(Timer* t) congWindow = MAXIMUM_WINDOW; #ifdef CONGESTION_DEBUG - fprintf(stderr, "RTT: %d ms (%d ms), Window: %d KiB, Bandwidth: %g Mbps\n", - minRTT, baseRTT, congWindow / 1024, - congWindow * 8.0 / baseRTT / 1000.0); + vlog.debug("RTT: %d ms (%d ms), Window: %d KiB, Bandwidth: %g Mbps", + minRTT, baseRTT, congWindow / 1024, + congWindow * 8.0 / baseRTT / 1000.0); #endif - minRTT = -1; - seenCongestion = false; - - return false; + measurements = 0; + gettimeofday(&lastAdjustment, NULL); + minRTT = minCongestedRTT = -1; } diff --git a/common/rfb/Congestion.h b/common/rfb/Congestion.h index e8548f93..2bea5dac 100644 --- a/common/rfb/Congestion.h +++ b/common/rfb/Congestion.h @@ -21,41 +21,62 @@ #include <list> -#include <rfb/Timer.h> - namespace rfb { - class Congestion : public Timer::Callback { + class Congestion { public: Congestion(); ~Congestion(); + // updatePosition() registers the current stream position and can + // and should be called often. + void updatePosition(unsigned pos); + // sentPing() must be called when a marker is placed on the - // outgoing stream, along with the current stream position. - // gotPong() must be called when the response for such a marker - // is received. - void sentPing(int offset); + // outgoing stream. gotPong() must be called when the response for + // such a marker is received. + void sentPing(); void gotPong(); // isCongested() determines if the transport is currently congested - // or if more data can be sent. The curren stream position and how - // long the transport has been idle must be specified. - bool isCongested(int offset, unsigned idleTime); + // or if more data can be sent. + bool isCongested(); - private: - // Timer callbacks - virtual bool handleTimeout(Timer* t); + // getUncongestedETA() returns the number of milliseconds until the + // transport is no longer congested. Returns 0 if there is no + // congestion, and -1 if it is unknown when the transport will no + // longer be congested. + int getUncongestedETA(); + + protected: + unsigned getExtraBuffer(); + unsigned getInFlight(); + + void updateCongestion(); private: + unsigned lastPosition; + unsigned extraBuffer; + struct timeval lastUpdate; + struct timeval lastSent; + unsigned baseRTT; unsigned congWindow; - unsigned ackedOffset, sentOffset; - unsigned minRTT; - bool seenCongestion; - Timer congestionTimer; + struct RTTInfo { + struct timeval tv; + unsigned pos; + unsigned extra; + bool congested; + }; - struct RTTInfo; std::list<struct RTTInfo> pings; + + struct RTTInfo lastPong; + struct timeval lastPongArrival; + + int measurements; + struct timeval lastAdjustment; + unsigned minRTT, minCongestedRTT; }; } diff --git a/common/rfb/VNCSConnectionST.cxx b/common/rfb/VNCSConnectionST.cxx index 43eb8256..b2ceb7d2 100644 --- a/common/rfb/VNCSConnectionST.cxx +++ b/common/rfb/VNCSConnectionST.cxx @@ -43,7 +43,7 @@ VNCSConnectionST::VNCSConnectionST(VNCServerST* server_, network::Socket *s, : sock(s), reverseConnection(reverse), queryConnectTimer(this), inProcessMessages(false), pendingSyncFence(false), syncFence(false), fenceFlags(0), - fenceDataLen(0), fenceData(NULL), + fenceDataLen(0), fenceData(NULL), congestionTimer(this), server(server_), updates(false), updateRenderedCursor(false), removeRenderedCursor(false), continuousUpdates(false), encodeManager(this), pointerEventTime(0), @@ -726,6 +726,8 @@ bool VNCSConnectionST::handleTimeout(Timer* t) if (t == &queryConnectTimer) { if (state() == RFBSTATE_QUERYING) approveConnection(false, "The attempt to prompt the user to accept the connection failed"); + } else if (t == &congestionTimer) { + writeFramebufferUpdate(); } } catch (rdr::Exception& e) { close(e.str()); @@ -742,6 +744,8 @@ void VNCSConnectionST::writeRTTPing() if (!cp.supportsFence) return; + congestion.updatePosition(sock->outStream().length()); + // We need to make sure any old update are already processed by the // time we get the response back. This allows us to reliably throttle // back on client overload, as well as network overload. @@ -749,11 +753,15 @@ void VNCSConnectionST::writeRTTPing() writer()->writeFence(fenceFlagRequest | fenceFlagBlockBefore, sizeof(type), &type); - congestion.sentPing(sock->outStream().length()); + congestion.sentPing(); } bool VNCSConnectionST::isCongested() { + unsigned eta; + + congestionTimer.stop(); + // Stuff still waiting in the send buffer? sock->outStream().flush(); if (sock->outStream().bufferUsage() > 0) @@ -762,13 +770,22 @@ bool VNCSConnectionST::isCongested() if (!cp.supportsFence) return false; - return congestion.isCongested(sock->outStream().length(), - sock->outStream().getIdleTime()); + congestion.updatePosition(sock->outStream().length()); + if (!congestion.isCongested()) + return false; + + eta = congestion.getUncongestedETA(); + if (eta >= 0) + congestionTimer.start(eta); + + return true; } void VNCSConnectionST::writeFramebufferUpdate() { + congestion.updatePosition(sock->outStream().length()); + // We're in the middle of processing a command that's supposed to be // synchronised. Allowing an update to slip out right now might violate // that synchronisation. @@ -805,6 +822,8 @@ void VNCSConnectionST::writeFramebufferUpdate() writeDataUpdate(); network::TcpSocket::cork(sock->getFd(), false); + + congestion.updatePosition(sock->outStream().length()); } void VNCSConnectionST::writeNoDataUpdate() diff --git a/common/rfb/VNCSConnectionST.h b/common/rfb/VNCSConnectionST.h index 96d7e4c9..dde0b1ec 100644 --- a/common/rfb/VNCSConnectionST.h +++ b/common/rfb/VNCSConnectionST.h @@ -187,6 +187,7 @@ namespace rfb { char *fenceData; Congestion congestion; + Timer congestionTimer; VNCServerST* server; SimpleUpdateTracker updates; diff --git a/common/rfb/util.cxx b/common/rfb/util.cxx index 22e00ffc..755c91fd 100644 --- a/common/rfb/util.cxx +++ b/common/rfb/util.cxx @@ -121,19 +121,38 @@ namespace rfb { dest[src ? destlen-1 : 0] = 0; } + unsigned msBetween(const struct timeval *first, + const struct timeval *second) + { + unsigned diff; + + diff = (second->tv_sec - first->tv_sec) * 1000; + + diff += second->tv_usec / 1000; + diff -= first->tv_usec / 1000; + + return diff; + } + unsigned msSince(const struct timeval *then) { struct timeval now; - unsigned diff; gettimeofday(&now, NULL); - diff = (now.tv_sec - then->tv_sec) * 1000; - - diff += now.tv_usec / 1000; - diff -= then->tv_usec / 1000; + return msBetween(then, &now); + } - return diff; + bool isBefore(const struct timeval *first, + const struct timeval *second) + { + if (first->tv_sec < second->tv_sec) + return true; + if (first->tv_sec > second->tv_sec) + return false; + if (first->tv_usec < second->tv_usec) + return true; + return false; } static size_t doPrefix(long long value, const char *unit, diff --git a/common/rfb/util.h b/common/rfb/util.h index e9114c3d..3ca92f9d 100644 --- a/common/rfb/util.h +++ b/common/rfb/util.h @@ -95,9 +95,17 @@ namespace rfb { return (secs < 0 || secs > (INT_MAX/1000) ? INT_MAX : secs * 1000); } + // Returns time elapsed between two moments in milliseconds. + unsigned msBetween(const struct timeval *first, + const struct timeval *second); + // Returns time elapsed since given moment in milliseconds. unsigned msSince(const struct timeval *then); + // Returns true if first happened before seconds + bool isBefore(const struct timeval *first, + const struct timeval *second); + size_t siPrefix(long long value, const char *unit, char *buffer, size_t maxlen, int precision=6); size_t iecPrefix(long long value, const char *unit, |