diff options
-rw-r--r-- | common/os/CMakeLists.txt | 4 | ||||
-rw-r--r-- | common/rfb/CConnection.cxx | 8 | ||||
-rw-r--r-- | common/rfb/DecodeManager.cxx | 197 | ||||
-rw-r--r-- | common/rfb/DecodeManager.h | 49 | ||||
-rw-r--r-- | common/rfb/Decoder.h | 9 |
5 files changed, 258 insertions, 9 deletions
diff --git a/common/os/CMakeLists.txt b/common/os/CMakeLists.txt index b5749594..7644341a 100644 --- a/common/os/CMakeLists.txt +++ b/common/os/CMakeLists.txt @@ -7,5 +7,9 @@ add_library(os STATIC os.cxx) if(UNIX) + target_link_libraries(os pthread) +endif() + +if(UNIX) libtool_create_control_file(os) endif() diff --git a/common/rfb/CConnection.cxx b/common/rfb/CConnection.cxx index 2ddfc330..7e9fd310 100644 --- a/common/rfb/CConnection.cxx +++ b/common/rfb/CConnection.cxx @@ -64,6 +64,8 @@ void CConnection::setStreams(rdr::InStream* is_, rdr::OutStream* os_) void CConnection::setFramebuffer(ModifiablePixelBuffer* fb) { + decoder.flush(); + if ((framebuffer != NULL) && (fb != NULL)) { Rect rect; @@ -303,6 +305,8 @@ void CConnection::securityCompleted() void CConnection::setDesktopSize(int w, int h) { + decoder.flush(); + CMsgHandler::setDesktopSize(w,h); } @@ -311,6 +315,8 @@ void CConnection::setExtendedDesktopSize(unsigned reason, int w, int h, const ScreenSet& layout) { + decoder.flush(); + CMsgHandler::setExtendedDesktopSize(reason, result, w, h, layout); } @@ -321,6 +327,8 @@ void CConnection::framebufferUpdateStart() void CConnection::framebufferUpdateEnd() { + decoder.flush(); + CMsgHandler::framebufferUpdateEnd(); } diff --git a/common/rfb/DecodeManager.cxx b/common/rfb/DecodeManager.cxx index ffae18b3..a444eb7b 100644 --- a/common/rfb/DecodeManager.cxx +++ b/common/rfb/DecodeManager.cxx @@ -28,6 +28,8 @@ #include <rdr/Exception.h> #include <rdr/MemOutStream.h> +#include <os/Mutex.h> + using namespace rfb; static LogWriter vlog("DecodeManager"); @@ -35,20 +37,54 @@ static LogWriter vlog("DecodeManager"); DecodeManager::DecodeManager(CConnection *conn) : conn(conn) { + int i; + memset(decoders, 0, sizeof(decoders)); - bufferStream = new rdr::MemOutStream(); + + queueMutex = new os::Mutex(); + producerCond = new os::Condition(queueMutex); + consumerCond = new os::Condition(queueMutex); + + // Just a single thread for now as we haven't sorted out the + // dependencies between rects + for (i = 0;i < 1;i++) { + // Twice as many possible entries in the queue as there + // are worker threads to make sure they don't stall + freeBuffers.push_back(new rdr::MemOutStream()); + freeBuffers.push_back(new rdr::MemOutStream()); + + threads.push_back(new DecodeThread(this)); + } } DecodeManager::~DecodeManager() { + while (!threads.empty()) { + delete threads.back(); + threads.pop_back(); + } + + while (!freeBuffers.empty()) { + delete freeBuffers.back(); + freeBuffers.pop_back(); + } + + delete consumerCond; + delete producerCond; + delete queueMutex; + for (size_t i = 0; i < sizeof(decoders)/sizeof(decoders[0]); i++) delete decoders[i]; - delete bufferStream; } void DecodeManager::decodeRect(const Rect& r, int encoding, ModifiablePixelBuffer* pb) { + Decoder *decoder; + rdr::MemOutStream *bufferStream; + + QueueEntry *entry; + assert(pb != NULL); if (!Decoder::supported(encoding)) { @@ -64,10 +100,157 @@ void DecodeManager::decodeRect(const Rect& r, int encoding, } } + decoder = decoders[encoding]; + + // Wait for an available memory buffer + queueMutex->lock(); + + while (freeBuffers.empty()) + producerCond->wait(); + + // Don't pop the buffer in case we throw an exception + // whilst reading + bufferStream = freeBuffers.front(); + + queueMutex->unlock(); + + // Read the rect bufferStream->clear(); - decoders[encoding]->readRect(r, conn->getInStream(), - conn->cp, bufferStream); - decoders[encoding]->decodeRect(r, bufferStream->data(), - bufferStream->length(), - conn->cp, pb); + decoder->readRect(r, conn->getInStream(), conn->cp, bufferStream); + + // Then try to put it on the queue + entry = new QueueEntry; + + entry->active = false; + entry->rect = r; + entry->encoding = encoding; + entry->decoder = decoder; + entry->cp = &conn->cp; + entry->pb = pb; + entry->bufferStream = bufferStream; + + queueMutex->lock(); + + // The workers add buffers to the end so it's safe to assume + // the front is still the same buffer + freeBuffers.pop_front(); + + workQueue.push_back(entry); + + // We only put a single entry on the queue so waking a single + // thread is sufficient + consumerCond->signal(); + + queueMutex->unlock(); +} + +void DecodeManager::flush() +{ + queueMutex->lock(); + + while (!workQueue.empty()) + producerCond->wait(); + + queueMutex->unlock(); +} + +DecodeManager::DecodeThread::DecodeThread(DecodeManager* manager) +{ + this->manager = manager; + + stopRequested = false; + + start(); +} + +DecodeManager::DecodeThread::~DecodeThread() +{ + stop(); + wait(); +} + +void DecodeManager::DecodeThread::stop() +{ + os::AutoMutex a(manager->queueMutex); + + if (!isRunning()) + return; + + stopRequested = true; + + // We can't wake just this thread, so wake everyone + manager->consumerCond->broadcast(); +} + +void DecodeManager::DecodeThread::worker() +{ + manager->queueMutex->lock(); + + while (!stopRequested) { + DecodeManager::QueueEntry *entry; + + // Look for an available entry in the work queue + entry = findEntry(); + if (entry == NULL) { + // Wait and try again + manager->consumerCond->wait(); + continue; + } + + // This is ours now + entry->active = true; + + manager->queueMutex->unlock(); + + // Do the actual decoding + try { + entry->decoder->decodeRect(entry->rect, entry->bufferStream->data(), + entry->bufferStream->length(), + *entry->cp, entry->pb); + } catch(...) { + // FIXME: Try to get the exception back to the main thread + assert(false); + } + + manager->queueMutex->lock(); + + // Remove the entry from the queue and give back the memory buffer + manager->freeBuffers.push_back(entry->bufferStream); + manager->workQueue.remove(entry); + delete entry; + + // Wake the main thread in case it is waiting for a memory buffer + manager->producerCond->signal(); + // This rect might have been blocking multiple other rects, so + // wake up every worker thread + if (manager->workQueue.size() > 1) + manager->consumerCond->broadcast(); + } + + manager->queueMutex->unlock(); +} + +DecodeManager::QueueEntry* DecodeManager::DecodeThread::findEntry() +{ + std::list<DecodeManager::QueueEntry*>::iterator iter; + + if (manager->workQueue.empty()) + return NULL; + + if (!manager->workQueue.front()->active) + return manager->workQueue.front(); + + for (iter = manager->workQueue.begin(); + iter != manager->workQueue.end(); + ++iter) { + // Another thread working on this? + if ((*iter)->active) + continue; + + // FIXME: check dependencies between rects + + return *iter; + } + + return NULL; } diff --git a/common/rfb/DecodeManager.h b/common/rfb/DecodeManager.h index 63a41206..1a974ea4 100644 --- a/common/rfb/DecodeManager.h +++ b/common/rfb/DecodeManager.h @@ -19,8 +19,17 @@ #ifndef __RFB_DECODEMANAGER_H__ #define __RFB_DECODEMANAGER_H__ +#include <list> + +#include <os/Thread.h> + #include <rfb/encodings.h> +namespace os { + class Condition; + class Mutex; +} + namespace rdr { class MemOutStream; } namespace rfb { @@ -37,10 +46,48 @@ namespace rfb { void decodeRect(const Rect& r, int encoding, ModifiablePixelBuffer* pb); + void flush(); + private: CConnection *conn; Decoder *decoders[encodingMax+1]; - rdr::MemOutStream *bufferStream; + + struct QueueEntry { + bool active; + Rect rect; + int encoding; + Decoder* decoder; + const ConnParams* cp; + ModifiablePixelBuffer* pb; + rdr::MemOutStream* bufferStream; + }; + + std::list<rdr::MemOutStream*> freeBuffers; + std::list<QueueEntry*> workQueue; + + os::Mutex* queueMutex; + os::Condition* producerCond; + os::Condition* consumerCond; + + private: + class DecodeThread : public os::Thread { + public: + DecodeThread(DecodeManager* manager); + ~DecodeThread(); + + void stop(); + + protected: + void worker(); + DecodeManager::QueueEntry* findEntry(); + + private: + DecodeManager* manager; + + bool stopRequested; + }; + + std::list<DecodeThread*> threads; }; } diff --git a/common/rfb/Decoder.h b/common/rfb/Decoder.h index 4195a80b..7286b41d 100644 --- a/common/rfb/Decoder.h +++ b/common/rfb/Decoder.h @@ -38,9 +38,15 @@ namespace rfb { // readRect() transfers data for the given rectangle from the // InStream to the OutStream, possibly changing it along the way to - // make it easier to decode. + // make it easier to decode. This function will always be called in + // a serial manner on the main thread. virtual void readRect(const Rect& r, rdr::InStream* is, const ConnParams& cp, rdr::OutStream* os)=0; + + // These functions will be called from any of the worker threads. + // A lock will be held whilst these are called so it is safe to + // read and update internal state as necessary. + // decodeRect() decodes the given rectangle with data from the // given buffer, onto the ModifiablePixelBuffer. The PixelFormat of // the PixelBuffer might not match the ConnParams and it is up to @@ -49,6 +55,7 @@ namespace rfb { size_t buflen, const ConnParams& cp, ModifiablePixelBuffer* pb)=0; + public: static bool supported(int encoding); static Decoder* createDecoder(int encoding); }; |