diff options
Diffstat (limited to 'org.eclipse.jgit/src/org/eclipse/jgit/lib')
-rw-r--r-- | org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java | 120 |
1 files changed, 97 insertions, 23 deletions
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java b/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java index 9708bb2f92..9e8e256b01 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java @@ -43,16 +43,35 @@ package org.eclipse.jgit.lib; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; /** * Wrapper around the general {@link ProgressMonitor} to make it thread safe. + * + * Updates to the underlying ProgressMonitor are made only from the thread that + * allocated this wrapper. Callers are responsible for ensuring the allocating + * thread uses {@link #pollForUpdates()} or {@link #waitForCompletion()} to + * update the underlying ProgressMonitor. + * + * Only {@link #update(int)}, {@link #isCancelled()}, and {@link #endWorker()} + * may be invoked from a worker thread. All other methods of the ProgressMonitor + * interface can only be called from the thread that allocates this wrapper. */ public class ThreadSafeProgressMonitor implements ProgressMonitor { private final ProgressMonitor pm; private final ReentrantLock lock; + private final Thread mainThread; + + private final AtomicInteger workers; + + private final AtomicInteger pendingUpdates; + + private final Semaphore process; + /** * Wrap a ProgressMonitor to be thread safe. * @@ -62,33 +81,87 @@ public class ThreadSafeProgressMonitor implements ProgressMonitor { public ThreadSafeProgressMonitor(ProgressMonitor pm) { this.pm = pm; this.lock = new ReentrantLock(); + this.mainThread = Thread.currentThread(); + this.workers = new AtomicInteger(0); + this.pendingUpdates = new AtomicInteger(0); + this.process = new Semaphore(0); } public void start(int totalTasks) { - lock.lock(); - try { - pm.start(totalTasks); - } finally { - lock.unlock(); - } + if (!isMainThread()) + throw new IllegalStateException(); + pm.start(totalTasks); } public void beginTask(String title, int totalWork) { - lock.lock(); - try { - pm.beginTask(title, totalWork); - } finally { - lock.unlock(); + if (!isMainThread()) + throw new IllegalStateException(); + pm.beginTask(title, totalWork); + } + + /** Notify the monitor a worker is starting. */ + public void startWorker() { + startWorkers(1); + } + + /** + * Notify the monitor of workers starting. + * + * @param count + * the number of worker threads that are starting. + */ + public void startWorkers(int count) { + workers.addAndGet(count); + } + + /** Notify the monitor a worker is finished. */ + public void endWorker() { + if (workers.decrementAndGet() == 0) + process.release(); + } + + /** + * Non-blocking poll for pending updates. + * + * This method can only be invoked by the same thread that allocated this + * ThreadSafeProgressMonior. + */ + public void pollForUpdates() { + assert isMainThread(); + doUpdates(); + } + + /** + * Process pending updates and wait for workers to finish. + * + * This method can only be invoked by the same thread that allocated this + * ThreadSafeProgressMonior. + * + * @throws InterruptedException + * if the main thread is interrupted while waiting for + * completion of workers. + */ + public void waitForCompletion() throws InterruptedException { + assert isMainThread(); + while (0 < workers.get()) { + doUpdates(); + process.acquire(); } + doUpdates(); + } + + private void doUpdates() { + int cnt = pendingUpdates.getAndSet(0); + if (0 < cnt) + pm.update(cnt); } public void update(int completed) { - lock.lock(); - try { - pm.update(completed); - } finally { - lock.unlock(); - } + int old = pendingUpdates.getAndAdd(completed); + if (isMainThread()) + doUpdates(); + else if (old == 0) + process.release(); } public boolean isCancelled() { @@ -101,11 +174,12 @@ public class ThreadSafeProgressMonitor implements ProgressMonitor { } public void endTask() { - lock.lock(); - try { - pm.endTask(); - } finally { - lock.unlock(); - } + if (!isMainThread()) + throw new IllegalStateException(); + pm.endTask(); + } + + private boolean isMainThread() { + return Thread.currentThread() == mainThread; } } |