return recentInfo;
}
+ RecentChunks getRecentChunks() {
+ return recentChunks;
+ }
+
DeltaBaseCache getDeltaBaseCache() {
return deltaBaseCache;
}
// configured as push might invoke our own methods that may
// try to call back into the active prefetcher.
//
- Prefetcher p = new Prefetcher(this, OBJ_COMMIT);
+ Prefetcher p = prefetch(OBJ_COMMIT, readerOptions.getWalkCommitsPrefetchRatio());
p.push(this, roots);
prefetcher = p;
}
// configured as push might invoke our own methods that may
// try to call back into the active prefetcher.
//
- Prefetcher p = new Prefetcher(this, OBJ_TREE);
+ Prefetcher p = prefetch(OBJ_TREE, readerOptions.getWalkTreesPrefetchRatio());
p.push(this, min.getTree(), max.getTree());
prefetcher = p;
}
new RepresentationSelector(packer, this, monitor).select(itr);
}
+ private Prefetcher prefetch(final int type, final int ratio) {
+ int limit = readerOptions.getChunkLimit();
+ int prefetchLimit = (int) (limit * (ratio / 100.0));
+ recentChunks.setMaxBytes(limit - prefetchLimit);
+ return new Prefetcher(this, type, prefetchLimit);
+ }
+
private void endPrefetch() {
+ recentChunks.setMaxBytes(getOptions().getChunkLimit());
prefetcher = null;
}
@SuppressWarnings("unchecked")
public void writeObjects(PackOutputStream out, List<ObjectToPack> objects)
throws IOException {
- prefetcher = new Prefetcher(this, 0);
+ prefetcher = prefetch(0, readerOptions.getWriteObjectsPrefetchRatio());
try {
List itr = objects;
new ObjectWriter(this, prefetcher).plan(itr);
private boolean prefetchFollowEdgeHints;
- private int prefetchLimit;
+ private int chunkLimit;
+
+ private int openQueuePrefetchRatio;
+
+ private int walkCommitsPrefetchRatio;
+
+ private int walkTreesPrefetchRatio;
+
+ private int writeObjectsPrefetchRatio;
private int objectIndexConcurrentBatches;
private int recentInfoCacheSize;
- private int recentChunkCacheSize;
-
private boolean trackFirstChunkLoad;
/** Create a default reader configuration. */
public DhtReaderOptions() {
setTimeout(Timeout.seconds(5));
setPrefetchFollowEdgeHints(true);
- setPrefetchLimit(5 * MiB);
+
+ setChunkLimit(5 * MiB);
+ setOpenQueuePrefetchRatio(20 /* percent */);
+ setWalkCommitsPrefetchRatio(20 /* percent */);
+ setWalkTreesPrefetchRatio(20 /* percent */);
+ setWriteObjectsPrefetchRatio(90 /* percent */);
setObjectIndexConcurrentBatches(2);
setObjectIndexBatchSize(512);
setDeltaBaseCacheLimit(10 * MiB);
setRecentInfoCacheSize(4096);
- setRecentChunkCacheSize(4);
}
/** @return default timeout to wait on long operations before aborting. */
return this;
}
- /** @return number of bytes to load during prefetching. */
- public int getPrefetchLimit() {
- return prefetchLimit;
+ /** @return number of bytes to hold within a DhtReader. */
+ public int getChunkLimit() {
+ return chunkLimit;
}
/**
- * Set the number of bytes the prefetcher should hold onto.
+ * Set the number of bytes hold within a DhtReader.
*
* @param maxBytes
* @return {@code this}
*/
- public DhtReaderOptions setPrefetchLimit(int maxBytes) {
- prefetchLimit = Math.max(1024, maxBytes);
+ public DhtReaderOptions setChunkLimit(int maxBytes) {
+ chunkLimit = Math.max(1024, maxBytes);
+ return this;
+ }
+
+ /** @return percentage of {@link #getChunkLimit()} used for prefetch, 0..100. */
+ public int getOpenQueuePrefetchRatio() {
+ return openQueuePrefetchRatio;
+ }
+
+ /**
+ * Set the prefetch ratio used by the open object queue.
+ *
+ * @param ratio 0..100.
+ * @return {@code this}
+ */
+ public DhtReaderOptions setOpenQueuePrefetchRatio(int ratio) {
+ openQueuePrefetchRatio = Math.max(0, Math.min(ratio, 100));
+ return this;
+ }
+
+ /** @return percentage of {@link #getChunkLimit()} used for prefetch, 0..100. */
+ public int getWalkCommitsPrefetchRatio() {
+ return walkCommitsPrefetchRatio;
+ }
+
+ /**
+ * Set the prefetch ratio used by the open object queue.
+ *
+ * @param ratio 0..100.
+ * @return {@code this}
+ */
+ public DhtReaderOptions setWalkCommitsPrefetchRatio(int ratio) {
+ walkCommitsPrefetchRatio = Math.max(0, Math.min(ratio, 100));
+ return this;
+ }
+
+ /** @return percentage of {@link #getChunkLimit()} used for prefetch, 0..100. */
+ public int getWalkTreesPrefetchRatio() {
+ return walkTreesPrefetchRatio;
+ }
+
+ /**
+ * Set the prefetch ratio used by the open object queue.
+ *
+ * @param ratio 0..100.
+ * @return {@code this}
+ */
+ public DhtReaderOptions setWalkTreesPrefetchRatio(int ratio) {
+ walkTreesPrefetchRatio = Math.max(0, Math.min(ratio, 100));
+ return this;
+ }
+
+ /** @return percentage of {@link #getChunkLimit()} used for prefetch, 0..100. */
+ public int getWriteObjectsPrefetchRatio() {
+ return writeObjectsPrefetchRatio;
+ }
+
+ /**
+ * Set the prefetch ratio used by the open object queue.
+ *
+ * @param ratio 0..100.
+ * @return {@code this}
+ */
+ public DhtReaderOptions setWriteObjectsPrefetchRatio(int ratio) {
+ writeObjectsPrefetchRatio = Math.max(0, Math.min(ratio, 100));
return this;
}
return this;
}
- /** @return number of recent chunks to hold onto per-reader. */
- public int getRecentChunkCacheSize() {
- return recentChunkCacheSize;
- }
-
- /**
- * Set the number of chunks each reader holds onto for recently used access.
- *
- * @param chunkCnt
- * the number of chunks each reader retains of recently used
- * chunks to smooth out access.
- * @return {@code this}
- */
- public DhtReaderOptions setRecentChunkCacheSize(int chunkCnt) {
- recentChunkCacheSize = Math.max(1, chunkCnt);
- return this;
- }
-
/**
* @return true if {@link DhtReader.Statistics} includes the stack trace for
* the first time a chunk is loaded. Supports debugging DHT code.
public DhtReaderOptions fromConfig(Config rc) {
setTimeout(Timeout.getTimeout(rc, "core", "dht", "timeout", getTimeout()));
setPrefetchFollowEdgeHints(rc.getBoolean("core", "dht", "prefetchFollowEdgeHints", isPrefetchFollowEdgeHints()));
- setPrefetchLimit(rc.getInt("core", "dht", "prefetchLimit", getPrefetchLimit()));
+ setChunkLimit(rc.getInt("core", "dht", "chunkLimit", getChunkLimit()));
+ setOpenQueuePrefetchRatio(rc.getInt("core", "dht", "openQueuePrefetchRatio", getOpenQueuePrefetchRatio()));
+ setWalkCommitsPrefetchRatio(rc.getInt("core", "dht", "walkCommitsPrefetchRatio", getWalkCommitsPrefetchRatio()));
+ setWalkTreesPrefetchRatio(rc.getInt("core", "dht", "walkTreesPrefetchRatio", getWalkTreesPrefetchRatio()));
+ setWriteObjectsPrefetchRatio(rc.getInt("core", "dht", "writeObjectsPrefetchRatio", getWriteObjectsPrefetchRatio()));
setObjectIndexConcurrentBatches(rc.getInt("core", "dht", "objectIndexConcurrentBatches", getObjectIndexConcurrentBatches()));
setObjectIndexBatchSize(rc.getInt("core", "dht", "objectIndexBatchSize", getObjectIndexBatchSize()));
setDeltaBaseCacheLimit(rc.getInt("core", "dht", "deltaBaseCacheLimit", getDeltaBaseCacheLimit()));
setRecentInfoCacheSize(rc.getInt("core", "dht", "recentInfoCacheSize", getRecentInfoCacheSize()));
- setRecentChunkCacheSize(rc.getInt("core", "dht", "recentChunkCacheSize", getRecentChunkCacheSize()));
setTrackFirstChunkLoad(rc.getBoolean("core", "dht", "debugTrackFirstChunkLoad", isTrackFirstChunkLoad()));
return this;