--- /dev/null
+/*
+ * Copyright (C) 2010, Google Inc.
+ * and other copyright owners as documented in the project's IP log.
+ *
+ * This program and the accompanying materials are made available
+ * under the terms of the Eclipse Distribution License v1.0 which
+ * accompanies this distribution, is reproduced below, and is
+ * available at http://www.eclipse.org/org/documents/edl-v10.php
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * - Neither the name of the Eclipse Foundation, Inc. nor the
+ * names of its contributors may be used to endorse or promote
+ * products derived from this software without specific prior
+ * written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.eclipse.jgit.storage.pack;
+
+import java.util.concurrent.Callable;
+
+import org.eclipse.jgit.lib.ObjectReader;
+import org.eclipse.jgit.lib.ProgressMonitor;
+
+final class DeltaTask implements Callable<Object> {
+ private final PackWriter writer;
+
+ private final ObjectReader templateReader;
+
+ private final DeltaCache dc;
+
+ private final ProgressMonitor pm;
+
+ private final int batchSize;
+
+ private final int start;
+
+ private final ObjectToPack[] list;
+
+ DeltaTask(PackWriter writer, ObjectReader reader, DeltaCache dc,
+ ProgressMonitor pm, int batchSize, int start, ObjectToPack[] list) {
+ this.writer = writer;
+ this.templateReader = reader;
+ this.dc = dc;
+ this.pm = pm;
+ this.batchSize = batchSize;
+ this.start = start;
+ this.list = list;
+ }
+
+ public Object call() throws Exception {
+ final ObjectReader or = templateReader.newReader();
+ try {
+ DeltaWindow dw;
+ dw = new DeltaWindow(writer, dc, or);
+ dw.search(pm, list, start, batchSize);
+ } finally {
+ or.release();
+ }
+ return null;
+ }
+}
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
private int threads = 1;
+ private Executor executor;
+
private boolean thin;
private boolean ignoreMissingUninteresting = true;
* During delta compression, if there are enough objects to be considered
* the writer will start up concurrent threads and allow them to compress
* different sections of the repository concurrently.
+ * <p>
+ * An application thread pool can be set by {@link #setExecutor(Executor)}.
+ * If not set a temporary pool will be created by the writer, and torn down
+ * automatically when compression is over.
*
* @param threads
* number of threads to use. If <= 0 the number of available
this.threads = threads;
}
+ /**
+ * Set the executor to use when using threads.
+ * <p>
+ * During delta compression if the executor is non-null jobs will be queued
+ * up on it to perform delta compression in parallel. Aside from setting the
+ * executor, the caller must set {@link #setThread(int)} to enable threaded
+ * delta search.
+ *
+ * @param executor
+ * executor to use for threads. Set to null to create a temporary
+ * executor just for this writer.
+ */
+ public void setExecutor(Executor executor) {
+ this.executor = executor;
+ }
+
/** @return true if this writer is producing a thin pack. */
public boolean isThin() {
return thin;
return;
}
- final List<Throwable> errors = Collections
- .synchronizedList(new ArrayList<Throwable>());
final DeltaCache dc = new ThreadSafeDeltaCache(this);
final ProgressMonitor pm = new ThreadSafeProgressMonitor(monitor);
- final ExecutorService pool = Executors.newFixedThreadPool(threads);
// Guess at the size of batch we want. Because we don't really
// have a way for a thread to steal work from another thread if
if (estSize < 2 * getDeltaSearchWindowSize())
estSize = 2 * getDeltaSearchWindowSize();
+ final List<DeltaTask> myTasks = new ArrayList<DeltaTask>(threads * 2);
for (int i = 0; i < cnt;) {
final int start = i;
final int batchSize;
batchSize = end - start;
}
i += batchSize;
+ myTasks.add(new DeltaTask(this, reader, dc, pm, batchSize, start, list));
+ }
- pool.submit(new Runnable() {
- public void run() {
+ final List<Throwable> errors = Collections
+ .synchronizedList(new ArrayList<Throwable>());
+ if (executor instanceof ExecutorService) {
+ // Caller supplied us a service, use it directly.
+ //
+ runTasks((ExecutorService) executor, myTasks, errors);
+
+ } else if (executor == null) {
+ // Caller didn't give us a way to run the tasks, spawn up a
+ // temporary thread pool and make sure it tears down cleanly.
+ //
+ ExecutorService pool = Executors.newFixedThreadPool(threads);
+ try {
+ runTasks(pool, myTasks, errors);
+ } finally {
+ pool.shutdown();
+ for (;;) {
try {
- final ObjectReader or = reader.newReader();
+ if (pool.awaitTermination(60, TimeUnit.SECONDS))
+ break;
+ } catch (InterruptedException e) {
+ throw new IOException(
+ JGitText.get().packingCancelledDuringObjectsWriting);
+ }
+ }
+ }
+ } else {
+ // The caller gave us an executor, but it might not do
+ // asynchronous execution. Wrap everything and hope it
+ // can schedule these for us.
+ //
+ final CountDownLatch done = new CountDownLatch(myTasks.size());
+ for (final DeltaTask task : myTasks) {
+ executor.execute(new Runnable() {
+ public void run() {
try {
- DeltaWindow dw;
- dw = new DeltaWindow(PackWriter.this, dc, or);
- dw.search(pm, list, start, batchSize);
+ task.call();
+ } catch (Throwable failure) {
+ errors.add(failure);
} finally {
- or.release();
+ done.countDown();
}
- } catch (Throwable err) {
- errors.add(err);
}
- }
- });
- }
-
- // Tell the pool to stop.
- //
- pool.shutdown();
- for (;;) {
+ });
+ }
try {
- if (pool.awaitTermination(60, TimeUnit.SECONDS))
- break;
- } catch (InterruptedException e) {
+ done.await();
+ } catch (InterruptedException ie) {
+ // We can't abort the other tasks as we have no handle.
+ // Cross our fingers and just break out anyway.
+ //
throw new IOException(
JGitText.get().packingCancelledDuringObjectsWriting);
}
}
- // If any thread threw an error, try to report it back as
+ // If any task threw an error, try to report it back as
// though we weren't using a threaded search algorithm.
//
if (!errors.isEmpty()) {
}
}
+ private void runTasks(ExecutorService pool, List<DeltaTask> tasks,
+ List<Throwable> errors) throws IOException {
+ List<Future<?>> futures = new ArrayList<Future<?>>(tasks.size());
+ for (DeltaTask task : tasks)
+ futures.add(pool.submit(task));
+
+ try {
+ for (Future<?> f : futures) {
+ try {
+ f.get();
+ } catch (ExecutionException failed) {
+ errors.add(failed.getCause());
+ }
+ }
+ } catch (InterruptedException ie) {
+ for (Future<?> f : futures)
+ f.cancel(true);
+ throw new IOException(
+ JGitText.get().packingCancelledDuringObjectsWriting);
+ }
+ }
+
private void writeObjects(ProgressMonitor writeMonitor, PackOutputStream out)
throws IOException {
for (List<ObjectToPack> list : objectsLists) {