summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--org.eclipse.jgit.test/tst/org/eclipse/jgit/lib/ThreadSafeProgressMonitorTest.java189
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java120
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/DeltaTask.java8
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java18
4 files changed, 299 insertions, 36 deletions
diff --git a/org.eclipse.jgit.test/tst/org/eclipse/jgit/lib/ThreadSafeProgressMonitorTest.java b/org.eclipse.jgit.test/tst/org/eclipse/jgit/lib/ThreadSafeProgressMonitorTest.java
new file mode 100644
index 0000000000..6839f8d3c1
--- /dev/null
+++ b/org.eclipse.jgit.test/tst/org/eclipse/jgit/lib/ThreadSafeProgressMonitorTest.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright (C) 2010, Google Inc.
+ * and other copyright owners as documented in the project's IP log.
+ *
+ * This program and the accompanying materials are made available
+ * under the terms of the Eclipse Distribution License v1.0 which
+ * accompanies this distribution, is reproduced below, and is
+ * available at http://www.eclipse.org/org/documents/edl-v10.php
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * - Neither the name of the Eclipse Foundation, Inc. nor the
+ * names of its contributors may be used to endorse or promote
+ * products derived from this software without specific prior
+ * written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.eclipse.jgit.lib;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+public class ThreadSafeProgressMonitorTest extends TestCase {
+ public void testFailsMethodsOnBackgroundThread()
+ throws InterruptedException {
+ final MockProgressMonitor mock = new MockProgressMonitor();
+ final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(mock);
+
+ runOnThread(new Runnable() {
+ public void run() {
+ try {
+ pm.start(1);
+ fail("start did not fail on background thread");
+ } catch (IllegalStateException notMainThread) {
+ // Expected result
+ }
+
+ try {
+ pm.beginTask("title", 1);
+ fail("beginTask did not fail on background thread");
+ } catch (IllegalStateException notMainThread) {
+ // Expected result
+ }
+
+ try {
+ pm.endTask();
+ fail("endTask did not fail on background thread");
+ } catch (IllegalStateException notMainThread) {
+ // Expected result
+ }
+ }
+ });
+
+ // Ensure we didn't alter the mock above when checking threads.
+ assertNull(mock.taskTitle);
+ assertEquals(0, mock.value);
+ }
+
+ public void testMethodsOkOnMainThread() {
+ final MockProgressMonitor mock = new MockProgressMonitor();
+ final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(mock);
+
+ pm.start(1);
+ assertEquals(1, mock.value);
+
+ pm.beginTask("title", 42);
+ assertEquals("title", mock.taskTitle);
+ assertEquals(42, mock.value);
+
+ pm.update(1);
+ assertEquals(43, mock.value);
+
+ pm.update(2);
+ assertEquals(45, mock.value);
+
+ pm.endTask();
+ assertNull(mock.taskTitle);
+ assertEquals(0, mock.value);
+ }
+
+ public void testUpdateOnBackgroundThreads() throws InterruptedException {
+ final MockProgressMonitor mock = new MockProgressMonitor();
+ final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(mock);
+
+ pm.startWorker();
+
+ final CountDownLatch doUpdate = new CountDownLatch(1);
+ final CountDownLatch didUpdate = new CountDownLatch(1);
+ final CountDownLatch doEndWorker = new CountDownLatch(1);
+
+ final Thread bg = new Thread() {
+ public void run() {
+ assertFalse(pm.isCancelled());
+
+ await(doUpdate);
+ pm.update(2);
+ didUpdate.countDown();
+
+ await(doEndWorker);
+ pm.update(1);
+ pm.endWorker();
+ }
+ };
+ bg.start();
+
+ pm.pollForUpdates();
+ assertEquals(0, mock.value);
+ doUpdate.countDown();
+
+ await(didUpdate);
+ pm.pollForUpdates();
+ assertEquals(2, mock.value);
+
+ doEndWorker.countDown();
+ pm.waitForCompletion();
+ assertEquals(3, mock.value);
+ }
+
+ private static void await(CountDownLatch cdl) {
+ try {
+ assertTrue("latch released", cdl.await(1000, TimeUnit.MILLISECONDS));
+ } catch (InterruptedException ie) {
+ fail("Did not expect to be interrupted");
+ }
+ }
+
+ private static void runOnThread(Runnable task) throws InterruptedException {
+ Thread t = new Thread(task);
+ t.start();
+ t.join(1000);
+ assertFalse("thread has stopped", t.isAlive());
+ }
+
+ private static class MockProgressMonitor implements ProgressMonitor {
+ String taskTitle;
+
+ int value;
+
+ public void update(int completed) {
+ value += completed;
+ }
+
+ public void start(int totalTasks) {
+ value = totalTasks;
+ }
+
+ public void beginTask(String title, int totalWork) {
+ taskTitle = title;
+ value = totalWork;
+ }
+
+ public void endTask() {
+ taskTitle = null;
+ value = 0;
+ }
+
+ public boolean isCancelled() {
+ return false;
+ }
+ }
+}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java b/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java
index 9708bb2f92..9e8e256b01 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java
@@ -43,16 +43,35 @@
package org.eclipse.jgit.lib;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
/**
* Wrapper around the general {@link ProgressMonitor} to make it thread safe.
+ *
+ * Updates to the underlying ProgressMonitor are made only from the thread that
+ * allocated this wrapper. Callers are responsible for ensuring the allocating
+ * thread uses {@link #pollForUpdates()} or {@link #waitForCompletion()} to
+ * update the underlying ProgressMonitor.
+ *
+ * Only {@link #update(int)}, {@link #isCancelled()}, and {@link #endWorker()}
+ * may be invoked from a worker thread. All other methods of the ProgressMonitor
+ * interface can only be called from the thread that allocates this wrapper.
*/
public class ThreadSafeProgressMonitor implements ProgressMonitor {
private final ProgressMonitor pm;
private final ReentrantLock lock;
+ private final Thread mainThread;
+
+ private final AtomicInteger workers;
+
+ private final AtomicInteger pendingUpdates;
+
+ private final Semaphore process;
+
/**
* Wrap a ProgressMonitor to be thread safe.
*
@@ -62,33 +81,87 @@ public class ThreadSafeProgressMonitor implements ProgressMonitor {
public ThreadSafeProgressMonitor(ProgressMonitor pm) {
this.pm = pm;
this.lock = new ReentrantLock();
+ this.mainThread = Thread.currentThread();
+ this.workers = new AtomicInteger(0);
+ this.pendingUpdates = new AtomicInteger(0);
+ this.process = new Semaphore(0);
}
public void start(int totalTasks) {
- lock.lock();
- try {
- pm.start(totalTasks);
- } finally {
- lock.unlock();
- }
+ if (!isMainThread())
+ throw new IllegalStateException();
+ pm.start(totalTasks);
}
public void beginTask(String title, int totalWork) {
- lock.lock();
- try {
- pm.beginTask(title, totalWork);
- } finally {
- lock.unlock();
+ if (!isMainThread())
+ throw new IllegalStateException();
+ pm.beginTask(title, totalWork);
+ }
+
+ /** Notify the monitor a worker is starting. */
+ public void startWorker() {
+ startWorkers(1);
+ }
+
+ /**
+ * Notify the monitor of workers starting.
+ *
+ * @param count
+ * the number of worker threads that are starting.
+ */
+ public void startWorkers(int count) {
+ workers.addAndGet(count);
+ }
+
+ /** Notify the monitor a worker is finished. */
+ public void endWorker() {
+ if (workers.decrementAndGet() == 0)
+ process.release();
+ }
+
+ /**
+ * Non-blocking poll for pending updates.
+ *
+ * This method can only be invoked by the same thread that allocated this
+ * ThreadSafeProgressMonior.
+ */
+ public void pollForUpdates() {
+ assert isMainThread();
+ doUpdates();
+ }
+
+ /**
+ * Process pending updates and wait for workers to finish.
+ *
+ * This method can only be invoked by the same thread that allocated this
+ * ThreadSafeProgressMonior.
+ *
+ * @throws InterruptedException
+ * if the main thread is interrupted while waiting for
+ * completion of workers.
+ */
+ public void waitForCompletion() throws InterruptedException {
+ assert isMainThread();
+ while (0 < workers.get()) {
+ doUpdates();
+ process.acquire();
}
+ doUpdates();
+ }
+
+ private void doUpdates() {
+ int cnt = pendingUpdates.getAndSet(0);
+ if (0 < cnt)
+ pm.update(cnt);
}
public void update(int completed) {
- lock.lock();
- try {
- pm.update(completed);
- } finally {
- lock.unlock();
- }
+ int old = pendingUpdates.getAndAdd(completed);
+ if (isMainThread())
+ doUpdates();
+ else if (old == 0)
+ process.release();
}
public boolean isCancelled() {
@@ -101,11 +174,12 @@ public class ThreadSafeProgressMonitor implements ProgressMonitor {
}
public void endTask() {
- lock.lock();
- try {
- pm.endTask();
- } finally {
- lock.unlock();
- }
+ if (!isMainThread())
+ throw new IllegalStateException();
+ pm.endTask();
+ }
+
+ private boolean isMainThread() {
+ return Thread.currentThread() == mainThread;
}
}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/DeltaTask.java b/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/DeltaTask.java
index 5e551e9d49..aa0374618a 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/DeltaTask.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/DeltaTask.java
@@ -46,7 +46,7 @@ package org.eclipse.jgit.storage.pack;
import java.util.concurrent.Callable;
import org.eclipse.jgit.lib.ObjectReader;
-import org.eclipse.jgit.lib.ProgressMonitor;
+import org.eclipse.jgit.lib.ThreadSafeProgressMonitor;
final class DeltaTask implements Callable<Object> {
private final PackConfig config;
@@ -55,7 +55,7 @@ final class DeltaTask implements Callable<Object> {
private final DeltaCache dc;
- private final ProgressMonitor pm;
+ private final ThreadSafeProgressMonitor pm;
private final int batchSize;
@@ -64,7 +64,8 @@ final class DeltaTask implements Callable<Object> {
private final ObjectToPack[] list;
DeltaTask(PackConfig config, ObjectReader reader, DeltaCache dc,
- ProgressMonitor pm, int batchSize, int start, ObjectToPack[] list) {
+ ThreadSafeProgressMonitor pm, int batchSize, int start,
+ ObjectToPack[] list) {
this.config = config;
this.templateReader = reader;
this.dc = dc;
@@ -82,6 +83,7 @@ final class DeltaTask implements Callable<Object> {
dw.search(pm, list, start, batchSize);
} finally {
or.release();
+ pm.endWorker();
}
return null;
}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java
index 20c4bb0f97..5986aca4e3 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java
@@ -59,7 +59,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@@ -675,7 +674,7 @@ public class PackWriter {
}
final DeltaCache dc = new ThreadSafeDeltaCache(config);
- final ProgressMonitor pm = new ThreadSafeProgressMonitor(monitor);
+ final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor);
// Guess at the size of batch we want. Because we don't really
// have a way for a thread to steal work from another thread if
@@ -713,6 +712,7 @@ public class PackWriter {
i += batchSize;
myTasks.add(new DeltaTask(config, reader, dc, pm, batchSize, start, list));
}
+ pm.startWorkers(myTasks.size());
final Executor executor = config.getExecutor();
final List<Throwable> errors = Collections
@@ -720,7 +720,7 @@ public class PackWriter {
if (executor instanceof ExecutorService) {
// Caller supplied us a service, use it directly.
//
- runTasks((ExecutorService) executor, myTasks, errors);
+ runTasks((ExecutorService) executor, pm, myTasks, errors);
} else if (executor == null) {
// Caller didn't give us a way to run the tasks, spawn up a
@@ -728,7 +728,7 @@ public class PackWriter {
//
ExecutorService pool = Executors.newFixedThreadPool(threads);
try {
- runTasks(pool, myTasks, errors);
+ runTasks(pool, pm, myTasks, errors);
} finally {
pool.shutdown();
for (;;) {
@@ -746,7 +746,6 @@ public class PackWriter {
// asynchronous execution. Wrap everything and hope it
// can schedule these for us.
//
- final CountDownLatch done = new CountDownLatch(myTasks.size());
for (final DeltaTask task : myTasks) {
executor.execute(new Runnable() {
public void run() {
@@ -754,14 +753,12 @@ public class PackWriter {
task.call();
} catch (Throwable failure) {
errors.add(failure);
- } finally {
- done.countDown();
}
}
});
}
try {
- done.await();
+ pm.waitForCompletion();
} catch (InterruptedException ie) {
// We can't abort the other tasks as we have no handle.
// Cross our fingers and just break out anyway.
@@ -789,13 +786,14 @@ public class PackWriter {
}
}
- private void runTasks(ExecutorService pool, List<DeltaTask> tasks,
- List<Throwable> errors) throws IOException {
+ private void runTasks(ExecutorService pool, ThreadSafeProgressMonitor pm,
+ List<DeltaTask> tasks, List<Throwable> errors) throws IOException {
List<Future<?>> futures = new ArrayList<Future<?>>(tasks.size());
for (DeltaTask task : tasks)
futures.add(pool.submit(task));
try {
+ pm.waitForCompletion();
for (Future<?> f : futures) {
try {
f.get();