diff options
Diffstat (limited to 'common/rfb/DecodeManager.cxx')
-rw-r--r-- | common/rfb/DecodeManager.cxx | 75 |
1 files changed, 35 insertions, 40 deletions
diff --git a/common/rfb/DecodeManager.cxx b/common/rfb/DecodeManager.cxx index 94908f86..48181f94 100644 --- a/common/rfb/DecodeManager.cxx +++ b/common/rfb/DecodeManager.cxx @@ -24,7 +24,6 @@ #include <string.h> #include <core/LogWriter.h> -#include <core/Mutex.h> #include <core/Region.h> #include <core/string.h> @@ -48,11 +47,7 @@ DecodeManager::DecodeManager(CConnection *conn_) : memset(stats, 0, sizeof(stats)); - queueMutex = new core::Mutex(); - producerCond = new core::Condition(queueMutex); - consumerCond = new core::Condition(queueMutex); - - cpuCount = core::Thread::getSystemCPUCount(); + cpuCount = std::thread::hardware_concurrency(); if (cpuCount == 0) { vlog.error("Unable to determine the number of CPU cores on this system"); cpuCount = 1; @@ -90,10 +85,6 @@ DecodeManager::~DecodeManager() freeBuffers.pop_back(); } - delete consumerCond; - delete producerCond; - delete queueMutex; - for (Decoder* decoder : decoders) delete decoder; } @@ -125,29 +116,25 @@ bool DecodeManager::decodeRect(const core::Rect& r, int encoding, decoder = decoders[encoding]; // Wait for an available memory buffer - queueMutex->lock(); + std::unique_lock<std::mutex> lock(queueMutex); // FIXME: Should we return and let other things run here? while (freeBuffers.empty()) - producerCond->wait(); + producerCond.wait(lock); // Don't pop the buffer in case we throw an exception // whilst reading bufferStream = freeBuffers.front(); - queueMutex->unlock(); + lock.unlock(); // First check if any thread has encountered a problem throwThreadException(); // Read the rect bufferStream->clear(); - try { - if (!decoder->readRect(r, conn->getInStream(), conn->server, bufferStream)) - return false; - } catch (std::exception& e) { - throw std::runtime_error(core::format("Error reading rect: %s", e.what())); - } + if (!decoder->readRect(r, conn->getInStream(), conn->server, bufferStream)) + return false; stats[encoding].rects++; stats[encoding].bytes += 12 + bufferStream->length(); @@ -170,7 +157,7 @@ bool DecodeManager::decodeRect(const core::Rect& r, int encoding, bufferStream->length(), conn->server, &entry->affectedRegion); - queueMutex->lock(); + lock.lock(); // The workers add buffers to the end so it's safe to assume // the front is still the same buffer @@ -180,21 +167,21 @@ bool DecodeManager::decodeRect(const core::Rect& r, int encoding, // We only put a single entry on the queue so waking a single // thread is sufficient - consumerCond->signal(); + consumerCond.notify_one(); - queueMutex->unlock(); + lock.unlock(); return true; } void DecodeManager::flush() { - queueMutex->lock(); + std::unique_lock<std::mutex> lock(queueMutex); while (!workQueue.empty()) - producerCond->wait(); + producerCond.wait(lock); - queueMutex->unlock(); + lock.unlock(); throwThreadException(); } @@ -242,7 +229,7 @@ void DecodeManager::logStats() void DecodeManager::setThreadException() { - core::AutoMutex a(queueMutex); + const std::lock_guard<std::mutex> lock(queueMutex); if (threadException) return; @@ -252,7 +239,7 @@ void DecodeManager::setThreadException() void DecodeManager::throwThreadException() { - core::AutoMutex a(queueMutex); + const std::lock_guard<std::mutex> lock(queueMutex); if (!threadException) return; @@ -266,7 +253,7 @@ void DecodeManager::throwThreadException() } DecodeManager::DecodeThread::DecodeThread(DecodeManager* manager_) - : manager(manager_), stopRequested(false) + : manager(manager_), thread(nullptr), stopRequested(false) { start(); } @@ -274,25 +261,35 @@ DecodeManager::DecodeThread::DecodeThread(DecodeManager* manager_) DecodeManager::DecodeThread::~DecodeThread() { stop(); - wait(); + if (thread != nullptr) { + thread->join(); + delete thread; + } +} + +void DecodeManager::DecodeThread::start() +{ + assert(thread == nullptr); + + thread = new std::thread(&DecodeThread::worker, this); } void DecodeManager::DecodeThread::stop() { - core::AutoMutex a(manager->queueMutex); + const std::lock_guard<std::mutex> lock(manager->queueMutex); - if (!isRunning()) + if (thread == nullptr) return; stopRequested = true; // We can't wake just this thread, so wake everyone - manager->consumerCond->broadcast(); + manager->consumerCond.notify_all(); } void DecodeManager::DecodeThread::worker() { - manager->queueMutex->lock(); + std::unique_lock<std::mutex> lock(manager->queueMutex); while (!stopRequested) { DecodeManager::QueueEntry *entry; @@ -301,14 +298,14 @@ void DecodeManager::DecodeThread::worker() entry = findEntry(); if (entry == nullptr) { // Wait and try again - manager->consumerCond->wait(); + manager->consumerCond.wait(lock); continue; } // This is ours now entry->active = true; - manager->queueMutex->unlock(); + lock.unlock(); // Do the actual decoding try { @@ -321,7 +318,7 @@ void DecodeManager::DecodeThread::worker() assert(false); } - manager->queueMutex->lock(); + lock.lock(); // Remove the entry from the queue and give back the memory buffer manager->freeBuffers.push_back(entry->bufferStream); @@ -329,14 +326,12 @@ void DecodeManager::DecodeThread::worker() delete entry; // Wake the main thread in case it is waiting for a memory buffer - manager->producerCond->signal(); + manager->producerCond.notify_one(); // This rect might have been blocking multiple other rects, so // wake up every worker thread if (manager->workQueue.size() > 1) - manager->consumerCond->broadcast(); + manager->consumerCond.notify_all(); } - - manager->queueMutex->unlock(); } DecodeManager::QueueEntry* DecodeManager::DecodeThread::findEntry() |