diff options
3 files changed, 187 insertions, 42 deletions
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java index cc212fb818..ca2fff6882 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java @@ -43,7 +43,12 @@ package org.eclipse.jgit.internal.storage.pack; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; @@ -53,7 +58,10 @@ import org.eclipse.jgit.storage.pack.PackConfig; final class DeltaTask implements Callable<Object> { static final class Block { + private static final int MIN_TOP_PATH = 50 << 20; + final List<DeltaTask> tasks; + final int threads; final PackConfig config; final ObjectReader templateReader; final DeltaCache dc; @@ -62,10 +70,13 @@ final class DeltaTask implements Callable<Object> { final int beginIndex; final int endIndex; + private long totalWeight; + Block(int threads, PackConfig config, ObjectReader reader, DeltaCache dc, ThreadSafeProgressMonitor pm, ObjectToPack[] list, int begin, int end) { this.tasks = new ArrayList<DeltaTask>(threads); + this.threads = threads; this.config = config; this.templateReader = reader; this.dc = dc; @@ -75,7 +86,7 @@ final class DeltaTask implements Callable<Object> { this.endIndex = end; } - synchronized Slice stealWork() { + synchronized DeltaWindow stealWork(DeltaTask forThread) { for (;;) { DeltaTask maxTask = null; Slice maxSlice = null; @@ -92,9 +103,122 @@ final class DeltaTask implements Callable<Object> { if (maxTask == null) return null; if (maxTask.tryStealWork(maxSlice)) - return maxSlice; + return forThread.initWindow(maxSlice); } } + + void partitionTasks() { + ArrayList<WeightedPath> topPaths = computeTopPaths(); + Iterator<WeightedPath> topPathItr = topPaths.iterator(); + int nextTop = 0; + long weightPerThread = totalWeight / threads; + for (int i = beginIndex; i < endIndex;) { + DeltaTask task = new DeltaTask(this); + long w = 0; + + // Assign the thread one top path. + if (topPathItr.hasNext()) { + WeightedPath p = topPathItr.next(); + w += p.weight; + task.add(p.slice); + } + + // Assign the task thread ~average weight. + int s = i; + for (; w < weightPerThread && i < endIndex;) { + if (nextTop < topPaths.size() + && i == topPaths.get(nextTop).slice.beginIndex) { + if (s < i) + task.add(new Slice(s, i)); + s = i = topPaths.get(nextTop++).slice.endIndex; + } else + w += list[i++].getWeight(); + } + + // Round up the slice to the end of a path. + if (s < i) { + int h = list[i - 1].getPathHash(); + while (i < endIndex) { + if (h == list[i].getPathHash()) + i++; + else + break; + } + task.add(new Slice(s, i)); + } + if (!task.slices.isEmpty()) + tasks.add(task); + } + while (topPathItr.hasNext()) { + WeightedPath p = topPathItr.next(); + DeltaTask task = new DeltaTask(this); + task.add(p.slice); + tasks.add(task); + } + + topPaths = null; + } + + private ArrayList<WeightedPath> computeTopPaths() { + ArrayList<WeightedPath> topPaths = new ArrayList<WeightedPath>( + threads); + int cp = beginIndex; + int ch = list[cp].getPathHash(); + long cw = list[cp].getWeight(); + totalWeight = list[cp].getWeight(); + + for (int i = cp + 1; i < endIndex; i++) { + ObjectToPack o = list[i]; + if (ch != o.getPathHash()) { + if (MIN_TOP_PATH < cw) { + if (topPaths.size() < threads) { + Slice s = new Slice(cp, i); + topPaths.add(new WeightedPath(cw, s)); + if (topPaths.size() == threads) + Collections.sort(topPaths); + } else if (topPaths.get(0).weight < cw) { + Slice s = new Slice(cp, i); + WeightedPath p = new WeightedPath(cw, s); + topPaths.set(0, p); + if (p.compareTo(topPaths.get(1)) > 0) + Collections.sort(topPaths); + } + } + cp = i; + ch = o.getPathHash(); + cw = 0; + } + if (o.isEdge() || o.doNotAttemptDelta()) + continue; + cw += o.getWeight(); + totalWeight += o.getWeight(); + } + + // Sort by starting index to identify gaps later. + Collections.sort(topPaths, new Comparator<WeightedPath>() { + public int compare(WeightedPath a, WeightedPath b) { + return a.slice.beginIndex - b.slice.beginIndex; + } + }); + return topPaths; + } + } + + static final class WeightedPath implements Comparable<WeightedPath> { + final long weight; + final Slice slice; + + WeightedPath(long weight, Slice s) { + this.weight = weight; + this.slice = s; + } + + public int compareTo(WeightedPath o) { + int cmp = Long.signum(weight - o.weight); + if (cmp != 0) + return cmp; + return slice.beginIndex - o.slice.beginIndex; + } } static final class Slice { @@ -112,36 +236,82 @@ final class DeltaTask implements Callable<Object> { } private final Block block; - private final Slice firstSlice; - private volatile DeltaWindow dw; + private final LinkedList<Slice> slices; - DeltaTask(Block b, int beginIndex, int endIndex) { + private ObjectReader or; + private DeltaWindow dw; + + DeltaTask(Block b) { this.block = b; - this.firstSlice = new Slice(beginIndex, endIndex); + this.slices = new LinkedList<Slice>(); + } + + void add(Slice s) { + if (!slices.isEmpty()) { + Slice last = slices.getLast(); + if (last.endIndex == s.beginIndex) { + slices.removeLast(); + slices.add(new Slice(last.beginIndex, s.endIndex)); + return; + } + } + slices.add(s); } public Object call() throws Exception { - ObjectReader or = block.templateReader.newReader(); + or = block.templateReader.newReader(); try { - for (Slice s = firstSlice; s != null; s = block.stealWork()) { - dw = new DeltaWindow(block.config, block.dc, or, block.pm, - block.list, s.beginIndex, s.endIndex); - dw.search(); - dw = null; + DeltaWindow w; + for (;;) { + synchronized (this) { + if (slices.isEmpty()) + break; + w = initWindow(slices.removeFirst()); + } + runWindow(w); } + while ((w = block.stealWork(this)) != null) + runWindow(w); } finally { - or.release(); block.pm.endWorker(); + or.release(); + or = null; } return null; } - Slice remaining() { + DeltaWindow initWindow(Slice s) { + DeltaWindow w = new DeltaWindow(block.config, block.dc, + or, block.pm, + block.list, s.beginIndex, s.endIndex); + synchronized (this) { + dw = w; + } + return w; + } + + private void runWindow(DeltaWindow w) throws IOException { + try { + w.search(); + } finally { + synchronized (this) { + dw = null; + } + } + } + + synchronized Slice remaining() { + if (!slices.isEmpty()) + return slices.getLast(); DeltaWindow d = dw; return d != null ? d.remaining() : null; } - boolean tryStealWork(Slice s) { + synchronized boolean tryStealWork(Slice s) { + if (!slices.isEmpty() && slices.getLast().beginIndex == s.beginIndex) { + slices.removeLast(); + return true; + } DeltaWindow d = dw; return d != null ? d.tryStealWork(s) : false; } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java index 66871bb145..cc7fac8007 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java @@ -134,7 +134,7 @@ final class DeltaWindow { } synchronized boolean tryStealWork(DeltaTask.Slice s) { - if (s.beginIndex <= cur) + if (s.beginIndex <= cur || end <= s.beginIndex) return false; end = s.beginIndex; return true; diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java index 54a5826c01..a3ef27c217 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java @@ -1337,35 +1337,10 @@ public class PackWriter { final DeltaCache dc = new ThreadSafeDeltaCache(config); final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor); - int estSize = cnt / threads; - if (estSize < config.getDeltaSearchWindowSize()) - estSize = config.getDeltaSearchWindowSize(); - DeltaTask.Block taskBlock = new DeltaTask.Block(threads, config, reader, dc, pm, list, 0, cnt); - for (int i = 0; i < cnt;) { - final int start = i; - int end; - - if (cnt - i < estSize) { - // If we don't have enough to fill the remaining block, - // schedule what is left over as a single block. - end = cnt; - } else { - // Try to split the block at the end of a path. - end = start + estSize; - int h = list[end - 1].getPathHash(); - while (end < cnt) { - if (h == list[end].getPathHash()) - end++; - else - break; - } - } - i = end; - taskBlock.tasks.add(new DeltaTask(taskBlock, start, end)); - } + taskBlock.partitionTasks(); pm.startWorkers(taskBlock.tasks.size()); final Executor executor = config.getExecutor(); |