summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorShawn Pearce <spearce@spearce.org>2013-04-16 08:19:13 -0700
committerShawn Pearce <sop@google.com>2013-04-17 11:31:00 -0700
commit21e4aa2b9eaf392825e52ada6034cc3044c69c67 (patch)
tree32f7a6735c2d406bb8e5007d3299ad790e05c1f0
parent5f03dc61b46f1eddc09c1fc2963891be48dcc99c (diff)
downloadjgit-21e4aa2b9eaf392825e52ada6034cc3044c69c67.tar.gz
jgit-21e4aa2b9eaf392825e52ada6034cc3044c69c67.zip
Split delta search buckets by byte weight
Instead of assuming all objects cost the same amount of time to delta compress, aggregate the byte size of objects in the list and partition threads with roughly equal total bytes. Before splitting the list select the N largest paths and assign each one to its own thread. This allows threads to get through the worst cases in parallel before attempting smaller paths that are more likely to be splittable. By running the largest path buckets first on each thread the likely slowest part of compression is done early, while progress is still reporting a low percentage. This gives users a better impression of how fast the phase will run. On very complex inputs the slow part is more likely to happen first, making a user realize its time to go grab lunch, or even run it overnight. If the worst sections are earlier, memory overruns may show up earlier, giving the user a chance to correct the configuration and try again before wasting large amounts of time. It also makes it less likely the delta compression phase reaches 92% in 30 minutes and then crawls for 10 hours through the remaining 8%. Change-Id: I7621c4349b99e40098825c4966b8411079992e5f
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java200
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java2
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java27
3 files changed, 187 insertions, 42 deletions
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java
index cc212fb818..ca2fff6882 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java
@@ -43,7 +43,12 @@
package org.eclipse.jgit.internal.storage.pack;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
@@ -53,7 +58,10 @@ import org.eclipse.jgit.storage.pack.PackConfig;
final class DeltaTask implements Callable<Object> {
static final class Block {
+ private static final int MIN_TOP_PATH = 50 << 20;
+
final List<DeltaTask> tasks;
+ final int threads;
final PackConfig config;
final ObjectReader templateReader;
final DeltaCache dc;
@@ -62,10 +70,13 @@ final class DeltaTask implements Callable<Object> {
final int beginIndex;
final int endIndex;
+ private long totalWeight;
+
Block(int threads, PackConfig config, ObjectReader reader,
DeltaCache dc, ThreadSafeProgressMonitor pm,
ObjectToPack[] list, int begin, int end) {
this.tasks = new ArrayList<DeltaTask>(threads);
+ this.threads = threads;
this.config = config;
this.templateReader = reader;
this.dc = dc;
@@ -75,7 +86,7 @@ final class DeltaTask implements Callable<Object> {
this.endIndex = end;
}
- synchronized Slice stealWork() {
+ synchronized DeltaWindow stealWork(DeltaTask forThread) {
for (;;) {
DeltaTask maxTask = null;
Slice maxSlice = null;
@@ -92,9 +103,122 @@ final class DeltaTask implements Callable<Object> {
if (maxTask == null)
return null;
if (maxTask.tryStealWork(maxSlice))
- return maxSlice;
+ return forThread.initWindow(maxSlice);
}
}
+
+ void partitionTasks() {
+ ArrayList<WeightedPath> topPaths = computeTopPaths();
+ Iterator<WeightedPath> topPathItr = topPaths.iterator();
+ int nextTop = 0;
+ long weightPerThread = totalWeight / threads;
+ for (int i = beginIndex; i < endIndex;) {
+ DeltaTask task = new DeltaTask(this);
+ long w = 0;
+
+ // Assign the thread one top path.
+ if (topPathItr.hasNext()) {
+ WeightedPath p = topPathItr.next();
+ w += p.weight;
+ task.add(p.slice);
+ }
+
+ // Assign the task thread ~average weight.
+ int s = i;
+ for (; w < weightPerThread && i < endIndex;) {
+ if (nextTop < topPaths.size()
+ && i == topPaths.get(nextTop).slice.beginIndex) {
+ if (s < i)
+ task.add(new Slice(s, i));
+ s = i = topPaths.get(nextTop++).slice.endIndex;
+ } else
+ w += list[i++].getWeight();
+ }
+
+ // Round up the slice to the end of a path.
+ if (s < i) {
+ int h = list[i - 1].getPathHash();
+ while (i < endIndex) {
+ if (h == list[i].getPathHash())
+ i++;
+ else
+ break;
+ }
+ task.add(new Slice(s, i));
+ }
+ if (!task.slices.isEmpty())
+ tasks.add(task);
+ }
+ while (topPathItr.hasNext()) {
+ WeightedPath p = topPathItr.next();
+ DeltaTask task = new DeltaTask(this);
+ task.add(p.slice);
+ tasks.add(task);
+ }
+
+ topPaths = null;
+ }
+
+ private ArrayList<WeightedPath> computeTopPaths() {
+ ArrayList<WeightedPath> topPaths = new ArrayList<WeightedPath>(
+ threads);
+ int cp = beginIndex;
+ int ch = list[cp].getPathHash();
+ long cw = list[cp].getWeight();
+ totalWeight = list[cp].getWeight();
+
+ for (int i = cp + 1; i < endIndex; i++) {
+ ObjectToPack o = list[i];
+ if (ch != o.getPathHash()) {
+ if (MIN_TOP_PATH < cw) {
+ if (topPaths.size() < threads) {
+ Slice s = new Slice(cp, i);
+ topPaths.add(new WeightedPath(cw, s));
+ if (topPaths.size() == threads)
+ Collections.sort(topPaths);
+ } else if (topPaths.get(0).weight < cw) {
+ Slice s = new Slice(cp, i);
+ WeightedPath p = new WeightedPath(cw, s);
+ topPaths.set(0, p);
+ if (p.compareTo(topPaths.get(1)) > 0)
+ Collections.sort(topPaths);
+ }
+ }
+ cp = i;
+ ch = o.getPathHash();
+ cw = 0;
+ }
+ if (o.isEdge() || o.doNotAttemptDelta())
+ continue;
+ cw += o.getWeight();
+ totalWeight += o.getWeight();
+ }
+
+ // Sort by starting index to identify gaps later.
+ Collections.sort(topPaths, new Comparator<WeightedPath>() {
+ public int compare(WeightedPath a, WeightedPath b) {
+ return a.slice.beginIndex - b.slice.beginIndex;
+ }
+ });
+ return topPaths;
+ }
+ }
+
+ static final class WeightedPath implements Comparable<WeightedPath> {
+ final long weight;
+ final Slice slice;
+
+ WeightedPath(long weight, Slice s) {
+ this.weight = weight;
+ this.slice = s;
+ }
+
+ public int compareTo(WeightedPath o) {
+ int cmp = Long.signum(weight - o.weight);
+ if (cmp != 0)
+ return cmp;
+ return slice.beginIndex - o.slice.beginIndex;
+ }
}
static final class Slice {
@@ -112,36 +236,82 @@ final class DeltaTask implements Callable<Object> {
}
private final Block block;
- private final Slice firstSlice;
- private volatile DeltaWindow dw;
+ private final LinkedList<Slice> slices;
- DeltaTask(Block b, int beginIndex, int endIndex) {
+ private ObjectReader or;
+ private DeltaWindow dw;
+
+ DeltaTask(Block b) {
this.block = b;
- this.firstSlice = new Slice(beginIndex, endIndex);
+ this.slices = new LinkedList<Slice>();
+ }
+
+ void add(Slice s) {
+ if (!slices.isEmpty()) {
+ Slice last = slices.getLast();
+ if (last.endIndex == s.beginIndex) {
+ slices.removeLast();
+ slices.add(new Slice(last.beginIndex, s.endIndex));
+ return;
+ }
+ }
+ slices.add(s);
}
public Object call() throws Exception {
- ObjectReader or = block.templateReader.newReader();
+ or = block.templateReader.newReader();
try {
- for (Slice s = firstSlice; s != null; s = block.stealWork()) {
- dw = new DeltaWindow(block.config, block.dc, or, block.pm,
- block.list, s.beginIndex, s.endIndex);
- dw.search();
- dw = null;
+ DeltaWindow w;
+ for (;;) {
+ synchronized (this) {
+ if (slices.isEmpty())
+ break;
+ w = initWindow(slices.removeFirst());
+ }
+ runWindow(w);
}
+ while ((w = block.stealWork(this)) != null)
+ runWindow(w);
} finally {
- or.release();
block.pm.endWorker();
+ or.release();
+ or = null;
}
return null;
}
- Slice remaining() {
+ DeltaWindow initWindow(Slice s) {
+ DeltaWindow w = new DeltaWindow(block.config, block.dc,
+ or, block.pm,
+ block.list, s.beginIndex, s.endIndex);
+ synchronized (this) {
+ dw = w;
+ }
+ return w;
+ }
+
+ private void runWindow(DeltaWindow w) throws IOException {
+ try {
+ w.search();
+ } finally {
+ synchronized (this) {
+ dw = null;
+ }
+ }
+ }
+
+ synchronized Slice remaining() {
+ if (!slices.isEmpty())
+ return slices.getLast();
DeltaWindow d = dw;
return d != null ? d.remaining() : null;
}
- boolean tryStealWork(Slice s) {
+ synchronized boolean tryStealWork(Slice s) {
+ if (!slices.isEmpty() && slices.getLast().beginIndex == s.beginIndex) {
+ slices.removeLast();
+ return true;
+ }
DeltaWindow d = dw;
return d != null ? d.tryStealWork(s) : false;
}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java
index 66871bb145..cc7fac8007 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java
@@ -134,7 +134,7 @@ final class DeltaWindow {
}
synchronized boolean tryStealWork(DeltaTask.Slice s) {
- if (s.beginIndex <= cur)
+ if (s.beginIndex <= cur || end <= s.beginIndex)
return false;
end = s.beginIndex;
return true;
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java
index 54a5826c01..a3ef27c217 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java
@@ -1337,35 +1337,10 @@ public class PackWriter {
final DeltaCache dc = new ThreadSafeDeltaCache(config);
final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor);
- int estSize = cnt / threads;
- if (estSize < config.getDeltaSearchWindowSize())
- estSize = config.getDeltaSearchWindowSize();
-
DeltaTask.Block taskBlock = new DeltaTask.Block(threads, config,
reader, dc, pm,
list, 0, cnt);
- for (int i = 0; i < cnt;) {
- final int start = i;
- int end;
-
- if (cnt - i < estSize) {
- // If we don't have enough to fill the remaining block,
- // schedule what is left over as a single block.
- end = cnt;
- } else {
- // Try to split the block at the end of a path.
- end = start + estSize;
- int h = list[end - 1].getPathHash();
- while (end < cnt) {
- if (h == list[end].getPathHash())
- end++;
- else
- break;
- }
- }
- i = end;
- taskBlock.tasks.add(new DeltaTask(taskBlock, start, end));
- }
+ taskBlock.partitionTasks();
pm.startWorkers(taskBlock.tasks.size());
final Executor executor = config.getExecutor();