* changes: DHT: Change DhtReadher caches to be dynamic by workload DHT: Use a proper HashMap for RecentChunk lookups DHT: Always have at least one recent chunk in DhtReader DHT: Fix NPE during prefetch DHT: Drop leading hash digits from row keystags/v1.1.0.201109011030-rc2
@@ -43,7 +43,8 @@ | |||
package org.eclipse.jgit.storage.dht; | |||
import static org.junit.Assert.*; | |||
import static org.junit.Assert.assertEquals; | |||
import static org.junit.Assert.assertFalse; | |||
import org.eclipse.jgit.lib.ObjectId; | |||
import org.junit.Test; | |||
@@ -59,19 +60,19 @@ public class ChunkKeyTest { | |||
ChunkKey key1 = ChunkKey.create(repo1, id); | |||
assertEquals(repo1.asInt(), key1.getRepositoryId()); | |||
assertEquals(id, key1.getChunkHash()); | |||
assertEquals("3e.41234567.3e64b928d51b3a28e89cfe2a3f0eeae35ef07839", | |||
assertEquals("41234567.3e64b928d51b3a28e89cfe2a3f0eeae35ef07839", | |||
key1.asString()); | |||
ChunkKey key2 = ChunkKey.fromBytes(key1.asBytes()); | |||
assertEquals(repo1.asInt(), key2.getRepositoryId()); | |||
assertEquals(id, key2.getChunkHash()); | |||
assertEquals("3e.41234567.3e64b928d51b3a28e89cfe2a3f0eeae35ef07839", | |||
assertEquals("41234567.3e64b928d51b3a28e89cfe2a3f0eeae35ef07839", | |||
key2.asString()); | |||
ChunkKey key3 = ChunkKey.fromString(key1.asString()); | |||
assertEquals(repo1.asInt(), key3.getRepositoryId()); | |||
assertEquals(id, key3.getChunkHash()); | |||
assertEquals("3e.41234567.3e64b928d51b3a28e89cfe2a3f0eeae35ef07839", | |||
assertEquals("41234567.3e64b928d51b3a28e89cfe2a3f0eeae35ef07839", | |||
key3.asString()); | |||
assertEquals(key1, key2); |
@@ -58,19 +58,19 @@ public class ObjectIndexKeyTest { | |||
ObjectIndexKey key1 = ObjectIndexKey.create(repo, id); | |||
assertEquals(repo.asInt(), key1.getRepositoryId()); | |||
assertEquals(key1, id); | |||
assertEquals("3e.41234567.3e64b928d51b3a28e89cfe2a3f0eeae35ef07839", | |||
assertEquals("41234567.3e64b928d51b3a28e89cfe2a3f0eeae35ef07839", | |||
key1.asString()); | |||
ObjectIndexKey key2 = ObjectIndexKey.fromBytes(key1.asBytes()); | |||
assertEquals(repo.asInt(), key2.getRepositoryId()); | |||
assertEquals(key2, id); | |||
assertEquals("3e.41234567.3e64b928d51b3a28e89cfe2a3f0eeae35ef07839", | |||
assertEquals("41234567.3e64b928d51b3a28e89cfe2a3f0eeae35ef07839", | |||
key2.asString()); | |||
ObjectIndexKey key3 = ObjectIndexKey.fromString(key1.asString()); | |||
assertEquals(repo.asInt(), key3.getRepositoryId()); | |||
assertEquals(key3, id); | |||
assertEquals("3e.41234567.3e64b928d51b3a28e89cfe2a3f0eeae35ef07839", | |||
assertEquals("41234567.3e64b928d51b3a28e89cfe2a3f0eeae35ef07839", | |||
key3.asString()); | |||
} | |||
} |
@@ -54,7 +54,7 @@ import org.eclipse.jgit.lib.ObjectId; | |||
/** Unique identifier of a {@link PackChunk} in the DHT. */ | |||
public final class ChunkKey implements RowKey { | |||
static final int KEYLEN = 52; | |||
static final int KEYLEN = 49; | |||
/** | |||
* @param repo | |||
@@ -84,8 +84,8 @@ public final class ChunkKey implements RowKey { | |||
throw new IllegalArgumentException(MessageFormat.format( | |||
DhtText.get().invalidChunkKey, decode(key, ptr, ptr + len))); | |||
int repo = parse32(key, ptr + 3); | |||
ObjectId chunk = ObjectId.fromString(key, ptr + 12); | |||
int repo = parse32(key, ptr); | |||
ObjectId chunk = ObjectId.fromString(key, ptr + 9); | |||
return new ChunkKey(repo, chunk); | |||
} | |||
@@ -122,13 +122,9 @@ public final class ChunkKey implements RowKey { | |||
public byte[] asBytes() { | |||
byte[] r = new byte[KEYLEN]; | |||
chunk.copyTo(r, 12); | |||
format32(r, 3, repo); | |||
// bucket is the leading 2 digits of the SHA-1. | |||
r[11] = '.'; | |||
r[2] = '.'; | |||
r[1] = r[12 + 1]; | |||
r[0] = r[12 + 0]; | |||
format32(r, 0, repo); | |||
r[8] = '.'; | |||
chunk.copyTo(r, 9); | |||
return r; | |||
} | |||
@@ -125,9 +125,18 @@ public class DhtCachedPack extends CachedPack { | |||
throws IOException { | |||
if (keyList == null) | |||
init(); | |||
Prefetcher p = new Prefetcher(ctx, 0); | |||
p.push(Arrays.asList(keyList)); | |||
copyPack(out, p, validate); | |||
// Clear the recent chunks because all of the reader's | |||
// chunk limit should be made available for prefetch. | |||
int cacheLimit = ctx.getOptions().getChunkLimit(); | |||
ctx.getRecentChunks().setMaxBytes(0); | |||
try { | |||
Prefetcher p = new Prefetcher(ctx, 0, cacheLimit); | |||
p.push(Arrays.asList(keyList)); | |||
copyPack(out, p, validate); | |||
} finally { | |||
ctx.getRecentChunks().setMaxBytes(cacheLimit); | |||
} | |||
} | |||
private void copyPack(PackOutputStream out, Prefetcher prefetcher, |
@@ -156,6 +156,10 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs { | |||
return recentInfo; | |||
} | |||
RecentChunks getRecentChunks() { | |||
return recentChunks; | |||
} | |||
DeltaBaseCache getDeltaBaseCache() { | |||
return deltaBaseCache; | |||
} | |||
@@ -242,7 +246,7 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs { | |||
// configured as push might invoke our own methods that may | |||
// try to call back into the active prefetcher. | |||
// | |||
Prefetcher p = new Prefetcher(this, OBJ_COMMIT); | |||
Prefetcher p = prefetch(OBJ_COMMIT, readerOptions.getWalkCommitsPrefetchRatio()); | |||
p.push(this, roots); | |||
prefetcher = p; | |||
} | |||
@@ -256,7 +260,7 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs { | |||
// configured as push might invoke our own methods that may | |||
// try to call back into the active prefetcher. | |||
// | |||
Prefetcher p = new Prefetcher(this, OBJ_TREE); | |||
Prefetcher p = prefetch(OBJ_TREE, readerOptions.getWalkTreesPrefetchRatio()); | |||
p.push(this, min.getTree(), max.getTree()); | |||
prefetcher = p; | |||
} | |||
@@ -391,14 +395,22 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs { | |||
new RepresentationSelector(packer, this, monitor).select(itr); | |||
} | |||
private Prefetcher prefetch(final int type, final int ratio) { | |||
int limit = readerOptions.getChunkLimit(); | |||
int prefetchLimit = (int) (limit * (ratio / 100.0)); | |||
recentChunks.setMaxBytes(limit - prefetchLimit); | |||
return new Prefetcher(this, type, prefetchLimit); | |||
} | |||
private void endPrefetch() { | |||
recentChunks.setMaxBytes(getOptions().getChunkLimit()); | |||
prefetcher = null; | |||
} | |||
@SuppressWarnings("unchecked") | |||
public void writeObjects(PackOutputStream out, List<ObjectToPack> objects) | |||
throws IOException { | |||
prefetcher = new Prefetcher(this, 0); | |||
prefetcher = prefetch(0, readerOptions.getWriteObjectsPrefetchRatio()); | |||
try { | |||
List itr = objects; | |||
new ObjectWriter(this, prefetcher).plan(itr); |
@@ -57,7 +57,15 @@ public class DhtReaderOptions { | |||
private boolean prefetchFollowEdgeHints; | |||
private int prefetchLimit; | |||
private int chunkLimit; | |||
private int openQueuePrefetchRatio; | |||
private int walkCommitsPrefetchRatio; | |||
private int walkTreesPrefetchRatio; | |||
private int writeObjectsPrefetchRatio; | |||
private int objectIndexConcurrentBatches; | |||
@@ -69,15 +77,18 @@ public class DhtReaderOptions { | |||
private int recentInfoCacheSize; | |||
private int recentChunkCacheSize; | |||
private boolean trackFirstChunkLoad; | |||
/** Create a default reader configuration. */ | |||
public DhtReaderOptions() { | |||
setTimeout(Timeout.seconds(5)); | |||
setPrefetchFollowEdgeHints(true); | |||
setPrefetchLimit(5 * MiB); | |||
setChunkLimit(5 * MiB); | |||
setOpenQueuePrefetchRatio(20 /* percent */); | |||
setWalkCommitsPrefetchRatio(20 /* percent */); | |||
setWalkTreesPrefetchRatio(20 /* percent */); | |||
setWriteObjectsPrefetchRatio(90 /* percent */); | |||
setObjectIndexConcurrentBatches(2); | |||
setObjectIndexBatchSize(512); | |||
@@ -86,7 +97,6 @@ public class DhtReaderOptions { | |||
setDeltaBaseCacheLimit(10 * MiB); | |||
setRecentInfoCacheSize(4096); | |||
setRecentChunkCacheSize(4); | |||
} | |||
/** @return default timeout to wait on long operations before aborting. */ | |||
@@ -125,19 +135,83 @@ public class DhtReaderOptions { | |||
return this; | |||
} | |||
/** @return number of bytes to load during prefetching. */ | |||
public int getPrefetchLimit() { | |||
return prefetchLimit; | |||
/** @return number of bytes to hold within a DhtReader. */ | |||
public int getChunkLimit() { | |||
return chunkLimit; | |||
} | |||
/** | |||
* Set the number of bytes the prefetcher should hold onto. | |||
* Set the number of bytes hold within a DhtReader. | |||
* | |||
* @param maxBytes | |||
* @return {@code this} | |||
*/ | |||
public DhtReaderOptions setPrefetchLimit(int maxBytes) { | |||
prefetchLimit = Math.max(1024, maxBytes); | |||
public DhtReaderOptions setChunkLimit(int maxBytes) { | |||
chunkLimit = Math.max(1024, maxBytes); | |||
return this; | |||
} | |||
/** @return percentage of {@link #getChunkLimit()} used for prefetch, 0..100. */ | |||
public int getOpenQueuePrefetchRatio() { | |||
return openQueuePrefetchRatio; | |||
} | |||
/** | |||
* Set the prefetch ratio used by the open object queue. | |||
* | |||
* @param ratio 0..100. | |||
* @return {@code this} | |||
*/ | |||
public DhtReaderOptions setOpenQueuePrefetchRatio(int ratio) { | |||
openQueuePrefetchRatio = Math.max(0, Math.min(ratio, 100)); | |||
return this; | |||
} | |||
/** @return percentage of {@link #getChunkLimit()} used for prefetch, 0..100. */ | |||
public int getWalkCommitsPrefetchRatio() { | |||
return walkCommitsPrefetchRatio; | |||
} | |||
/** | |||
* Set the prefetch ratio used by the open object queue. | |||
* | |||
* @param ratio 0..100. | |||
* @return {@code this} | |||
*/ | |||
public DhtReaderOptions setWalkCommitsPrefetchRatio(int ratio) { | |||
walkCommitsPrefetchRatio = Math.max(0, Math.min(ratio, 100)); | |||
return this; | |||
} | |||
/** @return percentage of {@link #getChunkLimit()} used for prefetch, 0..100. */ | |||
public int getWalkTreesPrefetchRatio() { | |||
return walkTreesPrefetchRatio; | |||
} | |||
/** | |||
* Set the prefetch ratio used by the open object queue. | |||
* | |||
* @param ratio 0..100. | |||
* @return {@code this} | |||
*/ | |||
public DhtReaderOptions setWalkTreesPrefetchRatio(int ratio) { | |||
walkTreesPrefetchRatio = Math.max(0, Math.min(ratio, 100)); | |||
return this; | |||
} | |||
/** @return percentage of {@link #getChunkLimit()} used for prefetch, 0..100. */ | |||
public int getWriteObjectsPrefetchRatio() { | |||
return writeObjectsPrefetchRatio; | |||
} | |||
/** | |||
* Set the prefetch ratio used by the open object queue. | |||
* | |||
* @param ratio 0..100. | |||
* @return {@code this} | |||
*/ | |||
public DhtReaderOptions setWriteObjectsPrefetchRatio(int ratio) { | |||
writeObjectsPrefetchRatio = Math.max(0, Math.min(ratio, 100)); | |||
return this; | |||
} | |||
@@ -226,24 +300,6 @@ public class DhtReaderOptions { | |||
return this; | |||
} | |||
/** @return number of recent chunks to hold onto per-reader. */ | |||
public int getRecentChunkCacheSize() { | |||
return recentChunkCacheSize; | |||
} | |||
/** | |||
* Set the number of chunks each reader holds onto for recently used access. | |||
* | |||
* @param chunkCnt | |||
* the number of chunks each reader retains of recently used | |||
* chunks to smooth out access. | |||
* @return {@code this} | |||
*/ | |||
public DhtReaderOptions setRecentChunkCacheSize(int chunkCnt) { | |||
recentChunkCacheSize = Math.max(0, chunkCnt); | |||
return this; | |||
} | |||
/** | |||
* @return true if {@link DhtReader.Statistics} includes the stack trace for | |||
* the first time a chunk is loaded. Supports debugging DHT code. | |||
@@ -277,7 +333,11 @@ public class DhtReaderOptions { | |||
public DhtReaderOptions fromConfig(Config rc) { | |||
setTimeout(Timeout.getTimeout(rc, "core", "dht", "timeout", getTimeout())); | |||
setPrefetchFollowEdgeHints(rc.getBoolean("core", "dht", "prefetchFollowEdgeHints", isPrefetchFollowEdgeHints())); | |||
setPrefetchLimit(rc.getInt("core", "dht", "prefetchLimit", getPrefetchLimit())); | |||
setChunkLimit(rc.getInt("core", "dht", "chunkLimit", getChunkLimit())); | |||
setOpenQueuePrefetchRatio(rc.getInt("core", "dht", "openQueuePrefetchRatio", getOpenQueuePrefetchRatio())); | |||
setWalkCommitsPrefetchRatio(rc.getInt("core", "dht", "walkCommitsPrefetchRatio", getWalkCommitsPrefetchRatio())); | |||
setWalkTreesPrefetchRatio(rc.getInt("core", "dht", "walkTreesPrefetchRatio", getWalkTreesPrefetchRatio())); | |||
setWriteObjectsPrefetchRatio(rc.getInt("core", "dht", "writeObjectsPrefetchRatio", getWriteObjectsPrefetchRatio())); | |||
setObjectIndexConcurrentBatches(rc.getInt("core", "dht", "objectIndexConcurrentBatches", getObjectIndexConcurrentBatches())); | |||
setObjectIndexBatchSize(rc.getInt("core", "dht", "objectIndexBatchSize", getObjectIndexBatchSize())); | |||
@@ -286,7 +346,6 @@ public class DhtReaderOptions { | |||
setDeltaBaseCacheLimit(rc.getInt("core", "dht", "deltaBaseCacheLimit", getDeltaBaseCacheLimit())); | |||
setRecentInfoCacheSize(rc.getInt("core", "dht", "recentInfoCacheSize", getRecentInfoCacheSize())); | |||
setRecentChunkCacheSize(rc.getInt("core", "dht", "recentChunkCacheSize", getRecentChunkCacheSize())); | |||
setTrackFirstChunkLoad(rc.getBoolean("core", "dht", "debugTrackFirstChunkLoad", isTrackFirstChunkLoad())); | |||
return this; |
@@ -55,7 +55,7 @@ import org.eclipse.jgit.lib.ObjectId; | |||
/** Identifies an ObjectId in the DHT. */ | |||
public final class ObjectIndexKey extends ObjectId implements RowKey { | |||
private static final int KEYLEN = 52; | |||
private static final int KEYLEN = 49; | |||
/** | |||
* @param repo | |||
@@ -75,8 +75,8 @@ public final class ObjectIndexKey extends ObjectId implements RowKey { | |||
throw new IllegalArgumentException(MessageFormat.format( | |||
DhtText.get().invalidChunkKey, decode(key))); | |||
int repo = parse32(key, 3); | |||
ObjectId id = ObjectId.fromString(key, 12); | |||
int repo = parse32(key, 0); | |||
ObjectId id = ObjectId.fromString(key, 9); | |||
return new ObjectIndexKey(repo, id); | |||
} | |||
@@ -106,13 +106,9 @@ public final class ObjectIndexKey extends ObjectId implements RowKey { | |||
public byte[] asBytes() { | |||
byte[] r = new byte[KEYLEN]; | |||
copyTo(r, 12); | |||
format32(r, 3, repo); | |||
// bucket is the leading 2 digits of the SHA-1. | |||
r[11] = '.'; | |||
r[2] = '.'; | |||
r[1] = r[12 + 1]; | |||
r[0] = r[12 + 0]; | |||
format32(r, 0, repo); | |||
r[8] = '.'; | |||
copyTo(r, 9); | |||
return r; | |||
} | |||
@@ -159,6 +159,7 @@ final class OpenQueue<T extends ObjectId> extends QueueObjectLookup<T> | |||
@Override | |||
public void release() { | |||
reader.getRecentChunks().setMaxBytes(reader.getOptions().getChunkLimit()); | |||
prefetcher = null; | |||
currChunk = null; | |||
} | |||
@@ -173,8 +174,13 @@ final class OpenQueue<T extends ObjectId> extends QueueObjectLookup<T> | |||
list = new ArrayList<ObjectWithInfo<T>>(); | |||
byChunk.put(chunkKey, list); | |||
if (prefetcher == null) | |||
prefetcher = new Prefetcher(reader, 0); | |||
if (prefetcher == null) { | |||
int limit = reader.getOptions().getChunkLimit(); | |||
int ratio = reader.getOptions().getOpenQueuePrefetchRatio(); | |||
int prefetchLimit = (int) (limit * (ratio / 100.0)); | |||
reader.getRecentChunks().setMaxBytes(limit - prefetchLimit); | |||
prefetcher = new Prefetcher(reader, 0, prefetchLimit); | |||
} | |||
prefetcher.push(chunkKey); | |||
} | |||
list.add(c); |
@@ -291,7 +291,7 @@ public final class PackChunk { | |||
} | |||
int findOffset(RepositoryKey repo, AnyObjectId objId) { | |||
if (key.getRepositoryId() == repo.asInt()) | |||
if (key.getRepositoryId() == repo.asInt() && index != null) | |||
return index.findOffset(objId); | |||
return -1; | |||
} |
@@ -104,7 +104,7 @@ class Prefetcher implements StreamingCallback<Collection<PackChunk.Members>> { | |||
private DhtException error; | |||
Prefetcher(DhtReader reader, int objectType) { | |||
Prefetcher(DhtReader reader, int objectType, int prefetchLimitInBytes) { | |||
this.db = reader.getDatabase(); | |||
this.stats = reader.getStatistics(); | |||
this.objectType = objectType; | |||
@@ -113,7 +113,7 @@ class Prefetcher implements StreamingCallback<Collection<PackChunk.Members>> { | |||
this.queue = new LinkedList<ChunkKey>(); | |||
this.followEdgeHints = reader.getOptions().isPrefetchFollowEdgeHints(); | |||
this.averageChunkSize = reader.getInserterOptions().getChunkSize(); | |||
this.highWaterMark = reader.getOptions().getPrefetchLimit(); | |||
this.highWaterMark = prefetchLimitInBytes; | |||
int lwm = (highWaterMark / averageChunkSize) - 4; | |||
if (lwm <= 0) |
@@ -44,6 +44,7 @@ | |||
package org.eclipse.jgit.storage.dht; | |||
import java.io.IOException; | |||
import java.util.HashMap; | |||
import org.eclipse.jgit.lib.AnyObjectId; | |||
import org.eclipse.jgit.lib.ObjectLoader; | |||
@@ -55,9 +56,11 @@ final class RecentChunks { | |||
private final DhtReader.Statistics stats; | |||
private final int maxSize; | |||
private final HashMap<ChunkKey, Node> byKey; | |||
private int curSize; | |||
private int maxBytes; | |||
private int curBytes; | |||
private Node lruHead; | |||
@@ -66,38 +69,56 @@ final class RecentChunks { | |||
RecentChunks(DhtReader reader) { | |||
this.reader = reader; | |||
this.stats = reader.getStatistics(); | |||
this.maxSize = reader.getOptions().getRecentChunkCacheSize(); | |||
this.byKey = new HashMap<ChunkKey, Node>(); | |||
this.maxBytes = reader.getOptions().getChunkLimit(); | |||
} | |||
void setMaxBytes(int newMax) { | |||
maxBytes = Math.max(0, newMax); | |||
if (0 < maxBytes) | |||
prune(); | |||
else | |||
clear(); | |||
} | |||
PackChunk get(ChunkKey key) { | |||
for (Node n = lruHead; n != null; n = n.next) { | |||
if (key.equals(n.chunk.getChunkKey())) { | |||
hit(n); | |||
stats.recentChunks_Hits++; | |||
return n.chunk; | |||
} | |||
Node n = byKey.get(key); | |||
if (n != null) { | |||
hit(n); | |||
stats.recentChunks_Hits++; | |||
return n.chunk; | |||
} | |||
stats.recentChunks_Miss++; | |||
return null; | |||
} | |||
void put(PackChunk chunk) { | |||
for (Node n = lruHead; n != null; n = n.next) { | |||
if (n.chunk == chunk) { | |||
hit(n); | |||
return; | |||
} | |||
Node n = byKey.get(chunk.getChunkKey()); | |||
if (n != null && n.chunk == chunk) { | |||
hit(n); | |||
return; | |||
} | |||
Node n; | |||
if (curSize < maxSize) { | |||
n = new Node(); | |||
curSize++; | |||
} else { | |||
n = lruTail; | |||
} | |||
curBytes += chunk.getTotalSize(); | |||
prune(); | |||
n = new Node(); | |||
n.chunk = chunk; | |||
hit(n); | |||
byKey.put(chunk.getChunkKey(), n); | |||
first(n); | |||
} | |||
private void prune() { | |||
while (maxBytes < curBytes) { | |||
Node n = lruTail; | |||
if (n == null) | |||
break; | |||
PackChunk c = n.chunk; | |||
curBytes -= c.getTotalSize(); | |||
byKey.remove(c.getChunkKey()); | |||
remove(n); | |||
} | |||
} | |||
ObjectLoader open(RepositoryKey repo, AnyObjectId objId, int typeHint) | |||
@@ -164,9 +185,10 @@ final class RecentChunks { | |||
} | |||
void clear() { | |||
curSize = 0; | |||
curBytes = 0; | |||
lruHead = null; | |||
lruTail = null; | |||
byKey.clear(); | |||
} | |||
private void hit(Node n) { |