Instead of counting objects processed, count number of bytes added into the window. This should rescale the progress meter so that 30% complete means 30% of the total uncompressed content size has been inflated and fed into the window. In theory the progress meter should be more accurate about its percentage complete/remaining fraction than with objects. When counting objects small objects move the progress meter more rapidly than large objects, but demand a smaller amount of work than large objects being compressed. Change-Id: Id2848c16a2148b5ca51e0ca1e29c5be97eefeb48tags/v3.0.0.201305080800-m7
import org.eclipse.jgit.storage.pack.PackConfig; | import org.eclipse.jgit.storage.pack.PackConfig; | ||||
final class DeltaTask implements Callable<Object> { | final class DeltaTask implements Callable<Object> { | ||||
static final long MAX_METER = 9 << 20; | |||||
static final class Block { | static final class Block { | ||||
private static final int MIN_TOP_PATH = 50 << 20; | private static final int MIN_TOP_PATH = 50 << 20; | ||||
final int endIndex; | final int endIndex; | ||||
private long totalWeight; | private long totalWeight; | ||||
private long bytesPerUnit; | |||||
Block(int threads, PackConfig config, ObjectReader reader, | Block(int threads, PackConfig config, ObjectReader reader, | ||||
DeltaCache dc, ThreadSafeProgressMonitor pm, | DeltaCache dc, ThreadSafeProgressMonitor pm, | ||||
this.endIndex = end; | this.endIndex = end; | ||||
} | } | ||||
int cost() { | |||||
int d = (int) (totalWeight / bytesPerUnit); | |||||
if (totalWeight % bytesPerUnit != 0) | |||||
d++; | |||||
return d; | |||||
} | |||||
synchronized DeltaWindow stealWork(DeltaTask forThread) { | synchronized DeltaWindow stealWork(DeltaTask forThread) { | ||||
for (;;) { | for (;;) { | ||||
DeltaTask maxTask = null; | DeltaTask maxTask = null; | ||||
return a.slice.beginIndex - b.slice.beginIndex; | return a.slice.beginIndex - b.slice.beginIndex; | ||||
} | } | ||||
}); | }); | ||||
bytesPerUnit = 1; | |||||
while (MAX_METER <= (totalWeight / bytesPerUnit)) | |||||
bytesPerUnit <<= 10; | |||||
return topPaths; | return topPaths; | ||||
} | } | ||||
} | } | ||||
DeltaWindow initWindow(Slice s) { | DeltaWindow initWindow(Slice s) { | ||||
DeltaWindow w = new DeltaWindow(block.config, block.dc, | DeltaWindow w = new DeltaWindow(block.config, block.dc, | ||||
or, block.pm, | |||||
or, block.pm, block.bytesPerUnit, | |||||
block.list, s.beginIndex, s.endIndex); | block.list, s.beginIndex, s.endIndex); | ||||
synchronized (this) { | synchronized (this) { | ||||
dw = w; | dw = w; |
private final DeltaCache deltaCache; | private final DeltaCache deltaCache; | ||||
private final ObjectReader reader; | private final ObjectReader reader; | ||||
private final ProgressMonitor monitor; | private final ProgressMonitor monitor; | ||||
private final long bytesPerUnit; | |||||
private long bytesProcessed; | |||||
/** Maximum number of bytes to admit to the window at once. */ | /** Maximum number of bytes to admit to the window at once. */ | ||||
private final long maxMemory; | private final long maxMemory; | ||||
private Deflater deflater; | private Deflater deflater; | ||||
DeltaWindow(PackConfig pc, DeltaCache dc, ObjectReader or, | DeltaWindow(PackConfig pc, DeltaCache dc, ObjectReader or, | ||||
ProgressMonitor pm, | |||||
ProgressMonitor pm, long bpu, | |||||
ObjectToPack[] in, int beginIndex, int endIndex) { | ObjectToPack[] in, int beginIndex, int endIndex) { | ||||
config = pc; | config = pc; | ||||
deltaCache = dc; | deltaCache = dc; | ||||
reader = or; | reader = or; | ||||
monitor = pm; | monitor = pm; | ||||
bytesPerUnit = bpu; | |||||
toSearch = in; | toSearch = in; | ||||
cur = beginIndex; | cur = beginIndex; | ||||
end = endIndex; | end = endIndex; | ||||
// We don't actually want to make a delta for | // We don't actually want to make a delta for | ||||
// them, just need to push them into the window | // them, just need to push them into the window | ||||
// so they can be read by other objects. | // so they can be read by other objects. | ||||
// | |||||
keepInWindow(); | keepInWindow(); | ||||
} else { | } else { | ||||
// Search for a delta for the current window slot. | // Search for a delta for the current window slot. | ||||
// | |||||
monitor.update(1); | |||||
if (bytesPerUnit <= (bytesProcessed += next.getWeight())) { | |||||
int d = (int) (bytesProcessed / bytesPerUnit); | |||||
monitor.update(d); | |||||
bytesProcessed -= d * bytesPerUnit; | |||||
} | |||||
searchInWindow(); | searchInWindow(); | ||||
} | } | ||||
} | } |
return; | return; | ||||
final long searchStart = System.currentTimeMillis(); | final long searchStart = System.currentTimeMillis(); | ||||
beginPhase(PackingPhase.COMPRESSING, monitor, nonEdgeCnt); | |||||
searchForDeltas(monitor, list, cnt); | searchForDeltas(monitor, list, cnt); | ||||
endPhase(monitor); | |||||
stats.deltaSearchNonEdgeObjects = nonEdgeCnt; | stats.deltaSearchNonEdgeObjects = nonEdgeCnt; | ||||
stats.timeCompressing = System.currentTimeMillis() - searchStart; | stats.timeCompressing = System.currentTimeMillis() - searchStart; | ||||
int threads = config.getThreads(); | int threads = config.getThreads(); | ||||
if (threads == 0) | if (threads == 0) | ||||
threads = Runtime.getRuntime().availableProcessors(); | threads = Runtime.getRuntime().availableProcessors(); | ||||
if (threads <= 1 || cnt <= config.getDeltaSearchWindowSize()) | |||||
singleThreadDeltaSearch(monitor, list, cnt); | |||||
else | |||||
parallelDeltaSearch(monitor, list, cnt, threads); | |||||
} | |||||
if (threads <= 1 || cnt <= 2 * config.getDeltaSearchWindowSize()) { | |||||
new DeltaWindow(config, new DeltaCache(config), reader, monitor, | |||||
list, 0, cnt).search(); | |||||
return; | |||||
private void singleThreadDeltaSearch(ProgressMonitor monitor, | |||||
ObjectToPack[] list, int cnt) throws IOException { | |||||
long totalWeight = 0; | |||||
for (int i = 0; i < cnt; i++) { | |||||
ObjectToPack o = list[i]; | |||||
if (!o.isEdge() && !o.doNotAttemptDelta()) | |||||
totalWeight += o.getWeight(); | |||||
} | } | ||||
final DeltaCache dc = new ThreadSafeDeltaCache(config); | |||||
final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor); | |||||
long bytesPerUnit = 1; | |||||
while (DeltaTask.MAX_METER <= (totalWeight / bytesPerUnit)) | |||||
bytesPerUnit <<= 10; | |||||
int cost = (int) (totalWeight / bytesPerUnit); | |||||
if (totalWeight % bytesPerUnit != 0) | |||||
cost++; | |||||
beginPhase(PackingPhase.COMPRESSING, monitor, cost); | |||||
new DeltaWindow(config, new DeltaCache(config), reader, | |||||
monitor, bytesPerUnit, | |||||
list, 0, cnt).search(); | |||||
endPhase(monitor); | |||||
} | |||||
private void parallelDeltaSearch(ProgressMonitor monitor, | |||||
ObjectToPack[] list, int cnt, int threads) throws IOException { | |||||
DeltaCache dc = new ThreadSafeDeltaCache(config); | |||||
ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor); | |||||
DeltaTask.Block taskBlock = new DeltaTask.Block(threads, config, | DeltaTask.Block taskBlock = new DeltaTask.Block(threads, config, | ||||
reader, dc, pm, | reader, dc, pm, | ||||
list, 0, cnt); | list, 0, cnt); | ||||
taskBlock.partitionTasks(); | taskBlock.partitionTasks(); | ||||
beginPhase(PackingPhase.COMPRESSING, monitor, taskBlock.cost()); | |||||
pm.startWorkers(taskBlock.tasks.size()); | pm.startWorkers(taskBlock.tasks.size()); | ||||
final Executor executor = config.getExecutor(); | |||||
final List<Throwable> errors = Collections | |||||
.synchronizedList(new ArrayList<Throwable>()); | |||||
Executor executor = config.getExecutor(); | |||||
final List<Throwable> errors = | |||||
Collections.synchronizedList(new ArrayList<Throwable>(threads)); | |||||
if (executor instanceof ExecutorService) { | if (executor instanceof ExecutorService) { | ||||
// Caller supplied us a service, use it directly. | // Caller supplied us a service, use it directly. | ||||
runTasks((ExecutorService) executor, pm, taskBlock, errors); | runTasks((ExecutorService) executor, pm, taskBlock, errors); | ||||
fail.initCause(err); | fail.initCause(err); | ||||
throw fail; | throw fail; | ||||
} | } | ||||
endPhase(monitor); | |||||
} | } | ||||
private static void runTasks(ExecutorService pool, | private static void runTasks(ExecutorService pool, |