]> source.dussan.org Git - jgit.git/commitdiff
Split delta search buckets by byte weight 76/11876/6
authorShawn Pearce <spearce@spearce.org>
Tue, 16 Apr 2013 15:19:13 +0000 (08:19 -0700)
committerShawn Pearce <sop@google.com>
Wed, 17 Apr 2013 18:31:00 +0000 (11:31 -0700)
Instead of assuming all objects cost the same amount of time to
delta compress, aggregate the byte size of objects in the list
and partition threads with roughly equal total bytes.

Before splitting the list select the N largest paths and assign
each one to its own thread. This allows threads to get through the
worst cases in parallel before attempting smaller paths that are
more likely to be splittable.

By running the largest path buckets first on each thread the likely
slowest part of compression is done early, while progress is still
reporting a low percentage. This gives users a better impression of
how fast the phase will run. On very complex inputs the slow part
is more likely to happen first, making a user realize its time to
go grab lunch, or even run it overnight.

If the worst sections are earlier, memory overruns may show up
earlier, giving the user a chance to correct the configuration and
try again before wasting large amounts of time. It also makes it
less likely the delta compression phase reaches 92% in 30 minutes
and then crawls for 10 hours through the remaining 8%.

Change-Id: I7621c4349b99e40098825c4966b8411079992e5f

org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java
org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java
org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java

index cc212fb81853c87a1f92a7599e828d462af53774..ca2fff68829fe7955c7689de8dd0bb83ed2c401c 100644 (file)
 
 package org.eclipse.jgit.internal.storage.pack;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Callable;
 
@@ -53,7 +58,10 @@ import org.eclipse.jgit.storage.pack.PackConfig;
 
 final class DeltaTask implements Callable<Object> {
        static final class Block {
+               private static final int MIN_TOP_PATH = 50 << 20;
+
                final List<DeltaTask> tasks;
+               final int threads;
                final PackConfig config;
                final ObjectReader templateReader;
                final DeltaCache dc;
@@ -62,10 +70,13 @@ final class DeltaTask implements Callable<Object> {
                final int beginIndex;
                final int endIndex;
 
+               private long totalWeight;
+
                Block(int threads, PackConfig config, ObjectReader reader,
                                DeltaCache dc, ThreadSafeProgressMonitor pm,
                                ObjectToPack[] list, int begin, int end) {
                        this.tasks = new ArrayList<DeltaTask>(threads);
+                       this.threads = threads;
                        this.config = config;
                        this.templateReader = reader;
                        this.dc = dc;
@@ -75,7 +86,7 @@ final class DeltaTask implements Callable<Object> {
                        this.endIndex = end;
                }
 
-               synchronized Slice stealWork() {
+               synchronized DeltaWindow stealWork(DeltaTask forThread) {
                        for (;;) {
                                DeltaTask maxTask = null;
                                Slice maxSlice = null;
@@ -92,9 +103,122 @@ final class DeltaTask implements Callable<Object> {
                                if (maxTask == null)
                                        return null;
                                if (maxTask.tryStealWork(maxSlice))
-                                       return maxSlice;
+                                       return forThread.initWindow(maxSlice);
                        }
                }
+
+               void partitionTasks() {
+                       ArrayList<WeightedPath> topPaths = computeTopPaths();
+                       Iterator<WeightedPath> topPathItr = topPaths.iterator();
+                       int nextTop = 0;
+                       long weightPerThread = totalWeight / threads;
+                       for (int i = beginIndex; i < endIndex;) {
+                               DeltaTask task = new DeltaTask(this);
+                               long w = 0;
+
+                               // Assign the thread one top path.
+                               if (topPathItr.hasNext()) {
+                                       WeightedPath p = topPathItr.next();
+                                       w += p.weight;
+                                       task.add(p.slice);
+                               }
+
+                               // Assign the task thread ~average weight.
+                               int s = i;
+                               for (; w < weightPerThread && i < endIndex;) {
+                                       if (nextTop < topPaths.size()
+                                                       && i == topPaths.get(nextTop).slice.beginIndex) {
+                                               if (s < i)
+                                                       task.add(new Slice(s, i));
+                                               s = i = topPaths.get(nextTop++).slice.endIndex;
+                                       } else
+                                               w += list[i++].getWeight();
+                               }
+
+                               // Round up the slice to the end of a path.
+                               if (s < i) {
+                                       int h = list[i - 1].getPathHash();
+                                       while (i < endIndex) {
+                                               if (h == list[i].getPathHash())
+                                                       i++;
+                                               else
+                                                       break;
+                                       }
+                                       task.add(new Slice(s, i));
+                               }
+                               if (!task.slices.isEmpty())
+                                       tasks.add(task);
+                       }
+                       while (topPathItr.hasNext()) {
+                               WeightedPath p = topPathItr.next();
+                               DeltaTask task = new DeltaTask(this);
+                               task.add(p.slice);
+                               tasks.add(task);
+                       }
+
+                       topPaths = null;
+               }
+
+               private ArrayList<WeightedPath> computeTopPaths() {
+                       ArrayList<WeightedPath> topPaths = new ArrayList<WeightedPath>(
+                                       threads);
+                       int cp = beginIndex;
+                       int ch = list[cp].getPathHash();
+                       long cw = list[cp].getWeight();
+                       totalWeight = list[cp].getWeight();
+
+                       for (int i = cp + 1; i < endIndex; i++) {
+                               ObjectToPack o = list[i];
+                               if (ch != o.getPathHash()) {
+                                       if (MIN_TOP_PATH < cw) {
+                                               if (topPaths.size() < threads) {
+                                                       Slice s = new Slice(cp, i);
+                                                       topPaths.add(new WeightedPath(cw, s));
+                                                       if (topPaths.size() == threads)
+                                                               Collections.sort(topPaths);
+                                               } else if (topPaths.get(0).weight < cw) {
+                                                       Slice s = new Slice(cp, i);
+                                                       WeightedPath p = new WeightedPath(cw, s);
+                                                       topPaths.set(0, p);
+                                                       if (p.compareTo(topPaths.get(1)) > 0)
+                                                               Collections.sort(topPaths);
+                                               }
+                                       }
+                                       cp = i;
+                                       ch = o.getPathHash();
+                                       cw = 0;
+                               }
+                               if (o.isEdge() || o.doNotAttemptDelta())
+                                       continue;
+                               cw += o.getWeight();
+                               totalWeight += o.getWeight();
+                       }
+
+                       // Sort by starting index to identify gaps later.
+                       Collections.sort(topPaths, new Comparator<WeightedPath>() {
+                               public int compare(WeightedPath a, WeightedPath b) {
+                                       return a.slice.beginIndex - b.slice.beginIndex;
+                               }
+                       });
+                       return topPaths;
+               }
+       }
+
+       static final class WeightedPath implements Comparable<WeightedPath> {
+               final long weight;
+               final Slice slice;
+
+               WeightedPath(long weight, Slice s) {
+                       this.weight = weight;
+                       this.slice = s;
+               }
+
+               public int compareTo(WeightedPath o) {
+                       int cmp = Long.signum(weight - o.weight);
+                       if (cmp != 0)
+                               return cmp;
+                       return slice.beginIndex - o.slice.beginIndex;
+               }
        }
 
        static final class Slice {
@@ -112,36 +236,82 @@ final class DeltaTask implements Callable<Object> {
        }
 
        private final Block block;
-       private final Slice firstSlice;
-       private volatile DeltaWindow dw;
+       private final LinkedList<Slice> slices;
 
-       DeltaTask(Block b, int beginIndex, int endIndex) {
+       private ObjectReader or;
+       private DeltaWindow dw;
+
+       DeltaTask(Block b) {
                this.block = b;
-               this.firstSlice = new Slice(beginIndex, endIndex);
+               this.slices = new LinkedList<Slice>();
+       }
+
+       void add(Slice s) {
+               if (!slices.isEmpty()) {
+                       Slice last = slices.getLast();
+                       if (last.endIndex == s.beginIndex) {
+                               slices.removeLast();
+                               slices.add(new Slice(last.beginIndex, s.endIndex));
+                               return;
+                       }
+               }
+               slices.add(s);
        }
 
        public Object call() throws Exception {
-               ObjectReader or = block.templateReader.newReader();
+               or = block.templateReader.newReader();
                try {
-                       for (Slice s = firstSlice; s != null; s = block.stealWork()) {
-                               dw = new DeltaWindow(block.config, block.dc, or, block.pm,
-                                               block.list, s.beginIndex, s.endIndex);
-                               dw.search();
-                               dw = null;
+                       DeltaWindow w;
+                       for (;;) {
+                               synchronized (this) {
+                                       if (slices.isEmpty())
+                                               break;
+                                       w = initWindow(slices.removeFirst());
+                               }
+                               runWindow(w);
                        }
+                       while ((w = block.stealWork(this)) != null)
+                               runWindow(w);
                } finally {
-                       or.release();
                        block.pm.endWorker();
+                       or.release();
+                       or = null;
                }
                return null;
        }
 
-       Slice remaining() {
+       DeltaWindow initWindow(Slice s) {
+               DeltaWindow w = new DeltaWindow(block.config, block.dc,
+                               or, block.pm,
+                               block.list, s.beginIndex, s.endIndex);
+               synchronized (this) {
+                       dw = w;
+               }
+               return w;
+       }
+
+       private void runWindow(DeltaWindow w) throws IOException {
+               try {
+                       w.search();
+               } finally {
+                       synchronized (this) {
+                               dw = null;
+                       }
+               }
+       }
+
+       synchronized Slice remaining() {
+               if (!slices.isEmpty())
+                       return slices.getLast();
                DeltaWindow d = dw;
                return d != null ? d.remaining() : null;
        }
 
-       boolean tryStealWork(Slice s) {
+       synchronized boolean tryStealWork(Slice s) {
+               if (!slices.isEmpty() && slices.getLast().beginIndex == s.beginIndex) {
+                       slices.removeLast();
+                       return true;
+               }
                DeltaWindow d = dw;
                return d != null ? d.tryStealWork(s) : false;
        }
index 66871bb145eecc10ae8d7c436ec813bb956c0317..cc7fac800799f70aebbd8822fc35e9ac5528d23c 100644 (file)
@@ -134,7 +134,7 @@ final class DeltaWindow {
        }
 
        synchronized boolean tryStealWork(DeltaTask.Slice s) {
-               if (s.beginIndex <= cur)
+               if (s.beginIndex <= cur || end <= s.beginIndex)
                        return false;
                end = s.beginIndex;
                return true;
index 54a5826c01096b9dbb428b3c028200bf074c3abf..a3ef27c217b4fe68c71e2e70e005cf491400721c 100644 (file)
@@ -1337,35 +1337,10 @@ public class PackWriter {
                final DeltaCache dc = new ThreadSafeDeltaCache(config);
                final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor);
 
-               int estSize = cnt / threads;
-               if (estSize < config.getDeltaSearchWindowSize())
-                       estSize = config.getDeltaSearchWindowSize();
-
                DeltaTask.Block taskBlock = new DeltaTask.Block(threads, config,
                                reader, dc, pm,
                                list, 0, cnt);
-               for (int i = 0; i < cnt;) {
-                       final int start = i;
-                       int end;
-
-                       if (cnt - i < estSize) {
-                               // If we don't have enough to fill the remaining block,
-                               // schedule what is left over as a single block.
-                               end = cnt;
-                       } else {
-                               // Try to split the block at the end of a path.
-                               end = start + estSize;
-                               int h = list[end - 1].getPathHash();
-                               while (end < cnt) {
-                                       if (h == list[end].getPathHash())
-                                               end++;
-                                       else
-                                               break;
-                               }
-                       }
-                       i = end;
-                       taskBlock.tasks.add(new DeltaTask(taskBlock, start, end));
-               }
+               taskBlock.partitionTasks();
                pm.startWorkers(taskBlock.tasks.size());
 
                final Executor executor = config.getExecutor();