aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorShawn Pearce <spearce@spearce.org>2013-04-17 14:53:13 -0400
committerGerrit Code Review @ Eclipse.org <gerrit@eclipse.org>2013-04-17 14:53:13 -0400
commitfa1bc6abb76cdd0abfe536d838035f94c3dfaeeb (patch)
tree451cae0f38057c832079e069dff178aa5c56614e
parente74263e74398a5c8ca14bde47828b2eb311429f9 (diff)
parent5d8a9f6f3f43ac43c6b1c48cdfad55e545171ea3 (diff)
downloadjgit-fa1bc6abb76cdd0abfe536d838035f94c3dfaeeb.tar.gz
jgit-fa1bc6abb76cdd0abfe536d838035f94c3dfaeeb.zip
Merge changes Id2848c16,I7621c434
* changes: Rescale "Compressing objects" progress meter by size Split delta search buckets by byte weight
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java214
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java15
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java70
3 files changed, 243 insertions, 56 deletions
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java
index cc212fb818..c4b01949d1 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java
@@ -43,7 +43,12 @@
package org.eclipse.jgit.internal.storage.pack;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
@@ -52,8 +57,13 @@ import org.eclipse.jgit.lib.ThreadSafeProgressMonitor;
import org.eclipse.jgit.storage.pack.PackConfig;
final class DeltaTask implements Callable<Object> {
+ static final long MAX_METER = 9 << 20;
+
static final class Block {
+ private static final int MIN_TOP_PATH = 50 << 20;
+
final List<DeltaTask> tasks;
+ final int threads;
final PackConfig config;
final ObjectReader templateReader;
final DeltaCache dc;
@@ -62,10 +72,14 @@ final class DeltaTask implements Callable<Object> {
final int beginIndex;
final int endIndex;
+ private long totalWeight;
+ private long bytesPerUnit;
+
Block(int threads, PackConfig config, ObjectReader reader,
DeltaCache dc, ThreadSafeProgressMonitor pm,
ObjectToPack[] list, int begin, int end) {
this.tasks = new ArrayList<DeltaTask>(threads);
+ this.threads = threads;
this.config = config;
this.templateReader = reader;
this.dc = dc;
@@ -75,7 +89,14 @@ final class DeltaTask implements Callable<Object> {
this.endIndex = end;
}
- synchronized Slice stealWork() {
+ int cost() {
+ int d = (int) (totalWeight / bytesPerUnit);
+ if (totalWeight % bytesPerUnit != 0)
+ d++;
+ return d;
+ }
+
+ synchronized DeltaWindow stealWork(DeltaTask forThread) {
for (;;) {
DeltaTask maxTask = null;
Slice maxSlice = null;
@@ -92,9 +113,126 @@ final class DeltaTask implements Callable<Object> {
if (maxTask == null)
return null;
if (maxTask.tryStealWork(maxSlice))
- return maxSlice;
+ return forThread.initWindow(maxSlice);
}
}
+
+ void partitionTasks() {
+ ArrayList<WeightedPath> topPaths = computeTopPaths();
+ Iterator<WeightedPath> topPathItr = topPaths.iterator();
+ int nextTop = 0;
+ long weightPerThread = totalWeight / threads;
+ for (int i = beginIndex; i < endIndex;) {
+ DeltaTask task = new DeltaTask(this);
+ long w = 0;
+
+ // Assign the thread one top path.
+ if (topPathItr.hasNext()) {
+ WeightedPath p = topPathItr.next();
+ w += p.weight;
+ task.add(p.slice);
+ }
+
+ // Assign the task thread ~average weight.
+ int s = i;
+ for (; w < weightPerThread && i < endIndex;) {
+ if (nextTop < topPaths.size()
+ && i == topPaths.get(nextTop).slice.beginIndex) {
+ if (s < i)
+ task.add(new Slice(s, i));
+ s = i = topPaths.get(nextTop++).slice.endIndex;
+ } else
+ w += list[i++].getWeight();
+ }
+
+ // Round up the slice to the end of a path.
+ if (s < i) {
+ int h = list[i - 1].getPathHash();
+ while (i < endIndex) {
+ if (h == list[i].getPathHash())
+ i++;
+ else
+ break;
+ }
+ task.add(new Slice(s, i));
+ }
+ if (!task.slices.isEmpty())
+ tasks.add(task);
+ }
+ while (topPathItr.hasNext()) {
+ WeightedPath p = topPathItr.next();
+ DeltaTask task = new DeltaTask(this);
+ task.add(p.slice);
+ tasks.add(task);
+ }
+
+ topPaths = null;
+ }
+
+ private ArrayList<WeightedPath> computeTopPaths() {
+ ArrayList<WeightedPath> topPaths = new ArrayList<WeightedPath>(
+ threads);
+ int cp = beginIndex;
+ int ch = list[cp].getPathHash();
+ long cw = list[cp].getWeight();
+ totalWeight = list[cp].getWeight();
+
+ for (int i = cp + 1; i < endIndex; i++) {
+ ObjectToPack o = list[i];
+ if (ch != o.getPathHash()) {
+ if (MIN_TOP_PATH < cw) {
+ if (topPaths.size() < threads) {
+ Slice s = new Slice(cp, i);
+ topPaths.add(new WeightedPath(cw, s));
+ if (topPaths.size() == threads)
+ Collections.sort(topPaths);
+ } else if (topPaths.get(0).weight < cw) {
+ Slice s = new Slice(cp, i);
+ WeightedPath p = new WeightedPath(cw, s);
+ topPaths.set(0, p);
+ if (p.compareTo(topPaths.get(1)) > 0)
+ Collections.sort(topPaths);
+ }
+ }
+ cp = i;
+ ch = o.getPathHash();
+ cw = 0;
+ }
+ if (o.isEdge() || o.doNotAttemptDelta())
+ continue;
+ cw += o.getWeight();
+ totalWeight += o.getWeight();
+ }
+
+ // Sort by starting index to identify gaps later.
+ Collections.sort(topPaths, new Comparator<WeightedPath>() {
+ public int compare(WeightedPath a, WeightedPath b) {
+ return a.slice.beginIndex - b.slice.beginIndex;
+ }
+ });
+
+ bytesPerUnit = 1;
+ while (MAX_METER <= (totalWeight / bytesPerUnit))
+ bytesPerUnit <<= 10;
+ return topPaths;
+ }
+ }
+
+ static final class WeightedPath implements Comparable<WeightedPath> {
+ final long weight;
+ final Slice slice;
+
+ WeightedPath(long weight, Slice s) {
+ this.weight = weight;
+ this.slice = s;
+ }
+
+ public int compareTo(WeightedPath o) {
+ int cmp = Long.signum(weight - o.weight);
+ if (cmp != 0)
+ return cmp;
+ return slice.beginIndex - o.slice.beginIndex;
+ }
}
static final class Slice {
@@ -112,36 +250,82 @@ final class DeltaTask implements Callable<Object> {
}
private final Block block;
- private final Slice firstSlice;
- private volatile DeltaWindow dw;
+ private final LinkedList<Slice> slices;
+
+ private ObjectReader or;
+ private DeltaWindow dw;
- DeltaTask(Block b, int beginIndex, int endIndex) {
+ DeltaTask(Block b) {
this.block = b;
- this.firstSlice = new Slice(beginIndex, endIndex);
+ this.slices = new LinkedList<Slice>();
+ }
+
+ void add(Slice s) {
+ if (!slices.isEmpty()) {
+ Slice last = slices.getLast();
+ if (last.endIndex == s.beginIndex) {
+ slices.removeLast();
+ slices.add(new Slice(last.beginIndex, s.endIndex));
+ return;
+ }
+ }
+ slices.add(s);
}
public Object call() throws Exception {
- ObjectReader or = block.templateReader.newReader();
+ or = block.templateReader.newReader();
try {
- for (Slice s = firstSlice; s != null; s = block.stealWork()) {
- dw = new DeltaWindow(block.config, block.dc, or, block.pm,
- block.list, s.beginIndex, s.endIndex);
- dw.search();
- dw = null;
+ DeltaWindow w;
+ for (;;) {
+ synchronized (this) {
+ if (slices.isEmpty())
+ break;
+ w = initWindow(slices.removeFirst());
+ }
+ runWindow(w);
}
+ while ((w = block.stealWork(this)) != null)
+ runWindow(w);
} finally {
- or.release();
block.pm.endWorker();
+ or.release();
+ or = null;
}
return null;
}
- Slice remaining() {
+ DeltaWindow initWindow(Slice s) {
+ DeltaWindow w = new DeltaWindow(block.config, block.dc,
+ or, block.pm, block.bytesPerUnit,
+ block.list, s.beginIndex, s.endIndex);
+ synchronized (this) {
+ dw = w;
+ }
+ return w;
+ }
+
+ private void runWindow(DeltaWindow w) throws IOException {
+ try {
+ w.search();
+ } finally {
+ synchronized (this) {
+ dw = null;
+ }
+ }
+ }
+
+ synchronized Slice remaining() {
+ if (!slices.isEmpty())
+ return slices.getLast();
DeltaWindow d = dw;
return d != null ? d.remaining() : null;
}
- boolean tryStealWork(Slice s) {
+ synchronized boolean tryStealWork(Slice s) {
+ if (!slices.isEmpty() && slices.getLast().beginIndex == s.beginIndex) {
+ slices.removeLast();
+ return true;
+ }
DeltaWindow d = dw;
return d != null ? d.tryStealWork(s) : false;
}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java
index 66871bb145..19d06a23f8 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java
@@ -64,6 +64,8 @@ final class DeltaWindow {
private final DeltaCache deltaCache;
private final ObjectReader reader;
private final ProgressMonitor monitor;
+ private final long bytesPerUnit;
+ private long bytesProcessed;
/** Maximum number of bytes to admit to the window at once. */
private final long maxMemory;
@@ -92,12 +94,13 @@ final class DeltaWindow {
private Deflater deflater;
DeltaWindow(PackConfig pc, DeltaCache dc, ObjectReader or,
- ProgressMonitor pm,
+ ProgressMonitor pm, long bpu,
ObjectToPack[] in, int beginIndex, int endIndex) {
config = pc;
deltaCache = dc;
reader = or;
monitor = pm;
+ bytesPerUnit = bpu;
toSearch = in;
cur = beginIndex;
end = endIndex;
@@ -134,7 +137,7 @@ final class DeltaWindow {
}
synchronized boolean tryStealWork(DeltaTask.Slice s) {
- if (s.beginIndex <= cur)
+ if (s.beginIndex <= cur || end <= s.beginIndex)
return false;
end = s.beginIndex;
return true;
@@ -162,12 +165,14 @@ final class DeltaWindow {
// We don't actually want to make a delta for
// them, just need to push them into the window
// so they can be read by other objects.
- //
keepInWindow();
} else {
// Search for a delta for the current window slot.
- //
- monitor.update(1);
+ if (bytesPerUnit <= (bytesProcessed += next.getWeight())) {
+ int d = (int) (bytesProcessed / bytesPerUnit);
+ monitor.update(d);
+ bytesProcessed -= d * bytesPerUnit;
+ }
searchInWindow();
}
}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java
index 54a5826c01..a7122592f3 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java
@@ -1286,9 +1286,7 @@ public class PackWriter {
return;
final long searchStart = System.currentTimeMillis();
- beginPhase(PackingPhase.COMPRESSING, monitor, nonEdgeCnt);
searchForDeltas(monitor, list, cnt);
- endPhase(monitor);
stats.deltaSearchNonEdgeObjects = nonEdgeCnt;
stats.timeCompressing = System.currentTimeMillis() - searchStart;
@@ -1327,50 +1325,49 @@ public class PackWriter {
int threads = config.getThreads();
if (threads == 0)
threads = Runtime.getRuntime().availableProcessors();
+ if (threads <= 1 || cnt <= config.getDeltaSearchWindowSize())
+ singleThreadDeltaSearch(monitor, list, cnt);
+ else
+ parallelDeltaSearch(monitor, list, cnt, threads);
+ }
- if (threads <= 1 || cnt <= 2 * config.getDeltaSearchWindowSize()) {
- new DeltaWindow(config, new DeltaCache(config), reader, monitor,
- list, 0, cnt).search();
- return;
+ private void singleThreadDeltaSearch(ProgressMonitor monitor,
+ ObjectToPack[] list, int cnt) throws IOException {
+ long totalWeight = 0;
+ for (int i = 0; i < cnt; i++) {
+ ObjectToPack o = list[i];
+ if (!o.isEdge() && !o.doNotAttemptDelta())
+ totalWeight += o.getWeight();
}
- final DeltaCache dc = new ThreadSafeDeltaCache(config);
- final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor);
+ long bytesPerUnit = 1;
+ while (DeltaTask.MAX_METER <= (totalWeight / bytesPerUnit))
+ bytesPerUnit <<= 10;
+ int cost = (int) (totalWeight / bytesPerUnit);
+ if (totalWeight % bytesPerUnit != 0)
+ cost++;
- int estSize = cnt / threads;
- if (estSize < config.getDeltaSearchWindowSize())
- estSize = config.getDeltaSearchWindowSize();
+ beginPhase(PackingPhase.COMPRESSING, monitor, cost);
+ new DeltaWindow(config, new DeltaCache(config), reader,
+ monitor, bytesPerUnit,
+ list, 0, cnt).search();
+ endPhase(monitor);
+ }
+ private void parallelDeltaSearch(ProgressMonitor monitor,
+ ObjectToPack[] list, int cnt, int threads) throws IOException {
+ DeltaCache dc = new ThreadSafeDeltaCache(config);
+ ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor);
DeltaTask.Block taskBlock = new DeltaTask.Block(threads, config,
reader, dc, pm,
list, 0, cnt);
- for (int i = 0; i < cnt;) {
- final int start = i;
- int end;
-
- if (cnt - i < estSize) {
- // If we don't have enough to fill the remaining block,
- // schedule what is left over as a single block.
- end = cnt;
- } else {
- // Try to split the block at the end of a path.
- end = start + estSize;
- int h = list[end - 1].getPathHash();
- while (end < cnt) {
- if (h == list[end].getPathHash())
- end++;
- else
- break;
- }
- }
- i = end;
- taskBlock.tasks.add(new DeltaTask(taskBlock, start, end));
- }
+ taskBlock.partitionTasks();
+ beginPhase(PackingPhase.COMPRESSING, monitor, taskBlock.cost());
pm.startWorkers(taskBlock.tasks.size());
- final Executor executor = config.getExecutor();
- final List<Throwable> errors = Collections
- .synchronizedList(new ArrayList<Throwable>());
+ Executor executor = config.getExecutor();
+ final List<Throwable> errors =
+ Collections.synchronizedList(new ArrayList<Throwable>(threads));
if (executor instanceof ExecutorService) {
// Caller supplied us a service, use it directly.
runTasks((ExecutorService) executor, pm, taskBlock, errors);
@@ -1434,6 +1431,7 @@ public class PackWriter {
fail.initCause(err);
throw fail;
}
+ endPhase(monitor);
}
private static void runTasks(ExecutorService pool,