diff options
Diffstat (limited to 'java/com/tigervnc/rfb/DecodeManager.java')
-rw-r--r-- | java/com/tigervnc/rfb/DecodeManager.java | 386 |
1 files changed, 386 insertions, 0 deletions
diff --git a/java/com/tigervnc/rfb/DecodeManager.java b/java/com/tigervnc/rfb/DecodeManager.java new file mode 100644 index 00000000..9e254ad2 --- /dev/null +++ b/java/com/tigervnc/rfb/DecodeManager.java @@ -0,0 +1,386 @@ +/* Copyright 2015 Pierre Ossman for Cendio AB + * Copyright 2016 Brian P. Hinz + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, + * USA. + */ + +package com.tigervnc.rfb; + +import java.lang.Runtime; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.locks.*; + +import com.tigervnc.rdr.*; +import com.tigervnc.rdr.Exception; + +import static com.tigervnc.rfb.Decoder.DecoderFlags.*; + +public class DecodeManager { + + static LogWriter vlog = new LogWriter("DecodeManager"); + + public DecodeManager(CConnection conn) { + int cpuCount; + + this.conn = conn; threadException = null; + decoders = new Decoder[Encodings.encodingMax+1]; + + queueMutex = new ReentrantLock(); + producerCond = queueMutex.newCondition(); + consumerCond = queueMutex.newCondition(); + + //cpuCount = 1; + cpuCount = Runtime.getRuntime().availableProcessors(); + if (cpuCount == 0) { + vlog.error("Unable to determine the number of CPU cores on this system"); + cpuCount = 1; + } else { + vlog.info("Detected "+cpuCount+" CPU core(s)"); + // No point creating more threads than this, they'll just end up + // wasting CPU fighting for locks + if (cpuCount > 4) + cpuCount = 4; + // The overhead of threading is small, but not small enough to + // ignore on single CPU systems + if (cpuCount == 1) + vlog.info("Decoding data on main thread"); + else + vlog.info("Creating "+cpuCount+" decoder thread(s)"); + } + + freeBuffers = new ArrayDeque<MemOutStream>(cpuCount*2); + workQueue = new ArrayDeque<QueueEntry>(cpuCount); + threads = new ArrayList<DecodeThread>(cpuCount); + while (cpuCount-- > 0) { + // Twice as many possible entries in the queue as there + // are worker threads to make sure they don't stall + try { + freeBuffers.addLast(new MemOutStream()); + freeBuffers.addLast(new MemOutStream()); + + threads.add(new DecodeThread(this)); + } catch (IllegalStateException e) { } + } + + } + + public void decodeRect(Rect r, int encoding, + ModifiablePixelBuffer pb) + { + Decoder decoder; + MemOutStream bufferStream; + + QueueEntry entry; + + assert(pb != null); + + if (!Decoder.supported(encoding)) { + vlog.error("Unknown encoding " + encoding); + throw new Exception("Unknown encoding"); + } + + if (decoders[encoding] == null) { + decoders[encoding] = Decoder.createDecoder(encoding); + if (decoders[encoding] == null) { + vlog.error("Unknown encoding " + encoding); + throw new Exception("Unknown encoding"); + } + } + + decoder = decoders[encoding]; + + // Fast path for single CPU machines to avoid the context + // switching overhead + if (threads.size() == 1) { + bufferStream = freeBuffers.getFirst(); + bufferStream.clear(); + decoder.readRect(r, conn.getInStream(), conn.cp, bufferStream); + decoder.decodeRect(r, (Object)bufferStream.data(), bufferStream.length(), + conn.cp, pb); + return; + } + + // Wait for an available memory buffer + queueMutex.lock(); + + while (freeBuffers.isEmpty()) + try { + producerCond.await(); + } catch (InterruptedException e) { } + + // Don't pop the buffer in case we throw an exception + // whilst reading + bufferStream = freeBuffers.getFirst(); + + queueMutex.unlock(); + + // First check if any thread has encountered a problem + throwThreadException(); + + // Read the rect + bufferStream.clear(); + decoder.readRect(r, conn.getInStream(), conn.cp, bufferStream); + + // Then try to put it on the queue + entry = new QueueEntry(); + + entry.active = false; + entry.rect = r; + entry.encoding = encoding; + entry.decoder = decoder; + entry.cp = conn.cp; + entry.pb = pb; + entry.bufferStream = bufferStream; + entry.affectedRegion = new Region(r); + + decoder.getAffectedRegion(r, bufferStream.data(), + bufferStream.length(), conn.cp, + entry.affectedRegion); + + // The workers add buffers to the end so it's safe to assume + // the front is still the same buffer + freeBuffers.removeFirst(); + + queueMutex.lock(); + + workQueue.addLast(entry); + + // We only put a single entry on the queue so waking a single + // thread is sufficient + consumerCond.signal(); + + queueMutex.unlock(); + } + + public void flush() + { + queueMutex.lock(); + + while (!workQueue.isEmpty()) + try { + producerCond.await(); + } catch (InterruptedException e) { } + + queueMutex.unlock(); + + throwThreadException(); + } + + private void setThreadException(Exception e) + { + //os::AutoMutex a(queueMutex); + queueMutex.lock(); + + if (threadException == null) + return; + + threadException = + new Exception("Exception on worker thread: "+e.getMessage()); + } + + private void throwThreadException() + { + //os::AutoMutex a(queueMutex); + queueMutex.lock(); + + if (threadException == null) + return; + + Exception e = new Exception(threadException.getMessage()); + + threadException = null; + + throw e; + } + + private class QueueEntry { + + public QueueEntry() { + } + public boolean active; + public Rect rect; + public int encoding; + public Decoder decoder; + public ConnParams cp; + public ModifiablePixelBuffer pb; + public MemOutStream bufferStream; + public Region affectedRegion; + } + + private class DecodeThread implements Runnable { + + public DecodeThread(DecodeManager manager) + { + this.manager = manager; + + stopRequested = false; + + (thread = new Thread(this)).start(); + } + + public void stop() + { + //os::AutoMutex a(manager.queueMutex); + manager.queueMutex.lock(); + + if (!thread.isAlive()) + return; + + stopRequested = true; + + // We can't wake just this thread, so wake everyone + manager.consumerCond.signalAll(); + } + + public void run() + { + manager.queueMutex.lock(); + while (!stopRequested) { + QueueEntry entry; + + // Look for an available entry in the work queue + entry = findEntry(); + if (entry == null) { + // Wait and try again + try { + manager.consumerCond.await(); + } catch (InterruptedException e) { } + continue; + } + + // This is ours now + entry.active = true; + + manager.queueMutex.unlock(); + + // Do the actual decoding + try { + entry.decoder.decodeRect(entry.rect, entry.bufferStream.data(), + entry.bufferStream.length(), + entry.cp, entry.pb); + } catch (com.tigervnc.rdr.Exception e) { + manager.setThreadException(e); + } catch(java.lang.Exception e) { + assert(false); + } + + manager.queueMutex.lock(); + + // Remove the entry from the queue and give back the memory buffer + manager.freeBuffers.add(entry.bufferStream); + manager.workQueue.remove(entry); + entry = null; + + // Wake the main thread in case it is waiting for a memory buffer + manager.producerCond.signal(); + // This rect might have been blocking multiple other rects, so + // wake up every worker thread + if (manager.workQueue.size() > 1) + manager.consumerCond.signalAll(); + } + + manager.queueMutex.unlock(); + } + + protected QueueEntry findEntry() + { + Iterator<QueueEntry> iter; + Region lockedRegion = new Region(); + + if (manager.workQueue.isEmpty()) + return null; + + if (!manager.workQueue.peek().active) + return manager.workQueue.peek(); + + for (iter = manager.workQueue.iterator(); iter.hasNext();) { + QueueEntry entry; + + Iterator<QueueEntry> iter2; + + entry = iter.next(); + + // Another thread working on this? + if (entry.active) { + lockedRegion.assign_union(entry.affectedRegion); + continue; + } + + // If this is an ordered decoder then make sure this is the first + // rectangle in the queue for that decoder + if ((entry.decoder.flags & DecoderOrdered) != 0) { + for (iter2 = manager.workQueue.iterator(); iter2.hasNext() && iter2 != iter;) { + if (entry.encoding == (iter2.next()).encoding) { + lockedRegion.assign_union(entry.affectedRegion); + continue; + } + } + } + + // For a partially ordered decoder we must ask the decoder for each + // pair of rectangles. + if ((entry.decoder.flags & DecoderPartiallyOrdered) != 0) { + for (iter2 = manager.workQueue.iterator(); iter2.hasNext() && iter2 != iter;) { + QueueEntry entry2 = iter2.next(); + if (entry.encoding != entry2.encoding) + continue; + if (entry.decoder.doRectsConflict(entry.rect, + entry.bufferStream.data(), + entry.bufferStream.length(), + entry2.rect, + entry2.bufferStream.data(), + entry2.bufferStream.length(), + entry.cp)) + lockedRegion.assign_union(entry.affectedRegion); + continue; + } + } + + // Check overlap with earlier rectangles + if (!lockedRegion.intersect(entry.affectedRegion).is_empty()) { + lockedRegion.assign_union(entry.affectedRegion); + continue; + } + + return entry; + + } + + return null; + } + + private DecodeManager manager; + private boolean stopRequested; + + private Thread thread; + + } + + private CConnection conn; + private Decoder[] decoders; + + private ArrayDeque<MemOutStream> freeBuffers; + private ArrayDeque<QueueEntry> workQueue; + + private ReentrantLock queueMutex; + private Condition producerCond; + private Condition consumerCond; + + private List<DecodeThread> threads; + private com.tigervnc.rdr.Exception threadException; + +} |