You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

DecodeManager.java 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. /* Copyright 2015 Pierre Ossman for Cendio AB
  2. * Copyright 2016-2019 Brian P. Hinz
  3. *
  4. * This is free software; you can redistribute it and/or modify
  5. * it under the terms of the GNU General Public License as published by
  6. * the Free Software Foundation; either version 2 of the License, or
  7. * (at your option) any later version.
  8. *
  9. * This software is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. * GNU General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU General Public License
  15. * along with this software; if not, write to the Free Software
  16. * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
  17. * USA.
  18. */
  19. package com.tigervnc.rfb;
  20. import java.lang.Runtime;
  21. import java.util.*;
  22. import java.util.concurrent.*;
  23. import java.util.concurrent.locks.*;
  24. import com.tigervnc.rdr.*;
  25. import com.tigervnc.rdr.Exception;
  26. import static com.tigervnc.rfb.Decoder.DecoderFlags.*;
  27. public class DecodeManager {
  28. static LogWriter vlog = new LogWriter("DecodeManager");
  29. public DecodeManager(CConnection conn) {
  30. int cpuCount;
  31. this.conn = conn; threadException = null;
  32. decoders = new Decoder[Encodings.encodingMax+1];
  33. queueMutex = new ReentrantLock();
  34. producerCond = queueMutex.newCondition();
  35. consumerCond = queueMutex.newCondition();
  36. cpuCount = Runtime.getRuntime().availableProcessors();
  37. if (cpuCount == 0) {
  38. vlog.error("Unable to determine the number of CPU cores on this system");
  39. cpuCount = 1;
  40. } else {
  41. vlog.info("Detected "+cpuCount+" CPU core(s)");
  42. // No point creating more threads than this, they'll just end up
  43. // wasting CPU fighting for locks
  44. if (cpuCount > 4)
  45. cpuCount = 4;
  46. // The overhead of threading is small, but not small enough to
  47. // ignore on single CPU systems
  48. if (cpuCount == 1)
  49. vlog.info("Decoding data on main thread");
  50. else
  51. vlog.info("Creating "+cpuCount+" decoder thread(s)");
  52. }
  53. freeBuffers = new ArrayDeque<MemOutStream>(cpuCount*2);
  54. workQueue = new ArrayDeque<QueueEntry>(cpuCount);
  55. threads = new ArrayList<DecodeThread>(cpuCount);
  56. while (cpuCount-- > 0) {
  57. // Twice as many possible entries in the queue as there
  58. // are worker threads to make sure they don't stall
  59. try {
  60. freeBuffers.addLast(new MemOutStream());
  61. freeBuffers.addLast(new MemOutStream());
  62. threads.add(new DecodeThread(this));
  63. } catch (IllegalStateException e) { }
  64. }
  65. }
  66. public void decodeRect(Rect r, int encoding,
  67. ModifiablePixelBuffer pb)
  68. {
  69. Decoder decoder;
  70. MemOutStream bufferStream;
  71. QueueEntry entry;
  72. assert(pb != null);
  73. if (!Decoder.supported(encoding)) {
  74. vlog.error("Unknown encoding " + encoding);
  75. throw new Exception("Unknown encoding");
  76. }
  77. if (decoders[encoding] == null) {
  78. decoders[encoding] = Decoder.createDecoder(encoding);
  79. if (decoders[encoding] == null) {
  80. vlog.error("Unknown encoding " + encoding);
  81. throw new Exception("Unknown encoding");
  82. }
  83. }
  84. decoder = decoders[encoding];
  85. // Fast path for single CPU machines to avoid the context
  86. // switching overhead
  87. if (threads.size() == 1) {
  88. bufferStream = freeBuffers.getFirst();
  89. bufferStream.clear();
  90. decoder.readRect(r, conn.getInStream(), conn.server, bufferStream);
  91. decoder.decodeRect(r, (Object)bufferStream.data(), bufferStream.length(),
  92. conn.server, pb);
  93. return;
  94. }
  95. // Wait for an available memory buffer
  96. queueMutex.lock();
  97. try {
  98. while (freeBuffers.isEmpty())
  99. try {
  100. producerCond.await();
  101. } catch (InterruptedException e) { }
  102. // Don't pop the buffer in case we throw an exception
  103. // whilst reading
  104. bufferStream = freeBuffers.getFirst();
  105. } finally {
  106. queueMutex.unlock();
  107. }
  108. // First check if any thread has encountered a problem
  109. throwThreadException();
  110. // Read the rect
  111. bufferStream.clear();
  112. decoder.readRect(r, conn.getInStream(), conn.server, bufferStream);
  113. // Then try to put it on the queue
  114. entry = new QueueEntry();
  115. entry.active = false;
  116. entry.rect = r;
  117. entry.encoding = encoding;
  118. entry.decoder = decoder;
  119. entry.server = conn.server;
  120. entry.pb = pb;
  121. entry.bufferStream = bufferStream;
  122. decoder.getAffectedRegion(r, bufferStream.data(),
  123. bufferStream.length(), conn.server,
  124. entry.affectedRegion);
  125. queueMutex.lock();
  126. try {
  127. // The workers add buffers to the end so it's safe to assume
  128. // the front is still the same buffer
  129. freeBuffers.removeFirst();
  130. workQueue.addLast(entry);
  131. // We only put a single entry on the queue so waking a single
  132. // thread is sufficient
  133. consumerCond.signal();
  134. } finally {
  135. queueMutex.unlock();
  136. }
  137. }
  138. public void flush()
  139. {
  140. queueMutex.lock();
  141. try {
  142. while (!workQueue.isEmpty())
  143. try {
  144. producerCond.await();
  145. } catch (InterruptedException e) { }
  146. } finally {
  147. queueMutex.unlock();
  148. }
  149. throwThreadException();
  150. }
  151. private void setThreadException(Exception e)
  152. {
  153. //os::AutoMutex a(queueMutex);
  154. queueMutex.lock();
  155. try {
  156. if (threadException != null)
  157. return;
  158. threadException =
  159. new Exception("Exception on worker thread: "+e.getMessage());
  160. } finally {
  161. queueMutex.unlock();
  162. }
  163. }
  164. private void throwThreadException()
  165. {
  166. //os::AutoMutex a(queueMutex);
  167. queueMutex.lock();
  168. try {
  169. if (threadException == null)
  170. return;
  171. Exception e = new Exception(threadException.getMessage());
  172. threadException = null;
  173. throw e;
  174. } finally {
  175. queueMutex.unlock();
  176. }
  177. }
  178. private class QueueEntry {
  179. public QueueEntry() {
  180. affectedRegion = new Region();
  181. }
  182. public boolean active;
  183. public Rect rect;
  184. public int encoding;
  185. public Decoder decoder;
  186. public ServerParams server;
  187. public ModifiablePixelBuffer pb;
  188. public MemOutStream bufferStream;
  189. public Region affectedRegion;
  190. }
  191. private class DecodeThread implements Runnable {
  192. public DecodeThread(DecodeManager manager)
  193. {
  194. this.manager = manager;
  195. stopRequested = false;
  196. (thread = new Thread(this, "Decoder Thread")).start();
  197. }
  198. public void stop()
  199. {
  200. //os::AutoMutex a(manager.queueMutex);
  201. manager.queueMutex.lock();
  202. try {
  203. if (!thread.isAlive())
  204. return;
  205. stopRequested = true;
  206. // We can't wake just this thread, so wake everyone
  207. manager.consumerCond.signalAll();
  208. } finally {
  209. manager.queueMutex.unlock();
  210. }
  211. }
  212. public void run()
  213. {
  214. manager.queueMutex.lock();
  215. while (!stopRequested) {
  216. QueueEntry entry;
  217. // Look for an available entry in the work queue
  218. entry = findEntry();
  219. if (entry == null) {
  220. // Wait and try again
  221. try {
  222. manager.consumerCond.await();
  223. } catch (InterruptedException e) { }
  224. continue;
  225. }
  226. // This is ours now
  227. entry.active = true;
  228. manager.queueMutex.unlock();
  229. // Do the actual decoding
  230. try {
  231. entry.decoder.decodeRect(entry.rect, entry.bufferStream.data(),
  232. entry.bufferStream.length(),
  233. entry.server, entry.pb);
  234. } catch (com.tigervnc.rdr.Exception e) {
  235. manager.setThreadException(e);
  236. } catch(java.lang.Exception e) {
  237. assert(false);
  238. }
  239. manager.queueMutex.lock();
  240. // Remove the entry from the queue and give back the memory buffer
  241. manager.freeBuffers.addLast(entry.bufferStream);
  242. manager.workQueue.remove(entry);
  243. entry = null;
  244. // Wake the main thread in case it is waiting for a memory buffer
  245. manager.producerCond.signal();
  246. // This rect might have been blocking multiple other rects, so
  247. // wake up every worker thread
  248. if (manager.workQueue.size() > 1)
  249. manager.consumerCond.signalAll();
  250. }
  251. manager.queueMutex.unlock();
  252. }
  253. protected QueueEntry findEntry()
  254. {
  255. Iterator<QueueEntry> iter;
  256. Region lockedRegion = new Region();
  257. if (manager.workQueue.isEmpty())
  258. return null;
  259. if (!manager.workQueue.peek().active)
  260. return manager.workQueue.peek();
  261. next:for (iter = manager.workQueue.iterator(); iter.hasNext();) {
  262. QueueEntry entry, entry2;
  263. Iterator<QueueEntry> iter2;
  264. entry = iter.next();
  265. // Another thread working on this?
  266. if (entry.active) {
  267. lockedRegion.assign_union(entry.affectedRegion);
  268. continue next;
  269. }
  270. // If this is an ordered decoder then make sure this is the first
  271. // rectangle in the queue for that decoder
  272. if ((entry.decoder.flags & DecoderOrdered) != 0) {
  273. for (iter2 = manager.workQueue.iterator(); iter2.hasNext() &&
  274. !(entry2 = iter2.next()).equals(entry);) {
  275. if (entry.encoding == entry2.encoding) {
  276. lockedRegion.assign_union(entry.affectedRegion);
  277. continue next;
  278. }
  279. }
  280. }
  281. // For a partially ordered decoder we must ask the decoder for each
  282. // pair of rectangles.
  283. if ((entry.decoder.flags & DecoderPartiallyOrdered) != 0) {
  284. for (iter2 = manager.workQueue.iterator(); iter2.hasNext() &&
  285. !(entry2 = iter2.next()).equals(entry);) {
  286. if (entry.encoding != entry2.encoding)
  287. continue;
  288. if (entry.decoder.doRectsConflict(entry.rect,
  289. entry.bufferStream.data(),
  290. entry.bufferStream.length(),
  291. entry2.rect,
  292. entry2.bufferStream.data(),
  293. entry2.bufferStream.length(),
  294. entry.server))
  295. lockedRegion.assign_union(entry.affectedRegion);
  296. continue next;
  297. }
  298. }
  299. // Check overlap with earlier rectangles
  300. if (!lockedRegion.intersect(entry.affectedRegion).is_empty()) {
  301. lockedRegion.assign_union(entry.affectedRegion);
  302. continue next;
  303. }
  304. return entry;
  305. }
  306. return null;
  307. }
  308. private DecodeManager manager;
  309. private boolean stopRequested;
  310. private Thread thread;
  311. }
  312. private CConnection conn;
  313. private Decoder[] decoders;
  314. private ArrayDeque<MemOutStream> freeBuffers;
  315. private ArrayDeque<QueueEntry> workQueue;
  316. private ReentrantLock queueMutex;
  317. private Condition producerCond;
  318. private Condition consumerCond;
  319. private List<DecodeThread> threads;
  320. private com.tigervnc.rdr.Exception threadException;
  321. }