/* Copyright 2015 Pierre Ossman for Cendio AB * * This is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this software; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, * USA. */ #include #include #include #include #include #include #include #include #include #include using namespace rfb; static LogWriter vlog("DecodeManager"); DecodeManager::DecodeManager(CConnection *conn) : conn(conn), threadException(NULL) { size_t cpuCount; memset(decoders, 0, sizeof(decoders)); queueMutex = new os::Mutex(); producerCond = new os::Condition(queueMutex); consumerCond = new os::Condition(queueMutex); cpuCount = os::Thread::getSystemCPUCount(); if (cpuCount == 0) { vlog.error("Unable to determine the number of CPU cores on this system"); cpuCount = 1; } else { vlog.info("Detected %d CPU core(s)", (int)cpuCount); // No point creating more threads than this, they'll just end up // wasting CPU fighting for locks if (cpuCount > 4) cpuCount = 4; // The overhead of threading is small, but not small enough to // ignore on single CPU systems if (cpuCount == 1) vlog.info("Decoding data on main thread"); else vlog.info("Creating %d decoder thread(s)", (int)cpuCount); } if (cpuCount == 1) { // Threads are not used on single CPU machines freeBuffers.push_back(new rdr::MemOutStream()); return; } while (cpuCount--) { // Twice as many possible entries in the queue as there // are worker threads to make sure they don't stall freeBuffers.push_back(new rdr::MemOutStream()); freeBuffers.push_back(new rdr::MemOutStream()); threads.push_back(new DecodeThread(this)); } } DecodeManager::~DecodeManager() { while (!threads.empty()) { delete threads.back(); threads.pop_back(); } delete threadException; while (!freeBuffers.empty()) { delete freeBuffers.back(); freeBuffers.pop_back(); } delete consumerCond; delete producerCond; delete queueMutex; for (size_t i = 0; i < sizeof(decoders)/sizeof(decoders[0]); i++) delete decoders[i]; } void DecodeManager::decodeRect(const Rect& r, int encoding, ModifiablePixelBuffer* pb) { Decoder *decoder; rdr::MemOutStream *bufferStream; QueueEntry *entry; assert(pb != NULL); if (!Decoder::supported(encoding)) { vlog.error("Unknown encoding %d", encoding); throw rdr::Exception("Unknown encoding"); } if (!decoders[encoding]) { decoders[encoding] = Decoder::createDecoder(encoding); if (!decoders[encoding]) { vlog.error("Unknown encoding %d", encoding); throw rdr::Exception("Unknown encoding"); } } decoder = decoders[encoding]; // Fast path for single CPU machines to avoid the context // switching overhead if (threads.empty()) { bufferStream = freeBuffers.front(); bufferStream->clear(); decoder->readRect(r, conn->getInStream(), conn->server, bufferStream); decoder->decodeRect(r, bufferStream->data(), bufferStream->length(), conn->server, pb); return; } // Wait for an available memory buffer queueMutex->lock(); while (freeBuffers.empty()) producerCond->wait(); // Don't pop the buffer in case we throw an exception // whilst reading bufferStream = freeBuffers.front(); queueMutex->unlock(); // First check if any thread has encountered a problem throwThreadException(); // Read the rect bufferStream->clear(); decoder->readRect(r, conn->getInStream(), conn->server, bufferStream); // Then try to put it on the queue entry = new QueueEntry; entry->active = false; entry->rect = r; entry->encoding = encoding; entry->decoder = decoder; entry->server = &conn->server; entry->pb = pb; entry->bufferStream = bufferStream; decoder->getAffectedRegion(r, bufferStream->data(), bufferStream->length(), conn->server, &entry->affectedRegion); queueMutex->lock(); // The workers add buffers to the end so it's safe to assume // the front is still the same buffer freeBuffers.pop_front(); workQueue.push_back(entry); // We only put a single entry on the queue so waking a single // thread is sufficient consumerCond->signal(); queueMutex->unlock(); } void DecodeManager::flush() { queueMutex->lock(); while (!workQueue.empty()) producerCond->wait(); queueMutex->unlock(); throwThreadException(); } void DecodeManager::setThreadException(const rdr::Exception& e) { os::AutoMutex a(queueMutex); if (threadException != NULL) return; threadException = new rdr::Exception("Exception on worker thread: %s", e.str()); } void DecodeManager::throwThreadException() { os::AutoMutex a(queueMutex); if (threadException == NULL) return; rdr::Exception e(*threadException); delete threadException; threadException = NULL; throw e; } DecodeManager::DecodeThread::DecodeThread(DecodeManager* manager) { this->manager = manager; stopRequested = false; start(); } DecodeManager::DecodeThread::~DecodeThread() { stop(); wait(); } void DecodeManager::DecodeThread::stop() { os::AutoMutex a(manager->queueMutex); if (!isRunning()) return; stopRequested = true; // We can't wake just this thread, so wake everyone manager->consumerCond->broadcast(); } void DecodeManager::DecodeThread::worker() { manager->queueMutex->lock(); while (!stopRequested) { DecodeManager::QueueEntry *entry; // Look for an available entry in the work queue entry = findEntry(); if (entry == NULL) { // Wait and try again manager->consumerCond->wait(); continue; } // This is ours now entry->active = true; manager->queueMutex->unlock(); // Do the actual decoding try { entry->decoder->decodeRect(entry->rect, entry->bufferStream->data(), entry->bufferStream->length(), *entry->server, entry->pb); } catch (rdr::Exception& e) { manager->setThreadException(e); } catch(...) { assert(false); } manager->queueMutex->lock(); // Remove the entry from the queue and give back the memory buffer manager->freeBuffers.push_back(entry->bufferStream); manager->workQueue.remove(entry); delete entry; // Wake the main thread in case it is waiting for a memory buffer manager->producerCond->signal(); // This rect might have been blocking multiple other rects, so // wake up every worker thread if (manager->workQueue.size() > 1) manager->consumerCond->broadcast(); } manager->queueMutex->unlock(); } DecodeManager::QueueEntry* DecodeManager::DecodeThread::findEntry() { std::list::iterator iter; Region lockedRegion; if (manager->workQueue.empty()) return NULL; if (!manager->workQueue.front()->active) return manager->workQueue.front(); for (iter = manager->workQueue.begin(); iter != manager->workQueue.end(); ++iter) { DecodeManager::QueueEntry* entry; std::list::iterator iter2; entry = *iter; // Another thread working on this? if (entry->active) goto next; // If this is an ordered decoder then make sure this is the first // rectangle in the queue for that decoder if (entry->decoder->flags & DecoderOrdered) { for (iter2 = manager->workQueue.begin(); iter2 != iter; ++iter2) { if (entry->encoding == (*iter2)->encoding) goto next; } } // For a partially ordered decoder we must ask the decoder for each // pair of rectangles. if (entry->decoder->flags & DecoderPartiallyOrdered) { for (iter2 = manager->workQueue.begin(); iter2 != iter; ++iter2) { if (entry->encoding != (*iter2)->encoding) continue; if (entry->decoder->doRectsConflict(entry->rect, entry->bufferStream->data(), entry->bufferStream->length(), (*iter2)->rect, (*iter2)->bufferStream->data(), (*iter2)->bufferStream->length(), *entry->server)) goto next; } } // Check overlap with earlier rectangles if (!lockedRegion.intersect(entry->affectedRegion).is_empty()) goto next; return entry; next: lockedRegion.assign_union(entry->affectedRegion); } return NULL; }