aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/rdr/FdOutStream.cxx77
-rw-r--r--common/rdr/FdOutStream.h9
-rw-r--r--unix/xserver/hw/vnc/XserverDesktop.cc74
-rw-r--r--unix/xserver/hw/vnc/XserverDesktop.h2
-rw-r--r--unix/xserver/hw/vnc/vncExtInit.cc88
-rw-r--r--unix/xserver15.patch58
6 files changed, 297 insertions, 11 deletions
diff --git a/common/rdr/FdOutStream.cxx b/common/rdr/FdOutStream.cxx
index c84c946f..a6b85e21 100644
--- a/common/rdr/FdOutStream.cxx
+++ b/common/rdr/FdOutStream.cxx
@@ -50,17 +50,18 @@ using namespace rdr;
enum { DEFAULT_BUF_SIZE = 16384 };
-FdOutStream::FdOutStream(int fd_, int timeoutms_, int bufSize_)
- : fd(fd_), timeoutms(timeoutms_),
+FdOutStream::FdOutStream(int fd_, bool blocking_, int timeoutms_, int bufSize_)
+ : fd(fd_), blocking(blocking_), timeoutms(timeoutms_),
bufSize(bufSize_ ? bufSize_ : DEFAULT_BUF_SIZE), offset(0)
{
- ptr = start = new U8[bufSize];
+ ptr = start = sentUpTo = new U8[bufSize];
end = start + bufSize;
}
FdOutStream::~FdOutStream()
{
try {
+ blocking = true;
flush();
} catch (Exception&) {
}
@@ -71,21 +72,56 @@ void FdOutStream::setTimeout(int timeoutms_) {
timeoutms = timeoutms_;
}
+void FdOutStream::setBlocking(bool blocking_) {
+ blocking = blocking_;
+}
+
int FdOutStream::length()
{
- return offset + ptr - start;
+ return offset + ptr - sentUpTo;
+}
+
+int FdOutStream::bufferUsage()
+{
+ return ptr - sentUpTo;
}
void FdOutStream::flush()
{
- U8* sentUpTo = start;
+ int timeoutms_;
+
+ if (blocking)
+ timeoutms_ = timeoutms;
+ else
+ timeoutms_ = 0;
+
while (sentUpTo < ptr) {
- int n = writeWithTimeout((const void*) sentUpTo, ptr - sentUpTo);
+ int n = writeWithTimeout((const void*) sentUpTo,
+ ptr - sentUpTo, timeoutms_);
+
+ // Timeout?
+ if (n == 0) {
+ // If non-blocking then we're done here
+ if (!blocking)
+ break;
+
+ // Otherwise try blocking (with possible timeout)
+ if ((timeoutms_ == 0) && (timeoutms != 0)) {
+ timeoutms_ = timeoutms;
+ break;
+ }
+
+ // Proper timeout
+ throw TimedOut();
+ }
+
sentUpTo += n;
offset += n;
}
- ptr = start;
+ // Managed to flush everything?
+ if (sentUpTo == ptr)
+ ptr = sentUpTo = start;
}
@@ -94,8 +130,31 @@ int FdOutStream::overrun(int itemSize, int nItems)
if (itemSize > bufSize)
throw Exception("FdOutStream overrun: max itemSize exceeded");
+ // First try to get rid of the data we have
flush();
+ // Still not enough space?
+ if (itemSize > end - ptr) {
+ // Can we shuffle things around?
+ // (don't do this if it gains us less than 25%)
+ if ((sentUpTo - start > bufSize / 4) &&
+ (itemSize < bufSize - (ptr - sentUpTo))) {
+ memmove(start, sentUpTo, ptr - sentUpTo);
+ ptr = start + (ptr - sentUpTo);
+ sentUpTo = start;
+ } else {
+ // Have to get rid of more data, so turn off non-blocking
+ // for a bit...
+ bool realBlocking;
+
+ realBlocking = blocking;
+ blocking = true;
+ flush();
+ blocking = realBlocking;
+ }
+ }
+
+ // Can we fit all the items asked for?
if (itemSize * nItems > end - ptr)
nItems = (end - ptr) / itemSize;
@@ -112,7 +171,7 @@ int FdOutStream::overrun(int itemSize, int nItems)
// select() and write() returning EINTR.
//
-int FdOutStream::writeWithTimeout(const void* data, int length)
+int FdOutStream::writeWithTimeout(const void* data, int length, int timeoutms)
{
int n;
@@ -146,7 +205,7 @@ int FdOutStream::writeWithTimeout(const void* data, int length)
if (n < 0) throw SystemException("select",errno);
- if (n == 0) throw TimedOut();
+ if (n == 0) return 0;
do {
n = ::write(fd, data, length);
diff --git a/common/rdr/FdOutStream.h b/common/rdr/FdOutStream.h
index a3e29127..daa88b24 100644
--- a/common/rdr/FdOutStream.h
+++ b/common/rdr/FdOutStream.h
@@ -31,23 +31,28 @@ namespace rdr {
public:
- FdOutStream(int fd, int timeoutms=-1, int bufSize=0);
+ FdOutStream(int fd, bool blocking=true, int timeoutms=-1, int bufSize=0);
virtual ~FdOutStream();
void setTimeout(int timeoutms);
+ void setBlocking(bool blocking);
int getFd() { return fd; }
void flush();
int length();
+ int bufferUsage();
+
private:
int overrun(int itemSize, int nItems);
- int writeWithTimeout(const void* data, int length);
+ int writeWithTimeout(const void* data, int length, int timeoutms);
int fd;
+ bool blocking;
int timeoutms;
int bufSize;
int offset;
U8* start;
+ U8* sentUpTo;
};
}
diff --git a/unix/xserver/hw/vnc/XserverDesktop.cc b/unix/xserver/hw/vnc/XserverDesktop.cc
index 9afbb49b..9c5b20b5 100644
--- a/unix/xserver/hw/vnc/XserverDesktop.cc
+++ b/unix/xserver/hw/vnc/XserverDesktop.cc
@@ -581,6 +581,7 @@ void XserverDesktop::wakeupHandler(fd_set* fds, int nfds)
if (FD_ISSET(listener->getFd(), fds)) {
FD_CLR(listener->getFd(), fds);
Socket* sock = listener->accept();
+ sock->outStream().setBlocking(false);
server->addSocket(sock);
vlog.debug("new client, sock %d",sock->getFd());
}
@@ -590,6 +591,7 @@ void XserverDesktop::wakeupHandler(fd_set* fds, int nfds)
if (FD_ISSET(httpListener->getFd(), fds)) {
FD_CLR(httpListener->getFd(), fds);
Socket* sock = httpListener->accept();
+ sock->outStream().setBlocking(false);
httpServer->addSocket(sock);
vlog.debug("new http client, sock %d",sock->getFd());
}
@@ -632,6 +634,78 @@ void XserverDesktop::wakeupHandler(fd_set* fds, int nfds)
}
}
+void XserverDesktop::writeBlockHandler(fd_set* fds)
+{
+ try {
+ std::list<Socket*> sockets;
+ std::list<Socket*>::iterator i;
+
+ server->getSockets(&sockets);
+ for (i = sockets.begin(); i != sockets.end(); i++) {
+ int fd = (*i)->getFd();
+ if ((*i)->isShutdown()) {
+ vlog.debug("client gone, sock %d",fd);
+ server->removeSocket(*i);
+ vncClientGone(fd);
+ delete (*i);
+ } else {
+ if ((*i)->outStream().bufferUsage() > 0)
+ FD_SET(fd, fds);
+ }
+ }
+
+ if (httpServer) {
+ httpServer->getSockets(&sockets);
+ for (i = sockets.begin(); i != sockets.end(); i++) {
+ int fd = (*i)->getFd();
+ if ((*i)->isShutdown()) {
+ vlog.debug("http client gone, sock %d",fd);
+ httpServer->removeSocket(*i);
+ delete (*i);
+ } else {
+ if ((*i)->outStream().bufferUsage() > 0)
+ FD_SET(fd, fds);
+ }
+ }
+ }
+ } catch (rdr::Exception& e) {
+ vlog.error("XserverDesktop::writeBlockHandler: %s",e.str());
+ }
+}
+
+void XserverDesktop::writeWakeupHandler(fd_set* fds, int nfds)
+{
+ if (nfds < 1)
+ return;
+
+ try {
+ std::list<Socket*> sockets;
+ std::list<Socket*>::iterator i;
+
+ server->getSockets(&sockets);
+ for (i = sockets.begin(); i != sockets.end(); i++) {
+ int fd = (*i)->getFd();
+ if (FD_ISSET(fd, fds)) {
+ FD_CLR(fd, fds);
+ (*i)->outStream().flush();
+ }
+ }
+
+ if (httpServer) {
+ httpServer->getSockets(&sockets);
+ for (i = sockets.begin(); i != sockets.end(); i++) {
+ int fd = (*i)->getFd();
+ if (FD_ISSET(fd, fds)) {
+ FD_CLR(fd, fds);
+ (*i)->outStream().flush();
+ }
+ }
+ }
+ } catch (rdr::Exception& e) {
+ vlog.error("XserverDesktop::writeWakeupHandler: %s",e.str());
+ }
+}
+
void XserverDesktop::addClient(Socket* sock, bool reverse)
{
vlog.debug("new client, sock %d reverse %d",sock->getFd(),reverse);
diff --git a/unix/xserver/hw/vnc/XserverDesktop.h b/unix/xserver/hw/vnc/XserverDesktop.h
index 1c037053..af365117 100644
--- a/unix/xserver/hw/vnc/XserverDesktop.h
+++ b/unix/xserver/hw/vnc/XserverDesktop.h
@@ -72,6 +72,8 @@ public:
void ignoreHooks(bool b) { ignoreHooks_ = b; }
void blockHandler(fd_set* fds);
void wakeupHandler(fd_set* fds, int nfds);
+ void writeBlockHandler(fd_set* fds);
+ void writeWakeupHandler(fd_set* fds, int nfds);
void addClient(network::Socket* sock, bool reverse);
void disconnectClients();
diff --git a/unix/xserver/hw/vnc/vncExtInit.cc b/unix/xserver/hw/vnc/vncExtInit.cc
index d3cfbe26..baa8fa4e 100644
--- a/unix/xserver/hw/vnc/vncExtInit.cc
+++ b/unix/xserver/hw/vnc/vncExtInit.cc
@@ -21,6 +21,7 @@
#endif
#include <stdio.h>
+#include <errno.h>
extern "C" {
#define class c_class
@@ -28,6 +29,7 @@ extern "C" {
#define NEED_EVENTS
#include <X11/X.h>
#include <X11/Xproto.h>
+#include <X11/Xpoll.h>
#include "misc.h"
#include "os.h"
#include "dixstruct.h"
@@ -63,6 +65,8 @@ extern "C" {
static void vncResetProc(ExtensionEntry* extEntry);
static void vncBlockHandler(pointer data, OSTimePtr t, pointer readmask);
static void vncWakeupHandler(pointer data, int nfds, pointer readmask);
+ void vncWriteBlockHandler(fd_set *fds);
+ void vncWriteWakeupHandler(int nfds, fd_set *fds);
static void vncClientStateChange(CallbackListPtr*, pointer, pointer);
static void SendSelectionChangeEvent(Atom selection);
static int ProcVncExtDispatch(ClientPtr client);
@@ -287,6 +291,9 @@ static void vncSelectionCallback(CallbackListPtr *callbacks, pointer data, point
SendSelectionChangeEvent(selection->selection);
}
+static void vncWriteBlockHandlerFallback(OSTimePtr timeout);
+static void vncWriteWakeupHandlerFallback();
+
//
// vncBlockHandler - called just before the X server goes into select(). Call
// on to the block handler for each desktop. Then check whether any of the
@@ -297,6 +304,8 @@ static void vncBlockHandler(pointer data, OSTimePtr timeout, pointer readmask)
{
fd_set* fds = (fd_set*)readmask;
+ vncWriteBlockHandlerFallback(timeout);
+
for (int scr = 0; scr < screenInfo.numScreens; scr++)
if (desktop[scr])
desktop[scr]->blockHandler(fds);
@@ -311,6 +320,85 @@ static void vncWakeupHandler(pointer data, int nfds, pointer readmask)
desktop[scr]->wakeupHandler(fds, nfds);
}
}
+
+ vncWriteWakeupHandlerFallback();
+}
+
+//
+// vncWriteBlockHandler - extra hack to be able to get the main select loop
+// to monitor writeable fds and not just readable. This requirers a modified
+// Xorg and might therefore not be called. When it is called though, it will
+// do so before vncBlockHandler (and vncWriteWakeupHandler called after
+// vncWakeupHandler).
+//
+
+static bool needFallback = true;
+static fd_set fallbackFds;
+static struct timeval tw;
+
+void vncWriteBlockHandler(fd_set *fds)
+{
+ needFallback = false;
+
+ for (int scr = 0; scr < screenInfo.numScreens; scr++)
+ if (desktop[scr])
+ desktop[scr]->writeBlockHandler(fds);
+}
+
+void vncWriteWakeupHandler(int nfds, fd_set *fds)
+{
+ for (int scr = 0; scr < screenInfo.numScreens; scr++) {
+ if (desktop[scr]) {
+ desktop[scr]->writeWakeupHandler(fds, nfds);
+ }
+ }
+}
+
+static void vncWriteBlockHandlerFallback(OSTimePtr timeout)
+{
+ if (!needFallback)
+ return;
+
+ FD_ZERO(&fallbackFds);
+ vncWriteBlockHandler(&fallbackFds);
+ needFallback = true;
+
+ if (!XFD_ANYSET(&fallbackFds))
+ return;
+
+ if ((*timeout == NULL) ||
+ ((*timeout)->tv_sec > 0) || ((*timeout)->tv_usec > 10000)) {
+ tw.tv_sec = 0;
+ tw.tv_usec = 10000;
+ *timeout = &tw;
+ }
+}
+
+static void vncWriteWakeupHandlerFallback()
+{
+ int ret;
+ struct timeval timeout;
+
+ if (!needFallback)
+ return;
+
+ if (!XFD_ANYSET(&fallbackFds))
+ return;
+
+ timeout.tv_sec = 0;
+ timeout.tv_usec = 0;
+
+ ret = select(XFD_SETSIZE, NULL, &fallbackFds, NULL, &timeout);
+ if (ret < 0) {
+ ErrorF("vncWriteWakeupHandlerFallback(): select: %s\n",
+ strerror(errno));
+ return;
+ }
+
+ if (ret == 0)
+ return;
+
+ vncWriteWakeupHandler(ret, &fallbackFds);
}
static void vncClientStateChange(CallbackListPtr*, pointer, pointer p)
diff --git a/unix/xserver15.patch b/unix/xserver15.patch
index f9307770..7d8c94be 100644
--- a/unix/xserver15.patch
+++ b/unix/xserver15.patch
@@ -98,3 +98,61 @@ diff -up xserver/mi/miinitext.c.vnc xserver/mi/miinitext.c
#ifdef XIDLE
if (!noXIdleExtension) XIdleExtensionInit();
#endif
+--- xserver/os/WaitFor.c.orig 2011-10-07 12:57:57.000000000 +0200
++++ xserver/os/WaitFor.c 2011-10-07 13:21:11.000000000 +0200
+@@ -125,6 +125,9 @@
+ static void CheckAllTimers(void);
+ static OsTimerPtr timers = NULL;
+
++extern void vncWriteBlockHandler(fd_set *fds);
++extern void vncWriteWakeupHandler(int nfds, fd_set *fds);
++
+ /*****************
+ * WaitForSomething:
+ * Make the server suspend until there is
+@@ -150,6 +153,7 @@
+ INT32 timeout = 0;
+ fd_set clientsReadable;
+ fd_set clientsWritable;
++ fd_set socketsWritable;
+ int curclient;
+ int selecterr;
+ int nready;
+@@ -220,23 +224,29 @@
+ SmartScheduleStopTimer ();
+
+ #endif
++ FD_ZERO(&socketsWritable);
++ vncWriteBlockHandler(&socketsWritable);
+ BlockHandler((pointer)&wt, (pointer)&LastSelectMask);
+ if (NewOutputPending)
+ FlushAllOutput();
+ /* keep this check close to select() call to minimize race */
+ if (dispatchException)
+ i = -1;
+- else if (AnyClientsWriteBlocked)
+- {
+- XFD_COPYSET(&ClientsWriteBlocked, &clientsWritable);
+- i = Select (MaxClients, &LastSelectMask, &clientsWritable, NULL, wt);
+- }
+- else
+- {
+- i = Select (MaxClients, &LastSelectMask, NULL, NULL, wt);
++ else {
++ if (AnyClientsWriteBlocked)
++ XFD_ORSET(&socketsWritable, &ClientsWriteBlocked, &socketsWritable);
++
++ if (XFD_ANYSET(&socketsWritable)) {
++ i = Select (MaxClients, &LastSelectMask, &socketsWritable, NULL, wt);
++ if (AnyClientsWriteBlocked)
++ XFD_ANDSET(&clientsWritable, &socketsWritable, &ClientsWriteBlocked);
++ } else {
++ i = Select (MaxClients, &LastSelectMask, NULL, NULL, wt);
++ }
+ }
+ selecterr = GetErrno();
+ WakeupHandler(i, (pointer)&LastSelectMask);
++ vncWriteWakeupHandler(i, &socketsWritable);
+ #ifdef SMART_SCHEDULE
+ SmartScheduleStartTimer ();
+ #endif