Browse Source

Make socket writes non-blockable. This allows the system to more quickly

return back to the Xorg main loop, meaning that things will be more responsive
in the presence of slow VNC clients.


git-svn-id: svn://svn.code.sf.net/p/tigervnc/code/trunk@4735 3789f03b-4d11-0410-bbf8-ca57d06f2519
tags/v1.1.90
Pierre Ossman 12 years ago
parent
commit
4ce51ffc4e

+ 68
- 9
common/rdr/FdOutStream.cxx View File

@@ -50,17 +50,18 @@ using namespace rdr;

enum { DEFAULT_BUF_SIZE = 16384 };

FdOutStream::FdOutStream(int fd_, int timeoutms_, int bufSize_)
: fd(fd_), timeoutms(timeoutms_),
FdOutStream::FdOutStream(int fd_, bool blocking_, int timeoutms_, int bufSize_)
: fd(fd_), blocking(blocking_), timeoutms(timeoutms_),
bufSize(bufSize_ ? bufSize_ : DEFAULT_BUF_SIZE), offset(0)
{
ptr = start = new U8[bufSize];
ptr = start = sentUpTo = new U8[bufSize];
end = start + bufSize;
}

FdOutStream::~FdOutStream()
{
try {
blocking = true;
flush();
} catch (Exception&) {
}
@@ -71,21 +72,56 @@ void FdOutStream::setTimeout(int timeoutms_) {
timeoutms = timeoutms_;
}

void FdOutStream::setBlocking(bool blocking_) {
blocking = blocking_;
}

int FdOutStream::length()
{
return offset + ptr - start;
return offset + ptr - sentUpTo;
}

int FdOutStream::bufferUsage()
{
return ptr - sentUpTo;
}

void FdOutStream::flush()
{
U8* sentUpTo = start;
int timeoutms_;

if (blocking)
timeoutms_ = timeoutms;
else
timeoutms_ = 0;

while (sentUpTo < ptr) {
int n = writeWithTimeout((const void*) sentUpTo, ptr - sentUpTo);
int n = writeWithTimeout((const void*) sentUpTo,
ptr - sentUpTo, timeoutms_);

// Timeout?
if (n == 0) {
// If non-blocking then we're done here
if (!blocking)
break;

// Otherwise try blocking (with possible timeout)
if ((timeoutms_ == 0) && (timeoutms != 0)) {
timeoutms_ = timeoutms;
break;
}

// Proper timeout
throw TimedOut();
}

sentUpTo += n;
offset += n;
}

ptr = start;
// Managed to flush everything?
if (sentUpTo == ptr)
ptr = sentUpTo = start;
}


@@ -94,8 +130,31 @@ int FdOutStream::overrun(int itemSize, int nItems)
if (itemSize > bufSize)
throw Exception("FdOutStream overrun: max itemSize exceeded");

// First try to get rid of the data we have
flush();

// Still not enough space?
if (itemSize > end - ptr) {
// Can we shuffle things around?
// (don't do this if it gains us less than 25%)
if ((sentUpTo - start > bufSize / 4) &&
(itemSize < bufSize - (ptr - sentUpTo))) {
memmove(start, sentUpTo, ptr - sentUpTo);
ptr = start + (ptr - sentUpTo);
sentUpTo = start;
} else {
// Have to get rid of more data, so turn off non-blocking
// for a bit...
bool realBlocking;

realBlocking = blocking;
blocking = true;
flush();
blocking = realBlocking;
}
}

// Can we fit all the items asked for?
if (itemSize * nItems > end - ptr)
nItems = (end - ptr) / itemSize;

@@ -112,7 +171,7 @@ int FdOutStream::overrun(int itemSize, int nItems)
// select() and write() returning EINTR.
//

int FdOutStream::writeWithTimeout(const void* data, int length)
int FdOutStream::writeWithTimeout(const void* data, int length, int timeoutms)
{
int n;

@@ -146,7 +205,7 @@ int FdOutStream::writeWithTimeout(const void* data, int length)

if (n < 0) throw SystemException("select",errno);

if (n == 0) throw TimedOut();
if (n == 0) return 0;

do {
n = ::write(fd, data, length);

+ 7
- 2
common/rdr/FdOutStream.h View File

@@ -31,23 +31,28 @@ namespace rdr {

public:

FdOutStream(int fd, int timeoutms=-1, int bufSize=0);
FdOutStream(int fd, bool blocking=true, int timeoutms=-1, int bufSize=0);
virtual ~FdOutStream();

void setTimeout(int timeoutms);
void setBlocking(bool blocking);
int getFd() { return fd; }

void flush();
int length();

int bufferUsage();

private:
int overrun(int itemSize, int nItems);
int writeWithTimeout(const void* data, int length);
int writeWithTimeout(const void* data, int length, int timeoutms);
int fd;
bool blocking;
int timeoutms;
int bufSize;
int offset;
U8* start;
U8* sentUpTo;
};

}

+ 74
- 0
unix/xserver/hw/vnc/XserverDesktop.cc View File

@@ -581,6 +581,7 @@ void XserverDesktop::wakeupHandler(fd_set* fds, int nfds)
if (FD_ISSET(listener->getFd(), fds)) {
FD_CLR(listener->getFd(), fds);
Socket* sock = listener->accept();
sock->outStream().setBlocking(false);
server->addSocket(sock);
vlog.debug("new client, sock %d",sock->getFd());
}
@@ -590,6 +591,7 @@ void XserverDesktop::wakeupHandler(fd_set* fds, int nfds)
if (FD_ISSET(httpListener->getFd(), fds)) {
FD_CLR(httpListener->getFd(), fds);
Socket* sock = httpListener->accept();
sock->outStream().setBlocking(false);
httpServer->addSocket(sock);
vlog.debug("new http client, sock %d",sock->getFd());
}
@@ -632,6 +634,78 @@ void XserverDesktop::wakeupHandler(fd_set* fds, int nfds)
}
}

void XserverDesktop::writeBlockHandler(fd_set* fds)
{
try {
std::list<Socket*> sockets;
std::list<Socket*>::iterator i;

server->getSockets(&sockets);
for (i = sockets.begin(); i != sockets.end(); i++) {
int fd = (*i)->getFd();
if ((*i)->isShutdown()) {
vlog.debug("client gone, sock %d",fd);
server->removeSocket(*i);
vncClientGone(fd);
delete (*i);
} else {
if ((*i)->outStream().bufferUsage() > 0)
FD_SET(fd, fds);
}
}

if (httpServer) {
httpServer->getSockets(&sockets);
for (i = sockets.begin(); i != sockets.end(); i++) {
int fd = (*i)->getFd();
if ((*i)->isShutdown()) {
vlog.debug("http client gone, sock %d",fd);
httpServer->removeSocket(*i);
delete (*i);
} else {
if ((*i)->outStream().bufferUsage() > 0)
FD_SET(fd, fds);
}
}
}
} catch (rdr::Exception& e) {
vlog.error("XserverDesktop::writeBlockHandler: %s",e.str());
}
}

void XserverDesktop::writeWakeupHandler(fd_set* fds, int nfds)
{
if (nfds < 1)
return;

try {
std::list<Socket*> sockets;
std::list<Socket*>::iterator i;

server->getSockets(&sockets);
for (i = sockets.begin(); i != sockets.end(); i++) {
int fd = (*i)->getFd();
if (FD_ISSET(fd, fds)) {
FD_CLR(fd, fds);
(*i)->outStream().flush();
}
}

if (httpServer) {
httpServer->getSockets(&sockets);
for (i = sockets.begin(); i != sockets.end(); i++) {
int fd = (*i)->getFd();
if (FD_ISSET(fd, fds)) {
FD_CLR(fd, fds);
(*i)->outStream().flush();
}
}
}
} catch (rdr::Exception& e) {
vlog.error("XserverDesktop::writeWakeupHandler: %s",e.str());
}
}

void XserverDesktop::addClient(Socket* sock, bool reverse)
{
vlog.debug("new client, sock %d reverse %d",sock->getFd(),reverse);

+ 2
- 0
unix/xserver/hw/vnc/XserverDesktop.h View File

@@ -72,6 +72,8 @@ public:
void ignoreHooks(bool b) { ignoreHooks_ = b; }
void blockHandler(fd_set* fds);
void wakeupHandler(fd_set* fds, int nfds);
void writeBlockHandler(fd_set* fds);
void writeWakeupHandler(fd_set* fds, int nfds);
void addClient(network::Socket* sock, bool reverse);
void disconnectClients();


+ 88
- 0
unix/xserver/hw/vnc/vncExtInit.cc View File

@@ -21,6 +21,7 @@
#endif

#include <stdio.h>
#include <errno.h>

extern "C" {
#define class c_class
@@ -28,6 +29,7 @@ extern "C" {
#define NEED_EVENTS
#include <X11/X.h>
#include <X11/Xproto.h>
#include <X11/Xpoll.h>
#include "misc.h"
#include "os.h"
#include "dixstruct.h"
@@ -63,6 +65,8 @@ extern "C" {
static void vncResetProc(ExtensionEntry* extEntry);
static void vncBlockHandler(pointer data, OSTimePtr t, pointer readmask);
static void vncWakeupHandler(pointer data, int nfds, pointer readmask);
void vncWriteBlockHandler(fd_set *fds);
void vncWriteWakeupHandler(int nfds, fd_set *fds);
static void vncClientStateChange(CallbackListPtr*, pointer, pointer);
static void SendSelectionChangeEvent(Atom selection);
static int ProcVncExtDispatch(ClientPtr client);
@@ -287,6 +291,9 @@ static void vncSelectionCallback(CallbackListPtr *callbacks, pointer data, point
SendSelectionChangeEvent(selection->selection);
}

static void vncWriteBlockHandlerFallback(OSTimePtr timeout);
static void vncWriteWakeupHandlerFallback();

//
// vncBlockHandler - called just before the X server goes into select(). Call
// on to the block handler for each desktop. Then check whether any of the
@@ -297,6 +304,8 @@ static void vncBlockHandler(pointer data, OSTimePtr timeout, pointer readmask)
{
fd_set* fds = (fd_set*)readmask;

vncWriteBlockHandlerFallback(timeout);

for (int scr = 0; scr < screenInfo.numScreens; scr++)
if (desktop[scr])
desktop[scr]->blockHandler(fds);
@@ -311,6 +320,85 @@ static void vncWakeupHandler(pointer data, int nfds, pointer readmask)
desktop[scr]->wakeupHandler(fds, nfds);
}
}

vncWriteWakeupHandlerFallback();
}

//
// vncWriteBlockHandler - extra hack to be able to get the main select loop
// to monitor writeable fds and not just readable. This requirers a modified
// Xorg and might therefore not be called. When it is called though, it will
// do so before vncBlockHandler (and vncWriteWakeupHandler called after
// vncWakeupHandler).
//

static bool needFallback = true;
static fd_set fallbackFds;
static struct timeval tw;

void vncWriteBlockHandler(fd_set *fds)
{
needFallback = false;

for (int scr = 0; scr < screenInfo.numScreens; scr++)
if (desktop[scr])
desktop[scr]->writeBlockHandler(fds);
}

void vncWriteWakeupHandler(int nfds, fd_set *fds)
{
for (int scr = 0; scr < screenInfo.numScreens; scr++) {
if (desktop[scr]) {
desktop[scr]->writeWakeupHandler(fds, nfds);
}
}
}

static void vncWriteBlockHandlerFallback(OSTimePtr timeout)
{
if (!needFallback)
return;

FD_ZERO(&fallbackFds);
vncWriteBlockHandler(&fallbackFds);
needFallback = true;

if (!XFD_ANYSET(&fallbackFds))
return;

if ((*timeout == NULL) ||
((*timeout)->tv_sec > 0) || ((*timeout)->tv_usec > 10000)) {
tw.tv_sec = 0;
tw.tv_usec = 10000;
*timeout = &tw;
}
}

static void vncWriteWakeupHandlerFallback()
{
int ret;
struct timeval timeout;

if (!needFallback)
return;

if (!XFD_ANYSET(&fallbackFds))
return;

timeout.tv_sec = 0;
timeout.tv_usec = 0;

ret = select(XFD_SETSIZE, NULL, &fallbackFds, NULL, &timeout);
if (ret < 0) {
ErrorF("vncWriteWakeupHandlerFallback(): select: %s\n",
strerror(errno));
return;
}

if (ret == 0)
return;

vncWriteWakeupHandler(ret, &fallbackFds);
}

static void vncClientStateChange(CallbackListPtr*, pointer, pointer p)

+ 58
- 0
unix/xserver15.patch View File

@@ -98,3 +98,61 @@ diff -up xserver/mi/miinitext.c.vnc xserver/mi/miinitext.c
#ifdef XIDLE
if (!noXIdleExtension) XIdleExtensionInit();
#endif
--- xserver/os/WaitFor.c.orig 2011-10-07 12:57:57.000000000 +0200
+++ xserver/os/WaitFor.c 2011-10-07 13:21:11.000000000 +0200
@@ -125,6 +125,9 @@
static void CheckAllTimers(void);
static OsTimerPtr timers = NULL;
+extern void vncWriteBlockHandler(fd_set *fds);
+extern void vncWriteWakeupHandler(int nfds, fd_set *fds);
+
/*****************
* WaitForSomething:
* Make the server suspend until there is
@@ -150,6 +153,7 @@
INT32 timeout = 0;
fd_set clientsReadable;
fd_set clientsWritable;
+ fd_set socketsWritable;
int curclient;
int selecterr;
int nready;
@@ -220,23 +224,29 @@
SmartScheduleStopTimer ();
#endif
+ FD_ZERO(&socketsWritable);
+ vncWriteBlockHandler(&socketsWritable);
BlockHandler((pointer)&wt, (pointer)&LastSelectMask);
if (NewOutputPending)
FlushAllOutput();
/* keep this check close to select() call to minimize race */
if (dispatchException)
i = -1;
- else if (AnyClientsWriteBlocked)
- {
- XFD_COPYSET(&ClientsWriteBlocked, &clientsWritable);
- i = Select (MaxClients, &LastSelectMask, &clientsWritable, NULL, wt);
- }
- else
- {
- i = Select (MaxClients, &LastSelectMask, NULL, NULL, wt);
+ else {
+ if (AnyClientsWriteBlocked)
+ XFD_ORSET(&socketsWritable, &ClientsWriteBlocked, &socketsWritable);
+
+ if (XFD_ANYSET(&socketsWritable)) {
+ i = Select (MaxClients, &LastSelectMask, &socketsWritable, NULL, wt);
+ if (AnyClientsWriteBlocked)
+ XFD_ANDSET(&clientsWritable, &socketsWritable, &ClientsWriteBlocked);
+ } else {
+ i = Select (MaxClients, &LastSelectMask, NULL, NULL, wt);
+ }
}
selecterr = GetErrno();
WakeupHandler(i, (pointer)&LastSelectMask);
+ vncWriteWakeupHandler(i, &socketsWritable);
#ifdef SMART_SCHEDULE
SmartScheduleStartTimer ();
#endif

Loading…
Cancel
Save