You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

DecodeManager.cxx 10KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. /* Copyright 2015 Pierre Ossman for Cendio AB
  2. *
  3. * This is free software; you can redistribute it and/or modify
  4. * it under the terms of the GNU General Public License as published by
  5. * the Free Software Foundation; either version 2 of the License, or
  6. * (at your option) any later version.
  7. *
  8. * This software is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. * GNU General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU General Public License
  14. * along with this software; if not, write to the Free Software
  15. * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
  16. * USA.
  17. */
  18. #ifdef HAVE_CONFIG_H
  19. #include <config.h>
  20. #endif
  21. #include <assert.h>
  22. #include <string.h>
  23. #include <rfb/CConnection.h>
  24. #include <rfb/DecodeManager.h>
  25. #include <rfb/Decoder.h>
  26. #include <rfb/Exception.h>
  27. #include <rfb/Region.h>
  28. #include <rfb/LogWriter.h>
  29. #include <rdr/Exception.h>
  30. #include <rdr/MemOutStream.h>
  31. #include <os/Mutex.h>
  32. using namespace rfb;
  33. static LogWriter vlog("DecodeManager");
  34. DecodeManager::DecodeManager(CConnection *conn) :
  35. conn(conn), threadException(NULL)
  36. {
  37. size_t cpuCount;
  38. memset(decoders, 0, sizeof(decoders));
  39. memset(stats, 0, sizeof(stats));
  40. queueMutex = new os::Mutex();
  41. producerCond = new os::Condition(queueMutex);
  42. consumerCond = new os::Condition(queueMutex);
  43. cpuCount = os::Thread::getSystemCPUCount();
  44. if (cpuCount == 0) {
  45. vlog.error("Unable to determine the number of CPU cores on this system");
  46. cpuCount = 1;
  47. } else {
  48. vlog.info("Detected %d CPU core(s)", (int)cpuCount);
  49. // No point creating more threads than this, they'll just end up
  50. // wasting CPU fighting for locks
  51. if (cpuCount > 4)
  52. cpuCount = 4;
  53. }
  54. vlog.info("Creating %d decoder thread(s)", (int)cpuCount);
  55. while (cpuCount--) {
  56. // Twice as many possible entries in the queue as there
  57. // are worker threads to make sure they don't stall
  58. freeBuffers.push_back(new rdr::MemOutStream());
  59. freeBuffers.push_back(new rdr::MemOutStream());
  60. threads.push_back(new DecodeThread(this));
  61. }
  62. }
  63. DecodeManager::~DecodeManager()
  64. {
  65. logStats();
  66. while (!threads.empty()) {
  67. delete threads.back();
  68. threads.pop_back();
  69. }
  70. delete threadException;
  71. while (!freeBuffers.empty()) {
  72. delete freeBuffers.back();
  73. freeBuffers.pop_back();
  74. }
  75. delete consumerCond;
  76. delete producerCond;
  77. delete queueMutex;
  78. for (size_t i = 0; i < sizeof(decoders)/sizeof(decoders[0]); i++)
  79. delete decoders[i];
  80. }
  81. bool DecodeManager::decodeRect(const Rect& r, int encoding,
  82. ModifiablePixelBuffer* pb)
  83. {
  84. Decoder *decoder;
  85. rdr::MemOutStream *bufferStream;
  86. int equiv;
  87. QueueEntry *entry;
  88. assert(pb != NULL);
  89. if (!Decoder::supported(encoding)) {
  90. vlog.error("Unknown encoding %d", encoding);
  91. throw rdr::Exception("Unknown encoding");
  92. }
  93. if (!decoders[encoding]) {
  94. decoders[encoding] = Decoder::createDecoder(encoding);
  95. if (!decoders[encoding]) {
  96. vlog.error("Unknown encoding %d", encoding);
  97. throw rdr::Exception("Unknown encoding");
  98. }
  99. }
  100. decoder = decoders[encoding];
  101. // Wait for an available memory buffer
  102. queueMutex->lock();
  103. // FIXME: Should we return and let other things run here?
  104. while (freeBuffers.empty())
  105. producerCond->wait();
  106. // Don't pop the buffer in case we throw an exception
  107. // whilst reading
  108. bufferStream = freeBuffers.front();
  109. queueMutex->unlock();
  110. // First check if any thread has encountered a problem
  111. throwThreadException();
  112. // Read the rect
  113. bufferStream->clear();
  114. try {
  115. if (!decoder->readRect(r, conn->getInStream(), conn->server, bufferStream))
  116. return false;
  117. } catch (rdr::Exception& e) {
  118. throw Exception("Error reading rect: %s", e.str());
  119. }
  120. stats[encoding].rects++;
  121. stats[encoding].bytes += 12 + bufferStream->length();
  122. stats[encoding].pixels += r.area();
  123. equiv = 12 + r.area() * (conn->server.pf().bpp/8);
  124. stats[encoding].equivalent += equiv;
  125. // Then try to put it on the queue
  126. entry = new QueueEntry;
  127. entry->active = false;
  128. entry->rect = r;
  129. entry->encoding = encoding;
  130. entry->decoder = decoder;
  131. entry->server = &conn->server;
  132. entry->pb = pb;
  133. entry->bufferStream = bufferStream;
  134. decoder->getAffectedRegion(r, bufferStream->data(),
  135. bufferStream->length(), conn->server,
  136. &entry->affectedRegion);
  137. queueMutex->lock();
  138. // The workers add buffers to the end so it's safe to assume
  139. // the front is still the same buffer
  140. freeBuffers.pop_front();
  141. workQueue.push_back(entry);
  142. // We only put a single entry on the queue so waking a single
  143. // thread is sufficient
  144. consumerCond->signal();
  145. queueMutex->unlock();
  146. return true;
  147. }
  148. void DecodeManager::flush()
  149. {
  150. queueMutex->lock();
  151. while (!workQueue.empty())
  152. producerCond->wait();
  153. queueMutex->unlock();
  154. throwThreadException();
  155. }
  156. void DecodeManager::logStats()
  157. {
  158. size_t i;
  159. unsigned rects;
  160. unsigned long long pixels, bytes, equivalent;
  161. double ratio;
  162. char a[1024], b[1024];
  163. rects = 0;
  164. pixels = bytes = equivalent = 0;
  165. for (i = 0;i < (sizeof(stats)/sizeof(stats[0]));i++) {
  166. // Did this class do anything at all?
  167. if (stats[i].rects == 0)
  168. continue;
  169. rects += stats[i].rects;
  170. pixels += stats[i].pixels;
  171. bytes += stats[i].bytes;
  172. equivalent += stats[i].equivalent;
  173. ratio = (double)stats[i].equivalent / stats[i].bytes;
  174. siPrefix(stats[i].rects, "rects", a, sizeof(a));
  175. siPrefix(stats[i].pixels, "pixels", b, sizeof(b));
  176. vlog.info(" %s: %s, %s", encodingName(i), a, b);
  177. iecPrefix(stats[i].bytes, "B", a, sizeof(a));
  178. vlog.info(" %*s %s (1:%g ratio)",
  179. (int)strlen(encodingName(i)), "",
  180. a, ratio);
  181. }
  182. ratio = (double)equivalent / bytes;
  183. siPrefix(rects, "rects", a, sizeof(a));
  184. siPrefix(pixels, "pixels", b, sizeof(b));
  185. vlog.info(" Total: %s, %s", a, b);
  186. iecPrefix(bytes, "B", a, sizeof(a));
  187. vlog.info(" %s (1:%g ratio)", a, ratio);
  188. }
  189. void DecodeManager::setThreadException(const rdr::Exception& e)
  190. {
  191. os::AutoMutex a(queueMutex);
  192. if (threadException != NULL)
  193. return;
  194. threadException = new rdr::Exception("Exception on worker thread: %s", e.str());
  195. }
  196. void DecodeManager::throwThreadException()
  197. {
  198. os::AutoMutex a(queueMutex);
  199. if (threadException == NULL)
  200. return;
  201. rdr::Exception e(*threadException);
  202. delete threadException;
  203. threadException = NULL;
  204. throw e;
  205. }
  206. DecodeManager::DecodeThread::DecodeThread(DecodeManager* manager)
  207. {
  208. this->manager = manager;
  209. stopRequested = false;
  210. start();
  211. }
  212. DecodeManager::DecodeThread::~DecodeThread()
  213. {
  214. stop();
  215. wait();
  216. }
  217. void DecodeManager::DecodeThread::stop()
  218. {
  219. os::AutoMutex a(manager->queueMutex);
  220. if (!isRunning())
  221. return;
  222. stopRequested = true;
  223. // We can't wake just this thread, so wake everyone
  224. manager->consumerCond->broadcast();
  225. }
  226. void DecodeManager::DecodeThread::worker()
  227. {
  228. manager->queueMutex->lock();
  229. while (!stopRequested) {
  230. DecodeManager::QueueEntry *entry;
  231. // Look for an available entry in the work queue
  232. entry = findEntry();
  233. if (entry == NULL) {
  234. // Wait and try again
  235. manager->consumerCond->wait();
  236. continue;
  237. }
  238. // This is ours now
  239. entry->active = true;
  240. manager->queueMutex->unlock();
  241. // Do the actual decoding
  242. try {
  243. entry->decoder->decodeRect(entry->rect, entry->bufferStream->data(),
  244. entry->bufferStream->length(),
  245. *entry->server, entry->pb);
  246. } catch (rdr::Exception& e) {
  247. manager->setThreadException(e);
  248. } catch(...) {
  249. assert(false);
  250. }
  251. manager->queueMutex->lock();
  252. // Remove the entry from the queue and give back the memory buffer
  253. manager->freeBuffers.push_back(entry->bufferStream);
  254. manager->workQueue.remove(entry);
  255. delete entry;
  256. // Wake the main thread in case it is waiting for a memory buffer
  257. manager->producerCond->signal();
  258. // This rect might have been blocking multiple other rects, so
  259. // wake up every worker thread
  260. if (manager->workQueue.size() > 1)
  261. manager->consumerCond->broadcast();
  262. }
  263. manager->queueMutex->unlock();
  264. }
  265. DecodeManager::QueueEntry* DecodeManager::DecodeThread::findEntry()
  266. {
  267. std::list<DecodeManager::QueueEntry*>::iterator iter;
  268. Region lockedRegion;
  269. if (manager->workQueue.empty())
  270. return NULL;
  271. if (!manager->workQueue.front()->active)
  272. return manager->workQueue.front();
  273. for (iter = manager->workQueue.begin();
  274. iter != manager->workQueue.end();
  275. ++iter) {
  276. DecodeManager::QueueEntry* entry;
  277. std::list<DecodeManager::QueueEntry*>::iterator iter2;
  278. entry = *iter;
  279. // Another thread working on this?
  280. if (entry->active)
  281. goto next;
  282. // If this is an ordered decoder then make sure this is the first
  283. // rectangle in the queue for that decoder
  284. if (entry->decoder->flags & DecoderOrdered) {
  285. for (iter2 = manager->workQueue.begin(); iter2 != iter; ++iter2) {
  286. if (entry->encoding == (*iter2)->encoding)
  287. goto next;
  288. }
  289. }
  290. // For a partially ordered decoder we must ask the decoder for each
  291. // pair of rectangles.
  292. if (entry->decoder->flags & DecoderPartiallyOrdered) {
  293. for (iter2 = manager->workQueue.begin(); iter2 != iter; ++iter2) {
  294. if (entry->encoding != (*iter2)->encoding)
  295. continue;
  296. if (entry->decoder->doRectsConflict(entry->rect,
  297. entry->bufferStream->data(),
  298. entry->bufferStream->length(),
  299. (*iter2)->rect,
  300. (*iter2)->bufferStream->data(),
  301. (*iter2)->bufferStream->length(),
  302. *entry->server))
  303. goto next;
  304. }
  305. }
  306. // Check overlap with earlier rectangles
  307. if (!lockedRegion.intersect(entry->affectedRegion).is_empty())
  308. goto next;
  309. return entry;
  310. next:
  311. lockedRegion.assign_union(entry->affectedRegion);
  312. }
  313. return NULL;
  314. }