From 5d8a9f6f3f43ac43c6b1c48cdfad55e545171ea3 Mon Sep 17 00:00:00 2001 From: Shawn Pearce Date: Tue, 16 Apr 2013 09:25:29 -0700 Subject: Rescale "Compressing objects" progress meter by size Instead of counting objects processed, count number of bytes added into the window. This should rescale the progress meter so that 30% complete means 30% of the total uncompressed content size has been inflated and fed into the window. In theory the progress meter should be more accurate about its percentage complete/remaining fraction than with objects. When counting objects small objects move the progress meter more rapidly than large objects, but demand a smaller amount of work than large objects being compressed. Change-Id: Id2848c16a2148b5ca51e0ca1e29c5be97eefeb48 --- .../jgit/internal/storage/pack/DeltaTask.java | 16 +++++++- .../jgit/internal/storage/pack/DeltaWindow.java | 13 +++++-- .../jgit/internal/storage/pack/PackWriter.java | 45 ++++++++++++++++------ 3 files changed, 58 insertions(+), 16 deletions(-) diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java index ca2fff6882..c4b01949d1 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java @@ -57,6 +57,8 @@ import org.eclipse.jgit.lib.ThreadSafeProgressMonitor; import org.eclipse.jgit.storage.pack.PackConfig; final class DeltaTask implements Callable { + static final long MAX_METER = 9 << 20; + static final class Block { private static final int MIN_TOP_PATH = 50 << 20; @@ -71,6 +73,7 @@ final class DeltaTask implements Callable { final int endIndex; private long totalWeight; + private long bytesPerUnit; Block(int threads, PackConfig config, ObjectReader reader, DeltaCache dc, ThreadSafeProgressMonitor pm, @@ -86,6 +89,13 @@ final class DeltaTask implements Callable { this.endIndex = end; } + int cost() { + int d = (int) (totalWeight / bytesPerUnit); + if (totalWeight % bytesPerUnit != 0) + d++; + return d; + } + synchronized DeltaWindow stealWork(DeltaTask forThread) { for (;;) { DeltaTask maxTask = null; @@ -200,6 +210,10 @@ final class DeltaTask implements Callable { return a.slice.beginIndex - b.slice.beginIndex; } }); + + bytesPerUnit = 1; + while (MAX_METER <= (totalWeight / bytesPerUnit)) + bytesPerUnit <<= 10; return topPaths; } } @@ -282,7 +296,7 @@ final class DeltaTask implements Callable { DeltaWindow initWindow(Slice s) { DeltaWindow w = new DeltaWindow(block.config, block.dc, - or, block.pm, + or, block.pm, block.bytesPerUnit, block.list, s.beginIndex, s.endIndex); synchronized (this) { dw = w; diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java index cc7fac8007..19d06a23f8 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java @@ -64,6 +64,8 @@ final class DeltaWindow { private final DeltaCache deltaCache; private final ObjectReader reader; private final ProgressMonitor monitor; + private final long bytesPerUnit; + private long bytesProcessed; /** Maximum number of bytes to admit to the window at once. */ private final long maxMemory; @@ -92,12 +94,13 @@ final class DeltaWindow { private Deflater deflater; DeltaWindow(PackConfig pc, DeltaCache dc, ObjectReader or, - ProgressMonitor pm, + ProgressMonitor pm, long bpu, ObjectToPack[] in, int beginIndex, int endIndex) { config = pc; deltaCache = dc; reader = or; monitor = pm; + bytesPerUnit = bpu; toSearch = in; cur = beginIndex; end = endIndex; @@ -162,12 +165,14 @@ final class DeltaWindow { // We don't actually want to make a delta for // them, just need to push them into the window // so they can be read by other objects. - // keepInWindow(); } else { // Search for a delta for the current window slot. - // - monitor.update(1); + if (bytesPerUnit <= (bytesProcessed += next.getWeight())) { + int d = (int) (bytesProcessed / bytesPerUnit); + monitor.update(d); + bytesProcessed -= d * bytesPerUnit; + } searchInWindow(); } } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java index a3ef27c217..a7122592f3 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java @@ -1286,9 +1286,7 @@ public class PackWriter { return; final long searchStart = System.currentTimeMillis(); - beginPhase(PackingPhase.COMPRESSING, monitor, nonEdgeCnt); searchForDeltas(monitor, list, cnt); - endPhase(monitor); stats.deltaSearchNonEdgeObjects = nonEdgeCnt; stats.timeCompressing = System.currentTimeMillis() - searchStart; @@ -1327,25 +1325,49 @@ public class PackWriter { int threads = config.getThreads(); if (threads == 0) threads = Runtime.getRuntime().availableProcessors(); + if (threads <= 1 || cnt <= config.getDeltaSearchWindowSize()) + singleThreadDeltaSearch(monitor, list, cnt); + else + parallelDeltaSearch(monitor, list, cnt, threads); + } - if (threads <= 1 || cnt <= 2 * config.getDeltaSearchWindowSize()) { - new DeltaWindow(config, new DeltaCache(config), reader, monitor, - list, 0, cnt).search(); - return; + private void singleThreadDeltaSearch(ProgressMonitor monitor, + ObjectToPack[] list, int cnt) throws IOException { + long totalWeight = 0; + for (int i = 0; i < cnt; i++) { + ObjectToPack o = list[i]; + if (!o.isEdge() && !o.doNotAttemptDelta()) + totalWeight += o.getWeight(); } - final DeltaCache dc = new ThreadSafeDeltaCache(config); - final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor); + long bytesPerUnit = 1; + while (DeltaTask.MAX_METER <= (totalWeight / bytesPerUnit)) + bytesPerUnit <<= 10; + int cost = (int) (totalWeight / bytesPerUnit); + if (totalWeight % bytesPerUnit != 0) + cost++; + beginPhase(PackingPhase.COMPRESSING, monitor, cost); + new DeltaWindow(config, new DeltaCache(config), reader, + monitor, bytesPerUnit, + list, 0, cnt).search(); + endPhase(monitor); + } + + private void parallelDeltaSearch(ProgressMonitor monitor, + ObjectToPack[] list, int cnt, int threads) throws IOException { + DeltaCache dc = new ThreadSafeDeltaCache(config); + ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor); DeltaTask.Block taskBlock = new DeltaTask.Block(threads, config, reader, dc, pm, list, 0, cnt); taskBlock.partitionTasks(); + beginPhase(PackingPhase.COMPRESSING, monitor, taskBlock.cost()); pm.startWorkers(taskBlock.tasks.size()); - final Executor executor = config.getExecutor(); - final List errors = Collections - .synchronizedList(new ArrayList()); + Executor executor = config.getExecutor(); + final List errors = + Collections.synchronizedList(new ArrayList(threads)); if (executor instanceof ExecutorService) { // Caller supplied us a service, use it directly. runTasks((ExecutorService) executor, pm, taskBlock, errors); @@ -1409,6 +1431,7 @@ public class PackWriter { fail.initCause(err); throw fail; } + endPhase(monitor); } private static void runTasks(ExecutorService pool, -- cgit v1.2.3