aboutsummaryrefslogtreecommitdiffstats
path: root/common/rfb/DecodeManager.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'common/rfb/DecodeManager.cxx')
-rw-r--r--common/rfb/DecodeManager.cxx75
1 files changed, 35 insertions, 40 deletions
diff --git a/common/rfb/DecodeManager.cxx b/common/rfb/DecodeManager.cxx
index 94908f86..48181f94 100644
--- a/common/rfb/DecodeManager.cxx
+++ b/common/rfb/DecodeManager.cxx
@@ -24,7 +24,6 @@
#include <string.h>
#include <core/LogWriter.h>
-#include <core/Mutex.h>
#include <core/Region.h>
#include <core/string.h>
@@ -48,11 +47,7 @@ DecodeManager::DecodeManager(CConnection *conn_) :
memset(stats, 0, sizeof(stats));
- queueMutex = new core::Mutex();
- producerCond = new core::Condition(queueMutex);
- consumerCond = new core::Condition(queueMutex);
-
- cpuCount = core::Thread::getSystemCPUCount();
+ cpuCount = std::thread::hardware_concurrency();
if (cpuCount == 0) {
vlog.error("Unable to determine the number of CPU cores on this system");
cpuCount = 1;
@@ -90,10 +85,6 @@ DecodeManager::~DecodeManager()
freeBuffers.pop_back();
}
- delete consumerCond;
- delete producerCond;
- delete queueMutex;
-
for (Decoder* decoder : decoders)
delete decoder;
}
@@ -125,29 +116,25 @@ bool DecodeManager::decodeRect(const core::Rect& r, int encoding,
decoder = decoders[encoding];
// Wait for an available memory buffer
- queueMutex->lock();
+ std::unique_lock<std::mutex> lock(queueMutex);
// FIXME: Should we return and let other things run here?
while (freeBuffers.empty())
- producerCond->wait();
+ producerCond.wait(lock);
// Don't pop the buffer in case we throw an exception
// whilst reading
bufferStream = freeBuffers.front();
- queueMutex->unlock();
+ lock.unlock();
// First check if any thread has encountered a problem
throwThreadException();
// Read the rect
bufferStream->clear();
- try {
- if (!decoder->readRect(r, conn->getInStream(), conn->server, bufferStream))
- return false;
- } catch (std::exception& e) {
- throw std::runtime_error(core::format("Error reading rect: %s", e.what()));
- }
+ if (!decoder->readRect(r, conn->getInStream(), conn->server, bufferStream))
+ return false;
stats[encoding].rects++;
stats[encoding].bytes += 12 + bufferStream->length();
@@ -170,7 +157,7 @@ bool DecodeManager::decodeRect(const core::Rect& r, int encoding,
bufferStream->length(), conn->server,
&entry->affectedRegion);
- queueMutex->lock();
+ lock.lock();
// The workers add buffers to the end so it's safe to assume
// the front is still the same buffer
@@ -180,21 +167,21 @@ bool DecodeManager::decodeRect(const core::Rect& r, int encoding,
// We only put a single entry on the queue so waking a single
// thread is sufficient
- consumerCond->signal();
+ consumerCond.notify_one();
- queueMutex->unlock();
+ lock.unlock();
return true;
}
void DecodeManager::flush()
{
- queueMutex->lock();
+ std::unique_lock<std::mutex> lock(queueMutex);
while (!workQueue.empty())
- producerCond->wait();
+ producerCond.wait(lock);
- queueMutex->unlock();
+ lock.unlock();
throwThreadException();
}
@@ -242,7 +229,7 @@ void DecodeManager::logStats()
void DecodeManager::setThreadException()
{
- core::AutoMutex a(queueMutex);
+ const std::lock_guard<std::mutex> lock(queueMutex);
if (threadException)
return;
@@ -252,7 +239,7 @@ void DecodeManager::setThreadException()
void DecodeManager::throwThreadException()
{
- core::AutoMutex a(queueMutex);
+ const std::lock_guard<std::mutex> lock(queueMutex);
if (!threadException)
return;
@@ -266,7 +253,7 @@ void DecodeManager::throwThreadException()
}
DecodeManager::DecodeThread::DecodeThread(DecodeManager* manager_)
- : manager(manager_), stopRequested(false)
+ : manager(manager_), thread(nullptr), stopRequested(false)
{
start();
}
@@ -274,25 +261,35 @@ DecodeManager::DecodeThread::DecodeThread(DecodeManager* manager_)
DecodeManager::DecodeThread::~DecodeThread()
{
stop();
- wait();
+ if (thread != nullptr) {
+ thread->join();
+ delete thread;
+ }
+}
+
+void DecodeManager::DecodeThread::start()
+{
+ assert(thread == nullptr);
+
+ thread = new std::thread(&DecodeThread::worker, this);
}
void DecodeManager::DecodeThread::stop()
{
- core::AutoMutex a(manager->queueMutex);
+ const std::lock_guard<std::mutex> lock(manager->queueMutex);
- if (!isRunning())
+ if (thread == nullptr)
return;
stopRequested = true;
// We can't wake just this thread, so wake everyone
- manager->consumerCond->broadcast();
+ manager->consumerCond.notify_all();
}
void DecodeManager::DecodeThread::worker()
{
- manager->queueMutex->lock();
+ std::unique_lock<std::mutex> lock(manager->queueMutex);
while (!stopRequested) {
DecodeManager::QueueEntry *entry;
@@ -301,14 +298,14 @@ void DecodeManager::DecodeThread::worker()
entry = findEntry();
if (entry == nullptr) {
// Wait and try again
- manager->consumerCond->wait();
+ manager->consumerCond.wait(lock);
continue;
}
// This is ours now
entry->active = true;
- manager->queueMutex->unlock();
+ lock.unlock();
// Do the actual decoding
try {
@@ -321,7 +318,7 @@ void DecodeManager::DecodeThread::worker()
assert(false);
}
- manager->queueMutex->lock();
+ lock.lock();
// Remove the entry from the queue and give back the memory buffer
manager->freeBuffers.push_back(entry->bufferStream);
@@ -329,14 +326,12 @@ void DecodeManager::DecodeThread::worker()
delete entry;
// Wake the main thread in case it is waiting for a memory buffer
- manager->producerCond->signal();
+ manager->producerCond.notify_one();
// This rect might have been blocking multiple other rects, so
// wake up every worker thread
if (manager->workQueue.size() > 1)
- manager->consumerCond->broadcast();
+ manager->consumerCond.notify_all();
}
-
- manager->queueMutex->unlock();
}
DecodeManager::QueueEntry* DecodeManager::DecodeThread::findEntry()