You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

DecodeManager.cxx 9.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. /* Copyright 2015 Pierre Ossman for Cendio AB
  2. *
  3. * This is free software; you can redistribute it and/or modify
  4. * it under the terms of the GNU General Public License as published by
  5. * the Free Software Foundation; either version 2 of the License, or
  6. * (at your option) any later version.
  7. *
  8. * This software is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. * GNU General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU General Public License
  14. * along with this software; if not, write to the Free Software
  15. * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
  16. * USA.
  17. */
  18. #include <assert.h>
  19. #include <string.h>
  20. #include <rfb/CConnection.h>
  21. #include <rfb/DecodeManager.h>
  22. #include <rfb/Decoder.h>
  23. #include <rfb/Region.h>
  24. #include <rfb/LogWriter.h>
  25. #include <rdr/Exception.h>
  26. #include <rdr/MemOutStream.h>
  27. #include <os/Mutex.h>
  28. using namespace rfb;
  29. static LogWriter vlog("DecodeManager");
  30. DecodeManager::DecodeManager(CConnection *conn) :
  31. conn(conn), threadException(NULL)
  32. {
  33. size_t cpuCount;
  34. memset(decoders, 0, sizeof(decoders));
  35. queueMutex = new os::Mutex();
  36. producerCond = new os::Condition(queueMutex);
  37. consumerCond = new os::Condition(queueMutex);
  38. cpuCount = os::Thread::getSystemCPUCount();
  39. if (cpuCount == 0) {
  40. vlog.error("Unable to determine the number of CPU cores on this system");
  41. cpuCount = 1;
  42. } else {
  43. vlog.info("Detected %d CPU core(s)", (int)cpuCount);
  44. // No point creating more threads than this, they'll just end up
  45. // wasting CPU fighting for locks
  46. if (cpuCount > 4)
  47. cpuCount = 4;
  48. // The overhead of threading is small, but not small enough to
  49. // ignore on single CPU systems
  50. if (cpuCount == 1)
  51. vlog.info("Decoding data on main thread");
  52. else
  53. vlog.info("Creating %d decoder thread(s)", (int)cpuCount);
  54. }
  55. if (cpuCount == 1) {
  56. // Threads are not used on single CPU machines
  57. freeBuffers.push_back(new rdr::MemOutStream());
  58. return;
  59. }
  60. while (cpuCount--) {
  61. // Twice as many possible entries in the queue as there
  62. // are worker threads to make sure they don't stall
  63. freeBuffers.push_back(new rdr::MemOutStream());
  64. freeBuffers.push_back(new rdr::MemOutStream());
  65. threads.push_back(new DecodeThread(this));
  66. }
  67. }
  68. DecodeManager::~DecodeManager()
  69. {
  70. while (!threads.empty()) {
  71. delete threads.back();
  72. threads.pop_back();
  73. }
  74. delete threadException;
  75. while (!freeBuffers.empty()) {
  76. delete freeBuffers.back();
  77. freeBuffers.pop_back();
  78. }
  79. delete consumerCond;
  80. delete producerCond;
  81. delete queueMutex;
  82. for (size_t i = 0; i < sizeof(decoders)/sizeof(decoders[0]); i++)
  83. delete decoders[i];
  84. }
  85. void DecodeManager::decodeRect(const Rect& r, int encoding,
  86. ModifiablePixelBuffer* pb)
  87. {
  88. Decoder *decoder;
  89. rdr::MemOutStream *bufferStream;
  90. QueueEntry *entry;
  91. assert(pb != NULL);
  92. if (!Decoder::supported(encoding)) {
  93. vlog.error("Unknown encoding %d", encoding);
  94. throw rdr::Exception("Unknown encoding");
  95. }
  96. if (!decoders[encoding]) {
  97. decoders[encoding] = Decoder::createDecoder(encoding);
  98. if (!decoders[encoding]) {
  99. vlog.error("Unknown encoding %d", encoding);
  100. throw rdr::Exception("Unknown encoding");
  101. }
  102. }
  103. decoder = decoders[encoding];
  104. // Fast path for single CPU machines to avoid the context
  105. // switching overhead
  106. if (threads.empty()) {
  107. bufferStream = freeBuffers.front();
  108. bufferStream->clear();
  109. decoder->readRect(r, conn->getInStream(), conn->server, bufferStream);
  110. decoder->decodeRect(r, bufferStream->data(), bufferStream->length(),
  111. conn->server, pb);
  112. return;
  113. }
  114. // Wait for an available memory buffer
  115. queueMutex->lock();
  116. while (freeBuffers.empty())
  117. producerCond->wait();
  118. // Don't pop the buffer in case we throw an exception
  119. // whilst reading
  120. bufferStream = freeBuffers.front();
  121. queueMutex->unlock();
  122. // First check if any thread has encountered a problem
  123. throwThreadException();
  124. // Read the rect
  125. bufferStream->clear();
  126. decoder->readRect(r, conn->getInStream(), conn->server, bufferStream);
  127. // Then try to put it on the queue
  128. entry = new QueueEntry;
  129. entry->active = false;
  130. entry->rect = r;
  131. entry->encoding = encoding;
  132. entry->decoder = decoder;
  133. entry->server = &conn->server;
  134. entry->pb = pb;
  135. entry->bufferStream = bufferStream;
  136. decoder->getAffectedRegion(r, bufferStream->data(),
  137. bufferStream->length(), conn->server,
  138. &entry->affectedRegion);
  139. queueMutex->lock();
  140. // The workers add buffers to the end so it's safe to assume
  141. // the front is still the same buffer
  142. freeBuffers.pop_front();
  143. workQueue.push_back(entry);
  144. // We only put a single entry on the queue so waking a single
  145. // thread is sufficient
  146. consumerCond->signal();
  147. queueMutex->unlock();
  148. }
  149. void DecodeManager::flush()
  150. {
  151. queueMutex->lock();
  152. while (!workQueue.empty())
  153. producerCond->wait();
  154. queueMutex->unlock();
  155. throwThreadException();
  156. }
  157. void DecodeManager::setThreadException(const rdr::Exception& e)
  158. {
  159. os::AutoMutex a(queueMutex);
  160. if (threadException != NULL)
  161. return;
  162. threadException = new rdr::Exception("Exception on worker thread: %s", e.str());
  163. }
  164. void DecodeManager::throwThreadException()
  165. {
  166. os::AutoMutex a(queueMutex);
  167. if (threadException == NULL)
  168. return;
  169. rdr::Exception e(*threadException);
  170. delete threadException;
  171. threadException = NULL;
  172. throw e;
  173. }
  174. DecodeManager::DecodeThread::DecodeThread(DecodeManager* manager)
  175. {
  176. this->manager = manager;
  177. stopRequested = false;
  178. start();
  179. }
  180. DecodeManager::DecodeThread::~DecodeThread()
  181. {
  182. stop();
  183. wait();
  184. }
  185. void DecodeManager::DecodeThread::stop()
  186. {
  187. os::AutoMutex a(manager->queueMutex);
  188. if (!isRunning())
  189. return;
  190. stopRequested = true;
  191. // We can't wake just this thread, so wake everyone
  192. manager->consumerCond->broadcast();
  193. }
  194. void DecodeManager::DecodeThread::worker()
  195. {
  196. manager->queueMutex->lock();
  197. while (!stopRequested) {
  198. DecodeManager::QueueEntry *entry;
  199. // Look for an available entry in the work queue
  200. entry = findEntry();
  201. if (entry == NULL) {
  202. // Wait and try again
  203. manager->consumerCond->wait();
  204. continue;
  205. }
  206. // This is ours now
  207. entry->active = true;
  208. manager->queueMutex->unlock();
  209. // Do the actual decoding
  210. try {
  211. entry->decoder->decodeRect(entry->rect, entry->bufferStream->data(),
  212. entry->bufferStream->length(),
  213. *entry->server, entry->pb);
  214. } catch (rdr::Exception& e) {
  215. manager->setThreadException(e);
  216. } catch(...) {
  217. assert(false);
  218. }
  219. manager->queueMutex->lock();
  220. // Remove the entry from the queue and give back the memory buffer
  221. manager->freeBuffers.push_back(entry->bufferStream);
  222. manager->workQueue.remove(entry);
  223. delete entry;
  224. // Wake the main thread in case it is waiting for a memory buffer
  225. manager->producerCond->signal();
  226. // This rect might have been blocking multiple other rects, so
  227. // wake up every worker thread
  228. if (manager->workQueue.size() > 1)
  229. manager->consumerCond->broadcast();
  230. }
  231. manager->queueMutex->unlock();
  232. }
  233. DecodeManager::QueueEntry* DecodeManager::DecodeThread::findEntry()
  234. {
  235. std::list<DecodeManager::QueueEntry*>::iterator iter;
  236. Region lockedRegion;
  237. if (manager->workQueue.empty())
  238. return NULL;
  239. if (!manager->workQueue.front()->active)
  240. return manager->workQueue.front();
  241. for (iter = manager->workQueue.begin();
  242. iter != manager->workQueue.end();
  243. ++iter) {
  244. DecodeManager::QueueEntry* entry;
  245. std::list<DecodeManager::QueueEntry*>::iterator iter2;
  246. entry = *iter;
  247. // Another thread working on this?
  248. if (entry->active)
  249. goto next;
  250. // If this is an ordered decoder then make sure this is the first
  251. // rectangle in the queue for that decoder
  252. if (entry->decoder->flags & DecoderOrdered) {
  253. for (iter2 = manager->workQueue.begin(); iter2 != iter; ++iter2) {
  254. if (entry->encoding == (*iter2)->encoding)
  255. goto next;
  256. }
  257. }
  258. // For a partially ordered decoder we must ask the decoder for each
  259. // pair of rectangles.
  260. if (entry->decoder->flags & DecoderPartiallyOrdered) {
  261. for (iter2 = manager->workQueue.begin(); iter2 != iter; ++iter2) {
  262. if (entry->encoding != (*iter2)->encoding)
  263. continue;
  264. if (entry->decoder->doRectsConflict(entry->rect,
  265. entry->bufferStream->data(),
  266. entry->bufferStream->length(),
  267. (*iter2)->rect,
  268. (*iter2)->bufferStream->data(),
  269. (*iter2)->bufferStream->length(),
  270. *entry->server))
  271. goto next;
  272. }
  273. }
  274. // Check overlap with earlier rectangles
  275. if (!lockedRegion.intersect(entry->affectedRegion).is_empty())
  276. goto next;
  277. return entry;
  278. next:
  279. lockedRegion.assign_union(entry->affectedRegion);
  280. }
  281. return NULL;
  282. }