]> source.dussan.org Git - tigervnc.git/commitdiff
Make socket writes non-blockable. This allows the system to more quickly
authorPierre Ossman <ossman@cendio.se>
Tue, 25 Oct 2011 15:13:13 +0000 (15:13 +0000)
committerPierre Ossman <ossman@cendio.se>
Tue, 25 Oct 2011 15:13:13 +0000 (15:13 +0000)
return back to the Xorg main loop, meaning that things will be more responsive
in the presence of slow VNC clients.

git-svn-id: svn://svn.code.sf.net/p/tigervnc/code/trunk@4735 3789f03b-4d11-0410-bbf8-ca57d06f2519

common/rdr/FdOutStream.cxx
common/rdr/FdOutStream.h
unix/xserver/hw/vnc/XserverDesktop.cc
unix/xserver/hw/vnc/XserverDesktop.h
unix/xserver/hw/vnc/vncExtInit.cc
unix/xserver15.patch

index c84c946fa8dcd9d24547ac488ee945325602a50d..a6b85e2165263ea3e840bc025d985ce7dd4bf166 100644 (file)
@@ -50,17 +50,18 @@ using namespace rdr;
 
 enum { DEFAULT_BUF_SIZE = 16384 };
 
-FdOutStream::FdOutStream(int fd_, int timeoutms_, int bufSize_)
-  : fd(fd_), timeoutms(timeoutms_),
+FdOutStream::FdOutStream(int fd_, bool blocking_, int timeoutms_, int bufSize_)
+  : fd(fd_), blocking(blocking_), timeoutms(timeoutms_),
     bufSize(bufSize_ ? bufSize_ : DEFAULT_BUF_SIZE), offset(0)
 {
-  ptr = start = new U8[bufSize];
+  ptr = start = sentUpTo = new U8[bufSize];
   end = start + bufSize;
 }
 
 FdOutStream::~FdOutStream()
 {
   try {
+    blocking = true;
     flush();
   } catch (Exception&) {
   }
@@ -71,21 +72,56 @@ void FdOutStream::setTimeout(int timeoutms_) {
   timeoutms = timeoutms_;
 }
 
+void FdOutStream::setBlocking(bool blocking_) {
+  blocking = blocking_;
+}
+
 int FdOutStream::length()
 {
-  return offset + ptr - start;
+  return offset + ptr - sentUpTo;
+}
+
+int FdOutStream::bufferUsage()
+{
+  return ptr - sentUpTo;
 }
 
 void FdOutStream::flush()
 {
-  U8* sentUpTo = start;
+  int timeoutms_;
+
+  if (blocking)
+    timeoutms_ = timeoutms;
+  else
+    timeoutms_ = 0;
+
   while (sentUpTo < ptr) {
-    int n = writeWithTimeout((const void*) sentUpTo, ptr - sentUpTo);
+    int n = writeWithTimeout((const void*) sentUpTo,
+                             ptr - sentUpTo, timeoutms_);
+
+    // Timeout?
+    if (n == 0) {
+      // If non-blocking then we're done here
+      if (!blocking)
+        break;
+
+      // Otherwise try blocking (with possible timeout)
+      if ((timeoutms_ == 0) && (timeoutms != 0)) {
+        timeoutms_ = timeoutms;
+        break;
+      }
+
+      // Proper timeout
+      throw TimedOut();
+    }
+
     sentUpTo += n;
     offset += n;
   }
 
-  ptr = start;
+   // Managed to flush everything?
+  if (sentUpTo == ptr)
+    ptr = sentUpTo = start;
 }
 
 
@@ -94,8 +130,31 @@ int FdOutStream::overrun(int itemSize, int nItems)
   if (itemSize > bufSize)
     throw Exception("FdOutStream overrun: max itemSize exceeded");
 
+  // First try to get rid of the data we have
   flush();
 
+  // Still not enough space?
+  if (itemSize > end - ptr) {
+    // Can we shuffle things around?
+    // (don't do this if it gains us less than 25%)
+    if ((sentUpTo - start > bufSize / 4) &&
+        (itemSize < bufSize - (ptr - sentUpTo))) {
+      memmove(start, sentUpTo, ptr - sentUpTo);
+      ptr = start + (ptr - sentUpTo);
+      sentUpTo = start;
+    } else {
+      // Have to get rid of more data, so turn off non-blocking
+      // for a bit...
+      bool realBlocking;
+
+      realBlocking = blocking;
+      blocking = true;
+      flush();
+      blocking = realBlocking;
+    }
+  }
+
+  // Can we fit all the items asked for?
   if (itemSize * nItems > end - ptr)
     nItems = (end - ptr) / itemSize;
 
@@ -112,7 +171,7 @@ int FdOutStream::overrun(int itemSize, int nItems)
 // select() and write() returning EINTR.
 //
 
-int FdOutStream::writeWithTimeout(const void* data, int length)
+int FdOutStream::writeWithTimeout(const void* data, int length, int timeoutms)
 {
   int n;
 
@@ -146,7 +205,7 @@ int FdOutStream::writeWithTimeout(const void* data, int length)
 
     if (n < 0) throw SystemException("select",errno);
 
-    if (n == 0) throw TimedOut();
+    if (n == 0) return 0;
 
     do {
       n = ::write(fd, data, length);
index a3e29127921cf7320c05a3a503da82832428fb3f..daa88b24dbc8850787e05d55aeb6e1cb90bb5be3 100644 (file)
@@ -31,23 +31,28 @@ namespace rdr {
 
   public:
 
-    FdOutStream(int fd, int timeoutms=-1, int bufSize=0);
+    FdOutStream(int fd, bool blocking=true, int timeoutms=-1, int bufSize=0);
     virtual ~FdOutStream();
 
     void setTimeout(int timeoutms);
+    void setBlocking(bool blocking);
     int getFd() { return fd; }
 
     void flush();
     int length();
 
+    int bufferUsage();
+
   private:
     int overrun(int itemSize, int nItems);
-    int writeWithTimeout(const void* data, int length);
+    int writeWithTimeout(const void* data, int length, int timeoutms);
     int fd;
+    bool blocking;
     int timeoutms;
     int bufSize;
     int offset;
     U8* start;
+    U8* sentUpTo;
   };
 
 }
index 9afbb49b6cd64e80b07913109b7f331ab0f4eda1..9c5b20b53e4e0531c86ac937655ff15d46d9f250 100644 (file)
@@ -581,6 +581,7 @@ void XserverDesktop::wakeupHandler(fd_set* fds, int nfds)
         if (FD_ISSET(listener->getFd(), fds)) {
           FD_CLR(listener->getFd(), fds);
           Socket* sock = listener->accept();
+          sock->outStream().setBlocking(false);
           server->addSocket(sock);
           vlog.debug("new client, sock %d",sock->getFd());
         }
@@ -590,6 +591,7 @@ void XserverDesktop::wakeupHandler(fd_set* fds, int nfds)
         if (FD_ISSET(httpListener->getFd(), fds)) {
           FD_CLR(httpListener->getFd(), fds);
           Socket* sock = httpListener->accept();
+          sock->outStream().setBlocking(false);
           httpServer->addSocket(sock);
           vlog.debug("new http client, sock %d",sock->getFd());
         }
@@ -632,6 +634,78 @@ void XserverDesktop::wakeupHandler(fd_set* fds, int nfds)
   }
 }
 
+void XserverDesktop::writeBlockHandler(fd_set* fds)
+{
+  try {
+    std::list<Socket*> sockets;
+    std::list<Socket*>::iterator i;
+
+    server->getSockets(&sockets);
+    for (i = sockets.begin(); i != sockets.end(); i++) {
+      int fd = (*i)->getFd();
+      if ((*i)->isShutdown()) {
+        vlog.debug("client gone, sock %d",fd);
+        server->removeSocket(*i);
+        vncClientGone(fd);
+        delete (*i);
+      } else {
+        if ((*i)->outStream().bufferUsage() > 0)
+          FD_SET(fd, fds);
+      }
+    }
+
+    if (httpServer) {
+      httpServer->getSockets(&sockets);
+      for (i = sockets.begin(); i != sockets.end(); i++) {
+        int fd = (*i)->getFd();
+        if ((*i)->isShutdown()) {
+          vlog.debug("http client gone, sock %d",fd);
+          httpServer->removeSocket(*i);
+          delete (*i);
+        } else {
+          if ((*i)->outStream().bufferUsage() > 0)
+            FD_SET(fd, fds);
+        }
+      }
+    }
+  } catch (rdr::Exception& e) {
+    vlog.error("XserverDesktop::writeBlockHandler: %s",e.str());
+  }
+}
+
+void XserverDesktop::writeWakeupHandler(fd_set* fds, int nfds)
+{
+  if (nfds < 1)
+    return;
+
+  try {
+    std::list<Socket*> sockets;
+    std::list<Socket*>::iterator i;
+
+    server->getSockets(&sockets);
+    for (i = sockets.begin(); i != sockets.end(); i++) {
+      int fd = (*i)->getFd();
+      if (FD_ISSET(fd, fds)) {
+        FD_CLR(fd, fds);
+        (*i)->outStream().flush();
+      }
+    }
+
+    if (httpServer) {
+      httpServer->getSockets(&sockets);
+      for (i = sockets.begin(); i != sockets.end(); i++) {
+        int fd = (*i)->getFd();
+        if (FD_ISSET(fd, fds)) {
+          FD_CLR(fd, fds);
+          (*i)->outStream().flush();
+        }
+      }
+    }
+  } catch (rdr::Exception& e) {
+    vlog.error("XserverDesktop::writeWakeupHandler: %s",e.str());
+  }
+}
+
 void XserverDesktop::addClient(Socket* sock, bool reverse)
 {
   vlog.debug("new client, sock %d reverse %d",sock->getFd(),reverse);
index 1c037053642db58aa6b0ac203268a3483caa4989..af3651171feae4c0a822b2176571b6ac8160d47b 100644 (file)
@@ -72,6 +72,8 @@ public:
   void ignoreHooks(bool b) { ignoreHooks_ = b; }
   void blockHandler(fd_set* fds);
   void wakeupHandler(fd_set* fds, int nfds);
+  void writeBlockHandler(fd_set* fds);
+  void writeWakeupHandler(fd_set* fds, int nfds);
   void addClient(network::Socket* sock, bool reverse);
   void disconnectClients();
 
index d3cfbe2645ca76ae8716f6f153ad7d3af7a1dd9d..baa8fa4efa563edc544306539736e7425593193b 100644 (file)
@@ -21,6 +21,7 @@
 #endif
 
 #include <stdio.h>
+#include <errno.h>
 
 extern "C" {
 #define class c_class
@@ -28,6 +29,7 @@ extern "C" {
 #define NEED_EVENTS
 #include <X11/X.h>
 #include <X11/Xproto.h>
+#include <X11/Xpoll.h>
 #include "misc.h"
 #include "os.h"
 #include "dixstruct.h"
@@ -63,6 +65,8 @@ extern "C" {
   static void vncResetProc(ExtensionEntry* extEntry);
   static void vncBlockHandler(pointer data, OSTimePtr t, pointer readmask);
   static void vncWakeupHandler(pointer data, int nfds, pointer readmask);
+  void vncWriteBlockHandler(fd_set *fds);
+  void vncWriteWakeupHandler(int nfds, fd_set *fds);
   static void vncClientStateChange(CallbackListPtr*, pointer, pointer);
   static void SendSelectionChangeEvent(Atom selection);
   static int ProcVncExtDispatch(ClientPtr client);
@@ -287,6 +291,9 @@ static void vncSelectionCallback(CallbackListPtr *callbacks, pointer data, point
   SendSelectionChangeEvent(selection->selection);
 }
 
+static void vncWriteBlockHandlerFallback(OSTimePtr timeout);
+static void vncWriteWakeupHandlerFallback();
+
 //
 // vncBlockHandler - called just before the X server goes into select().  Call
 // on to the block handler for each desktop.  Then check whether any of the
@@ -297,6 +304,8 @@ static void vncBlockHandler(pointer data, OSTimePtr timeout, pointer readmask)
 {
   fd_set* fds = (fd_set*)readmask;
 
+  vncWriteBlockHandlerFallback(timeout);
+
   for (int scr = 0; scr < screenInfo.numScreens; scr++)
     if (desktop[scr])
       desktop[scr]->blockHandler(fds);
@@ -311,6 +320,85 @@ static void vncWakeupHandler(pointer data, int nfds, pointer readmask)
       desktop[scr]->wakeupHandler(fds, nfds);
     }
   }
+
+  vncWriteWakeupHandlerFallback();
+}
+
+//
+// vncWriteBlockHandler - extra hack to be able to get the main select loop
+// to monitor writeable fds and not just readable. This requirers a modified
+// Xorg and might therefore not be called. When it is called though, it will
+// do so before vncBlockHandler (and vncWriteWakeupHandler called after
+// vncWakeupHandler).
+//
+
+static bool needFallback = true;
+static fd_set fallbackFds;
+static struct timeval tw;
+
+void vncWriteBlockHandler(fd_set *fds)
+{
+  needFallback = false;
+
+  for (int scr = 0; scr < screenInfo.numScreens; scr++)
+    if (desktop[scr])
+      desktop[scr]->writeBlockHandler(fds);
+}
+
+void vncWriteWakeupHandler(int nfds, fd_set *fds)
+{
+  for (int scr = 0; scr < screenInfo.numScreens; scr++) {
+    if (desktop[scr]) {
+      desktop[scr]->writeWakeupHandler(fds, nfds);
+    }
+  }
+}
+
+static void vncWriteBlockHandlerFallback(OSTimePtr timeout)
+{
+  if (!needFallback)
+    return;
+
+  FD_ZERO(&fallbackFds);
+  vncWriteBlockHandler(&fallbackFds);
+  needFallback = true;
+
+  if (!XFD_ANYSET(&fallbackFds))
+    return;
+
+  if ((*timeout == NULL) ||
+      ((*timeout)->tv_sec > 0) || ((*timeout)->tv_usec > 10000)) {
+    tw.tv_sec = 0;
+    tw.tv_usec = 10000;
+    *timeout = &tw;
+  }
+}
+
+static void vncWriteWakeupHandlerFallback()
+{
+  int ret;
+  struct timeval timeout;
+
+  if (!needFallback)
+    return;
+
+  if (!XFD_ANYSET(&fallbackFds))
+    return;
+
+  timeout.tv_sec = 0;
+  timeout.tv_usec = 0;
+
+  ret = select(XFD_SETSIZE, NULL, &fallbackFds, NULL, &timeout);
+  if (ret < 0) {
+    ErrorF("vncWriteWakeupHandlerFallback(): select: %s\n",
+           strerror(errno));
+    return;
+  }
+
+  if (ret == 0)
+    return;
+
+  vncWriteWakeupHandler(ret, &fallbackFds);
 }
 
 static void vncClientStateChange(CallbackListPtr*, pointer, pointer p)
index f9307770ac3ad9bea4ef57e95052218d2170506f..7d8c94be86a69b2b006f256b0d380534d9be473a 100644 (file)
@@ -98,3 +98,61 @@ diff -up xserver/mi/miinitext.c.vnc xserver/mi/miinitext.c
  #ifdef XIDLE
      if (!noXIdleExtension) XIdleExtensionInit();
  #endif
+--- xserver/os/WaitFor.c.orig  2011-10-07 12:57:57.000000000 +0200
++++ xserver/os/WaitFor.c       2011-10-07 13:21:11.000000000 +0200
+@@ -125,6 +125,9 @@
+ static void CheckAllTimers(void);
+ static OsTimerPtr timers = NULL;
++extern void vncWriteBlockHandler(fd_set *fds);
++extern void vncWriteWakeupHandler(int nfds, fd_set *fds);
++
+ /*****************
+  * WaitForSomething:
+  *     Make the server suspend until there is
+@@ -150,6 +153,7 @@
+     INT32 timeout = 0;
+     fd_set clientsReadable;
+     fd_set clientsWritable;
++    fd_set socketsWritable;
+     int curclient;
+     int selecterr;
+     int nready;
+@@ -220,23 +224,29 @@
+       SmartScheduleStopTimer ();
+ #endif
++      FD_ZERO(&socketsWritable);
++      vncWriteBlockHandler(&socketsWritable);
+       BlockHandler((pointer)&wt, (pointer)&LastSelectMask);
+       if (NewOutputPending)
+           FlushAllOutput();
+       /* keep this check close to select() call to minimize race */
+       if (dispatchException)
+           i = -1;
+-      else if (AnyClientsWriteBlocked)
+-      {
+-          XFD_COPYSET(&ClientsWriteBlocked, &clientsWritable);
+-          i = Select (MaxClients, &LastSelectMask, &clientsWritable, NULL, wt);
+-      }
+-      else 
+-      {
+-          i = Select (MaxClients, &LastSelectMask, NULL, NULL, wt);
++      else {
++          if (AnyClientsWriteBlocked)
++              XFD_ORSET(&socketsWritable, &ClientsWriteBlocked, &socketsWritable);
++
++          if (XFD_ANYSET(&socketsWritable)) {
++              i = Select (MaxClients, &LastSelectMask, &socketsWritable, NULL, wt);
++              if (AnyClientsWriteBlocked)
++                  XFD_ANDSET(&clientsWritable, &socketsWritable, &ClientsWriteBlocked);
++          } else {
++              i = Select (MaxClients, &LastSelectMask, NULL, NULL, wt);
++          }
+       }
+       selecterr = GetErrno();
+       WakeupHandler(i, (pointer)&LastSelectMask);
++      vncWriteWakeupHandler(i, &socketsWritable);
+ #ifdef SMART_SCHEDULE
+       SmartScheduleStartTimer ();
+ #endif