]> source.dussan.org Git - jgit.git/commitdiff
Honor pack.threads and perform delta search in parallel 15/1115/1
authorShawn O. Pearce <spearce@spearce.org>
Sat, 10 Jul 2010 01:15:50 +0000 (18:15 -0700)
committerShawn O. Pearce <spearce@spearce.org>
Sat, 10 Jul 2010 02:17:30 +0000 (19:17 -0700)
If we have multiple CPUs available, packing usually goes faster
when each CPU is assigned a slice of the available search space.
The number of threads to use is guessed from the runtime if it
wasn't set by the caller, or wasn't set in the configuration.

Change-Id: If554fd8973db77632a52a0f45377dd6ec13fc220
Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
org.eclipse.jgit/src/org/eclipse/jgit/lib/ObjectReader.java
org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java [new file with mode: 0644]
org.eclipse.jgit/src/org/eclipse/jgit/storage/file/WindowCursor.java
org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/DeltaCache.java
org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackConfig.java
org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java
org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/ThreadSafeDeltaCache.java [new file with mode: 0644]

index d0351f867092f845cb8d3b88b5b2d0d78024fc79..ae70638672cd6916f54706041584cc3e962c5f56 100644 (file)
@@ -59,6 +59,16 @@ public abstract class ObjectReader {
        /** Type hint indicating the caller doesn't know the type. */
        protected static final int OBJ_ANY = -1;
 
+       /**
+        * Construct a new reader from the same data.
+        * <p>
+        * Applications can use this method to build a new reader from the same data
+        * source, but for an different thread.
+        *
+        * @return a brand new reader, using the same data source.
+        */
+       public abstract ObjectReader newReader();
+
        /**
         * Does the requested object exist in this database?
         *
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java b/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java
new file mode 100644 (file)
index 0000000..9708bb2
--- /dev/null
@@ -0,0 +1,111 @@
+/*
+ * Copyright (C) 2010, Google Inc.
+ * and other copyright owners as documented in the project's IP log.
+ *
+ * This program and the accompanying materials are made available
+ * under the terms of the Eclipse Distribution License v1.0 which
+ * accompanies this distribution, is reproduced below, and is
+ * available at http://www.eclipse.org/org/documents/edl-v10.php
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ *   notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ *   copyright notice, this list of conditions and the following
+ *   disclaimer in the documentation and/or other materials provided
+ *   with the distribution.
+ *
+ * - Neither the name of the Eclipse Foundation, Inc. nor the
+ *   names of its contributors may be used to endorse or promote
+ *   products derived from this software without specific prior
+ *   written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.eclipse.jgit.lib;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Wrapper around the general {@link ProgressMonitor} to make it thread safe.
+ */
+public class ThreadSafeProgressMonitor implements ProgressMonitor {
+       private final ProgressMonitor pm;
+
+       private final ReentrantLock lock;
+
+       /**
+        * Wrap a ProgressMonitor to be thread safe.
+        *
+        * @param pm
+        *            the underlying monitor to receive events.
+        */
+       public ThreadSafeProgressMonitor(ProgressMonitor pm) {
+               this.pm = pm;
+               this.lock = new ReentrantLock();
+       }
+
+       public void start(int totalTasks) {
+               lock.lock();
+               try {
+                       pm.start(totalTasks);
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       public void beginTask(String title, int totalWork) {
+               lock.lock();
+               try {
+                       pm.beginTask(title, totalWork);
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       public void update(int completed) {
+               lock.lock();
+               try {
+                       pm.update(completed);
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       public boolean isCancelled() {
+               lock.lock();
+               try {
+                       return pm.isCancelled();
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       public void endTask() {
+               lock.lock();
+               try {
+                       pm.endTask();
+               } finally {
+                       lock.unlock();
+               }
+       }
+}
index 04ee8b2c49431bcb012560b874d1aeccd85225a2..5376d077ad4333a815133872c7066c8e79928014 100644 (file)
@@ -77,6 +77,11 @@ final class WindowCursor extends ObjectReader implements ObjectReuseAsIs {
                this.db = db;
        }
 
+       @Override
+       public ObjectReader newReader() {
+               return new WindowCursor(db);
+       }
+
        public boolean has(AnyObjectId objectId) throws IOException {
                return db.has(objectId);
        }
index 7ad1c7f0331a6226ac2106441fb34edfb4b847cd..b6a7436f166e5309862f3c295db2b5ddfa90b85f 100644 (file)
@@ -95,11 +95,7 @@ class DeltaCache {
                // The caller may have had to allocate more space than is
                // required. If we are about to waste anything, shrink it.
                //
-               if (data.length != actLen) {
-                       byte[] nbuf = new byte[actLen];
-                       System.arraycopy(data, 0, nbuf, 0, actLen);
-                       data = nbuf;
-               }
+               data = resize(data, actLen);
 
                // When we reserved space for this item we did it for the
                // inflated size of the delta, but we were just given the
@@ -112,6 +108,15 @@ class DeltaCache {
                return new Ref(data, queue);
        }
 
+       byte[] resize(byte[] data, int actLen) {
+               if (data.length != actLen) {
+                       byte[] nbuf = new byte[actLen];
+                       System.arraycopy(data, 0, nbuf, 0, actLen);
+                       data = nbuf;
+               }
+               return data;
+       }
+
        private void checkForGarbageCollectedObjects() {
                Ref r;
                while ((r = (Ref) queue.poll()) != null)
index f1c17d76122c5fc4a69a0e28aa20c0bc75a3430e..814ab8f291bda45e08d50b4bc7abb467ff5620d5 100644 (file)
@@ -71,6 +71,8 @@ class PackConfig {
 
        final long bigFileThreshold;
 
+       final int threads;
+
        private PackConfig(Config rc) {
                deltaWindow = rc.getInt("pack", "window", PackWriter.DEFAULT_DELTA_SEARCH_WINDOW_SIZE);
                deltaWindowMemory = rc.getLong("pack", null, "windowmemory", 0);
@@ -80,6 +82,7 @@ class PackConfig {
                compression = compression(rc);
                indexVersion = rc.getInt("pack", "indexversion", 2);
                bigFileThreshold = rc.getLong("core", null, "bigfilethreshold", PackWriter.DEFAULT_BIG_FILE_THRESHOLD);
+               threads = rc.getInt("pack", "threads", 0);
        }
 
        private static int compression(Config rc) {
index bd9c1e312e6c74cb7da9a8c7000fac1d19c0b947..3769d6b4bfe449cd3a38c40a8eb4ae8c3e798afe 100644 (file)
@@ -58,6 +58,9 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.zip.Deflater;
 import java.util.zip.DeflaterOutputStream;
 
@@ -77,6 +80,7 @@ import org.eclipse.jgit.lib.ObjectLoader;
 import org.eclipse.jgit.lib.ObjectReader;
 import org.eclipse.jgit.lib.ProgressMonitor;
 import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.lib.ThreadSafeProgressMonitor;
 import org.eclipse.jgit.revwalk.ObjectWalk;
 import org.eclipse.jgit.revwalk.RevFlag;
 import org.eclipse.jgit.revwalk.RevObject;
@@ -233,6 +237,8 @@ public class PackWriter {
 
        private long bigFileThreshold = DEFAULT_BIG_FILE_THRESHOLD;
 
+       private int threads = 1;
+
        private boolean thin;
 
        private boolean ignoreMissingUninteresting = true;
@@ -289,6 +295,7 @@ public class PackWriter {
                compressionLevel = pc.compression;
                indexVersion = pc.indexVersion;
                bigFileThreshold = pc.bigFileThreshold;
+               threads = pc.threads;
        }
 
        private static Config configOf(final Repository repo) {
@@ -452,6 +459,9 @@ public class PackWriter {
 
        /**
         * Get the number of objects to try when looking for a delta base.
+        * <p>
+        * This limit is per thread, if 4 threads are used the actual memory
+        * used will be 4 times this value.
         *
         * @return the object count to be searched.
         */
@@ -477,6 +487,8 @@ public class PackWriter {
 
        /**
         * Get the size of the in-memory delta cache.
+        * <p>
+        * This limit is for the entire writer, even if multiple threads are used.
         *
         * @return maximum number of bytes worth of delta data to cache in memory.
         *         If 0 the cache is infinite in size (up to the JVM heap limit
@@ -570,6 +582,26 @@ public class PackWriter {
                compressionLevel = level;
        }
 
+       /** @return number of threads used for delta compression. */
+       public int getThreads() {
+               return threads;
+       }
+
+       /**
+        * Set the number of threads to use for delta compression.
+        * <p>
+        * During delta compression, if there are enough objects to be considered
+        * the writer will start up concurrent threads and allow them to compress
+        * different sections of the repository concurrently.
+        *
+        * @param threads
+        *            number of threads to use. If <= 0 the number of available
+        *            processors for this JVM is used.
+        */
+       public void setThread(int threads) {
+               this.threads = threads;
+       }
+
        /** @return true if this writer is producing a thin pack. */
        public boolean isThin() {
                return thin;
@@ -925,12 +957,107 @@ public class PackWriter {
                return true;
        }
 
-       private void searchForDeltas(ProgressMonitor monitor,
-                       ObjectToPack[] list, int cnt) throws MissingObjectException,
-                       IncorrectObjectTypeException, LargeObjectException, IOException {
-               DeltaCache dc = new DeltaCache(this);
-               DeltaWindow dw = new DeltaWindow(this, dc, reader);
-               dw.search(monitor, list, 0, cnt);
+       private void searchForDeltas(final ProgressMonitor monitor,
+                       final ObjectToPack[] list, final int cnt)
+                       throws MissingObjectException, IncorrectObjectTypeException,
+                       LargeObjectException, IOException {
+               if (threads == 0)
+                       threads = Runtime.getRuntime().availableProcessors();
+
+               if (threads <= 1 || cnt <= 2 * getDeltaSearchWindowSize()) {
+                       DeltaCache dc = new DeltaCache(this);
+                       DeltaWindow dw = new DeltaWindow(this, dc, reader);
+                       dw.search(monitor, list, 0, cnt);
+                       return;
+               }
+
+               final List<Throwable> errors = Collections
+                               .synchronizedList(new ArrayList<Throwable>());
+               final DeltaCache dc = new ThreadSafeDeltaCache(this);
+               final ProgressMonitor pm = new ThreadSafeProgressMonitor(monitor);
+               final ExecutorService pool = Executors.newFixedThreadPool(threads);
+
+               // Guess at the size of batch we want. Because we don't really
+               // have a way for a thread to steal work from another thread if
+               // it ends early, we over partition slightly so the work units
+               // are a bit smaller.
+               //
+               int estSize = cnt / (threads * 2);
+               if (estSize < 2 * getDeltaSearchWindowSize())
+                       estSize = 2 * getDeltaSearchWindowSize();
+
+               for (int i = 0; i < cnt;) {
+                       final int start = i;
+                       final int batchSize;
+
+                       if (cnt - i < estSize) {
+                               // If we don't have enough to fill the remaining block,
+                               // schedule what is left over as a single block.
+                               //
+                               batchSize = cnt - i;
+                       } else {
+                               // Try to split the block at the end of a path.
+                               //
+                               int end = start + estSize;
+                               while (end < cnt) {
+                                       ObjectToPack a = list[end - 1];
+                                       ObjectToPack b = list[end];
+                                       if (a.getPathHash() == b.getPathHash())
+                                               end++;
+                                       else
+                                               break;
+                               }
+                               batchSize = end - start;
+                       }
+                       i += batchSize;
+
+                       pool.submit(new Runnable() {
+                               public void run() {
+                                       try {
+                                               final ObjectReader or = reader.newReader();
+                                               try {
+                                                       DeltaWindow dw;
+                                                       dw = new DeltaWindow(PackWriter.this, dc, or);
+                                                       dw.search(pm, list, start, batchSize);
+                                               } finally {
+                                                       or.release();
+                                               }
+                                       } catch (Throwable err) {
+                                               errors.add(err);
+                                       }
+                               }
+                       });
+               }
+
+               // Tell the pool to stop.
+               //
+               pool.shutdown();
+               for (;;) {
+                       try {
+                               if (pool.awaitTermination(60, TimeUnit.SECONDS))
+                                       break;
+                       } catch (InterruptedException e) {
+                               throw new IOException(
+                                               JGitText.get().packingCancelledDuringObjectsWriting);
+                       }
+               }
+
+               // If any thread threw an error, try to report it back as
+               // though we weren't using a threaded search algorithm.
+               //
+               if (!errors.isEmpty()) {
+                       Throwable err = errors.get(0);
+                       if (err instanceof Error)
+                               throw (Error) err;
+                       if (err instanceof RuntimeException)
+                               throw (RuntimeException) err;
+                       if (err instanceof IOException)
+                               throw (IOException) err;
+
+                       IOException fail = new IOException(err.getMessage());
+                       fail.initCause(err);
+                       throw fail;
+               }
        }
 
        private void writeObjects(ProgressMonitor writeMonitor, PackOutputStream out)
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/ThreadSafeDeltaCache.java b/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/ThreadSafeDeltaCache.java
new file mode 100644 (file)
index 0000000..1412891
--- /dev/null
@@ -0,0 +1,86 @@
+/*
+ * Copyright (C) 2010, Google Inc.
+ * and other copyright owners as documented in the project's IP log.
+ *
+ * This program and the accompanying materials are made available
+ * under the terms of the Eclipse Distribution License v1.0 which
+ * accompanies this distribution, is reproduced below, and is
+ * available at http://www.eclipse.org/org/documents/edl-v10.php
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ *   notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ *   copyright notice, this list of conditions and the following
+ *   disclaimer in the documentation and/or other materials provided
+ *   with the distribution.
+ *
+ * - Neither the name of the Eclipse Foundation, Inc. nor the
+ *   names of its contributors may be used to endorse or promote
+ *   products derived from this software without specific prior
+ *   written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.eclipse.jgit.storage.pack;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+class ThreadSafeDeltaCache extends DeltaCache {
+       private final ReentrantLock lock;
+
+       ThreadSafeDeltaCache(PackWriter pw) {
+               super(pw);
+               lock = new ReentrantLock();
+       }
+
+       @Override
+       boolean canCache(int length, ObjectToPack src, ObjectToPack res) {
+               lock.lock();
+               try {
+                       return super.canCache(length, src, res);
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       @Override
+       void credit(int reservedSize) {
+               lock.lock();
+               try {
+                       super.credit(reservedSize);
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       @Override
+       Ref cache(byte[] data, int actLen, int reservedSize) {
+               data = resize(data, actLen);
+               lock.lock();
+               try {
+                       return super.cache(data, actLen, reservedSize);
+               } finally {
+                       lock.unlock();
+               }
+       }
+}