From d34ec1201950c01f9b6a104540b31dbd614eea65 Mon Sep 17 00:00:00 2001 From: "Shawn O. Pearce" Date: Thu, 9 Jun 2011 19:10:11 -0700 Subject: [PATCH] DHT: Change DhtReadher caches to be dynamic by workload Instead of fixing the prefetch queue and recent chunk queue as different sizes, allow these to share the same limit but be scaled based on the work being performed. During walks about 20% of the space will be given to the prefetcher, and the other 80% will be used by the recent chunks cache. This should improve cases where there is bad locality between chunks. During writing of a pack stream, 90-100% of the space should be made available to the prefetcher, as the prefetch plan is usually very accurate about the order chunks will be needed in. Change-Id: I1ca7acb4518e66eb9d4138fb753df38e7254704d Signed-off-by: Shawn O. Pearce --- .../jgit/storage/dht/DhtCachedPack.java | 15 ++- .../eclipse/jgit/storage/dht/DhtReader.java | 18 ++- .../jgit/storage/dht/DhtReaderOptions.java | 121 +++++++++++++----- .../eclipse/jgit/storage/dht/OpenQueue.java | 10 +- .../eclipse/jgit/storage/dht/Prefetcher.java | 4 +- .../jgit/storage/dht/RecentChunks.java | 44 +++++-- 6 files changed, 158 insertions(+), 54 deletions(-) diff --git a/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtCachedPack.java b/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtCachedPack.java index 39a76463fb..0fd253bfbf 100644 --- a/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtCachedPack.java +++ b/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtCachedPack.java @@ -125,9 +125,18 @@ public class DhtCachedPack extends CachedPack { throws IOException { if (keyList == null) init(); - Prefetcher p = new Prefetcher(ctx, 0); - p.push(Arrays.asList(keyList)); - copyPack(out, p, validate); + + // Clear the recent chunks because all of the reader's + // chunk limit should be made available for prefetch. + int cacheLimit = ctx.getOptions().getChunkLimit(); + ctx.getRecentChunks().setMaxBytes(0); + try { + Prefetcher p = new Prefetcher(ctx, 0, cacheLimit); + p.push(Arrays.asList(keyList)); + copyPack(out, p, validate); + } finally { + ctx.getRecentChunks().setMaxBytes(cacheLimit); + } } private void copyPack(PackOutputStream out, Prefetcher prefetcher, diff --git a/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtReader.java b/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtReader.java index f9288b9e2e..330b5c0734 100644 --- a/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtReader.java +++ b/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtReader.java @@ -156,6 +156,10 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs { return recentInfo; } + RecentChunks getRecentChunks() { + return recentChunks; + } + DeltaBaseCache getDeltaBaseCache() { return deltaBaseCache; } @@ -242,7 +246,7 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs { // configured as push might invoke our own methods that may // try to call back into the active prefetcher. // - Prefetcher p = new Prefetcher(this, OBJ_COMMIT); + Prefetcher p = prefetch(OBJ_COMMIT, readerOptions.getWalkCommitsPrefetchRatio()); p.push(this, roots); prefetcher = p; } @@ -256,7 +260,7 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs { // configured as push might invoke our own methods that may // try to call back into the active prefetcher. // - Prefetcher p = new Prefetcher(this, OBJ_TREE); + Prefetcher p = prefetch(OBJ_TREE, readerOptions.getWalkTreesPrefetchRatio()); p.push(this, min.getTree(), max.getTree()); prefetcher = p; } @@ -391,14 +395,22 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs { new RepresentationSelector(packer, this, monitor).select(itr); } + private Prefetcher prefetch(final int type, final int ratio) { + int limit = readerOptions.getChunkLimit(); + int prefetchLimit = (int) (limit * (ratio / 100.0)); + recentChunks.setMaxBytes(limit - prefetchLimit); + return new Prefetcher(this, type, prefetchLimit); + } + private void endPrefetch() { + recentChunks.setMaxBytes(getOptions().getChunkLimit()); prefetcher = null; } @SuppressWarnings("unchecked") public void writeObjects(PackOutputStream out, List objects) throws IOException { - prefetcher = new Prefetcher(this, 0); + prefetcher = prefetch(0, readerOptions.getWriteObjectsPrefetchRatio()); try { List itr = objects; new ObjectWriter(this, prefetcher).plan(itr); diff --git a/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtReaderOptions.java b/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtReaderOptions.java index 8376e2b6ba..db3f51028f 100644 --- a/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtReaderOptions.java +++ b/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtReaderOptions.java @@ -57,7 +57,15 @@ public class DhtReaderOptions { private boolean prefetchFollowEdgeHints; - private int prefetchLimit; + private int chunkLimit; + + private int openQueuePrefetchRatio; + + private int walkCommitsPrefetchRatio; + + private int walkTreesPrefetchRatio; + + private int writeObjectsPrefetchRatio; private int objectIndexConcurrentBatches; @@ -69,15 +77,18 @@ public class DhtReaderOptions { private int recentInfoCacheSize; - private int recentChunkCacheSize; - private boolean trackFirstChunkLoad; /** Create a default reader configuration. */ public DhtReaderOptions() { setTimeout(Timeout.seconds(5)); setPrefetchFollowEdgeHints(true); - setPrefetchLimit(5 * MiB); + + setChunkLimit(5 * MiB); + setOpenQueuePrefetchRatio(20 /* percent */); + setWalkCommitsPrefetchRatio(20 /* percent */); + setWalkTreesPrefetchRatio(20 /* percent */); + setWriteObjectsPrefetchRatio(90 /* percent */); setObjectIndexConcurrentBatches(2); setObjectIndexBatchSize(512); @@ -86,7 +97,6 @@ public class DhtReaderOptions { setDeltaBaseCacheLimit(10 * MiB); setRecentInfoCacheSize(4096); - setRecentChunkCacheSize(4); } /** @return default timeout to wait on long operations before aborting. */ @@ -125,19 +135,83 @@ public class DhtReaderOptions { return this; } - /** @return number of bytes to load during prefetching. */ - public int getPrefetchLimit() { - return prefetchLimit; + /** @return number of bytes to hold within a DhtReader. */ + public int getChunkLimit() { + return chunkLimit; } /** - * Set the number of bytes the prefetcher should hold onto. + * Set the number of bytes hold within a DhtReader. * * @param maxBytes * @return {@code this} */ - public DhtReaderOptions setPrefetchLimit(int maxBytes) { - prefetchLimit = Math.max(1024, maxBytes); + public DhtReaderOptions setChunkLimit(int maxBytes) { + chunkLimit = Math.max(1024, maxBytes); + return this; + } + + /** @return percentage of {@link #getChunkLimit()} used for prefetch, 0..100. */ + public int getOpenQueuePrefetchRatio() { + return openQueuePrefetchRatio; + } + + /** + * Set the prefetch ratio used by the open object queue. + * + * @param ratio 0..100. + * @return {@code this} + */ + public DhtReaderOptions setOpenQueuePrefetchRatio(int ratio) { + openQueuePrefetchRatio = Math.max(0, Math.min(ratio, 100)); + return this; + } + + /** @return percentage of {@link #getChunkLimit()} used for prefetch, 0..100. */ + public int getWalkCommitsPrefetchRatio() { + return walkCommitsPrefetchRatio; + } + + /** + * Set the prefetch ratio used by the open object queue. + * + * @param ratio 0..100. + * @return {@code this} + */ + public DhtReaderOptions setWalkCommitsPrefetchRatio(int ratio) { + walkCommitsPrefetchRatio = Math.max(0, Math.min(ratio, 100)); + return this; + } + + /** @return percentage of {@link #getChunkLimit()} used for prefetch, 0..100. */ + public int getWalkTreesPrefetchRatio() { + return walkTreesPrefetchRatio; + } + + /** + * Set the prefetch ratio used by the open object queue. + * + * @param ratio 0..100. + * @return {@code this} + */ + public DhtReaderOptions setWalkTreesPrefetchRatio(int ratio) { + walkTreesPrefetchRatio = Math.max(0, Math.min(ratio, 100)); + return this; + } + + /** @return percentage of {@link #getChunkLimit()} used for prefetch, 0..100. */ + public int getWriteObjectsPrefetchRatio() { + return writeObjectsPrefetchRatio; + } + + /** + * Set the prefetch ratio used by the open object queue. + * + * @param ratio 0..100. + * @return {@code this} + */ + public DhtReaderOptions setWriteObjectsPrefetchRatio(int ratio) { + writeObjectsPrefetchRatio = Math.max(0, Math.min(ratio, 100)); return this; } @@ -226,24 +300,6 @@ public class DhtReaderOptions { return this; } - /** @return number of recent chunks to hold onto per-reader. */ - public int getRecentChunkCacheSize() { - return recentChunkCacheSize; - } - - /** - * Set the number of chunks each reader holds onto for recently used access. - * - * @param chunkCnt - * the number of chunks each reader retains of recently used - * chunks to smooth out access. - * @return {@code this} - */ - public DhtReaderOptions setRecentChunkCacheSize(int chunkCnt) { - recentChunkCacheSize = Math.max(1, chunkCnt); - return this; - } - /** * @return true if {@link DhtReader.Statistics} includes the stack trace for * the first time a chunk is loaded. Supports debugging DHT code. @@ -277,7 +333,11 @@ public class DhtReaderOptions { public DhtReaderOptions fromConfig(Config rc) { setTimeout(Timeout.getTimeout(rc, "core", "dht", "timeout", getTimeout())); setPrefetchFollowEdgeHints(rc.getBoolean("core", "dht", "prefetchFollowEdgeHints", isPrefetchFollowEdgeHints())); - setPrefetchLimit(rc.getInt("core", "dht", "prefetchLimit", getPrefetchLimit())); + setChunkLimit(rc.getInt("core", "dht", "chunkLimit", getChunkLimit())); + setOpenQueuePrefetchRatio(rc.getInt("core", "dht", "openQueuePrefetchRatio", getOpenQueuePrefetchRatio())); + setWalkCommitsPrefetchRatio(rc.getInt("core", "dht", "walkCommitsPrefetchRatio", getWalkCommitsPrefetchRatio())); + setWalkTreesPrefetchRatio(rc.getInt("core", "dht", "walkTreesPrefetchRatio", getWalkTreesPrefetchRatio())); + setWriteObjectsPrefetchRatio(rc.getInt("core", "dht", "writeObjectsPrefetchRatio", getWriteObjectsPrefetchRatio())); setObjectIndexConcurrentBatches(rc.getInt("core", "dht", "objectIndexConcurrentBatches", getObjectIndexConcurrentBatches())); setObjectIndexBatchSize(rc.getInt("core", "dht", "objectIndexBatchSize", getObjectIndexBatchSize())); @@ -286,7 +346,6 @@ public class DhtReaderOptions { setDeltaBaseCacheLimit(rc.getInt("core", "dht", "deltaBaseCacheLimit", getDeltaBaseCacheLimit())); setRecentInfoCacheSize(rc.getInt("core", "dht", "recentInfoCacheSize", getRecentInfoCacheSize())); - setRecentChunkCacheSize(rc.getInt("core", "dht", "recentChunkCacheSize", getRecentChunkCacheSize())); setTrackFirstChunkLoad(rc.getBoolean("core", "dht", "debugTrackFirstChunkLoad", isTrackFirstChunkLoad())); return this; diff --git a/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/OpenQueue.java b/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/OpenQueue.java index e47f2b2cb4..32b22340d1 100644 --- a/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/OpenQueue.java +++ b/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/OpenQueue.java @@ -159,6 +159,7 @@ final class OpenQueue extends QueueObjectLookup @Override public void release() { + reader.getRecentChunks().setMaxBytes(reader.getOptions().getChunkLimit()); prefetcher = null; currChunk = null; } @@ -173,8 +174,13 @@ final class OpenQueue extends QueueObjectLookup list = new ArrayList>(); byChunk.put(chunkKey, list); - if (prefetcher == null) - prefetcher = new Prefetcher(reader, 0); + if (prefetcher == null) { + int limit = reader.getOptions().getChunkLimit(); + int ratio = reader.getOptions().getOpenQueuePrefetchRatio(); + int prefetchLimit = (int) (limit * (ratio / 100.0)); + reader.getRecentChunks().setMaxBytes(limit - prefetchLimit); + prefetcher = new Prefetcher(reader, 0, prefetchLimit); + } prefetcher.push(chunkKey); } list.add(c); diff --git a/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/Prefetcher.java b/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/Prefetcher.java index 743f1f5944..fef2b4f29d 100644 --- a/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/Prefetcher.java +++ b/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/Prefetcher.java @@ -104,7 +104,7 @@ class Prefetcher implements StreamingCallback> { private DhtException error; - Prefetcher(DhtReader reader, int objectType) { + Prefetcher(DhtReader reader, int objectType, int prefetchLimitInBytes) { this.db = reader.getDatabase(); this.stats = reader.getStatistics(); this.objectType = objectType; @@ -113,7 +113,7 @@ class Prefetcher implements StreamingCallback> { this.queue = new LinkedList(); this.followEdgeHints = reader.getOptions().isPrefetchFollowEdgeHints(); this.averageChunkSize = reader.getInserterOptions().getChunkSize(); - this.highWaterMark = reader.getOptions().getPrefetchLimit(); + this.highWaterMark = prefetchLimitInBytes; int lwm = (highWaterMark / averageChunkSize) - 4; if (lwm <= 0) diff --git a/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/RecentChunks.java b/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/RecentChunks.java index 095de43fe7..22608ee1b3 100644 --- a/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/RecentChunks.java +++ b/org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/RecentChunks.java @@ -56,11 +56,11 @@ final class RecentChunks { private final DhtReader.Statistics stats; - private final int maxSize; - private final HashMap byKey; - private int curSize; + private int maxBytes; + + private int curBytes; private Node lruHead; @@ -69,8 +69,16 @@ final class RecentChunks { RecentChunks(DhtReader reader) { this.reader = reader; this.stats = reader.getStatistics(); - this.maxSize = reader.getOptions().getRecentChunkCacheSize(); this.byKey = new HashMap(); + this.maxBytes = reader.getOptions().getChunkLimit(); + } + + void setMaxBytes(int newMax) { + maxBytes = Math.max(0, newMax); + if (0 < maxBytes) + prune(); + else + clear(); } PackChunk get(ChunkKey key) { @@ -91,16 +99,26 @@ final class RecentChunks { return; } - if (curSize < maxSize) { - n = new Node(); - curSize++; - } else { - n = lruTail; - byKey.remove(n.chunk.getChunkKey()); - } + curBytes += chunk.getTotalSize(); + prune(); + + n = new Node(); n.chunk = chunk; byKey.put(chunk.getChunkKey(), n); - hit(n); + first(n); + } + + private void prune() { + while (maxBytes < curBytes) { + Node n = lruTail; + if (n == null) + break; + + PackChunk c = n.chunk; + curBytes -= c.getTotalSize(); + byKey.remove(c.getChunkKey()); + remove(n); + } } ObjectLoader open(RepositoryKey repo, AnyObjectId objId, int typeHint) @@ -167,7 +185,7 @@ final class RecentChunks { } void clear() { - curSize = 0; + curBytes = 0; lruHead = null; lruTail = null; byKey.clear(); -- 2.39.5