]> source.dussan.org Git - tigervnc.git/commitdiff
Improved congestion control handling
authorPierre Ossman <ossman@cendio.se>
Sun, 13 Dec 2015 14:43:46 +0000 (15:43 +0100)
committerPierre Ossman <ossman@cendio.se>
Fri, 17 Nov 2017 07:23:07 +0000 (08:23 +0100)
Refine the previous method by interpolating the values we need.
This reduces the effect of the problem that we cannot send enough
ping packets.

common/rfb/Congestion.cxx
common/rfb/Congestion.h
common/rfb/VNCSConnectionST.cxx
common/rfb/VNCSConnectionST.h
common/rfb/util.cxx
common/rfb/util.h

index c4c4d96d9edb9939f12909ea2d34d7d8fdc0e30c..94d78e32f26f85a059e35702ff1fefae336c0ca7 100644 (file)
  * The basic principle is TCP Congestion Control (RFC 5618), with the
  * addition of using the TCP Vegas algorithm. The reason we use Vegas
  * is that we run on top of a reliable transport so we need a latency
- * based algorithm rather than a loss based one.
+ * based algorithm rather than a loss based one. There is also a lot of
+ * interpolation of values. This is because we have rather horrible
+ * granularity in our measurements.
  */
 
+#include <assert.h>
 #include <sys/time.h>
 
 #include <rfb/Congestion.h>
+#include <rfb/LogWriter.h>
 #include <rfb/util.h>
 
 // Debug output on what the congestion control is up to
@@ -49,158 +53,332 @@ static const unsigned MINIMUM_WINDOW = 4096;
 // limit for now...
 static const unsigned MAXIMUM_WINDOW = 4194304;
 
-struct Congestion::RTTInfo {
-  struct timeval tv;
-  int offset;
-  unsigned inFlight;
-};
+static LogWriter vlog("Congestion");
 
 Congestion::Congestion() :
+    lastPosition(0), extraBuffer(0),
     baseRTT(-1), congWindow(INITIAL_WINDOW),
-    ackedOffset(0), sentOffset(0),
-    minRTT(-1), seenCongestion(false),
-    congestionTimer(this)
+    measurements(0), minRTT(-1), minCongestedRTT(-1)
 {
+  gettimeofday(&lastUpdate, NULL);
+  gettimeofday(&lastSent, NULL);
+  memset(&lastPong, 0, sizeof(lastPong));
+  gettimeofday(&lastPongArrival, NULL);
+  gettimeofday(&lastAdjustment, NULL);
 }
 
 Congestion::~Congestion()
 {
 }
 
+void Congestion::updatePosition(unsigned pos)
+{
+  struct timeval now;
+  unsigned delta, consumed;
+
+  gettimeofday(&now, NULL);
+
+  delta = pos - lastPosition;
+  if ((delta > 0) || (extraBuffer > 0))
+    lastSent = now;
+
+  // Idle for too long?
+  // We use a very crude RTO calculation in order to keep things simple
+  // FIXME: should implement RFC 2861
+  if (msBetween(&lastSent, &now) > __rfbmax(baseRTT*2, 100)) {
+
+#ifdef CONGESTION_DEBUG
+    vlog.debug("Connection idle for %d ms, resetting congestion control",
+               msBetween(&lastSent, &now));
+#endif
+
+    // Close congestion window and redo wire latency measurement
+    congWindow = __rfbmin(INITIAL_WINDOW, congWindow);
+    baseRTT = -1;
+    measurements = 0;
+    gettimeofday(&lastAdjustment, NULL);
+    minRTT = minCongestedRTT = -1;
+  }
+
+  // Commonly we will be in a state of overbuffering. We need to
+  // estimate the extra delay that causes so we can separate it from
+  // the delay caused by an incorrect congestion window.
+  // (we cannot do this until we have a RTT measurement though)
+  if (baseRTT != (unsigned)-1) {
+    extraBuffer += delta;
+    consumed = msBetween(&lastUpdate, &now) * congWindow / baseRTT;
+    if (extraBuffer < consumed)
+      extraBuffer = 0;
+    else
+      extraBuffer -= consumed;
+  }
+
+  lastPosition = pos;
+  lastUpdate = now;
+}
 
-void Congestion::sentPing(int offset)
+void Congestion::sentPing()
 {
   struct RTTInfo rttInfo;
 
-  if (ackedOffset == 0)
-    ackedOffset = offset;
-
   memset(&rttInfo, 0, sizeof(struct RTTInfo));
 
   gettimeofday(&rttInfo.tv, NULL);
-  rttInfo.offset = offset;
-  rttInfo.inFlight = rttInfo.offset - ackedOffset;
+  rttInfo.pos = lastPosition;
+  rttInfo.extra = getExtraBuffer();
+  rttInfo.congested = isCongested();
 
   pings.push_back(rttInfo);
-
-  sentOffset = offset;
-
-  // Let some data flow before we adjust the settings
-  if (!congestionTimer.isStarted())
-    congestionTimer.start(__rfbmin(baseRTT * 2, 100));
 }
 
 void Congestion::gotPong()
 {
+  struct timeval now;
   struct RTTInfo rttInfo;
   unsigned rtt, delay;
 
   if (pings.empty())
     return;
 
+  gettimeofday(&now, NULL);
+
   rttInfo = pings.front();
   pings.pop_front();
 
-  rtt = msSince(&rttInfo.tv);
+  lastPong = rttInfo;
+  lastPongArrival = now;
+
+  rtt = msBetween(&rttInfo.tv, &now);
   if (rtt < 1)
     rtt = 1;
 
-  ackedOffset = rttInfo.offset;
-
   // Try to estimate wire latency by tracking lowest seen latency
   if (rtt < baseRTT)
     baseRTT = rtt;
 
-  if (rttInfo.inFlight > congWindow) {
-    seenCongestion = true;
-
-    // Estimate added delay because of overtaxed buffers
-    delay = (rttInfo.inFlight - congWindow) * baseRTT / congWindow;
+  // Pings sent before the last adjustment aren't interesting as they
+  // aren't a measurement of the current congestion window
+  if (isBefore(&rttInfo.tv, &lastAdjustment))
+    return;
 
-    if (delay < rtt)
-      rtt -= delay;
-    else
-      rtt = 1;
+  // Estimate added delay because of overtaxed buffers (see above)
+  delay = rttInfo.extra * baseRTT / congWindow;
+  if (delay < rtt)
+    rtt -= delay;
+  else
+    rtt = 1;
 
-    // If we underestimate the congestion window, then we'll get a latency
-    // that's less than the wire latency, which will confuse other portions
-    // of the code.
-    if (rtt < baseRTT)
-      rtt = baseRTT;
-  }
+  // A latency less than the wire latency means that we've
+  // understimated the congestion window. We can't really determine
+  // how much, so pretend that we got no buffer latency at all.
+  if (rtt < baseRTT)
+    rtt = baseRTT;
 
-  // We only keep track of the minimum latency seen (for a given interval)
-  // on the basis that we want to avoid continuous buffer issue, but don't
-  // mind (or even approve of) bursts.
+  // Record the minimum seen delay (hopefully ignores jitter) and let
+  // the congestion control do its thing.
+  //
+  // Note: We are delay based rather than loss based, which means we
+  //       need to look at pongs even if they weren't limited by the
+  //       current window ("congested"). Otherwise we will fail to
+  //       detect increasing congestion until the application exceeds
+  //       the congestion window.
   if (rtt < minRTT)
     minRTT = rtt;
+  if (rttInfo.congested) {
+    if (rtt < minCongestedRTT)
+      minCongestedRTT = rtt;
+  }
+
+  measurements++;
+  updateCongestion();
 }
 
-bool Congestion::isCongested(int offset, unsigned idleTime)
+bool Congestion::isCongested()
 {
-  // Idle for too long? (and no data on the wire)
-  //
-  // FIXME: This should really just be one baseRTT, but we're getting
-  //        problems with triggering the idle timeout on each update.
-  //        Maybe we need to use a moving average for the wire latency
-  //        instead of baseRTT.
-  if ((sentOffset == ackedOffset) && (idleTime > 2 * baseRTT)) {
+  if (getInFlight() < congWindow)
+    return false;
 
-#ifdef CONGESTION_DEBUG
-    if (congWindow > INITIAL_WINDOW)
-      fprintf(stderr, "Reverting to initial window (%d KiB) after %d ms\n",
-              INITIAL_WINDOW / 1024, sock->outStream().getIdleTime());
-#endif
+  return true;
+}
 
-    // Close congestion window and allow a transfer
-    // FIXME: Reset baseRTT like Linux Vegas?
-    congWindow = __rfbmin(INITIAL_WINDOW, congWindow);
+int Congestion::getUncongestedETA()
+{
+  unsigned targetAcked;
+
+  const struct RTTInfo* prevPing;
+  unsigned eta, elapsed;
+  unsigned etaNext, delay;
+
+  std::list<struct RTTInfo>::const_iterator iter;
+
+  targetAcked = lastPosition - congWindow;
+
+  // Simple case?
+  if (lastPong.pos > targetAcked)
+    return 0;
+
+  // No measurements yet?
+  if (baseRTT == (unsigned)-1)
+    return -1;
+
+  prevPing = &lastPong;
+  eta = 0;
+  elapsed = msSince(&lastPongArrival);
+
+  // Walk the ping queue and figure out which one we are waiting for to
+  // get to an uncongested state
+
+  for (iter = pings.begin(); ;++iter) {
+    struct RTTInfo curPing;
+
+    // If we aren't waiting for a pong that will clear the congested
+    // state then we have to estimate the final bit by pretending that
+    // we had a ping just after the last position update.
+    if (iter == pings.end()) {
+      curPing.tv = lastUpdate;
+      curPing.pos = lastPosition;
+      curPing.extra = extraBuffer;
+    } else {
+      curPing = *iter;
+    }
+
+    etaNext = msBetween(&prevPing->tv, &curPing.tv);
+    // Compensate for buffering delays
+    delay = curPing.extra * baseRTT / congWindow;
+    etaNext += delay;
+    delay = prevPing->extra * baseRTT / congWindow;
+    if (delay >= etaNext)
+      etaNext = 0;
+    else
+      etaNext -= delay;
 
-    return false;
+    // Found it?
+    if (curPing.pos > targetAcked) {
+      eta += etaNext * (curPing.pos - targetAcked) / (curPing.pos - prevPing->pos);
+      if (elapsed > eta)
+        return 0;
+      else
+        return eta - elapsed;
+    }
+
+    assert(iter != pings.end());
+
+    eta += etaNext;
+    prevPing = &*iter;
   }
+}
 
-  // FIXME: Should we compensate for non-update data?
-  //        (i.e. use sentOffset instead of offset)
-  if ((offset - ackedOffset) < congWindow)
-    return false;
+unsigned Congestion::getExtraBuffer()
+{
+  unsigned elapsed;
+  unsigned consumed;
 
-  // If we just have one outstanding "ping", that means the client has
-  // started receiving our update. In order to not regress compared to
-  // before we had congestion avoidance, we allow another update here.
-  // This could further clog up the tubes, but congestion control isn't
-  // really working properly right now anyway as the wire would otherwise
-  // be idle for at least RTT/2.
-  if (pings.size() == 1)
-    return false;
+  if (baseRTT == (unsigned)-1)
+    return 0;
 
-  return true;
+  elapsed = msSince(&lastUpdate);
+  consumed = elapsed * congWindow / baseRTT;
+
+  if (consumed >= extraBuffer)
+    return 0;
+  else
+    return extraBuffer - consumed;
 }
 
-bool Congestion::handleTimeout(Timer* t)
+unsigned Congestion::getInFlight()
+{
+  struct RTTInfo nextPong;
+  unsigned etaNext, delay, elapsed, acked;
+
+  // Simple case?
+  if (lastPosition == lastPong.pos)
+    return 0;
+
+  // No measurements yet?
+  if (baseRTT == (unsigned)-1) {
+    if (!pings.empty())
+      return lastPosition - pings.front().pos;
+    return 0;
+  }
+
+  // If we aren't waiting for any pong then we have to estimate things
+  // by pretending that we had a ping just after the last position
+  // update.
+  if (pings.empty()) {
+    nextPong.tv = lastUpdate;
+    nextPong.pos = lastPosition;
+    nextPong.extra = extraBuffer;
+  } else {
+    nextPong = pings.front();
+  }
+
+  // First we need to estimate how many bytes have made it through
+  // completely. Look at the next ping that should arrive and figure
+  // out how far behind it should be and interpolate the positions.
+
+  etaNext = msBetween(&lastPong.tv, &nextPong.tv);
+  // Compensate for buffering delays
+  delay = nextPong.extra * baseRTT / congWindow;
+  etaNext += delay;
+  delay = lastPong.extra * baseRTT / congWindow;
+  if (delay >= etaNext)
+    etaNext = 0;
+  else
+    etaNext -= delay;
+
+  elapsed = msSince(&lastPongArrival);
+
+  // The pong should be here any second. Be optimistic and assume
+  // we can already use its value.
+  if (etaNext <= elapsed)
+    acked = nextPong.pos;
+  else {
+    acked = lastPong.pos;
+    acked += (nextPong.pos - lastPong.pos) * elapsed / etaNext;
+  }
+
+  return lastPosition - acked;
+}
+
+void Congestion::updateCongestion()
 {
   unsigned diff;
 
-  if (!seenCongestion)
-    return false;
+  // We want at least three measurements to avoid noise
+  if (measurements < 3)
+    return;
+
+  assert(minRTT >= baseRTT);
+  assert(minCongestedRTT >= baseRTT);
 
   // The goal is to have a slightly too large congestion window since
   // a "perfect" one cannot be distinguished from a too small one. This
   // translates to a goal of a few extra milliseconds of delay.
 
+  // First we check all pongs to make sure we're not having a too large
+  // congestion window.
   diff = minRTT - baseRTT;
 
-  if (diff > __rfbmin(100, baseRTT)) {
+  // FIXME: Should we do slow start?
+  if (diff > 100) {
     // Way too fast
     congWindow = congWindow * baseRTT / minRTT;
-  } else if (diff > __rfbmin(50, baseRTT/2)) {
+  } else if (diff > 50) {
     // Slightly too fast
     congWindow -= 4096;
-  } else if (diff < 5) {
-    // Way too slow
-    congWindow += 8192;
-  } else if (diff < 25) {
-    // Too slow
-    congWindow += 4096;
+  } else {
+    // Secondly only the "congested" pongs are checked to see if the
+    // window is too small.
+
+    diff = minCongestedRTT - baseRTT;
+
+    if (diff < 5) {
+      // Way too slow
+      congWindow += 8192;
+    } else if (diff < 25) {
+      // Too slow
+      congWindow += 4096;
+    }
   }
 
   if (congWindow < MINIMUM_WINDOW)
@@ -209,14 +387,13 @@ bool Congestion::handleTimeout(Timer* t)
     congWindow = MAXIMUM_WINDOW;
 
 #ifdef CONGESTION_DEBUG
-  fprintf(stderr, "RTT: %d ms (%d ms), Window: %d KiB, Bandwidth: %g Mbps\n",
-          minRTT, baseRTT, congWindow / 1024,
-          congWindow * 8.0 / baseRTT / 1000.0);
+  vlog.debug("RTT: %d ms (%d ms), Window: %d KiB, Bandwidth: %g Mbps",
+             minRTT, baseRTT, congWindow / 1024,
+             congWindow * 8.0 / baseRTT / 1000.0);
 #endif
 
-  minRTT = -1;
-  seenCongestion = false;
-
-  return false;
+  measurements = 0;
+  gettimeofday(&lastAdjustment, NULL);
+  minRTT = minCongestedRTT = -1;
 }
 
index e8548f93e8a74ae671a259ff8beed715ed74796c..2bea5dacf945ee354461300d68bfc883c9f6e855 100644 (file)
 
 #include <list>
 
-#include <rfb/Timer.h>
-
 namespace rfb {
-  class Congestion : public Timer::Callback {
+  class Congestion {
   public:
     Congestion();
     ~Congestion();
 
+    // updatePosition() registers the current stream position and can
+    // and should be called often.
+    void updatePosition(unsigned pos);
+
     // sentPing() must be called when a marker is placed on the
-    // outgoing stream, along with the current stream position.
-    // gotPong() must be called when the response for such a marker
-    // is received.
-    void sentPing(int offset);
+    // outgoing stream. gotPong() must be called when the response for
+    // such a marker is received.
+    void sentPing();
     void gotPong();
 
     // isCongested() determines if the transport is currently congested
-    // or if more data can be sent. The curren stream position and how
-    // long the transport has been idle must be specified.
-    bool isCongested(int offset, unsigned idleTime);
+    // or if more data can be sent.
+    bool isCongested();
 
-  private:
-    // Timer callbacks
-    virtual bool handleTimeout(Timer* t);
+    // getUncongestedETA() returns the number of milliseconds until the
+    // transport is no longer congested. Returns 0 if there is no
+    // congestion, and -1 if it is unknown when the transport will no
+    // longer be congested.
+    int getUncongestedETA();
+
+  protected:
+    unsigned getExtraBuffer();
+    unsigned getInFlight();
+
+    void updateCongestion();
 
   private:
+    unsigned lastPosition;
+    unsigned extraBuffer;
+    struct timeval lastUpdate;
+    struct timeval lastSent;
+
     unsigned baseRTT;
     unsigned congWindow;
-    unsigned ackedOffset, sentOffset;
 
-    unsigned minRTT;
-    bool seenCongestion;
-    Timer congestionTimer;
+    struct RTTInfo {
+      struct timeval tv;
+      unsigned pos;
+      unsigned extra;
+      bool congested;
+    };
 
-    struct RTTInfo;
     std::list<struct RTTInfo> pings;
+
+    struct RTTInfo lastPong;
+    struct timeval lastPongArrival;
+
+    int measurements;
+    struct timeval lastAdjustment;
+    unsigned minRTT, minCongestedRTT;
   };
 }
 
index 43eb8256f81bfbfc04bce73dfccb85776a718642..b2ceb7d29361fcbb8a02b5771fb930e6d7bcfca3 100644 (file)
@@ -43,7 +43,7 @@ VNCSConnectionST::VNCSConnectionST(VNCServerST* server_, network::Socket *s,
   : sock(s), reverseConnection(reverse),
     queryConnectTimer(this), inProcessMessages(false),
     pendingSyncFence(false), syncFence(false), fenceFlags(0),
-    fenceDataLen(0), fenceData(NULL),
+    fenceDataLen(0), fenceData(NULL), congestionTimer(this),
     server(server_), updates(false),
     updateRenderedCursor(false), removeRenderedCursor(false),
     continuousUpdates(false), encodeManager(this), pointerEventTime(0),
@@ -726,6 +726,8 @@ bool VNCSConnectionST::handleTimeout(Timer* t)
     if (t == &queryConnectTimer) {
       if (state() == RFBSTATE_QUERYING)
         approveConnection(false, "The attempt to prompt the user to accept the connection failed");
+    } else if (t == &congestionTimer) {
+      writeFramebufferUpdate();
     }
   } catch (rdr::Exception& e) {
     close(e.str());
@@ -742,6 +744,8 @@ void VNCSConnectionST::writeRTTPing()
   if (!cp.supportsFence)
     return;
 
+  congestion.updatePosition(sock->outStream().length());
+
   // We need to make sure any old update are already processed by the
   // time we get the response back. This allows us to reliably throttle
   // back on client overload, as well as network overload.
@@ -749,11 +753,15 @@ void VNCSConnectionST::writeRTTPing()
   writer()->writeFence(fenceFlagRequest | fenceFlagBlockBefore,
                        sizeof(type), &type);
 
-  congestion.sentPing(sock->outStream().length());
+  congestion.sentPing();
 }
 
 bool VNCSConnectionST::isCongested()
 {
+  unsigned eta;
+
+  congestionTimer.stop();
+
   // Stuff still waiting in the send buffer?
   sock->outStream().flush();
   if (sock->outStream().bufferUsage() > 0)
@@ -762,13 +770,22 @@ bool VNCSConnectionST::isCongested()
   if (!cp.supportsFence)
     return false;
 
-  return congestion.isCongested(sock->outStream().length(),
-                                sock->outStream().getIdleTime());
+  congestion.updatePosition(sock->outStream().length());
+  if (!congestion.isCongested())
+    return false;
+
+  eta = congestion.getUncongestedETA();
+  if (eta >= 0)
+    congestionTimer.start(eta);
+
+  return true;
 }
 
 
 void VNCSConnectionST::writeFramebufferUpdate()
 {
+  congestion.updatePosition(sock->outStream().length());
+
   // We're in the middle of processing a command that's supposed to be
   // synchronised. Allowing an update to slip out right now might violate
   // that synchronisation.
@@ -805,6 +822,8 @@ void VNCSConnectionST::writeFramebufferUpdate()
   writeDataUpdate();
 
   network::TcpSocket::cork(sock->getFd(), false);
+
+  congestion.updatePosition(sock->outStream().length());
 }
 
 void VNCSConnectionST::writeNoDataUpdate()
index 96d7e4c9e5df9fc89b22044c09efdd243b5d61d8..dde0b1ec5c8ca1d66f408613902814abe00bf701 100644 (file)
@@ -187,6 +187,7 @@ namespace rfb {
     char *fenceData;
 
     Congestion congestion;
+    Timer congestionTimer;
 
     VNCServerST* server;
     SimpleUpdateTracker updates;
index 22e00ffc37dd12e0b6457ef546ab412e8aa382f2..755c91fd6f8342d71290f6ba806c23f12cd7202a 100644 (file)
@@ -121,19 +121,38 @@ namespace rfb {
     dest[src ? destlen-1 : 0] = 0;
   }
 
+  unsigned msBetween(const struct timeval *first,
+                     const struct timeval *second)
+  {
+    unsigned diff;
+
+    diff = (second->tv_sec - first->tv_sec) * 1000;
+
+    diff += second->tv_usec / 1000;
+    diff -= first->tv_usec / 1000;
+
+    return diff;
+  }
+
   unsigned msSince(const struct timeval *then)
   {
     struct timeval now;
-    unsigned diff;
 
     gettimeofday(&now, NULL);
 
-    diff = (now.tv_sec - then->tv_sec) * 1000;
-
-    diff += now.tv_usec / 1000;
-    diff -= then->tv_usec / 1000;
+    return msBetween(then, &now);
+  }
 
-    return diff;
+  bool isBefore(const struct timeval *first,
+                const struct timeval *second)
+  {
+    if (first->tv_sec < second->tv_sec)
+      return true;
+    if (first->tv_sec > second->tv_sec)
+      return false;
+    if (first->tv_usec < second->tv_usec)
+      return true;
+    return false;
   }
 
   static size_t doPrefix(long long value, const char *unit,
index e9114c3d79c04795d672b349ff667ab6ac6ed27c..3ca92f9d94c0f82d6cf52259eaad6f31b9a838cb 100644 (file)
@@ -95,9 +95,17 @@ namespace rfb {
     return (secs < 0 || secs > (INT_MAX/1000) ? INT_MAX : secs * 1000);
   }
 
+  // Returns time elapsed between two moments in milliseconds.
+  unsigned msBetween(const struct timeval *first,
+                     const struct timeval *second);
+
   // Returns time elapsed since given moment in milliseconds.
   unsigned msSince(const struct timeval *then);
 
+  // Returns true if first happened before seconds
+  bool isBefore(const struct timeval *first,
+                const struct timeval *second);
+
   size_t siPrefix(long long value, const char *unit,
                   char *buffer, size_t maxlen, int precision=6);
   size_t iecPrefix(long long value, const char *unit,