]> source.dussan.org Git - tigervnc.git/commitdiff
Make the decoder multi-threaded
authorPierre Ossman <ossman@cendio.se>
Thu, 12 Nov 2015 11:21:58 +0000 (12:21 +0100)
committerPierre Ossman <ossman@cendio.se>
Fri, 27 Nov 2015 10:07:43 +0000 (11:07 +0100)
This implements the basic infrastructure for multi-threaded
decoding of rects. However there is just one thread reading data
and one thread decoding it. More logic is needed to safely decode
multiple rects at the same time.

common/os/CMakeLists.txt
common/rfb/CConnection.cxx
common/rfb/DecodeManager.cxx
common/rfb/DecodeManager.h
common/rfb/Decoder.h

index b574959468b44f84d1af4426534f6142f4b80467..7644341a1305501fac61665e84a07dc7abfab259 100644 (file)
@@ -6,6 +6,10 @@ add_library(os STATIC
   w32tiger.c
   os.cxx)
 
+if(UNIX)
+  target_link_libraries(os pthread)
+endif()
+
 if(UNIX)
   libtool_create_control_file(os)
 endif()
index 2ddfc3304f6468da82969608bd5d7b450eea005d..7e9fd3104ee4ea43b9588541efd53d4b5772d417 100644 (file)
@@ -64,6 +64,8 @@ void CConnection::setStreams(rdr::InStream* is_, rdr::OutStream* os_)
 
 void CConnection::setFramebuffer(ModifiablePixelBuffer* fb)
 {
+  decoder.flush();
+
   if ((framebuffer != NULL) && (fb != NULL)) {
     Rect rect;
 
@@ -303,6 +305,8 @@ void CConnection::securityCompleted()
 
 void CConnection::setDesktopSize(int w, int h)
 {
+  decoder.flush();
+
   CMsgHandler::setDesktopSize(w,h);
 }
 
@@ -311,6 +315,8 @@ void CConnection::setExtendedDesktopSize(unsigned reason,
                                          int w, int h,
                                          const ScreenSet& layout)
 {
+  decoder.flush();
+
   CMsgHandler::setExtendedDesktopSize(reason, result, w, h, layout);
 }
 
@@ -321,6 +327,8 @@ void CConnection::framebufferUpdateStart()
 
 void CConnection::framebufferUpdateEnd()
 {
+  decoder.flush();
+
   CMsgHandler::framebufferUpdateEnd();
 }
 
index ffae18b31ccb0892add19107ee13088c8655c427..a444eb7b1500d15719c51e34abc7fb6b1d31bc10 100644 (file)
@@ -28,6 +28,8 @@
 #include <rdr/Exception.h>
 #include <rdr/MemOutStream.h>
 
+#include <os/Mutex.h>
+
 using namespace rfb;
 
 static LogWriter vlog("DecodeManager");
@@ -35,20 +37,54 @@ static LogWriter vlog("DecodeManager");
 DecodeManager::DecodeManager(CConnection *conn) :
   conn(conn)
 {
+  int i;
+
   memset(decoders, 0, sizeof(decoders));
-  bufferStream = new rdr::MemOutStream();
+
+  queueMutex = new os::Mutex();
+  producerCond = new os::Condition(queueMutex);
+  consumerCond = new os::Condition(queueMutex);
+
+  // Just a single thread for now as we haven't sorted out the
+  // dependencies between rects
+  for (i = 0;i < 1;i++) {
+    // Twice as many possible entries in the queue as there
+    // are worker threads to make sure they don't stall
+    freeBuffers.push_back(new rdr::MemOutStream());
+    freeBuffers.push_back(new rdr::MemOutStream());
+
+    threads.push_back(new DecodeThread(this));
+  }
 }
 
 DecodeManager::~DecodeManager()
 {
+  while (!threads.empty()) {
+    delete threads.back();
+    threads.pop_back();
+  }
+
+  while (!freeBuffers.empty()) {
+    delete freeBuffers.back();
+    freeBuffers.pop_back();
+  }
+
+  delete consumerCond;
+  delete producerCond;
+  delete queueMutex;
+
   for (size_t i = 0; i < sizeof(decoders)/sizeof(decoders[0]); i++)
     delete decoders[i];
-  delete bufferStream;
 }
 
 void DecodeManager::decodeRect(const Rect& r, int encoding,
                                ModifiablePixelBuffer* pb)
 {
+  Decoder *decoder;
+  rdr::MemOutStream *bufferStream;
+
+  QueueEntry *entry;
+
   assert(pb != NULL);
 
   if (!Decoder::supported(encoding)) {
@@ -64,10 +100,157 @@ void DecodeManager::decodeRect(const Rect& r, int encoding,
     }
   }
 
+  decoder = decoders[encoding];
+
+  // Wait for an available memory buffer
+  queueMutex->lock();
+
+  while (freeBuffers.empty())
+    producerCond->wait();
+
+  // Don't pop the buffer in case we throw an exception
+  // whilst reading
+  bufferStream = freeBuffers.front();
+
+  queueMutex->unlock();
+
+  // Read the rect
   bufferStream->clear();
-  decoders[encoding]->readRect(r, conn->getInStream(),
-                               conn->cp, bufferStream);
-  decoders[encoding]->decodeRect(r, bufferStream->data(),
-                                 bufferStream->length(),
-                                 conn->cp, pb);
+  decoder->readRect(r, conn->getInStream(), conn->cp, bufferStream);
+
+  // Then try to put it on the queue
+  entry = new QueueEntry;
+
+  entry->active = false;
+  entry->rect = r;
+  entry->encoding = encoding;
+  entry->decoder = decoder;
+  entry->cp = &conn->cp;
+  entry->pb = pb;
+  entry->bufferStream = bufferStream;
+
+  queueMutex->lock();
+
+  // The workers add buffers to the end so it's safe to assume
+  // the front is still the same buffer
+  freeBuffers.pop_front();
+
+  workQueue.push_back(entry);
+
+  // We only put a single entry on the queue so waking a single
+  // thread is sufficient
+  consumerCond->signal();
+
+  queueMutex->unlock();
+}
+
+void DecodeManager::flush()
+{
+  queueMutex->lock();
+
+  while (!workQueue.empty())
+    producerCond->wait();
+
+  queueMutex->unlock();
+}
+
+DecodeManager::DecodeThread::DecodeThread(DecodeManager* manager)
+{
+  this->manager = manager;
+
+  stopRequested = false;
+
+  start();
+}
+
+DecodeManager::DecodeThread::~DecodeThread()
+{
+  stop();
+  wait();
+}
+
+void DecodeManager::DecodeThread::stop()
+{
+  os::AutoMutex a(manager->queueMutex);
+
+  if (!isRunning())
+    return;
+
+  stopRequested = true;
+
+  // We can't wake just this thread, so wake everyone
+  manager->consumerCond->broadcast();
+}
+
+void DecodeManager::DecodeThread::worker()
+{
+  manager->queueMutex->lock();
+
+  while (!stopRequested) {
+    DecodeManager::QueueEntry *entry;
+
+    // Look for an available entry in the work queue
+    entry = findEntry();
+    if (entry == NULL) {
+      // Wait and try again
+      manager->consumerCond->wait();
+      continue;
+    }
+
+    // This is ours now
+    entry->active = true;
+
+    manager->queueMutex->unlock();
+
+    // Do the actual decoding
+    try {
+      entry->decoder->decodeRect(entry->rect, entry->bufferStream->data(),
+                                 entry->bufferStream->length(),
+                                 *entry->cp, entry->pb);
+    } catch(...) {
+      // FIXME: Try to get the exception back to the main thread
+      assert(false);
+    }
+
+    manager->queueMutex->lock();
+
+    // Remove the entry from the queue and give back the memory buffer
+    manager->freeBuffers.push_back(entry->bufferStream);
+    manager->workQueue.remove(entry);
+    delete entry;
+
+    // Wake the main thread in case it is waiting for a memory buffer
+    manager->producerCond->signal();
+    // This rect might have been blocking multiple other rects, so
+    // wake up every worker thread
+    if (manager->workQueue.size() > 1)
+      manager->consumerCond->broadcast();
+  }
+
+  manager->queueMutex->unlock();
+}
+
+DecodeManager::QueueEntry* DecodeManager::DecodeThread::findEntry()
+{
+  std::list<DecodeManager::QueueEntry*>::iterator iter;
+
+  if (manager->workQueue.empty())
+    return NULL;
+
+  if (!manager->workQueue.front()->active)
+    return manager->workQueue.front();
+
+  for (iter = manager->workQueue.begin();
+       iter != manager->workQueue.end();
+       ++iter) {
+    // Another thread working on this?
+    if ((*iter)->active)
+      continue;
+
+    // FIXME: check dependencies between rects
+
+    return *iter;
+  }
+
+  return NULL;
 }
index 63a41206843c1194c68d2d9295d35f3c40bfb90b..1a974ea4add55b85bf8be1dd67e06028ed220f5a 100644 (file)
 #ifndef __RFB_DECODEMANAGER_H__
 #define __RFB_DECODEMANAGER_H__
 
+#include <list>
+
+#include <os/Thread.h>
+
 #include <rfb/encodings.h>
 
+namespace os {
+  class Condition;
+  class Mutex;
+}
+
 namespace rdr { class MemOutStream; }
 
 namespace rfb {
@@ -37,10 +46,48 @@ namespace rfb {
     void decodeRect(const Rect& r, int encoding,
                     ModifiablePixelBuffer* pb);
 
+    void flush();
+
   private:
     CConnection *conn;
     Decoder *decoders[encodingMax+1];
-    rdr::MemOutStream *bufferStream;
+
+    struct QueueEntry {
+      bool active;
+      Rect rect;
+      int encoding;
+      Decoder* decoder;
+      const ConnParams* cp;
+      ModifiablePixelBuffer* pb;
+      rdr::MemOutStream* bufferStream;
+    };
+
+    std::list<rdr::MemOutStream*> freeBuffers;
+    std::list<QueueEntry*> workQueue;
+
+    os::Mutex* queueMutex;
+    os::Condition* producerCond;
+    os::Condition* consumerCond;
+
+  private:
+    class DecodeThread : public os::Thread {
+    public:
+      DecodeThread(DecodeManager* manager);
+      ~DecodeThread();
+
+      void stop();
+
+    protected:
+      void worker();
+      DecodeManager::QueueEntry* findEntry();
+
+    private:
+      DecodeManager* manager;
+
+      bool stopRequested;
+    };
+
+    std::list<DecodeThread*> threads;
   };
 }
 
index 4195a80b612b2499d09598962cca6272a43d6299..7286b41d5f8adf2a1ed881b27bb0d54dbb572c90 100644 (file)
@@ -38,9 +38,15 @@ namespace rfb {
 
     // readRect() transfers data for the given rectangle from the
     // InStream to the OutStream, possibly changing it along the way to
-    // make it easier to decode.
+    // make it easier to decode. This function will always be called in
+    // a serial manner on the main thread.
     virtual void readRect(const Rect& r, rdr::InStream* is,
                           const ConnParams& cp, rdr::OutStream* os)=0;
+
+    // These functions will be called from any of the worker threads.
+    // A lock will be held whilst these are called so it is safe to
+    // read and update internal state as necessary.
+
     // decodeRect() decodes the given rectangle with data from the
     // given buffer, onto the ModifiablePixelBuffer. The PixelFormat of
     // the PixelBuffer might not match the ConnParams and it is up to
@@ -49,6 +55,7 @@ namespace rfb {
                             size_t buflen, const ConnParams& cp,
                             ModifiablePixelBuffer* pb)=0;
 
+  public:
     static bool supported(int encoding);
     static Decoder* createDecoder(int encoding);
   };