aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/os/CMakeLists.txt4
-rw-r--r--common/rfb/CConnection.cxx8
-rw-r--r--common/rfb/DecodeManager.cxx197
-rw-r--r--common/rfb/DecodeManager.h49
-rw-r--r--common/rfb/Decoder.h9
5 files changed, 258 insertions, 9 deletions
diff --git a/common/os/CMakeLists.txt b/common/os/CMakeLists.txt
index b5749594..7644341a 100644
--- a/common/os/CMakeLists.txt
+++ b/common/os/CMakeLists.txt
@@ -7,5 +7,9 @@ add_library(os STATIC
os.cxx)
if(UNIX)
+ target_link_libraries(os pthread)
+endif()
+
+if(UNIX)
libtool_create_control_file(os)
endif()
diff --git a/common/rfb/CConnection.cxx b/common/rfb/CConnection.cxx
index 2ddfc330..7e9fd310 100644
--- a/common/rfb/CConnection.cxx
+++ b/common/rfb/CConnection.cxx
@@ -64,6 +64,8 @@ void CConnection::setStreams(rdr::InStream* is_, rdr::OutStream* os_)
void CConnection::setFramebuffer(ModifiablePixelBuffer* fb)
{
+ decoder.flush();
+
if ((framebuffer != NULL) && (fb != NULL)) {
Rect rect;
@@ -303,6 +305,8 @@ void CConnection::securityCompleted()
void CConnection::setDesktopSize(int w, int h)
{
+ decoder.flush();
+
CMsgHandler::setDesktopSize(w,h);
}
@@ -311,6 +315,8 @@ void CConnection::setExtendedDesktopSize(unsigned reason,
int w, int h,
const ScreenSet& layout)
{
+ decoder.flush();
+
CMsgHandler::setExtendedDesktopSize(reason, result, w, h, layout);
}
@@ -321,6 +327,8 @@ void CConnection::framebufferUpdateStart()
void CConnection::framebufferUpdateEnd()
{
+ decoder.flush();
+
CMsgHandler::framebufferUpdateEnd();
}
diff --git a/common/rfb/DecodeManager.cxx b/common/rfb/DecodeManager.cxx
index ffae18b3..a444eb7b 100644
--- a/common/rfb/DecodeManager.cxx
+++ b/common/rfb/DecodeManager.cxx
@@ -28,6 +28,8 @@
#include <rdr/Exception.h>
#include <rdr/MemOutStream.h>
+#include <os/Mutex.h>
+
using namespace rfb;
static LogWriter vlog("DecodeManager");
@@ -35,20 +37,54 @@ static LogWriter vlog("DecodeManager");
DecodeManager::DecodeManager(CConnection *conn) :
conn(conn)
{
+ int i;
+
memset(decoders, 0, sizeof(decoders));
- bufferStream = new rdr::MemOutStream();
+
+ queueMutex = new os::Mutex();
+ producerCond = new os::Condition(queueMutex);
+ consumerCond = new os::Condition(queueMutex);
+
+ // Just a single thread for now as we haven't sorted out the
+ // dependencies between rects
+ for (i = 0;i < 1;i++) {
+ // Twice as many possible entries in the queue as there
+ // are worker threads to make sure they don't stall
+ freeBuffers.push_back(new rdr::MemOutStream());
+ freeBuffers.push_back(new rdr::MemOutStream());
+
+ threads.push_back(new DecodeThread(this));
+ }
}
DecodeManager::~DecodeManager()
{
+ while (!threads.empty()) {
+ delete threads.back();
+ threads.pop_back();
+ }
+
+ while (!freeBuffers.empty()) {
+ delete freeBuffers.back();
+ freeBuffers.pop_back();
+ }
+
+ delete consumerCond;
+ delete producerCond;
+ delete queueMutex;
+
for (size_t i = 0; i < sizeof(decoders)/sizeof(decoders[0]); i++)
delete decoders[i];
- delete bufferStream;
}
void DecodeManager::decodeRect(const Rect& r, int encoding,
ModifiablePixelBuffer* pb)
{
+ Decoder *decoder;
+ rdr::MemOutStream *bufferStream;
+
+ QueueEntry *entry;
+
assert(pb != NULL);
if (!Decoder::supported(encoding)) {
@@ -64,10 +100,157 @@ void DecodeManager::decodeRect(const Rect& r, int encoding,
}
}
+ decoder = decoders[encoding];
+
+ // Wait for an available memory buffer
+ queueMutex->lock();
+
+ while (freeBuffers.empty())
+ producerCond->wait();
+
+ // Don't pop the buffer in case we throw an exception
+ // whilst reading
+ bufferStream = freeBuffers.front();
+
+ queueMutex->unlock();
+
+ // Read the rect
bufferStream->clear();
- decoders[encoding]->readRect(r, conn->getInStream(),
- conn->cp, bufferStream);
- decoders[encoding]->decodeRect(r, bufferStream->data(),
- bufferStream->length(),
- conn->cp, pb);
+ decoder->readRect(r, conn->getInStream(), conn->cp, bufferStream);
+
+ // Then try to put it on the queue
+ entry = new QueueEntry;
+
+ entry->active = false;
+ entry->rect = r;
+ entry->encoding = encoding;
+ entry->decoder = decoder;
+ entry->cp = &conn->cp;
+ entry->pb = pb;
+ entry->bufferStream = bufferStream;
+
+ queueMutex->lock();
+
+ // The workers add buffers to the end so it's safe to assume
+ // the front is still the same buffer
+ freeBuffers.pop_front();
+
+ workQueue.push_back(entry);
+
+ // We only put a single entry on the queue so waking a single
+ // thread is sufficient
+ consumerCond->signal();
+
+ queueMutex->unlock();
+}
+
+void DecodeManager::flush()
+{
+ queueMutex->lock();
+
+ while (!workQueue.empty())
+ producerCond->wait();
+
+ queueMutex->unlock();
+}
+
+DecodeManager::DecodeThread::DecodeThread(DecodeManager* manager)
+{
+ this->manager = manager;
+
+ stopRequested = false;
+
+ start();
+}
+
+DecodeManager::DecodeThread::~DecodeThread()
+{
+ stop();
+ wait();
+}
+
+void DecodeManager::DecodeThread::stop()
+{
+ os::AutoMutex a(manager->queueMutex);
+
+ if (!isRunning())
+ return;
+
+ stopRequested = true;
+
+ // We can't wake just this thread, so wake everyone
+ manager->consumerCond->broadcast();
+}
+
+void DecodeManager::DecodeThread::worker()
+{
+ manager->queueMutex->lock();
+
+ while (!stopRequested) {
+ DecodeManager::QueueEntry *entry;
+
+ // Look for an available entry in the work queue
+ entry = findEntry();
+ if (entry == NULL) {
+ // Wait and try again
+ manager->consumerCond->wait();
+ continue;
+ }
+
+ // This is ours now
+ entry->active = true;
+
+ manager->queueMutex->unlock();
+
+ // Do the actual decoding
+ try {
+ entry->decoder->decodeRect(entry->rect, entry->bufferStream->data(),
+ entry->bufferStream->length(),
+ *entry->cp, entry->pb);
+ } catch(...) {
+ // FIXME: Try to get the exception back to the main thread
+ assert(false);
+ }
+
+ manager->queueMutex->lock();
+
+ // Remove the entry from the queue and give back the memory buffer
+ manager->freeBuffers.push_back(entry->bufferStream);
+ manager->workQueue.remove(entry);
+ delete entry;
+
+ // Wake the main thread in case it is waiting for a memory buffer
+ manager->producerCond->signal();
+ // This rect might have been blocking multiple other rects, so
+ // wake up every worker thread
+ if (manager->workQueue.size() > 1)
+ manager->consumerCond->broadcast();
+ }
+
+ manager->queueMutex->unlock();
+}
+
+DecodeManager::QueueEntry* DecodeManager::DecodeThread::findEntry()
+{
+ std::list<DecodeManager::QueueEntry*>::iterator iter;
+
+ if (manager->workQueue.empty())
+ return NULL;
+
+ if (!manager->workQueue.front()->active)
+ return manager->workQueue.front();
+
+ for (iter = manager->workQueue.begin();
+ iter != manager->workQueue.end();
+ ++iter) {
+ // Another thread working on this?
+ if ((*iter)->active)
+ continue;
+
+ // FIXME: check dependencies between rects
+
+ return *iter;
+ }
+
+ return NULL;
}
diff --git a/common/rfb/DecodeManager.h b/common/rfb/DecodeManager.h
index 63a41206..1a974ea4 100644
--- a/common/rfb/DecodeManager.h
+++ b/common/rfb/DecodeManager.h
@@ -19,8 +19,17 @@
#ifndef __RFB_DECODEMANAGER_H__
#define __RFB_DECODEMANAGER_H__
+#include <list>
+
+#include <os/Thread.h>
+
#include <rfb/encodings.h>
+namespace os {
+ class Condition;
+ class Mutex;
+}
+
namespace rdr { class MemOutStream; }
namespace rfb {
@@ -37,10 +46,48 @@ namespace rfb {
void decodeRect(const Rect& r, int encoding,
ModifiablePixelBuffer* pb);
+ void flush();
+
private:
CConnection *conn;
Decoder *decoders[encodingMax+1];
- rdr::MemOutStream *bufferStream;
+
+ struct QueueEntry {
+ bool active;
+ Rect rect;
+ int encoding;
+ Decoder* decoder;
+ const ConnParams* cp;
+ ModifiablePixelBuffer* pb;
+ rdr::MemOutStream* bufferStream;
+ };
+
+ std::list<rdr::MemOutStream*> freeBuffers;
+ std::list<QueueEntry*> workQueue;
+
+ os::Mutex* queueMutex;
+ os::Condition* producerCond;
+ os::Condition* consumerCond;
+
+ private:
+ class DecodeThread : public os::Thread {
+ public:
+ DecodeThread(DecodeManager* manager);
+ ~DecodeThread();
+
+ void stop();
+
+ protected:
+ void worker();
+ DecodeManager::QueueEntry* findEntry();
+
+ private:
+ DecodeManager* manager;
+
+ bool stopRequested;
+ };
+
+ std::list<DecodeThread*> threads;
};
}
diff --git a/common/rfb/Decoder.h b/common/rfb/Decoder.h
index 4195a80b..7286b41d 100644
--- a/common/rfb/Decoder.h
+++ b/common/rfb/Decoder.h
@@ -38,9 +38,15 @@ namespace rfb {
// readRect() transfers data for the given rectangle from the
// InStream to the OutStream, possibly changing it along the way to
- // make it easier to decode.
+ // make it easier to decode. This function will always be called in
+ // a serial manner on the main thread.
virtual void readRect(const Rect& r, rdr::InStream* is,
const ConnParams& cp, rdr::OutStream* os)=0;
+
+ // These functions will be called from any of the worker threads.
+ // A lock will be held whilst these are called so it is safe to
+ // read and update internal state as necessary.
+
// decodeRect() decodes the given rectangle with data from the
// given buffer, onto the ModifiablePixelBuffer. The PixelFormat of
// the PixelBuffer might not match the ConnParams and it is up to
@@ -49,6 +55,7 @@ namespace rfb {
size_t buflen, const ConnParams& cp,
ModifiablePixelBuffer* pb)=0;
+ public:
static bool supported(int encoding);
static Decoder* createDecoder(int encoding);
};