aboutsummaryrefslogtreecommitdiffstats
path: root/org.eclipse.jgit/src/org/eclipse/jgit/lib
diff options
context:
space:
mode:
Diffstat (limited to 'org.eclipse.jgit/src/org/eclipse/jgit/lib')
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java120
1 files changed, 97 insertions, 23 deletions
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java b/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java
index 9708bb2f92..9e8e256b01 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java
@@ -43,16 +43,35 @@
package org.eclipse.jgit.lib;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
/**
* Wrapper around the general {@link ProgressMonitor} to make it thread safe.
+ *
+ * Updates to the underlying ProgressMonitor are made only from the thread that
+ * allocated this wrapper. Callers are responsible for ensuring the allocating
+ * thread uses {@link #pollForUpdates()} or {@link #waitForCompletion()} to
+ * update the underlying ProgressMonitor.
+ *
+ * Only {@link #update(int)}, {@link #isCancelled()}, and {@link #endWorker()}
+ * may be invoked from a worker thread. All other methods of the ProgressMonitor
+ * interface can only be called from the thread that allocates this wrapper.
*/
public class ThreadSafeProgressMonitor implements ProgressMonitor {
private final ProgressMonitor pm;
private final ReentrantLock lock;
+ private final Thread mainThread;
+
+ private final AtomicInteger workers;
+
+ private final AtomicInteger pendingUpdates;
+
+ private final Semaphore process;
+
/**
* Wrap a ProgressMonitor to be thread safe.
*
@@ -62,33 +81,87 @@ public class ThreadSafeProgressMonitor implements ProgressMonitor {
public ThreadSafeProgressMonitor(ProgressMonitor pm) {
this.pm = pm;
this.lock = new ReentrantLock();
+ this.mainThread = Thread.currentThread();
+ this.workers = new AtomicInteger(0);
+ this.pendingUpdates = new AtomicInteger(0);
+ this.process = new Semaphore(0);
}
public void start(int totalTasks) {
- lock.lock();
- try {
- pm.start(totalTasks);
- } finally {
- lock.unlock();
- }
+ if (!isMainThread())
+ throw new IllegalStateException();
+ pm.start(totalTasks);
}
public void beginTask(String title, int totalWork) {
- lock.lock();
- try {
- pm.beginTask(title, totalWork);
- } finally {
- lock.unlock();
+ if (!isMainThread())
+ throw new IllegalStateException();
+ pm.beginTask(title, totalWork);
+ }
+
+ /** Notify the monitor a worker is starting. */
+ public void startWorker() {
+ startWorkers(1);
+ }
+
+ /**
+ * Notify the monitor of workers starting.
+ *
+ * @param count
+ * the number of worker threads that are starting.
+ */
+ public void startWorkers(int count) {
+ workers.addAndGet(count);
+ }
+
+ /** Notify the monitor a worker is finished. */
+ public void endWorker() {
+ if (workers.decrementAndGet() == 0)
+ process.release();
+ }
+
+ /**
+ * Non-blocking poll for pending updates.
+ *
+ * This method can only be invoked by the same thread that allocated this
+ * ThreadSafeProgressMonior.
+ */
+ public void pollForUpdates() {
+ assert isMainThread();
+ doUpdates();
+ }
+
+ /**
+ * Process pending updates and wait for workers to finish.
+ *
+ * This method can only be invoked by the same thread that allocated this
+ * ThreadSafeProgressMonior.
+ *
+ * @throws InterruptedException
+ * if the main thread is interrupted while waiting for
+ * completion of workers.
+ */
+ public void waitForCompletion() throws InterruptedException {
+ assert isMainThread();
+ while (0 < workers.get()) {
+ doUpdates();
+ process.acquire();
}
+ doUpdates();
+ }
+
+ private void doUpdates() {
+ int cnt = pendingUpdates.getAndSet(0);
+ if (0 < cnt)
+ pm.update(cnt);
}
public void update(int completed) {
- lock.lock();
- try {
- pm.update(completed);
- } finally {
- lock.unlock();
- }
+ int old = pendingUpdates.getAndAdd(completed);
+ if (isMainThread())
+ doUpdates();
+ else if (old == 0)
+ process.release();
}
public boolean isCancelled() {
@@ -101,11 +174,12 @@ public class ThreadSafeProgressMonitor implements ProgressMonitor {
}
public void endTask() {
- lock.lock();
- try {
- pm.endTask();
- } finally {
- lock.unlock();
- }
+ if (!isMainThread())
+ throw new IllegalStateException();
+ pm.endTask();
+ }
+
+ private boolean isMainThread() {
+ return Thread.currentThread() == mainThread;
}
}