You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

DecodeManager.cxx 9.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. /* Copyright 2015 Pierre Ossman for Cendio AB
  2. *
  3. * This is free software; you can redistribute it and/or modify
  4. * it under the terms of the GNU General Public License as published by
  5. * the Free Software Foundation; either version 2 of the License, or
  6. * (at your option) any later version.
  7. *
  8. * This software is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. * GNU General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU General Public License
  14. * along with this software; if not, write to the Free Software
  15. * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
  16. * USA.
  17. */
  18. #include <assert.h>
  19. #include <string.h>
  20. #include <rfb/CConnection.h>
  21. #include <rfb/DecodeManager.h>
  22. #include <rfb/Decoder.h>
  23. #include <rfb/Exception.h>
  24. #include <rfb/Region.h>
  25. #include <rfb/LogWriter.h>
  26. #include <rdr/Exception.h>
  27. #include <rdr/MemOutStream.h>
  28. #include <os/Mutex.h>
  29. using namespace rfb;
  30. static LogWriter vlog("DecodeManager");
  31. DecodeManager::DecodeManager(CConnection *conn) :
  32. conn(conn), threadException(NULL)
  33. {
  34. size_t cpuCount;
  35. memset(decoders, 0, sizeof(decoders));
  36. queueMutex = new os::Mutex();
  37. producerCond = new os::Condition(queueMutex);
  38. consumerCond = new os::Condition(queueMutex);
  39. cpuCount = os::Thread::getSystemCPUCount();
  40. if (cpuCount == 0) {
  41. vlog.error("Unable to determine the number of CPU cores on this system");
  42. cpuCount = 1;
  43. } else {
  44. vlog.info("Detected %d CPU core(s)", (int)cpuCount);
  45. // No point creating more threads than this, they'll just end up
  46. // wasting CPU fighting for locks
  47. if (cpuCount > 4)
  48. cpuCount = 4;
  49. // The overhead of threading is small, but not small enough to
  50. // ignore on single CPU systems
  51. if (cpuCount == 1)
  52. vlog.info("Decoding data on main thread");
  53. else
  54. vlog.info("Creating %d decoder thread(s)", (int)cpuCount);
  55. }
  56. if (cpuCount == 1) {
  57. // Threads are not used on single CPU machines
  58. freeBuffers.push_back(new rdr::MemOutStream());
  59. return;
  60. }
  61. while (cpuCount--) {
  62. // Twice as many possible entries in the queue as there
  63. // are worker threads to make sure they don't stall
  64. freeBuffers.push_back(new rdr::MemOutStream());
  65. freeBuffers.push_back(new rdr::MemOutStream());
  66. threads.push_back(new DecodeThread(this));
  67. }
  68. }
  69. DecodeManager::~DecodeManager()
  70. {
  71. while (!threads.empty()) {
  72. delete threads.back();
  73. threads.pop_back();
  74. }
  75. delete threadException;
  76. while (!freeBuffers.empty()) {
  77. delete freeBuffers.back();
  78. freeBuffers.pop_back();
  79. }
  80. delete consumerCond;
  81. delete producerCond;
  82. delete queueMutex;
  83. for (size_t i = 0; i < sizeof(decoders)/sizeof(decoders[0]); i++)
  84. delete decoders[i];
  85. }
  86. bool DecodeManager::decodeRect(const Rect& r, int encoding,
  87. ModifiablePixelBuffer* pb)
  88. {
  89. Decoder *decoder;
  90. rdr::MemOutStream *bufferStream;
  91. QueueEntry *entry;
  92. assert(pb != NULL);
  93. if (!Decoder::supported(encoding)) {
  94. vlog.error("Unknown encoding %d", encoding);
  95. throw rdr::Exception("Unknown encoding");
  96. }
  97. if (!decoders[encoding]) {
  98. decoders[encoding] = Decoder::createDecoder(encoding);
  99. if (!decoders[encoding]) {
  100. vlog.error("Unknown encoding %d", encoding);
  101. throw rdr::Exception("Unknown encoding");
  102. }
  103. }
  104. decoder = decoders[encoding];
  105. // Fast path for single CPU machines to avoid the context
  106. // switching overhead
  107. if (threads.empty()) {
  108. bufferStream = freeBuffers.front();
  109. bufferStream->clear();
  110. if (!decoder->readRect(r, conn->getInStream(), conn->server, bufferStream))
  111. return false;
  112. try {
  113. decoder->decodeRect(r, bufferStream->data(), bufferStream->length(),
  114. conn->server, pb);
  115. } catch (rdr::Exception& e) {
  116. throw Exception("Error decoding rect: %s", e.str());
  117. }
  118. return true;
  119. }
  120. // Wait for an available memory buffer
  121. queueMutex->lock();
  122. // FIXME: Should we return and let other things run here?
  123. while (freeBuffers.empty())
  124. producerCond->wait();
  125. // Don't pop the buffer in case we throw an exception
  126. // whilst reading
  127. bufferStream = freeBuffers.front();
  128. queueMutex->unlock();
  129. // First check if any thread has encountered a problem
  130. throwThreadException();
  131. // Read the rect
  132. bufferStream->clear();
  133. if (!decoder->readRect(r, conn->getInStream(), conn->server, bufferStream))
  134. return false;
  135. // Then try to put it on the queue
  136. entry = new QueueEntry;
  137. entry->active = false;
  138. entry->rect = r;
  139. entry->encoding = encoding;
  140. entry->decoder = decoder;
  141. entry->server = &conn->server;
  142. entry->pb = pb;
  143. entry->bufferStream = bufferStream;
  144. decoder->getAffectedRegion(r, bufferStream->data(),
  145. bufferStream->length(), conn->server,
  146. &entry->affectedRegion);
  147. queueMutex->lock();
  148. // The workers add buffers to the end so it's safe to assume
  149. // the front is still the same buffer
  150. freeBuffers.pop_front();
  151. workQueue.push_back(entry);
  152. // We only put a single entry on the queue so waking a single
  153. // thread is sufficient
  154. consumerCond->signal();
  155. queueMutex->unlock();
  156. return true;
  157. }
  158. void DecodeManager::flush()
  159. {
  160. queueMutex->lock();
  161. while (!workQueue.empty())
  162. producerCond->wait();
  163. queueMutex->unlock();
  164. throwThreadException();
  165. }
  166. void DecodeManager::setThreadException(const rdr::Exception& e)
  167. {
  168. os::AutoMutex a(queueMutex);
  169. if (threadException != NULL)
  170. return;
  171. threadException = new rdr::Exception("Exception on worker thread: %s", e.str());
  172. }
  173. void DecodeManager::throwThreadException()
  174. {
  175. os::AutoMutex a(queueMutex);
  176. if (threadException == NULL)
  177. return;
  178. rdr::Exception e(*threadException);
  179. delete threadException;
  180. threadException = NULL;
  181. throw e;
  182. }
  183. DecodeManager::DecodeThread::DecodeThread(DecodeManager* manager)
  184. {
  185. this->manager = manager;
  186. stopRequested = false;
  187. start();
  188. }
  189. DecodeManager::DecodeThread::~DecodeThread()
  190. {
  191. stop();
  192. wait();
  193. }
  194. void DecodeManager::DecodeThread::stop()
  195. {
  196. os::AutoMutex a(manager->queueMutex);
  197. if (!isRunning())
  198. return;
  199. stopRequested = true;
  200. // We can't wake just this thread, so wake everyone
  201. manager->consumerCond->broadcast();
  202. }
  203. void DecodeManager::DecodeThread::worker()
  204. {
  205. manager->queueMutex->lock();
  206. while (!stopRequested) {
  207. DecodeManager::QueueEntry *entry;
  208. // Look for an available entry in the work queue
  209. entry = findEntry();
  210. if (entry == NULL) {
  211. // Wait and try again
  212. manager->consumerCond->wait();
  213. continue;
  214. }
  215. // This is ours now
  216. entry->active = true;
  217. manager->queueMutex->unlock();
  218. // Do the actual decoding
  219. try {
  220. entry->decoder->decodeRect(entry->rect, entry->bufferStream->data(),
  221. entry->bufferStream->length(),
  222. *entry->server, entry->pb);
  223. } catch (rdr::Exception& e) {
  224. manager->setThreadException(e);
  225. } catch(...) {
  226. assert(false);
  227. }
  228. manager->queueMutex->lock();
  229. // Remove the entry from the queue and give back the memory buffer
  230. manager->freeBuffers.push_back(entry->bufferStream);
  231. manager->workQueue.remove(entry);
  232. delete entry;
  233. // Wake the main thread in case it is waiting for a memory buffer
  234. manager->producerCond->signal();
  235. // This rect might have been blocking multiple other rects, so
  236. // wake up every worker thread
  237. if (manager->workQueue.size() > 1)
  238. manager->consumerCond->broadcast();
  239. }
  240. manager->queueMutex->unlock();
  241. }
  242. DecodeManager::QueueEntry* DecodeManager::DecodeThread::findEntry()
  243. {
  244. std::list<DecodeManager::QueueEntry*>::iterator iter;
  245. Region lockedRegion;
  246. if (manager->workQueue.empty())
  247. return NULL;
  248. if (!manager->workQueue.front()->active)
  249. return manager->workQueue.front();
  250. for (iter = manager->workQueue.begin();
  251. iter != manager->workQueue.end();
  252. ++iter) {
  253. DecodeManager::QueueEntry* entry;
  254. std::list<DecodeManager::QueueEntry*>::iterator iter2;
  255. entry = *iter;
  256. // Another thread working on this?
  257. if (entry->active)
  258. goto next;
  259. // If this is an ordered decoder then make sure this is the first
  260. // rectangle in the queue for that decoder
  261. if (entry->decoder->flags & DecoderOrdered) {
  262. for (iter2 = manager->workQueue.begin(); iter2 != iter; ++iter2) {
  263. if (entry->encoding == (*iter2)->encoding)
  264. goto next;
  265. }
  266. }
  267. // For a partially ordered decoder we must ask the decoder for each
  268. // pair of rectangles.
  269. if (entry->decoder->flags & DecoderPartiallyOrdered) {
  270. for (iter2 = manager->workQueue.begin(); iter2 != iter; ++iter2) {
  271. if (entry->encoding != (*iter2)->encoding)
  272. continue;
  273. if (entry->decoder->doRectsConflict(entry->rect,
  274. entry->bufferStream->data(),
  275. entry->bufferStream->length(),
  276. (*iter2)->rect,
  277. (*iter2)->bufferStream->data(),
  278. (*iter2)->bufferStream->length(),
  279. *entry->server))
  280. goto next;
  281. }
  282. }
  283. // Check overlap with earlier rectangles
  284. if (!lockedRegion.intersect(entry->affectedRegion).is_empty())
  285. goto next;
  286. return entry;
  287. next:
  288. lockedRegion.assign_union(entry->affectedRegion);
  289. }
  290. return NULL;
  291. }