From 1b478e517749ba0a426fa4dd5a3b5ec8c502c7d8 Mon Sep 17 00:00:00 2001 From: Pierre Ossman Date: Tue, 15 Nov 2011 12:08:30 +0000 Subject: Server implementation of continuous updates, including advanced flow control. git-svn-id: svn://svn.code.sf.net/p/tigervnc/code/trunk@4803 3789f03b-4d11-0410-bbf8-ca57d06f2519 --- common/rfb/VNCSConnectionST.cxx | 282 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 272 insertions(+), 10 deletions(-) (limited to 'common/rfb/VNCSConnectionST.cxx') diff --git a/common/rfb/VNCSConnectionST.cxx b/common/rfb/VNCSConnectionST.cxx index 12dcf047..bd93d633 100644 --- a/common/rfb/VNCSConnectionST.cxx +++ b/common/rfb/VNCSConnectionST.cxx @@ -17,6 +17,17 @@ * USA. */ +// Debug output on what the congestion control is up to +#undef CONGESTION_DEBUG + +#include + +#ifdef CONGESTION_DEBUG +#include +#include +#include +#endif + #include #include #include @@ -34,13 +45,34 @@ using namespace rfb; static LogWriter vlog("VNCSConnST"); +// This window should get us going fairly fast on a decent bandwidth network. +// If it's too high, it will rapidly be reduced and stay low. +static const unsigned INITIAL_WINDOW = 16384; + +// TCP's minimal window is 3*MSS. But since we don't know the MSS, we +// make a guess at 4 KiB (it's probaly a bit higher). +static const unsigned MINIMUM_WINDOW = 4096; + +// The current default maximum window for Linux (4 MiB). Should be a good +// limit for now... +static const unsigned MAXIMUM_WINDOW = 4194304; + +struct RTTInfo { + struct timeval tv; + int offset; + unsigned inFlight; +}; + VNCSConnectionST::VNCSConnectionST(VNCServerST* server_, network::Socket *s, bool reverse) : SConnection(reverse), sock(s), inProcessMessages(false), syncFence(false), fenceFlags(0), fenceDataLen(0), fenceData(NULL), + baseRTT(-1), minRTT(-1), seenCongestion(false), pingCounter(0), + ackedOffset(0), sentOffset(0), congWindow(0), congestionTimer(this), server(server_), updates(false), image_getter(server->useEconomicTranslate), drawRenderedCursor(false), removeRenderedCursor(false), + continuousUpdates(false), updateTimer(this), pointerEventTime(0), accessRights(AccessDefault), startTime(time(0)) { @@ -361,6 +393,10 @@ void VNCSConnectionST::authSuccess() // - Mark the entire display as "dirty" updates.add_changed(server->pb->getRect()); startTime = time(0); + + // - Bootstrap the congestion control + ackedOffset = sock->outStream().length(); + congWindow = INITIAL_WINDOW; } void VNCSConnectionST::queryConnection(const char* userName) @@ -518,7 +554,8 @@ void VNCSConnectionST::framebufferUpdateRequest(const Rect& r,bool incremental) // Just update the requested region. // Framebuffer update will be sent a bit later, see processMessages(). Region reqRgn(r); - requested.assign_union(reqRgn); + if (!incremental || !continuousUpdates) + requested.assign_union(reqRgn); if (!incremental) { // Non-incremental update - treat as if area requested has changed @@ -601,15 +638,42 @@ void VNCSConnectionST::fence(rdr::U32 flags, unsigned len, const char data[]) return; } + struct RTTInfo rttInfo; + switch (len) { case 0: // Initial dummy fence; break; + case sizeof(struct RTTInfo): + memcpy(&rttInfo, data, sizeof(struct RTTInfo)); + handleRTTPong(rttInfo); + break; default: vlog.error("Fence response of unexpected size received"); } } +void VNCSConnectionST::enableContinuousUpdates(bool enable, + int x, int y, int w, int h) +{ + Rect rect; + + if (!cp.supportsFence || !cp.supportsContinuousUpdates) + throw Exception("Client tried to enable continuous updates when not allowed"); + + continuousUpdates = enable; + + rect.setXYWH(x, y, w, h); + cuRegion.reset(rect); + + if (enable) { + requested.clear(); + writeFramebufferUpdate(); + } else { + writer()->writeEndOfContinuousUpdates(); + } +} + // supportsLocalCursor() is called whenever the status of // cp.supportsLocalCursor has changed. If the client does now support local // cursor, we make sure that the old server-side rendered cursor is cleaned up @@ -630,6 +694,16 @@ void VNCSConnectionST::supportsFence() writer()->writeFence(fenceFlagRequest, 0, NULL); } +void VNCSConnectionST::supportsContinuousUpdates() +{ + // We refuse to use continuous updates if we cannot monitor the buffer + // usage using fences. + if (!cp.supportsFence) + return; + + writer()->writeEndOfContinuousUpdates(); +} + void VNCSConnectionST::writeSetCursorCallback() { if (cp.supportsLocalXCursor) { @@ -670,6 +744,8 @@ bool VNCSConnectionST::handleTimeout(Timer* t) try { if (t == &updateTimer) writeFramebufferUpdate(); + else if (t == &congestionTimer) + updateCongestion(); } catch (rdr::Exception& e) { close(e.str()); } @@ -678,17 +754,188 @@ bool VNCSConnectionST::handleTimeout(Timer* t) } +void VNCSConnectionST::writeRTTPing() +{ + struct RTTInfo rttInfo; + + if (!cp.supportsFence) + return; + + memset(&rttInfo, 0, sizeof(struct RTTInfo)); + + gettimeofday(&rttInfo.tv, NULL); + rttInfo.offset = sock->outStream().length(); + rttInfo.inFlight = rttInfo.offset - ackedOffset; + + // We need to make sure any old update are already processed by the + // time we get the response back. This allows us to reliably throttle + // back on client overload, as well as network overload. + writer()->writeFence(fenceFlagRequest | fenceFlagBlockBefore, + sizeof(struct RTTInfo), (const char*)&rttInfo); + + pingCounter++; + + sentOffset = rttInfo.offset; + + // Let some data flow before we adjust the settings + if (!congestionTimer.isStarted()) + congestionTimer.start(__rfbmin(baseRTT * 2, 100)); +} + +void VNCSConnectionST::handleRTTPong(const struct RTTInfo &rttInfo) +{ + unsigned rtt, delay; + int bdp; + + pingCounter--; + + rtt = msSince(&rttInfo.tv); + if (rtt < 1) + rtt = 1; + + ackedOffset = rttInfo.offset; + + // Try to estimate wire latency by tracking lowest seen latency + if (rtt < baseRTT) + baseRTT = rtt; + + if (rttInfo.inFlight > congWindow) { + seenCongestion = true; + + // Estimate added delay because of overtaxed buffers + delay = (rttInfo.inFlight - congWindow) * baseRTT / congWindow; + + if (delay < rtt) + rtt -= delay; + else + rtt = 1; + + // If we underestimate the congestion window, then we'll get a latency + // that's less than the wire latency, which will confuse other portions + // of the code. + if (rtt < baseRTT) + rtt = baseRTT; + } + + // We only keep track of the minimum latency seen (for a given interval) + // on the basis that we want to avoid continous buffer issue, but don't + // mind (or even approve of) bursts. + if (rtt < minRTT) + minRTT = rtt; +} + bool VNCSConnectionST::isCongested() { + int offset; + + // Stuff still waiting in the send buffer? if (sock->outStream().bufferUsage() > 0) return true; - return false; + if (!cp.supportsFence) + return false; + + // Idle for too long? (and no data on the wire) + // + // FIXME: This should really just be one baseRTT, but we're getting + // problems with triggering the idle timeout on each update. + // Maybe we need to use a moving average for the wire latency + // instead of baseRTT. + if ((sentOffset == ackedOffset) && + (sock->outStream().getIdleTime() > 2 * baseRTT)) { + +#ifdef CONGESTION_DEBUG + if (congWindow > INITIAL_WINDOW) + fprintf(stderr, "Reverting to initial window (%d KiB) after %d ms\n", + INITIAL_WINDOW / 1024, sock->outStream().getIdleTime()); +#endif + + // Close congestion window and allow a transfer + // FIXME: Reset baseRTT like Linux Vegas? + congWindow = __rfbmin(INITIAL_WINDOW, congWindow); + + return false; + } + + offset = sock->outStream().length(); + + // FIXME: Should we compensate for non-update data? + // (i.e. use sentOffset instead of offset) + if ((offset - ackedOffset) < congWindow) + return false; + + // If we just have one outstanding "ping", that means the client has + // started receiving our update. In order to not regress compared to + // before we had congestion avoidance, we allow another update here. + // This could further clog up the tubes, but congestion control isn't + // really working properly right now anyway as the wire would otherwise + // be idle for at least RTT/2. + if (pingCounter == 1) + return false; + + return true; +} + + +void VNCSConnectionST::updateCongestion() +{ + unsigned diff; + + if (!seenCongestion) + return; + + diff = minRTT - baseRTT; + + if (diff > __rfbmin(100, baseRTT)) { + // Way too fast + congWindow = congWindow * baseRTT / minRTT; + } else if (diff > __rfbmin(50, baseRTT/2)) { + // Slightly too fast + congWindow -= 4096; + } else if (diff < 5) { + // Way too slow + congWindow += 8192; + } else if (diff < 25) { + // Too slow + congWindow += 4096; + } + + if (congWindow < MINIMUM_WINDOW) + congWindow = MINIMUM_WINDOW; + if (congWindow > MAXIMUM_WINDOW) + congWindow = MAXIMUM_WINDOW; + +#ifdef CONGESTION_DEBUG + fprintf(stderr, "RTT: %d ms (%d ms), Window: %d KiB, Bandwidth: %g Mbps\n", + minRTT, baseRTT, congWindow / 1024, + congWindow * 8.0 / baseRTT / 1000.0); + +#ifdef TCP_INFO + struct tcp_info tcp_info; + socklen_t tcp_info_length; + + tcp_info_length = sizeof(tcp_info); + if (getsockopt(sock->getFd(), SOL_TCP, TCP_INFO, + (void *)&tcp_info, &tcp_info_length) == 0) { + fprintf(stderr, "Socket: RTT: %d ms (+/- %d ms) Window %d KiB\n", + tcp_info.tcpi_rtt / 1000, tcp_info.tcpi_rttvar / 1000, + tcp_info.tcpi_snd_mss * tcp_info.tcpi_snd_cwnd / 1024); + } +#endif + +#endif + + minRTT = -1; + seenCongestion = false; } void VNCSConnectionST::writeFramebufferUpdate() { + Region req; + UpdateInfo ui; + bool needNewUpdateInfo; + updateTimer.stop(); // We're in the middle of processing a command that's supposed to be @@ -703,7 +950,9 @@ void VNCSConnectionST::writeFramebufferUpdate() if (inProcessMessages) return; - if (state() != RFBSTATE_NORMAL || requested.is_empty()) + if (state() != RFBSTATE_NORMAL) + return; + if (requested.is_empty() && !continuousUpdates) return; // Check that we actually have some space on the link and retry in a @@ -718,7 +967,8 @@ void VNCSConnectionST::writeFramebufferUpdate() if (writer()->needNoDataUpdate()) { writer()->writeNoDataUpdate(); requested.clear(); - return; + if (!continuousUpdates) + return; } updates.enable_copyrect(cp.useCopyRect); @@ -733,9 +983,13 @@ void VNCSConnectionST::writeFramebufferUpdate() // getUpdateInfo() will normalize the `updates' object such way that its // `changed' and `copied' regions would not intersect. - UpdateInfo ui; - updates.getUpdateInfo(&ui, requested); - bool needNewUpdateInfo = false; + if (continuousUpdates) + req = cuRegion.union_(requested); + else + req = requested; + + updates.getUpdateInfo(&ui, req); + needNewUpdateInfo = false; // If the previous position of the rendered cursor overlaps the source of the // copy, then when the copy happens the corresponding rectangle in the @@ -768,7 +1022,7 @@ void VNCSConnectionST::writeFramebufferUpdate() // The `updates' object could change, make sure we have valid update info. if (needNewUpdateInfo) - updates.getUpdateInfo(&ui, requested); + updates.getUpdateInfo(&ui, req); // If the client needs a server-side rendered cursor, work out the cursor // rectangle. If it's empty then don't bother drawing it, but if it overlaps @@ -778,7 +1032,7 @@ void VNCSConnectionST::writeFramebufferUpdate() if (needRenderedCursor()) { renderedCursorRect = (server->renderedCursor.getRect(server->renderedCursorTL) - .intersect(requested.get_bounding_rect())); + .intersect(req.get_bounding_rect())); if (renderedCursorRect.is_empty()) { drawRenderedCursor = false; @@ -794,7 +1048,7 @@ void VNCSConnectionST::writeFramebufferUpdate() //if (drawRenderedCursor) { // updates.subtract(renderedCursorRect); - // updates.getUpdateInfo(&ui, requested); + // updates.getUpdateInfo(&ui, req); //} } @@ -818,14 +1072,22 @@ void VNCSConnectionST::writeFramebufferUpdate() nRects += nUpdateRects; } } + + writeRTTPing(); writer()->writeFramebufferUpdateStart(nRects); + Region updatedRegion; writer()->writeRects(ui, &image_getter, &updatedRegion); updates.subtract(updatedRegion); + if (drawRenderedCursor) writeRenderedCursorRect(); + writer()->writeFramebufferUpdateEnd(); + + writeRTTPing(); + requested.clear(); } } -- cgit v1.2.3