diff options
author | Pierre Ossman <ossman@cendio.se> | 2020-05-14 18:49:39 +0200 |
---|---|---|
committer | Pierre Ossman <ossman@cendio.se> | 2020-05-21 12:59:02 +0200 |
commit | ad0f0618fa2ca13d7b916f22eccc5ba3201482cb (patch) | |
tree | 55b84c52f8ab0e7ebe672458471e4577afddc1b9 /win/rfb_win32 | |
parent | c0dac220de0186a879f1f71966a2848000f69a48 (diff) | |
download | tigervnc-ad0f0618fa2ca13d7b916f22eccc5ba3201482cb.tar.gz tigervnc-ad0f0618fa2ca13d7b916f22eccc5ba3201482cb.zip |
Change streams to be asynchronous
Major restructuring of how streams work. Neither input nor output
streams are now blocking. This avoids stalling the rest of the client or
server when a peer is slow or unresponsive.
Note that this puts an extra burden on users of streams to make sure
they are allowed to do their work once the underlying transports are
ready (e.g. monitoring fds).
Diffstat (limited to 'win/rfb_win32')
-rw-r--r-- | win/rfb_win32/SocketManager.cxx | 37 |
1 files changed, 32 insertions, 5 deletions
diff --git a/win/rfb_win32/SocketManager.cxx b/win/rfb_win32/SocketManager.cxx index 0092d94d..393e2191 100644 --- a/win/rfb_win32/SocketManager.cxx +++ b/win/rfb_win32/SocketManager.cxx @@ -170,6 +170,13 @@ int SocketManager::checkTimeouts() { j_next = j; j_next++; if (j->second.sock->isShutdown()) shutdownSocks.push_back(j->second.sock); + else { + long eventMask = FD_READ | FD_CLOSE; + if (j->second.sock->outStream().hasBufferedData()) + eventMask |= FD_WRITE; + if (WSAEventSelect(j->second.sock->getFd(), j->first, eventMask) == SOCKET_ERROR) + throw rdr::SystemException("unable to adjust WSAEventSelect:%u", WSAGetLastError()); + } } std::list<network::Socket*>::iterator k; @@ -213,6 +220,13 @@ void SocketManager::processEvent(HANDLE event) { try { // Process data from an active connection + WSANETWORKEVENTS events; + long eventMask; + + // Fetch why this event notification triggered + if (WSAEnumNetworkEvents(ci.sock->getFd(), event, &events) == SOCKET_ERROR) + throw rdr::SystemException("unable to get WSAEnumNetworkEvents:%u", WSAGetLastError()); + // Cancel event notification for this socket if (WSAEventSelect(ci.sock->getFd(), event, 0) == SOCKET_ERROR) throw rdr::SystemException("unable to disable WSAEventSelect:%u", WSAGetLastError()); @@ -220,16 +234,29 @@ void SocketManager::processEvent(HANDLE event) { // Reset the event object WSAResetEvent(event); + // Call the socket server to process the event - ci.server->processSocketReadEvent(ci.sock); - if (ci.sock->isShutdown()) { - remSocket(ci.sock); - return; + if (events.lNetworkEvents & FD_WRITE) { + ci.server->processSocketWriteEvent(ci.sock); + if (ci.sock->isShutdown()) { + remSocket(ci.sock); + return; + } + } + if (events.lNetworkEvents & (FD_READ | FD_CLOSE)) { + ci.server->processSocketReadEvent(ci.sock); + if (ci.sock->isShutdown()) { + remSocket(ci.sock); + return; + } } // Re-instate the required socket event // If the read event is still valid, the event object gets set here - if (WSAEventSelect(ci.sock->getFd(), event, FD_READ | FD_CLOSE) == SOCKET_ERROR) + eventMask = FD_READ | FD_CLOSE; + if (ci.sock->outStream().hasBufferedData()) + eventMask |= FD_WRITE; + if (WSAEventSelect(ci.sock->getFd(), event, eventMask) == SOCKET_ERROR) throw rdr::SystemException("unable to re-enable WSAEventSelect:%u", WSAGetLastError()); } catch (rdr::Exception& e) { vlog.error("%s", e.str()); |