From: Pierre Ossman Date: Tue, 25 Oct 2011 15:13:13 +0000 (+0000) Subject: Make socket writes non-blockable. This allows the system to more quickly X-Git-Tag: v1.1.90~75 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=4ce51ffc4ec5fb060855dbe17579cbb3d9228e61;p=tigervnc.git Make socket writes non-blockable. This allows the system to more quickly return back to the Xorg main loop, meaning that things will be more responsive in the presence of slow VNC clients. git-svn-id: svn://svn.code.sf.net/p/tigervnc/code/trunk@4735 3789f03b-4d11-0410-bbf8-ca57d06f2519 --- diff --git a/common/rdr/FdOutStream.cxx b/common/rdr/FdOutStream.cxx index c84c946f..a6b85e21 100644 --- a/common/rdr/FdOutStream.cxx +++ b/common/rdr/FdOutStream.cxx @@ -50,17 +50,18 @@ using namespace rdr; enum { DEFAULT_BUF_SIZE = 16384 }; -FdOutStream::FdOutStream(int fd_, int timeoutms_, int bufSize_) - : fd(fd_), timeoutms(timeoutms_), +FdOutStream::FdOutStream(int fd_, bool blocking_, int timeoutms_, int bufSize_) + : fd(fd_), blocking(blocking_), timeoutms(timeoutms_), bufSize(bufSize_ ? bufSize_ : DEFAULT_BUF_SIZE), offset(0) { - ptr = start = new U8[bufSize]; + ptr = start = sentUpTo = new U8[bufSize]; end = start + bufSize; } FdOutStream::~FdOutStream() { try { + blocking = true; flush(); } catch (Exception&) { } @@ -71,21 +72,56 @@ void FdOutStream::setTimeout(int timeoutms_) { timeoutms = timeoutms_; } +void FdOutStream::setBlocking(bool blocking_) { + blocking = blocking_; +} + int FdOutStream::length() { - return offset + ptr - start; + return offset + ptr - sentUpTo; +} + +int FdOutStream::bufferUsage() +{ + return ptr - sentUpTo; } void FdOutStream::flush() { - U8* sentUpTo = start; + int timeoutms_; + + if (blocking) + timeoutms_ = timeoutms; + else + timeoutms_ = 0; + while (sentUpTo < ptr) { - int n = writeWithTimeout((const void*) sentUpTo, ptr - sentUpTo); + int n = writeWithTimeout((const void*) sentUpTo, + ptr - sentUpTo, timeoutms_); + + // Timeout? + if (n == 0) { + // If non-blocking then we're done here + if (!blocking) + break; + + // Otherwise try blocking (with possible timeout) + if ((timeoutms_ == 0) && (timeoutms != 0)) { + timeoutms_ = timeoutms; + break; + } + + // Proper timeout + throw TimedOut(); + } + sentUpTo += n; offset += n; } - ptr = start; + // Managed to flush everything? + if (sentUpTo == ptr) + ptr = sentUpTo = start; } @@ -94,8 +130,31 @@ int FdOutStream::overrun(int itemSize, int nItems) if (itemSize > bufSize) throw Exception("FdOutStream overrun: max itemSize exceeded"); + // First try to get rid of the data we have flush(); + // Still not enough space? + if (itemSize > end - ptr) { + // Can we shuffle things around? + // (don't do this if it gains us less than 25%) + if ((sentUpTo - start > bufSize / 4) && + (itemSize < bufSize - (ptr - sentUpTo))) { + memmove(start, sentUpTo, ptr - sentUpTo); + ptr = start + (ptr - sentUpTo); + sentUpTo = start; + } else { + // Have to get rid of more data, so turn off non-blocking + // for a bit... + bool realBlocking; + + realBlocking = blocking; + blocking = true; + flush(); + blocking = realBlocking; + } + } + + // Can we fit all the items asked for? if (itemSize * nItems > end - ptr) nItems = (end - ptr) / itemSize; @@ -112,7 +171,7 @@ int FdOutStream::overrun(int itemSize, int nItems) // select() and write() returning EINTR. // -int FdOutStream::writeWithTimeout(const void* data, int length) +int FdOutStream::writeWithTimeout(const void* data, int length, int timeoutms) { int n; @@ -146,7 +205,7 @@ int FdOutStream::writeWithTimeout(const void* data, int length) if (n < 0) throw SystemException("select",errno); - if (n == 0) throw TimedOut(); + if (n == 0) return 0; do { n = ::write(fd, data, length); diff --git a/common/rdr/FdOutStream.h b/common/rdr/FdOutStream.h index a3e29127..daa88b24 100644 --- a/common/rdr/FdOutStream.h +++ b/common/rdr/FdOutStream.h @@ -31,23 +31,28 @@ namespace rdr { public: - FdOutStream(int fd, int timeoutms=-1, int bufSize=0); + FdOutStream(int fd, bool blocking=true, int timeoutms=-1, int bufSize=0); virtual ~FdOutStream(); void setTimeout(int timeoutms); + void setBlocking(bool blocking); int getFd() { return fd; } void flush(); int length(); + int bufferUsage(); + private: int overrun(int itemSize, int nItems); - int writeWithTimeout(const void* data, int length); + int writeWithTimeout(const void* data, int length, int timeoutms); int fd; + bool blocking; int timeoutms; int bufSize; int offset; U8* start; + U8* sentUpTo; }; } diff --git a/unix/xserver/hw/vnc/XserverDesktop.cc b/unix/xserver/hw/vnc/XserverDesktop.cc index 9afbb49b..9c5b20b5 100644 --- a/unix/xserver/hw/vnc/XserverDesktop.cc +++ b/unix/xserver/hw/vnc/XserverDesktop.cc @@ -581,6 +581,7 @@ void XserverDesktop::wakeupHandler(fd_set* fds, int nfds) if (FD_ISSET(listener->getFd(), fds)) { FD_CLR(listener->getFd(), fds); Socket* sock = listener->accept(); + sock->outStream().setBlocking(false); server->addSocket(sock); vlog.debug("new client, sock %d",sock->getFd()); } @@ -590,6 +591,7 @@ void XserverDesktop::wakeupHandler(fd_set* fds, int nfds) if (FD_ISSET(httpListener->getFd(), fds)) { FD_CLR(httpListener->getFd(), fds); Socket* sock = httpListener->accept(); + sock->outStream().setBlocking(false); httpServer->addSocket(sock); vlog.debug("new http client, sock %d",sock->getFd()); } @@ -632,6 +634,78 @@ void XserverDesktop::wakeupHandler(fd_set* fds, int nfds) } } +void XserverDesktop::writeBlockHandler(fd_set* fds) +{ + try { + std::list sockets; + std::list::iterator i; + + server->getSockets(&sockets); + for (i = sockets.begin(); i != sockets.end(); i++) { + int fd = (*i)->getFd(); + if ((*i)->isShutdown()) { + vlog.debug("client gone, sock %d",fd); + server->removeSocket(*i); + vncClientGone(fd); + delete (*i); + } else { + if ((*i)->outStream().bufferUsage() > 0) + FD_SET(fd, fds); + } + } + + if (httpServer) { + httpServer->getSockets(&sockets); + for (i = sockets.begin(); i != sockets.end(); i++) { + int fd = (*i)->getFd(); + if ((*i)->isShutdown()) { + vlog.debug("http client gone, sock %d",fd); + httpServer->removeSocket(*i); + delete (*i); + } else { + if ((*i)->outStream().bufferUsage() > 0) + FD_SET(fd, fds); + } + } + } + } catch (rdr::Exception& e) { + vlog.error("XserverDesktop::writeBlockHandler: %s",e.str()); + } +} + +void XserverDesktop::writeWakeupHandler(fd_set* fds, int nfds) +{ + if (nfds < 1) + return; + + try { + std::list sockets; + std::list::iterator i; + + server->getSockets(&sockets); + for (i = sockets.begin(); i != sockets.end(); i++) { + int fd = (*i)->getFd(); + if (FD_ISSET(fd, fds)) { + FD_CLR(fd, fds); + (*i)->outStream().flush(); + } + } + + if (httpServer) { + httpServer->getSockets(&sockets); + for (i = sockets.begin(); i != sockets.end(); i++) { + int fd = (*i)->getFd(); + if (FD_ISSET(fd, fds)) { + FD_CLR(fd, fds); + (*i)->outStream().flush(); + } + } + } + } catch (rdr::Exception& e) { + vlog.error("XserverDesktop::writeWakeupHandler: %s",e.str()); + } +} + void XserverDesktop::addClient(Socket* sock, bool reverse) { vlog.debug("new client, sock %d reverse %d",sock->getFd(),reverse); diff --git a/unix/xserver/hw/vnc/XserverDesktop.h b/unix/xserver/hw/vnc/XserverDesktop.h index 1c037053..af365117 100644 --- a/unix/xserver/hw/vnc/XserverDesktop.h +++ b/unix/xserver/hw/vnc/XserverDesktop.h @@ -72,6 +72,8 @@ public: void ignoreHooks(bool b) { ignoreHooks_ = b; } void blockHandler(fd_set* fds); void wakeupHandler(fd_set* fds, int nfds); + void writeBlockHandler(fd_set* fds); + void writeWakeupHandler(fd_set* fds, int nfds); void addClient(network::Socket* sock, bool reverse); void disconnectClients(); diff --git a/unix/xserver/hw/vnc/vncExtInit.cc b/unix/xserver/hw/vnc/vncExtInit.cc index d3cfbe26..baa8fa4e 100644 --- a/unix/xserver/hw/vnc/vncExtInit.cc +++ b/unix/xserver/hw/vnc/vncExtInit.cc @@ -21,6 +21,7 @@ #endif #include +#include extern "C" { #define class c_class @@ -28,6 +29,7 @@ extern "C" { #define NEED_EVENTS #include #include +#include #include "misc.h" #include "os.h" #include "dixstruct.h" @@ -63,6 +65,8 @@ extern "C" { static void vncResetProc(ExtensionEntry* extEntry); static void vncBlockHandler(pointer data, OSTimePtr t, pointer readmask); static void vncWakeupHandler(pointer data, int nfds, pointer readmask); + void vncWriteBlockHandler(fd_set *fds); + void vncWriteWakeupHandler(int nfds, fd_set *fds); static void vncClientStateChange(CallbackListPtr*, pointer, pointer); static void SendSelectionChangeEvent(Atom selection); static int ProcVncExtDispatch(ClientPtr client); @@ -287,6 +291,9 @@ static void vncSelectionCallback(CallbackListPtr *callbacks, pointer data, point SendSelectionChangeEvent(selection->selection); } +static void vncWriteBlockHandlerFallback(OSTimePtr timeout); +static void vncWriteWakeupHandlerFallback(); + // // vncBlockHandler - called just before the X server goes into select(). Call // on to the block handler for each desktop. Then check whether any of the @@ -297,6 +304,8 @@ static void vncBlockHandler(pointer data, OSTimePtr timeout, pointer readmask) { fd_set* fds = (fd_set*)readmask; + vncWriteBlockHandlerFallback(timeout); + for (int scr = 0; scr < screenInfo.numScreens; scr++) if (desktop[scr]) desktop[scr]->blockHandler(fds); @@ -311,6 +320,85 @@ static void vncWakeupHandler(pointer data, int nfds, pointer readmask) desktop[scr]->wakeupHandler(fds, nfds); } } + + vncWriteWakeupHandlerFallback(); +} + +// +// vncWriteBlockHandler - extra hack to be able to get the main select loop +// to monitor writeable fds and not just readable. This requirers a modified +// Xorg and might therefore not be called. When it is called though, it will +// do so before vncBlockHandler (and vncWriteWakeupHandler called after +// vncWakeupHandler). +// + +static bool needFallback = true; +static fd_set fallbackFds; +static struct timeval tw; + +void vncWriteBlockHandler(fd_set *fds) +{ + needFallback = false; + + for (int scr = 0; scr < screenInfo.numScreens; scr++) + if (desktop[scr]) + desktop[scr]->writeBlockHandler(fds); +} + +void vncWriteWakeupHandler(int nfds, fd_set *fds) +{ + for (int scr = 0; scr < screenInfo.numScreens; scr++) { + if (desktop[scr]) { + desktop[scr]->writeWakeupHandler(fds, nfds); + } + } +} + +static void vncWriteBlockHandlerFallback(OSTimePtr timeout) +{ + if (!needFallback) + return; + + FD_ZERO(&fallbackFds); + vncWriteBlockHandler(&fallbackFds); + needFallback = true; + + if (!XFD_ANYSET(&fallbackFds)) + return; + + if ((*timeout == NULL) || + ((*timeout)->tv_sec > 0) || ((*timeout)->tv_usec > 10000)) { + tw.tv_sec = 0; + tw.tv_usec = 10000; + *timeout = &tw; + } +} + +static void vncWriteWakeupHandlerFallback() +{ + int ret; + struct timeval timeout; + + if (!needFallback) + return; + + if (!XFD_ANYSET(&fallbackFds)) + return; + + timeout.tv_sec = 0; + timeout.tv_usec = 0; + + ret = select(XFD_SETSIZE, NULL, &fallbackFds, NULL, &timeout); + if (ret < 0) { + ErrorF("vncWriteWakeupHandlerFallback(): select: %s\n", + strerror(errno)); + return; + } + + if (ret == 0) + return; + + vncWriteWakeupHandler(ret, &fallbackFds); } static void vncClientStateChange(CallbackListPtr*, pointer, pointer p) diff --git a/unix/xserver15.patch b/unix/xserver15.patch index f9307770..7d8c94be 100644 --- a/unix/xserver15.patch +++ b/unix/xserver15.patch @@ -98,3 +98,61 @@ diff -up xserver/mi/miinitext.c.vnc xserver/mi/miinitext.c #ifdef XIDLE if (!noXIdleExtension) XIdleExtensionInit(); #endif +--- xserver/os/WaitFor.c.orig 2011-10-07 12:57:57.000000000 +0200 ++++ xserver/os/WaitFor.c 2011-10-07 13:21:11.000000000 +0200 +@@ -125,6 +125,9 @@ + static void CheckAllTimers(void); + static OsTimerPtr timers = NULL; + ++extern void vncWriteBlockHandler(fd_set *fds); ++extern void vncWriteWakeupHandler(int nfds, fd_set *fds); ++ + /***************** + * WaitForSomething: + * Make the server suspend until there is +@@ -150,6 +153,7 @@ + INT32 timeout = 0; + fd_set clientsReadable; + fd_set clientsWritable; ++ fd_set socketsWritable; + int curclient; + int selecterr; + int nready; +@@ -220,23 +224,29 @@ + SmartScheduleStopTimer (); + + #endif ++ FD_ZERO(&socketsWritable); ++ vncWriteBlockHandler(&socketsWritable); + BlockHandler((pointer)&wt, (pointer)&LastSelectMask); + if (NewOutputPending) + FlushAllOutput(); + /* keep this check close to select() call to minimize race */ + if (dispatchException) + i = -1; +- else if (AnyClientsWriteBlocked) +- { +- XFD_COPYSET(&ClientsWriteBlocked, &clientsWritable); +- i = Select (MaxClients, &LastSelectMask, &clientsWritable, NULL, wt); +- } +- else +- { +- i = Select (MaxClients, &LastSelectMask, NULL, NULL, wt); ++ else { ++ if (AnyClientsWriteBlocked) ++ XFD_ORSET(&socketsWritable, &ClientsWriteBlocked, &socketsWritable); ++ ++ if (XFD_ANYSET(&socketsWritable)) { ++ i = Select (MaxClients, &LastSelectMask, &socketsWritable, NULL, wt); ++ if (AnyClientsWriteBlocked) ++ XFD_ANDSET(&clientsWritable, &socketsWritable, &ClientsWriteBlocked); ++ } else { ++ i = Select (MaxClients, &LastSelectMask, NULL, NULL, wt); ++ } + } + selecterr = GetErrno(); + WakeupHandler(i, (pointer)&LastSelectMask); ++ vncWriteWakeupHandler(i, &socketsWritable); + #ifdef SMART_SCHEDULE + SmartScheduleStartTimer (); + #endif