summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
authorPierre Ossman <ossman@cendio.se>2015-11-12 12:21:58 +0100
committerPierre Ossman <ossman@cendio.se>2015-11-27 11:07:43 +0100
commit504afa27c1128de9287599e67cf01bd06df06908 (patch)
treed6ae146456270ece4f721b512e345249d3cdaf16 /common
parent3da238dee89906d952487fbe0d138dac1f5df3ea (diff)
downloadtigervnc-504afa27c1128de9287599e67cf01bd06df06908.tar.gz
tigervnc-504afa27c1128de9287599e67cf01bd06df06908.zip
Make the decoder multi-threaded
This implements the basic infrastructure for multi-threaded decoding of rects. However there is just one thread reading data and one thread decoding it. More logic is needed to safely decode multiple rects at the same time.
Diffstat (limited to 'common')
-rw-r--r--common/os/CMakeLists.txt4
-rw-r--r--common/rfb/CConnection.cxx8
-rw-r--r--common/rfb/DecodeManager.cxx197
-rw-r--r--common/rfb/DecodeManager.h49
-rw-r--r--common/rfb/Decoder.h9
5 files changed, 258 insertions, 9 deletions
diff --git a/common/os/CMakeLists.txt b/common/os/CMakeLists.txt
index b5749594..7644341a 100644
--- a/common/os/CMakeLists.txt
+++ b/common/os/CMakeLists.txt
@@ -7,5 +7,9 @@ add_library(os STATIC
os.cxx)
if(UNIX)
+ target_link_libraries(os pthread)
+endif()
+
+if(UNIX)
libtool_create_control_file(os)
endif()
diff --git a/common/rfb/CConnection.cxx b/common/rfb/CConnection.cxx
index 2ddfc330..7e9fd310 100644
--- a/common/rfb/CConnection.cxx
+++ b/common/rfb/CConnection.cxx
@@ -64,6 +64,8 @@ void CConnection::setStreams(rdr::InStream* is_, rdr::OutStream* os_)
void CConnection::setFramebuffer(ModifiablePixelBuffer* fb)
{
+ decoder.flush();
+
if ((framebuffer != NULL) && (fb != NULL)) {
Rect rect;
@@ -303,6 +305,8 @@ void CConnection::securityCompleted()
void CConnection::setDesktopSize(int w, int h)
{
+ decoder.flush();
+
CMsgHandler::setDesktopSize(w,h);
}
@@ -311,6 +315,8 @@ void CConnection::setExtendedDesktopSize(unsigned reason,
int w, int h,
const ScreenSet& layout)
{
+ decoder.flush();
+
CMsgHandler::setExtendedDesktopSize(reason, result, w, h, layout);
}
@@ -321,6 +327,8 @@ void CConnection::framebufferUpdateStart()
void CConnection::framebufferUpdateEnd()
{
+ decoder.flush();
+
CMsgHandler::framebufferUpdateEnd();
}
diff --git a/common/rfb/DecodeManager.cxx b/common/rfb/DecodeManager.cxx
index ffae18b3..a444eb7b 100644
--- a/common/rfb/DecodeManager.cxx
+++ b/common/rfb/DecodeManager.cxx
@@ -28,6 +28,8 @@
#include <rdr/Exception.h>
#include <rdr/MemOutStream.h>
+#include <os/Mutex.h>
+
using namespace rfb;
static LogWriter vlog("DecodeManager");
@@ -35,20 +37,54 @@ static LogWriter vlog("DecodeManager");
DecodeManager::DecodeManager(CConnection *conn) :
conn(conn)
{
+ int i;
+
memset(decoders, 0, sizeof(decoders));
- bufferStream = new rdr::MemOutStream();
+
+ queueMutex = new os::Mutex();
+ producerCond = new os::Condition(queueMutex);
+ consumerCond = new os::Condition(queueMutex);
+
+ // Just a single thread for now as we haven't sorted out the
+ // dependencies between rects
+ for (i = 0;i < 1;i++) {
+ // Twice as many possible entries in the queue as there
+ // are worker threads to make sure they don't stall
+ freeBuffers.push_back(new rdr::MemOutStream());
+ freeBuffers.push_back(new rdr::MemOutStream());
+
+ threads.push_back(new DecodeThread(this));
+ }
}
DecodeManager::~DecodeManager()
{
+ while (!threads.empty()) {
+ delete threads.back();
+ threads.pop_back();
+ }
+
+ while (!freeBuffers.empty()) {
+ delete freeBuffers.back();
+ freeBuffers.pop_back();
+ }
+
+ delete consumerCond;
+ delete producerCond;
+ delete queueMutex;
+
for (size_t i = 0; i < sizeof(decoders)/sizeof(decoders[0]); i++)
delete decoders[i];
- delete bufferStream;
}
void DecodeManager::decodeRect(const Rect& r, int encoding,
ModifiablePixelBuffer* pb)
{
+ Decoder *decoder;
+ rdr::MemOutStream *bufferStream;
+
+ QueueEntry *entry;
+
assert(pb != NULL);
if (!Decoder::supported(encoding)) {
@@ -64,10 +100,157 @@ void DecodeManager::decodeRect(const Rect& r, int encoding,
}
}
+ decoder = decoders[encoding];
+
+ // Wait for an available memory buffer
+ queueMutex->lock();
+
+ while (freeBuffers.empty())
+ producerCond->wait();
+
+ // Don't pop the buffer in case we throw an exception
+ // whilst reading
+ bufferStream = freeBuffers.front();
+
+ queueMutex->unlock();
+
+ // Read the rect
bufferStream->clear();
- decoders[encoding]->readRect(r, conn->getInStream(),
- conn->cp, bufferStream);
- decoders[encoding]->decodeRect(r, bufferStream->data(),
- bufferStream->length(),
- conn->cp, pb);
+ decoder->readRect(r, conn->getInStream(), conn->cp, bufferStream);
+
+ // Then try to put it on the queue
+ entry = new QueueEntry;
+
+ entry->active = false;
+ entry->rect = r;
+ entry->encoding = encoding;
+ entry->decoder = decoder;
+ entry->cp = &conn->cp;
+ entry->pb = pb;
+ entry->bufferStream = bufferStream;
+
+ queueMutex->lock();
+
+ // The workers add buffers to the end so it's safe to assume
+ // the front is still the same buffer
+ freeBuffers.pop_front();
+
+ workQueue.push_back(entry);
+
+ // We only put a single entry on the queue so waking a single
+ // thread is sufficient
+ consumerCond->signal();
+
+ queueMutex->unlock();
+}
+
+void DecodeManager::flush()
+{
+ queueMutex->lock();
+
+ while (!workQueue.empty())
+ producerCond->wait();
+
+ queueMutex->unlock();
+}
+
+DecodeManager::DecodeThread::DecodeThread(DecodeManager* manager)
+{
+ this->manager = manager;
+
+ stopRequested = false;
+
+ start();
+}
+
+DecodeManager::DecodeThread::~DecodeThread()
+{
+ stop();
+ wait();
+}
+
+void DecodeManager::DecodeThread::stop()
+{
+ os::AutoMutex a(manager->queueMutex);
+
+ if (!isRunning())
+ return;
+
+ stopRequested = true;
+
+ // We can't wake just this thread, so wake everyone
+ manager->consumerCond->broadcast();
+}
+
+void DecodeManager::DecodeThread::worker()
+{
+ manager->queueMutex->lock();
+
+ while (!stopRequested) {
+ DecodeManager::QueueEntry *entry;
+
+ // Look for an available entry in the work queue
+ entry = findEntry();
+ if (entry == NULL) {
+ // Wait and try again
+ manager->consumerCond->wait();
+ continue;
+ }
+
+ // This is ours now
+ entry->active = true;
+
+ manager->queueMutex->unlock();
+
+ // Do the actual decoding
+ try {
+ entry->decoder->decodeRect(entry->rect, entry->bufferStream->data(),
+ entry->bufferStream->length(),
+ *entry->cp, entry->pb);
+ } catch(...) {
+ // FIXME: Try to get the exception back to the main thread
+ assert(false);
+ }
+
+ manager->queueMutex->lock();
+
+ // Remove the entry from the queue and give back the memory buffer
+ manager->freeBuffers.push_back(entry->bufferStream);
+ manager->workQueue.remove(entry);
+ delete entry;
+
+ // Wake the main thread in case it is waiting for a memory buffer
+ manager->producerCond->signal();
+ // This rect might have been blocking multiple other rects, so
+ // wake up every worker thread
+ if (manager->workQueue.size() > 1)
+ manager->consumerCond->broadcast();
+ }
+
+ manager->queueMutex->unlock();
+}
+
+DecodeManager::QueueEntry* DecodeManager::DecodeThread::findEntry()
+{
+ std::list<DecodeManager::QueueEntry*>::iterator iter;
+
+ if (manager->workQueue.empty())
+ return NULL;
+
+ if (!manager->workQueue.front()->active)
+ return manager->workQueue.front();
+
+ for (iter = manager->workQueue.begin();
+ iter != manager->workQueue.end();
+ ++iter) {
+ // Another thread working on this?
+ if ((*iter)->active)
+ continue;
+
+ // FIXME: check dependencies between rects
+
+ return *iter;
+ }
+
+ return NULL;
}
diff --git a/common/rfb/DecodeManager.h b/common/rfb/DecodeManager.h
index 63a41206..1a974ea4 100644
--- a/common/rfb/DecodeManager.h
+++ b/common/rfb/DecodeManager.h
@@ -19,8 +19,17 @@
#ifndef __RFB_DECODEMANAGER_H__
#define __RFB_DECODEMANAGER_H__
+#include <list>
+
+#include <os/Thread.h>
+
#include <rfb/encodings.h>
+namespace os {
+ class Condition;
+ class Mutex;
+}
+
namespace rdr { class MemOutStream; }
namespace rfb {
@@ -37,10 +46,48 @@ namespace rfb {
void decodeRect(const Rect& r, int encoding,
ModifiablePixelBuffer* pb);
+ void flush();
+
private:
CConnection *conn;
Decoder *decoders[encodingMax+1];
- rdr::MemOutStream *bufferStream;
+
+ struct QueueEntry {
+ bool active;
+ Rect rect;
+ int encoding;
+ Decoder* decoder;
+ const ConnParams* cp;
+ ModifiablePixelBuffer* pb;
+ rdr::MemOutStream* bufferStream;
+ };
+
+ std::list<rdr::MemOutStream*> freeBuffers;
+ std::list<QueueEntry*> workQueue;
+
+ os::Mutex* queueMutex;
+ os::Condition* producerCond;
+ os::Condition* consumerCond;
+
+ private:
+ class DecodeThread : public os::Thread {
+ public:
+ DecodeThread(DecodeManager* manager);
+ ~DecodeThread();
+
+ void stop();
+
+ protected:
+ void worker();
+ DecodeManager::QueueEntry* findEntry();
+
+ private:
+ DecodeManager* manager;
+
+ bool stopRequested;
+ };
+
+ std::list<DecodeThread*> threads;
};
}
diff --git a/common/rfb/Decoder.h b/common/rfb/Decoder.h
index 4195a80b..7286b41d 100644
--- a/common/rfb/Decoder.h
+++ b/common/rfb/Decoder.h
@@ -38,9 +38,15 @@ namespace rfb {
// readRect() transfers data for the given rectangle from the
// InStream to the OutStream, possibly changing it along the way to
- // make it easier to decode.
+ // make it easier to decode. This function will always be called in
+ // a serial manner on the main thread.
virtual void readRect(const Rect& r, rdr::InStream* is,
const ConnParams& cp, rdr::OutStream* os)=0;
+
+ // These functions will be called from any of the worker threads.
+ // A lock will be held whilst these are called so it is safe to
+ // read and update internal state as necessary.
+
// decodeRect() decodes the given rectangle with data from the
// given buffer, onto the ModifiablePixelBuffer. The PixelFormat of
// the PixelBuffer might not match the ConnParams and it is up to
@@ -49,6 +55,7 @@ namespace rfb {
size_t buflen, const ConnParams& cp,
ModifiablePixelBuffer* pb)=0;
+ public:
static bool supported(int encoding);
static Decoder* createDecoder(int encoding);
};