Browse Source

Make the decoder multi-threaded

This implements the basic infrastructure for multi-threaded
decoding of rects. However there is just one thread reading data
and one thread decoding it. More logic is needed to safely decode
multiple rects at the same time.
tags/v1.6.90
Pierre Ossman 8 years ago
parent
commit
504afa27c1

+ 4
- 0
common/os/CMakeLists.txt View File

@@ -6,6 +6,10 @@ add_library(os STATIC
w32tiger.c
os.cxx)

if(UNIX)
target_link_libraries(os pthread)
endif()

if(UNIX)
libtool_create_control_file(os)
endif()

+ 8
- 0
common/rfb/CConnection.cxx View File

@@ -64,6 +64,8 @@ void CConnection::setStreams(rdr::InStream* is_, rdr::OutStream* os_)

void CConnection::setFramebuffer(ModifiablePixelBuffer* fb)
{
decoder.flush();

if ((framebuffer != NULL) && (fb != NULL)) {
Rect rect;

@@ -303,6 +305,8 @@ void CConnection::securityCompleted()

void CConnection::setDesktopSize(int w, int h)
{
decoder.flush();

CMsgHandler::setDesktopSize(w,h);
}

@@ -311,6 +315,8 @@ void CConnection::setExtendedDesktopSize(unsigned reason,
int w, int h,
const ScreenSet& layout)
{
decoder.flush();

CMsgHandler::setExtendedDesktopSize(reason, result, w, h, layout);
}

@@ -321,6 +327,8 @@ void CConnection::framebufferUpdateStart()

void CConnection::framebufferUpdateEnd()
{
decoder.flush();

CMsgHandler::framebufferUpdateEnd();
}


+ 190
- 7
common/rfb/DecodeManager.cxx View File

@@ -28,6 +28,8 @@
#include <rdr/Exception.h>
#include <rdr/MemOutStream.h>

#include <os/Mutex.h>

using namespace rfb;

static LogWriter vlog("DecodeManager");
@@ -35,20 +37,54 @@ static LogWriter vlog("DecodeManager");
DecodeManager::DecodeManager(CConnection *conn) :
conn(conn)
{
int i;

memset(decoders, 0, sizeof(decoders));
bufferStream = new rdr::MemOutStream();

queueMutex = new os::Mutex();
producerCond = new os::Condition(queueMutex);
consumerCond = new os::Condition(queueMutex);

// Just a single thread for now as we haven't sorted out the
// dependencies between rects
for (i = 0;i < 1;i++) {
// Twice as many possible entries in the queue as there
// are worker threads to make sure they don't stall
freeBuffers.push_back(new rdr::MemOutStream());
freeBuffers.push_back(new rdr::MemOutStream());

threads.push_back(new DecodeThread(this));
}
}

DecodeManager::~DecodeManager()
{
while (!threads.empty()) {
delete threads.back();
threads.pop_back();
}

while (!freeBuffers.empty()) {
delete freeBuffers.back();
freeBuffers.pop_back();
}

delete consumerCond;
delete producerCond;
delete queueMutex;

for (size_t i = 0; i < sizeof(decoders)/sizeof(decoders[0]); i++)
delete decoders[i];
delete bufferStream;
}

void DecodeManager::decodeRect(const Rect& r, int encoding,
ModifiablePixelBuffer* pb)
{
Decoder *decoder;
rdr::MemOutStream *bufferStream;

QueueEntry *entry;

assert(pb != NULL);

if (!Decoder::supported(encoding)) {
@@ -64,10 +100,157 @@ void DecodeManager::decodeRect(const Rect& r, int encoding,
}
}

decoder = decoders[encoding];

// Wait for an available memory buffer
queueMutex->lock();

while (freeBuffers.empty())
producerCond->wait();

// Don't pop the buffer in case we throw an exception
// whilst reading
bufferStream = freeBuffers.front();

queueMutex->unlock();

// Read the rect
bufferStream->clear();
decoders[encoding]->readRect(r, conn->getInStream(),
conn->cp, bufferStream);
decoders[encoding]->decodeRect(r, bufferStream->data(),
bufferStream->length(),
conn->cp, pb);
decoder->readRect(r, conn->getInStream(), conn->cp, bufferStream);

// Then try to put it on the queue
entry = new QueueEntry;

entry->active = false;
entry->rect = r;
entry->encoding = encoding;
entry->decoder = decoder;
entry->cp = &conn->cp;
entry->pb = pb;
entry->bufferStream = bufferStream;

queueMutex->lock();

// The workers add buffers to the end so it's safe to assume
// the front is still the same buffer
freeBuffers.pop_front();

workQueue.push_back(entry);

// We only put a single entry on the queue so waking a single
// thread is sufficient
consumerCond->signal();

queueMutex->unlock();
}

void DecodeManager::flush()
{
queueMutex->lock();

while (!workQueue.empty())
producerCond->wait();

queueMutex->unlock();
}

DecodeManager::DecodeThread::DecodeThread(DecodeManager* manager)
{
this->manager = manager;

stopRequested = false;

start();
}

DecodeManager::DecodeThread::~DecodeThread()
{
stop();
wait();
}

void DecodeManager::DecodeThread::stop()
{
os::AutoMutex a(manager->queueMutex);

if (!isRunning())
return;

stopRequested = true;

// We can't wake just this thread, so wake everyone
manager->consumerCond->broadcast();
}

void DecodeManager::DecodeThread::worker()
{
manager->queueMutex->lock();

while (!stopRequested) {
DecodeManager::QueueEntry *entry;

// Look for an available entry in the work queue
entry = findEntry();
if (entry == NULL) {
// Wait and try again
manager->consumerCond->wait();
continue;
}

// This is ours now
entry->active = true;

manager->queueMutex->unlock();

// Do the actual decoding
try {
entry->decoder->decodeRect(entry->rect, entry->bufferStream->data(),
entry->bufferStream->length(),
*entry->cp, entry->pb);
} catch(...) {
// FIXME: Try to get the exception back to the main thread
assert(false);
}

manager->queueMutex->lock();

// Remove the entry from the queue and give back the memory buffer
manager->freeBuffers.push_back(entry->bufferStream);
manager->workQueue.remove(entry);
delete entry;

// Wake the main thread in case it is waiting for a memory buffer
manager->producerCond->signal();
// This rect might have been blocking multiple other rects, so
// wake up every worker thread
if (manager->workQueue.size() > 1)
manager->consumerCond->broadcast();
}

manager->queueMutex->unlock();
}

DecodeManager::QueueEntry* DecodeManager::DecodeThread::findEntry()
{
std::list<DecodeManager::QueueEntry*>::iterator iter;

if (manager->workQueue.empty())
return NULL;

if (!manager->workQueue.front()->active)
return manager->workQueue.front();

for (iter = manager->workQueue.begin();
iter != manager->workQueue.end();
++iter) {
// Another thread working on this?
if ((*iter)->active)
continue;

// FIXME: check dependencies between rects

return *iter;
}

return NULL;
}

+ 48
- 1
common/rfb/DecodeManager.h View File

@@ -19,8 +19,17 @@
#ifndef __RFB_DECODEMANAGER_H__
#define __RFB_DECODEMANAGER_H__

#include <list>

#include <os/Thread.h>

#include <rfb/encodings.h>

namespace os {
class Condition;
class Mutex;
}

namespace rdr { class MemOutStream; }

namespace rfb {
@@ -37,10 +46,48 @@ namespace rfb {
void decodeRect(const Rect& r, int encoding,
ModifiablePixelBuffer* pb);

void flush();

private:
CConnection *conn;
Decoder *decoders[encodingMax+1];
rdr::MemOutStream *bufferStream;

struct QueueEntry {
bool active;
Rect rect;
int encoding;
Decoder* decoder;
const ConnParams* cp;
ModifiablePixelBuffer* pb;
rdr::MemOutStream* bufferStream;
};

std::list<rdr::MemOutStream*> freeBuffers;
std::list<QueueEntry*> workQueue;

os::Mutex* queueMutex;
os::Condition* producerCond;
os::Condition* consumerCond;

private:
class DecodeThread : public os::Thread {
public:
DecodeThread(DecodeManager* manager);
~DecodeThread();

void stop();

protected:
void worker();
DecodeManager::QueueEntry* findEntry();

private:
DecodeManager* manager;

bool stopRequested;
};

std::list<DecodeThread*> threads;
};
}


+ 8
- 1
common/rfb/Decoder.h View File

@@ -38,9 +38,15 @@ namespace rfb {

// readRect() transfers data for the given rectangle from the
// InStream to the OutStream, possibly changing it along the way to
// make it easier to decode.
// make it easier to decode. This function will always be called in
// a serial manner on the main thread.
virtual void readRect(const Rect& r, rdr::InStream* is,
const ConnParams& cp, rdr::OutStream* os)=0;

// These functions will be called from any of the worker threads.
// A lock will be held whilst these are called so it is safe to
// read and update internal state as necessary.

// decodeRect() decodes the given rectangle with data from the
// given buffer, onto the ModifiablePixelBuffer. The PixelFormat of
// the PixelBuffer might not match the ConnParams and it is up to
@@ -49,6 +55,7 @@ namespace rfb {
size_t buflen, const ConnParams& cp,
ModifiablePixelBuffer* pb)=0;

public:
static bool supported(int encoding);
static Decoder* createDecoder(int encoding);
};

Loading…
Cancel
Save