From 74e08350129825c63c1b0c683b402241a844aabb Mon Sep 17 00:00:00 2001 From: "Shawn O. Pearce" Date: Fri, 9 Jul 2010 18:15:50 -0700 Subject: Honor pack.threads and perform delta search in parallel If we have multiple CPUs available, packing usually goes faster when each CPU is assigned a slice of the available search space. The number of threads to use is guessed from the runtime if it wasn't set by the caller, or wasn't set in the configuration. Change-Id: If554fd8973db77632a52a0f45377dd6ec13fc220 Signed-off-by: Shawn O. Pearce --- .../src/org/eclipse/jgit/lib/ObjectReader.java | 10 ++ .../jgit/lib/ThreadSafeProgressMonitor.java | 111 ++++++++++++++++ .../eclipse/jgit/storage/file/WindowCursor.java | 5 + .../org/eclipse/jgit/storage/pack/DeltaCache.java | 15 ++- .../org/eclipse/jgit/storage/pack/PackConfig.java | 3 + .../org/eclipse/jgit/storage/pack/PackWriter.java | 139 ++++++++++++++++++++- .../jgit/storage/pack/ThreadSafeDeltaCache.java | 86 +++++++++++++ 7 files changed, 358 insertions(+), 11 deletions(-) create mode 100644 org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java create mode 100644 org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/ThreadSafeDeltaCache.java diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/lib/ObjectReader.java b/org.eclipse.jgit/src/org/eclipse/jgit/lib/ObjectReader.java index d0351f8670..ae70638672 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/lib/ObjectReader.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/lib/ObjectReader.java @@ -59,6 +59,16 @@ public abstract class ObjectReader { /** Type hint indicating the caller doesn't know the type. */ protected static final int OBJ_ANY = -1; + /** + * Construct a new reader from the same data. + *

+ * Applications can use this method to build a new reader from the same data + * source, but for an different thread. + * + * @return a brand new reader, using the same data source. + */ + public abstract ObjectReader newReader(); + /** * Does the requested object exist in this database? * diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java b/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java new file mode 100644 index 0000000000..9708bb2f92 --- /dev/null +++ b/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java @@ -0,0 +1,111 @@ +/* + * Copyright (C) 2010, Google Inc. + * and other copyright owners as documented in the project's IP log. + * + * This program and the accompanying materials are made available + * under the terms of the Eclipse Distribution License v1.0 which + * accompanies this distribution, is reproduced below, and is + * available at http://www.eclipse.org/org/documents/edl-v10.php + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided + * with the distribution. + * + * - Neither the name of the Eclipse Foundation, Inc. nor the + * names of its contributors may be used to endorse or promote + * products derived from this software without specific prior + * written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND + * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package org.eclipse.jgit.lib; + +import java.util.concurrent.locks.ReentrantLock; + +/** + * Wrapper around the general {@link ProgressMonitor} to make it thread safe. + */ +public class ThreadSafeProgressMonitor implements ProgressMonitor { + private final ProgressMonitor pm; + + private final ReentrantLock lock; + + /** + * Wrap a ProgressMonitor to be thread safe. + * + * @param pm + * the underlying monitor to receive events. + */ + public ThreadSafeProgressMonitor(ProgressMonitor pm) { + this.pm = pm; + this.lock = new ReentrantLock(); + } + + public void start(int totalTasks) { + lock.lock(); + try { + pm.start(totalTasks); + } finally { + lock.unlock(); + } + } + + public void beginTask(String title, int totalWork) { + lock.lock(); + try { + pm.beginTask(title, totalWork); + } finally { + lock.unlock(); + } + } + + public void update(int completed) { + lock.lock(); + try { + pm.update(completed); + } finally { + lock.unlock(); + } + } + + public boolean isCancelled() { + lock.lock(); + try { + return pm.isCancelled(); + } finally { + lock.unlock(); + } + } + + public void endTask() { + lock.lock(); + try { + pm.endTask(); + } finally { + lock.unlock(); + } + } +} diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/storage/file/WindowCursor.java b/org.eclipse.jgit/src/org/eclipse/jgit/storage/file/WindowCursor.java index 04ee8b2c49..5376d077ad 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/storage/file/WindowCursor.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/storage/file/WindowCursor.java @@ -77,6 +77,11 @@ final class WindowCursor extends ObjectReader implements ObjectReuseAsIs { this.db = db; } + @Override + public ObjectReader newReader() { + return new WindowCursor(db); + } + public boolean has(AnyObjectId objectId) throws IOException { return db.has(objectId); } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/DeltaCache.java b/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/DeltaCache.java index 7ad1c7f033..b6a7436f16 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/DeltaCache.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/DeltaCache.java @@ -95,11 +95,7 @@ class DeltaCache { // The caller may have had to allocate more space than is // required. If we are about to waste anything, shrink it. // - if (data.length != actLen) { - byte[] nbuf = new byte[actLen]; - System.arraycopy(data, 0, nbuf, 0, actLen); - data = nbuf; - } + data = resize(data, actLen); // When we reserved space for this item we did it for the // inflated size of the delta, but we were just given the @@ -112,6 +108,15 @@ class DeltaCache { return new Ref(data, queue); } + byte[] resize(byte[] data, int actLen) { + if (data.length != actLen) { + byte[] nbuf = new byte[actLen]; + System.arraycopy(data, 0, nbuf, 0, actLen); + data = nbuf; + } + return data; + } + private void checkForGarbageCollectedObjects() { Ref r; while ((r = (Ref) queue.poll()) != null) diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackConfig.java b/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackConfig.java index f1c17d7612..814ab8f291 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackConfig.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackConfig.java @@ -71,6 +71,8 @@ class PackConfig { final long bigFileThreshold; + final int threads; + private PackConfig(Config rc) { deltaWindow = rc.getInt("pack", "window", PackWriter.DEFAULT_DELTA_SEARCH_WINDOW_SIZE); deltaWindowMemory = rc.getLong("pack", null, "windowmemory", 0); @@ -80,6 +82,7 @@ class PackConfig { compression = compression(rc); indexVersion = rc.getInt("pack", "indexversion", 2); bigFileThreshold = rc.getLong("core", null, "bigfilethreshold", PackWriter.DEFAULT_BIG_FILE_THRESHOLD); + threads = rc.getInt("pack", "threads", 0); } private static int compression(Config rc) { diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java index bd9c1e312e..3769d6b4bf 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java @@ -58,6 +58,9 @@ import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; @@ -77,6 +80,7 @@ import org.eclipse.jgit.lib.ObjectLoader; import org.eclipse.jgit.lib.ObjectReader; import org.eclipse.jgit.lib.ProgressMonitor; import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.lib.ThreadSafeProgressMonitor; import org.eclipse.jgit.revwalk.ObjectWalk; import org.eclipse.jgit.revwalk.RevFlag; import org.eclipse.jgit.revwalk.RevObject; @@ -233,6 +237,8 @@ public class PackWriter { private long bigFileThreshold = DEFAULT_BIG_FILE_THRESHOLD; + private int threads = 1; + private boolean thin; private boolean ignoreMissingUninteresting = true; @@ -289,6 +295,7 @@ public class PackWriter { compressionLevel = pc.compression; indexVersion = pc.indexVersion; bigFileThreshold = pc.bigFileThreshold; + threads = pc.threads; } private static Config configOf(final Repository repo) { @@ -452,6 +459,9 @@ public class PackWriter { /** * Get the number of objects to try when looking for a delta base. + *

+ * This limit is per thread, if 4 threads are used the actual memory + * used will be 4 times this value. * * @return the object count to be searched. */ @@ -477,6 +487,8 @@ public class PackWriter { /** * Get the size of the in-memory delta cache. + *

+ * This limit is for the entire writer, even if multiple threads are used. * * @return maximum number of bytes worth of delta data to cache in memory. * If 0 the cache is infinite in size (up to the JVM heap limit @@ -570,6 +582,26 @@ public class PackWriter { compressionLevel = level; } + /** @return number of threads used for delta compression. */ + public int getThreads() { + return threads; + } + + /** + * Set the number of threads to use for delta compression. + *

+ * During delta compression, if there are enough objects to be considered + * the writer will start up concurrent threads and allow them to compress + * different sections of the repository concurrently. + * + * @param threads + * number of threads to use. If <= 0 the number of available + * processors for this JVM is used. + */ + public void setThread(int threads) { + this.threads = threads; + } + /** @return true if this writer is producing a thin pack. */ public boolean isThin() { return thin; @@ -925,12 +957,107 @@ public class PackWriter { return true; } - private void searchForDeltas(ProgressMonitor monitor, - ObjectToPack[] list, int cnt) throws MissingObjectException, - IncorrectObjectTypeException, LargeObjectException, IOException { - DeltaCache dc = new DeltaCache(this); - DeltaWindow dw = new DeltaWindow(this, dc, reader); - dw.search(monitor, list, 0, cnt); + private void searchForDeltas(final ProgressMonitor monitor, + final ObjectToPack[] list, final int cnt) + throws MissingObjectException, IncorrectObjectTypeException, + LargeObjectException, IOException { + if (threads == 0) + threads = Runtime.getRuntime().availableProcessors(); + + if (threads <= 1 || cnt <= 2 * getDeltaSearchWindowSize()) { + DeltaCache dc = new DeltaCache(this); + DeltaWindow dw = new DeltaWindow(this, dc, reader); + dw.search(monitor, list, 0, cnt); + return; + } + + final List errors = Collections + .synchronizedList(new ArrayList()); + final DeltaCache dc = new ThreadSafeDeltaCache(this); + final ProgressMonitor pm = new ThreadSafeProgressMonitor(monitor); + final ExecutorService pool = Executors.newFixedThreadPool(threads); + + // Guess at the size of batch we want. Because we don't really + // have a way for a thread to steal work from another thread if + // it ends early, we over partition slightly so the work units + // are a bit smaller. + // + int estSize = cnt / (threads * 2); + if (estSize < 2 * getDeltaSearchWindowSize()) + estSize = 2 * getDeltaSearchWindowSize(); + + for (int i = 0; i < cnt;) { + final int start = i; + final int batchSize; + + if (cnt - i < estSize) { + // If we don't have enough to fill the remaining block, + // schedule what is left over as a single block. + // + batchSize = cnt - i; + } else { + // Try to split the block at the end of a path. + // + int end = start + estSize; + while (end < cnt) { + ObjectToPack a = list[end - 1]; + ObjectToPack b = list[end]; + if (a.getPathHash() == b.getPathHash()) + end++; + else + break; + } + batchSize = end - start; + } + i += batchSize; + + pool.submit(new Runnable() { + public void run() { + try { + final ObjectReader or = reader.newReader(); + try { + DeltaWindow dw; + dw = new DeltaWindow(PackWriter.this, dc, or); + dw.search(pm, list, start, batchSize); + } finally { + or.release(); + } + } catch (Throwable err) { + errors.add(err); + } + } + }); + } + + // Tell the pool to stop. + // + pool.shutdown(); + for (;;) { + try { + if (pool.awaitTermination(60, TimeUnit.SECONDS)) + break; + } catch (InterruptedException e) { + throw new IOException( + JGitText.get().packingCancelledDuringObjectsWriting); + } + } + + // If any thread threw an error, try to report it back as + // though we weren't using a threaded search algorithm. + // + if (!errors.isEmpty()) { + Throwable err = errors.get(0); + if (err instanceof Error) + throw (Error) err; + if (err instanceof RuntimeException) + throw (RuntimeException) err; + if (err instanceof IOException) + throw (IOException) err; + + IOException fail = new IOException(err.getMessage()); + fail.initCause(err); + throw fail; + } } private void writeObjects(ProgressMonitor writeMonitor, PackOutputStream out) diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/ThreadSafeDeltaCache.java b/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/ThreadSafeDeltaCache.java new file mode 100644 index 0000000000..141289190a --- /dev/null +++ b/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/ThreadSafeDeltaCache.java @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2010, Google Inc. + * and other copyright owners as documented in the project's IP log. + * + * This program and the accompanying materials are made available + * under the terms of the Eclipse Distribution License v1.0 which + * accompanies this distribution, is reproduced below, and is + * available at http://www.eclipse.org/org/documents/edl-v10.php + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided + * with the distribution. + * + * - Neither the name of the Eclipse Foundation, Inc. nor the + * names of its contributors may be used to endorse or promote + * products derived from this software without specific prior + * written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND + * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package org.eclipse.jgit.storage.pack; + +import java.util.concurrent.locks.ReentrantLock; + +class ThreadSafeDeltaCache extends DeltaCache { + private final ReentrantLock lock; + + ThreadSafeDeltaCache(PackWriter pw) { + super(pw); + lock = new ReentrantLock(); + } + + @Override + boolean canCache(int length, ObjectToPack src, ObjectToPack res) { + lock.lock(); + try { + return super.canCache(length, src, res); + } finally { + lock.unlock(); + } + } + + @Override + void credit(int reservedSize) { + lock.lock(); + try { + super.credit(reservedSize); + } finally { + lock.unlock(); + } + } + + @Override + Ref cache(byte[] data, int actLen, int reservedSize) { + data = resize(data, actLen); + lock.lock(); + try { + return super.cache(data, actLen, reservedSize); + } finally { + lock.unlock(); + } + } +} -- cgit v1.2.3