]> source.dussan.org Git - jgit.git/commitdiff
DHT: Change DhtReadher caches to be dynamic by workload 08/3708/1
authorShawn O. Pearce <spearce@spearce.org>
Fri, 10 Jun 2011 02:10:11 +0000 (19:10 -0700)
committerShawn O. Pearce <spearce@spearce.org>
Fri, 10 Jun 2011 02:10:15 +0000 (19:10 -0700)
Instead of fixing the prefetch queue and recent chunk queue as
different sizes, allow these to share the same limit but be scaled
based on the work being performed.

During walks about 20% of the space will be given to the prefetcher,
and the other 80% will be used by the recent chunks cache. This
should improve cases where there is bad locality between chunks.

During writing of a pack stream, 90-100% of the space should be
made available to the prefetcher, as the prefetch plan is usually
very accurate about the order chunks will be needed in.

Change-Id: I1ca7acb4518e66eb9d4138fb753df38e7254704d
Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtCachedPack.java
org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtReader.java
org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtReaderOptions.java
org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/OpenQueue.java
org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/Prefetcher.java
org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/RecentChunks.java

index 39a76463fbf93b2c1c3f2e93fcd86e9428dce450..0fd253bfbf10e5a42e81cccb9cacc1ea8d71fa47 100644 (file)
@@ -125,9 +125,18 @@ public class DhtCachedPack extends CachedPack {
                        throws IOException {
                if (keyList == null)
                        init();
-               Prefetcher p = new Prefetcher(ctx, 0);
-               p.push(Arrays.asList(keyList));
-               copyPack(out, p, validate);
+
+               // Clear the recent chunks because all of the reader's
+               // chunk limit should be made available for prefetch.
+               int cacheLimit = ctx.getOptions().getChunkLimit();
+               ctx.getRecentChunks().setMaxBytes(0);
+               try {
+                       Prefetcher p = new Prefetcher(ctx, 0, cacheLimit);
+                       p.push(Arrays.asList(keyList));
+                       copyPack(out, p, validate);
+               } finally {
+                       ctx.getRecentChunks().setMaxBytes(cacheLimit);
+               }
        }
 
        private void copyPack(PackOutputStream out, Prefetcher prefetcher,
index f9288b9e2eb5983cfe14629e8d07f4e35ae6eac8..330b5c0734ae206d4de7c5fae445ae505a79d05f 100644 (file)
@@ -156,6 +156,10 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs {
                return recentInfo;
        }
 
+       RecentChunks getRecentChunks() {
+               return recentChunks;
+       }
+
        DeltaBaseCache getDeltaBaseCache() {
                return deltaBaseCache;
        }
@@ -242,7 +246,7 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs {
                // configured as push might invoke our own methods that may
                // try to call back into the active prefetcher.
                //
-               Prefetcher p = new Prefetcher(this, OBJ_COMMIT);
+               Prefetcher p = prefetch(OBJ_COMMIT, readerOptions.getWalkCommitsPrefetchRatio());
                p.push(this, roots);
                prefetcher = p;
        }
@@ -256,7 +260,7 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs {
                // configured as push might invoke our own methods that may
                // try to call back into the active prefetcher.
                //
-               Prefetcher p = new Prefetcher(this, OBJ_TREE);
+               Prefetcher p = prefetch(OBJ_TREE, readerOptions.getWalkTreesPrefetchRatio());
                p.push(this, min.getTree(), max.getTree());
                prefetcher = p;
        }
@@ -391,14 +395,22 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs {
                new RepresentationSelector(packer, this, monitor).select(itr);
        }
 
+       private Prefetcher prefetch(final int type, final int ratio) {
+               int limit = readerOptions.getChunkLimit();
+               int prefetchLimit = (int) (limit * (ratio / 100.0));
+               recentChunks.setMaxBytes(limit - prefetchLimit);
+               return new Prefetcher(this, type, prefetchLimit);
+       }
+
        private void endPrefetch() {
+               recentChunks.setMaxBytes(getOptions().getChunkLimit());
                prefetcher = null;
        }
 
        @SuppressWarnings("unchecked")
        public void writeObjects(PackOutputStream out, List<ObjectToPack> objects)
                        throws IOException {
-               prefetcher = new Prefetcher(this, 0);
+               prefetcher = prefetch(0, readerOptions.getWriteObjectsPrefetchRatio());
                try {
                        List itr = objects;
                        new ObjectWriter(this, prefetcher).plan(itr);
index 8376e2b6bad6dbad986d6bf2eafb150bc2795ba1..db3f51028f5bd58fa11e4b4f63014f16d0b4dd3c 100644 (file)
@@ -57,7 +57,15 @@ public class DhtReaderOptions {
 
        private boolean prefetchFollowEdgeHints;
 
-       private int prefetchLimit;
+       private int chunkLimit;
+
+       private int openQueuePrefetchRatio;
+
+       private int walkCommitsPrefetchRatio;
+
+       private int walkTreesPrefetchRatio;
+
+       private int writeObjectsPrefetchRatio;
 
        private int objectIndexConcurrentBatches;
 
@@ -69,15 +77,18 @@ public class DhtReaderOptions {
 
        private int recentInfoCacheSize;
 
-       private int recentChunkCacheSize;
-
        private boolean trackFirstChunkLoad;
 
        /** Create a default reader configuration. */
        public DhtReaderOptions() {
                setTimeout(Timeout.seconds(5));
                setPrefetchFollowEdgeHints(true);
-               setPrefetchLimit(5 * MiB);
+
+               setChunkLimit(5 * MiB);
+               setOpenQueuePrefetchRatio(20 /* percent */);
+               setWalkCommitsPrefetchRatio(20 /* percent */);
+               setWalkTreesPrefetchRatio(20 /* percent */);
+               setWriteObjectsPrefetchRatio(90 /* percent */);
 
                setObjectIndexConcurrentBatches(2);
                setObjectIndexBatchSize(512);
@@ -86,7 +97,6 @@ public class DhtReaderOptions {
                setDeltaBaseCacheLimit(10 * MiB);
 
                setRecentInfoCacheSize(4096);
-               setRecentChunkCacheSize(4);
        }
 
        /** @return default timeout to wait on long operations before aborting. */
@@ -125,19 +135,83 @@ public class DhtReaderOptions {
                return this;
        }
 
-       /** @return number of bytes to load during prefetching. */
-       public int getPrefetchLimit() {
-               return prefetchLimit;
+       /** @return number of bytes to hold within a DhtReader. */
+       public int getChunkLimit() {
+               return chunkLimit;
        }
 
        /**
-        * Set the number of bytes the prefetcher should hold onto.
+        * Set the number of bytes hold within a DhtReader.
         *
         * @param maxBytes
         * @return {@code this}
         */
-       public DhtReaderOptions setPrefetchLimit(int maxBytes) {
-               prefetchLimit = Math.max(1024, maxBytes);
+       public DhtReaderOptions setChunkLimit(int maxBytes) {
+               chunkLimit = Math.max(1024, maxBytes);
+               return this;
+       }
+
+       /** @return percentage of {@link #getChunkLimit()} used for prefetch, 0..100. */
+       public int getOpenQueuePrefetchRatio() {
+               return openQueuePrefetchRatio;
+       }
+
+       /**
+        * Set the prefetch ratio used by the open object queue.
+        *
+        * @param ratio 0..100.
+        * @return {@code this}
+        */
+       public DhtReaderOptions setOpenQueuePrefetchRatio(int ratio) {
+               openQueuePrefetchRatio = Math.max(0, Math.min(ratio, 100));
+               return this;
+       }
+
+       /** @return percentage of {@link #getChunkLimit()} used for prefetch, 0..100. */
+       public int getWalkCommitsPrefetchRatio() {
+               return walkCommitsPrefetchRatio;
+       }
+
+       /**
+        * Set the prefetch ratio used by the open object queue.
+        *
+        * @param ratio 0..100.
+        * @return {@code this}
+        */
+       public DhtReaderOptions setWalkCommitsPrefetchRatio(int ratio) {
+               walkCommitsPrefetchRatio = Math.max(0, Math.min(ratio, 100));
+               return this;
+       }
+
+       /** @return percentage of {@link #getChunkLimit()} used for prefetch, 0..100. */
+       public int getWalkTreesPrefetchRatio() {
+               return walkTreesPrefetchRatio;
+       }
+
+       /**
+        * Set the prefetch ratio used by the open object queue.
+        *
+        * @param ratio 0..100.
+        * @return {@code this}
+        */
+       public DhtReaderOptions setWalkTreesPrefetchRatio(int ratio) {
+               walkTreesPrefetchRatio = Math.max(0, Math.min(ratio, 100));
+               return this;
+       }
+
+       /** @return percentage of {@link #getChunkLimit()} used for prefetch, 0..100. */
+       public int getWriteObjectsPrefetchRatio() {
+               return writeObjectsPrefetchRatio;
+       }
+
+       /**
+        * Set the prefetch ratio used by the open object queue.
+        *
+        * @param ratio 0..100.
+        * @return {@code this}
+        */
+       public DhtReaderOptions setWriteObjectsPrefetchRatio(int ratio) {
+               writeObjectsPrefetchRatio = Math.max(0, Math.min(ratio, 100));
                return this;
        }
 
@@ -226,24 +300,6 @@ public class DhtReaderOptions {
                return this;
        }
 
-       /** @return number of recent chunks to hold onto per-reader. */
-       public int getRecentChunkCacheSize() {
-               return recentChunkCacheSize;
-       }
-
-       /**
-        * Set the number of chunks each reader holds onto for recently used access.
-        *
-        * @param chunkCnt
-        *            the number of chunks each reader retains of recently used
-        *            chunks to smooth out access.
-        * @return {@code this}
-        */
-       public DhtReaderOptions setRecentChunkCacheSize(int chunkCnt) {
-               recentChunkCacheSize = Math.max(1, chunkCnt);
-               return this;
-       }
-
        /**
         * @return true if {@link DhtReader.Statistics} includes the stack trace for
         *         the first time a chunk is loaded. Supports debugging DHT code.
@@ -277,7 +333,11 @@ public class DhtReaderOptions {
        public DhtReaderOptions fromConfig(Config rc) {
                setTimeout(Timeout.getTimeout(rc, "core", "dht", "timeout", getTimeout()));
                setPrefetchFollowEdgeHints(rc.getBoolean("core", "dht", "prefetchFollowEdgeHints", isPrefetchFollowEdgeHints()));
-               setPrefetchLimit(rc.getInt("core", "dht", "prefetchLimit", getPrefetchLimit()));
+               setChunkLimit(rc.getInt("core", "dht", "chunkLimit", getChunkLimit()));
+               setOpenQueuePrefetchRatio(rc.getInt("core", "dht", "openQueuePrefetchRatio", getOpenQueuePrefetchRatio()));
+               setWalkCommitsPrefetchRatio(rc.getInt("core", "dht", "walkCommitsPrefetchRatio", getWalkCommitsPrefetchRatio()));
+               setWalkTreesPrefetchRatio(rc.getInt("core", "dht", "walkTreesPrefetchRatio", getWalkTreesPrefetchRatio()));
+               setWriteObjectsPrefetchRatio(rc.getInt("core", "dht", "writeObjectsPrefetchRatio", getWriteObjectsPrefetchRatio()));
 
                setObjectIndexConcurrentBatches(rc.getInt("core", "dht", "objectIndexConcurrentBatches", getObjectIndexConcurrentBatches()));
                setObjectIndexBatchSize(rc.getInt("core", "dht", "objectIndexBatchSize", getObjectIndexBatchSize()));
@@ -286,7 +346,6 @@ public class DhtReaderOptions {
                setDeltaBaseCacheLimit(rc.getInt("core", "dht", "deltaBaseCacheLimit", getDeltaBaseCacheLimit()));
 
                setRecentInfoCacheSize(rc.getInt("core", "dht", "recentInfoCacheSize", getRecentInfoCacheSize()));
-               setRecentChunkCacheSize(rc.getInt("core", "dht", "recentChunkCacheSize", getRecentChunkCacheSize()));
 
                setTrackFirstChunkLoad(rc.getBoolean("core", "dht", "debugTrackFirstChunkLoad", isTrackFirstChunkLoad()));
                return this;
index e47f2b2cb429a97882e7473d48ef6f7dce92fabd..32b22340d15ba9ffda12fc291c0245011ee58ebe 100644 (file)
@@ -159,6 +159,7 @@ final class OpenQueue<T extends ObjectId> extends QueueObjectLookup<T>
 
        @Override
        public void release() {
+               reader.getRecentChunks().setMaxBytes(reader.getOptions().getChunkLimit());
                prefetcher = null;
                currChunk = null;
        }
@@ -173,8 +174,13 @@ final class OpenQueue<T extends ObjectId> extends QueueObjectLookup<T>
                                list = new ArrayList<ObjectWithInfo<T>>();
                                byChunk.put(chunkKey, list);
 
-                               if (prefetcher == null)
-                                       prefetcher = new Prefetcher(reader, 0);
+                               if (prefetcher == null) {
+                                       int limit = reader.getOptions().getChunkLimit();
+                                       int ratio = reader.getOptions().getOpenQueuePrefetchRatio();
+                                       int prefetchLimit = (int) (limit * (ratio / 100.0));
+                                       reader.getRecentChunks().setMaxBytes(limit - prefetchLimit);
+                                       prefetcher = new Prefetcher(reader, 0, prefetchLimit);
+                               }
                                prefetcher.push(chunkKey);
                        }
                        list.add(c);
index 743f1f59445b053adbb5c604449098f03898afb2..fef2b4f29deacdc1548030fde4f8e330d7f6e723 100644 (file)
@@ -104,7 +104,7 @@ class Prefetcher implements StreamingCallback<Collection<PackChunk.Members>> {
 
        private DhtException error;
 
-       Prefetcher(DhtReader reader, int objectType) {
+       Prefetcher(DhtReader reader, int objectType, int prefetchLimitInBytes) {
                this.db = reader.getDatabase();
                this.stats = reader.getStatistics();
                this.objectType = objectType;
@@ -113,7 +113,7 @@ class Prefetcher implements StreamingCallback<Collection<PackChunk.Members>> {
                this.queue = new LinkedList<ChunkKey>();
                this.followEdgeHints = reader.getOptions().isPrefetchFollowEdgeHints();
                this.averageChunkSize = reader.getInserterOptions().getChunkSize();
-               this.highWaterMark = reader.getOptions().getPrefetchLimit();
+               this.highWaterMark = prefetchLimitInBytes;
 
                int lwm = (highWaterMark / averageChunkSize) - 4;
                if (lwm <= 0)
index 095de43fe7215f50d1d076452dd2f522c474541d..22608ee1b3617e8f2386bc4581dface74ee21caf 100644 (file)
@@ -56,11 +56,11 @@ final class RecentChunks {
 
        private final DhtReader.Statistics stats;
 
-       private final int maxSize;
-
        private final HashMap<ChunkKey, Node> byKey;
 
-       private int curSize;
+       private int maxBytes;
+
+       private int curBytes;
 
        private Node lruHead;
 
@@ -69,8 +69,16 @@ final class RecentChunks {
        RecentChunks(DhtReader reader) {
                this.reader = reader;
                this.stats = reader.getStatistics();
-               this.maxSize = reader.getOptions().getRecentChunkCacheSize();
                this.byKey = new HashMap<ChunkKey, Node>();
+               this.maxBytes = reader.getOptions().getChunkLimit();
+       }
+
+       void setMaxBytes(int newMax) {
+               maxBytes = Math.max(0, newMax);
+               if (0 < maxBytes)
+                       prune();
+               else
+                       clear();
        }
 
        PackChunk get(ChunkKey key) {
@@ -91,16 +99,26 @@ final class RecentChunks {
                        return;
                }
 
-               if (curSize < maxSize) {
-                       n = new Node();
-                       curSize++;
-               } else {
-                       n = lruTail;
-                       byKey.remove(n.chunk.getChunkKey());
-               }
+               curBytes += chunk.getTotalSize();
+               prune();
+
+               n = new Node();
                n.chunk = chunk;
                byKey.put(chunk.getChunkKey(), n);
-               hit(n);
+               first(n);
+       }
+
+       private void prune() {
+               while (maxBytes < curBytes) {
+                       Node n = lruTail;
+                       if (n == null)
+                               break;
+
+                       PackChunk c = n.chunk;
+                       curBytes -= c.getTotalSize();
+                       byKey.remove(c.getChunkKey());
+                       remove(n);
+               }
        }
 
        ObjectLoader open(RepositoryKey repo, AnyObjectId objId, int typeHint)
@@ -167,7 +185,7 @@ final class RecentChunks {
        }
 
        void clear() {
-               curSize = 0;
+               curBytes = 0;
                lruHead = null;
                lruTail = null;
                byKey.clear();