By making this a deterministic function, DfsBlockCache can stop retaining a map of every DfsPackDescription it has ever seen. This fixes a long standing memory leak in DfsBlockCache. This refactoring also simplifies the idea of setting up more lightweight objects around streams. Change-Id: I051e7b96f5454c6b0a0e652d8f4a69c0bed7f6f4tags/v4.9.0.201710071750-r
@@ -63,7 +63,7 @@ public class DeltaBaseCacheTest { | |||
@Before | |||
public void setUp() { | |||
key = new DfsStreamKey(); | |||
key = DfsStreamKey.of("test.key"); | |||
cache = new DeltaBaseCache(SZ); | |||
rng = new TestRng(getClass().getSimpleName()); | |||
} |
@@ -61,7 +61,7 @@ public abstract class BlockBasedFile { | |||
final DfsStreamKey key; | |||
/** Description of the associated pack file's storage. */ | |||
final DfsPackDescription packDesc; | |||
final DfsPackDescription desc; | |||
final PackExt ext; | |||
/** | |||
@@ -84,16 +84,15 @@ public abstract class BlockBasedFile { | |||
/** True once corruption has been detected that cannot be worked around. */ | |||
volatile boolean invalid; | |||
BlockBasedFile(DfsBlockCache cache, DfsStreamKey key, | |||
DfsPackDescription packDesc, PackExt ext) { | |||
BlockBasedFile(DfsBlockCache cache, DfsPackDescription desc, PackExt ext) { | |||
this.cache = cache; | |||
this.key = key; | |||
this.packDesc = packDesc; | |||
this.key = desc.getStreamKey(ext); | |||
this.desc = desc; | |||
this.ext = ext; | |||
} | |||
String getFileName() { | |||
return packDesc.getFileName(ext); | |||
return desc.getFileName(ext); | |||
} | |||
boolean invalid() { | |||
@@ -138,7 +137,7 @@ public abstract class BlockBasedFile { | |||
ctx.stats.readBlock++; | |||
long start = System.nanoTime(); | |||
ReadableChannel rc = fileChannel != null ? fileChannel | |||
: ctx.db.openFile(packDesc, ext); | |||
: ctx.db.openFile(desc, ext); | |||
try { | |||
int size = blockSize(rc); | |||
pos = (pos / size) * size; |
@@ -45,10 +45,6 @@ | |||
package org.eclipse.jgit.internal.storage.dfs; | |||
import java.io.IOException; | |||
import java.util.Collection; | |||
import java.util.Collections; | |||
import java.util.Map; | |||
import java.util.concurrent.ConcurrentHashMap; | |||
import java.util.concurrent.atomic.AtomicLong; | |||
import java.util.concurrent.atomic.AtomicReferenceArray; | |||
import java.util.concurrent.locks.ReentrantLock; | |||
@@ -109,14 +105,7 @@ public final class DfsBlockCache { | |||
* settings, usually too low of a limit. | |||
*/ | |||
public static void reconfigure(DfsBlockCacheConfig cfg) { | |||
DfsBlockCache nc = new DfsBlockCache(cfg); | |||
DfsBlockCache oc = cache; | |||
cache = nc; | |||
if (oc != null) { | |||
for (DfsPackFile pack : oc.getPackFiles()) | |||
pack.key.cachedSize.set(0); | |||
} | |||
cache = new DfsBlockCache(cfg); | |||
} | |||
/** @return the currently active DfsBlockCache. */ | |||
@@ -154,12 +143,6 @@ public final class DfsBlockCache { | |||
/** As {@link #blockSize} is a power of 2, bits to shift for a / blockSize. */ | |||
private final int blockSizeShift; | |||
/** Cache of pack files, indexed by description. */ | |||
private final Map<DfsPackDescription, DfsPackFile> packCache; | |||
/** View of pack files in the pack cache. */ | |||
private final Collection<DfsPackFile> packFiles; | |||
/** Number of times a block was found in the cache. */ | |||
private final AtomicLong statHit; | |||
@@ -195,13 +178,9 @@ public final class DfsBlockCache { | |||
blockSizeShift = Integer.numberOfTrailingZeros(blockSize); | |||
clockLock = new ReentrantLock(true /* fair */); | |||
clockHand = new Ref<>(new DfsStreamKey(), -1, 0, null); | |||
clockHand = new Ref<>(DfsStreamKey.of(""), -1, 0, null); //$NON-NLS-1$ | |||
clockHand.next = clockHand; | |||
packCache = new ConcurrentHashMap<>( | |||
16, 0.75f, 1); | |||
packFiles = Collections.unmodifiableCollection(packCache.values()); | |||
statHit = new AtomicLong(); | |||
statMiss = new AtomicLong(); | |||
} | |||
@@ -250,38 +229,6 @@ public final class DfsBlockCache { | |||
return statEvict; | |||
} | |||
/** | |||
* Get the pack files stored in this cache. | |||
* | |||
* @return a collection of pack files, some of which may not actually be | |||
* present; the caller should check the pack's cached size. | |||
*/ | |||
public Collection<DfsPackFile> getPackFiles() { | |||
return packFiles; | |||
} | |||
DfsPackFile getOrCreate(DfsPackDescription dsc, DfsStreamKey key) { | |||
// TODO This table grows without bound. It needs to clean up | |||
// entries that aren't in cache anymore, and aren't being used | |||
// by a live DfsObjDatabase reference. | |||
DfsPackFile pack = packCache.get(dsc); | |||
if (pack != null && !pack.invalid()) { | |||
return pack; | |||
} | |||
// 'pack' either didn't exist or was invalid. Compute a new | |||
// entry atomically (guaranteed by ConcurrentHashMap). | |||
return packCache.compute(dsc, (k, v) -> { | |||
if (v != null && !v.invalid()) { // valid value added by | |||
return v; // another thread | |||
} else { | |||
return new DfsPackFile( | |||
this, dsc, key != null ? key : new DfsStreamKey()); | |||
} | |||
}); | |||
} | |||
private int hash(int packHash, long off) { | |||
return packHash + (int) (off >>> blockSizeShift); | |||
} | |||
@@ -361,7 +308,6 @@ public final class DfsBlockCache { | |||
e2 = table.get(slot); | |||
} | |||
key.cachedSize.addAndGet(v.size()); | |||
Ref<DfsBlock> ref = new Ref<>(key, position, v.size(), v); | |||
ref.hot = true; | |||
for (;;) { | |||
@@ -409,7 +355,6 @@ public final class DfsBlockCache { | |||
dead.next = null; | |||
dead.value = null; | |||
live -= dead.size; | |||
dead.pack.cachedSize.addAndGet(-dead.size); | |||
statEvict++; | |||
} while (maxBytes < live); | |||
clockHand = prev; | |||
@@ -465,7 +410,6 @@ public final class DfsBlockCache { | |||
} | |||
} | |||
key.cachedSize.addAndGet(size); | |||
ref = new Ref<>(key, pos, size, v); | |||
ref.hot = true; | |||
for (;;) { | |||
@@ -495,31 +439,27 @@ public final class DfsBlockCache { | |||
return val; | |||
} | |||
private <T> T scan(HashEntry n, DfsStreamKey pack, long position) { | |||
Ref<T> r = scanRef(n, pack, position); | |||
private <T> T scan(HashEntry n, DfsStreamKey key, long position) { | |||
Ref<T> r = scanRef(n, key, position); | |||
return r != null ? r.get() : null; | |||
} | |||
@SuppressWarnings("unchecked") | |||
private <T> Ref<T> scanRef(HashEntry n, DfsStreamKey pack, long position) { | |||
private <T> Ref<T> scanRef(HashEntry n, DfsStreamKey key, long position) { | |||
for (; n != null; n = n.next) { | |||
Ref<T> r = n.ref; | |||
if (r.pack == pack && r.position == position) | |||
if (r.position == position && r.key.equals(key)) | |||
return r.get() != null ? r : null; | |||
} | |||
return null; | |||
} | |||
void remove(DfsPackFile pack) { | |||
packCache.remove(pack.getPackDescription()); | |||
} | |||
private int slot(DfsStreamKey pack, long position) { | |||
return (hash(pack.hash, position) >>> 1) % tableSize; | |||
private int slot(DfsStreamKey key, long position) { | |||
return (hash(key.hash, position) >>> 1) % tableSize; | |||
} | |||
private ReentrantLock lockFor(DfsStreamKey pack, long position) { | |||
return loadLocks[(hash(pack.hash, position) >>> 1) % loadLocks.length]; | |||
private ReentrantLock lockFor(DfsStreamKey key, long position) { | |||
return loadLocks[(hash(key.hash, position) >>> 1) % loadLocks.length]; | |||
} | |||
private static HashEntry clean(HashEntry top) { | |||
@@ -545,15 +485,15 @@ public final class DfsBlockCache { | |||
} | |||
static final class Ref<T> { | |||
final DfsStreamKey pack; | |||
final DfsStreamKey key; | |||
final long position; | |||
final int size; | |||
volatile T value; | |||
Ref next; | |||
volatile boolean hot; | |||
Ref(DfsStreamKey pack, long position, int size, T v) { | |||
this.pack = pack; | |||
Ref(DfsStreamKey key, long position, int size, T v) { | |||
this.key = key; | |||
this.position = position; | |||
this.size = size; | |||
this.value = v; |
@@ -587,8 +587,6 @@ public class DfsGarbageCollector { | |||
pack.setLastModified(startTimeMillis); | |||
newPackStats.add(stats); | |||
newPackObj.add(pw.getObjectSet()); | |||
DfsBlockCache.getInstance().getOrCreate(pack, null); | |||
return pack; | |||
} | |||
} |
@@ -221,7 +221,7 @@ public class DfsInserter extends ObjectInserter { | |||
db.commitPack(Collections.singletonList(packDsc), null); | |||
rollback = false; | |||
DfsPackFile p = cache.getOrCreate(packDsc, packKey); | |||
DfsPackFile p = new DfsPackFile(cache, packDsc); | |||
if (index != null) | |||
p.setPackIndex(index); | |||
db.addPack(p); | |||
@@ -282,7 +282,7 @@ public class DfsInserter extends ObjectInserter { | |||
rollback = true; | |||
packDsc = db.newPack(DfsObjDatabase.PackSource.INSERT); | |||
packOut = new PackStream(db.writeFile(packDsc, PACK)); | |||
packKey = new DfsStreamKey(); | |||
packKey = packDsc.getStreamKey(PACK); | |||
// Write the header as though it were a single object pack. | |||
byte[] buf = packOut.hdrBuf; |
@@ -465,7 +465,7 @@ public abstract class DfsObjDatabase extends ObjectDatabase { | |||
if (oldPack != null) { | |||
list.add(oldPack); | |||
} else { | |||
list.add(cache.getOrCreate(dsc, null)); | |||
list.add(new DfsPackFile(cache, dsc)); | |||
foundNew = true; | |||
} | |||
} |
@@ -138,6 +138,15 @@ public class DfsPackDescription implements Comparable<DfsPackDescription> { | |||
return packName + '.' + ext.getExtension(); | |||
} | |||
/** | |||
* @param ext | |||
* the file extension. | |||
* @return cache key for use by the block cache. | |||
*/ | |||
public DfsStreamKey getStreamKey(PackExt ext) { | |||
return DfsStreamKey.of(getFileName(ext)); | |||
} | |||
/** @return the source of the pack. */ | |||
public PackSource getPackSource() { | |||
return packSource; |
@@ -72,7 +72,6 @@ import org.eclipse.jgit.internal.storage.file.PackBitmapIndex; | |||
import org.eclipse.jgit.internal.storage.file.PackIndex; | |||
import org.eclipse.jgit.internal.storage.file.PackReverseIndex; | |||
import org.eclipse.jgit.internal.storage.pack.BinaryDelta; | |||
import org.eclipse.jgit.internal.storage.pack.PackExt; | |||
import org.eclipse.jgit.internal.storage.pack.PackOutputStream; | |||
import org.eclipse.jgit.internal.storage.pack.StoredObjectRepresentation; | |||
import org.eclipse.jgit.lib.AbbreviatedObjectId; | |||
@@ -89,8 +88,8 @@ import org.eclipse.jgit.util.LongList; | |||
* objects are similar. | |||
*/ | |||
public final class DfsPackFile extends BlockBasedFile { | |||
final DfsStreamKey idxKey = new DfsStreamKey(); | |||
final DfsStreamKey reverseIdxKey = new DfsStreamKey(); | |||
final DfsStreamKey idxKey; | |||
final DfsStreamKey reverseIdxKey; | |||
DfsStreamKey bitmapKey; | |||
/** | |||
@@ -125,11 +124,11 @@ public final class DfsPackFile extends BlockBasedFile { | |||
* cache that owns the pack data. | |||
* @param desc | |||
* description of the pack within the DFS. | |||
* @param key | |||
* interned key used to identify blocks in the block cache. | |||
*/ | |||
DfsPackFile(DfsBlockCache cache, DfsPackDescription desc, DfsStreamKey key) { | |||
super(cache, key, desc, PACK); | |||
DfsPackFile(DfsBlockCache cache, DfsPackDescription desc) { | |||
super(cache, desc, PACK); | |||
idxKey = desc.getStreamKey(INDEX); | |||
reverseIdxKey = idxKey.derive("r"); //$NON-NLS-1$ | |||
length = desc.getFileSize(PACK); | |||
if (length <= 0) | |||
length = -1; | |||
@@ -137,7 +136,7 @@ public final class DfsPackFile extends BlockBasedFile { | |||
/** @return description that was originally used to configure this pack file. */ | |||
public DfsPackDescription getPackDescription() { | |||
return packDesc; | |||
return desc; | |||
} | |||
/** | |||
@@ -148,11 +147,6 @@ public final class DfsPackFile extends BlockBasedFile { | |||
return idxref != null && idxref.has(); | |||
} | |||
/** @return bytes cached in memory for this pack, excluding the index. */ | |||
public long getCachedSize() { | |||
return key.cachedSize.get(); | |||
} | |||
void setPackIndex(PackIndex idx) { | |||
long objCnt = idx.getObjectCount(); | |||
int recSize = Constants.OBJECT_ID_LENGTH + 8; | |||
@@ -200,7 +194,7 @@ public final class DfsPackFile extends BlockBasedFile { | |||
try { | |||
ctx.stats.readIdx++; | |||
long start = System.nanoTime(); | |||
ReadableChannel rc = ctx.db.openFile(packDesc, INDEX); | |||
ReadableChannel rc = ctx.db.openFile(desc, INDEX); | |||
try { | |||
InputStream in = Channels.newInputStream(rc); | |||
int wantSize = 8192; | |||
@@ -219,14 +213,14 @@ public final class DfsPackFile extends BlockBasedFile { | |||
invalid = true; | |||
IOException e2 = new IOException(MessageFormat.format( | |||
DfsText.get().shortReadOfIndex, | |||
packDesc.getFileName(INDEX))); | |||
desc.getFileName(INDEX))); | |||
e2.initCause(e); | |||
throw e2; | |||
} catch (IOException e) { | |||
invalid = true; | |||
IOException e2 = new IOException(MessageFormat.format( | |||
DfsText.get().cannotReadIndex, | |||
packDesc.getFileName(INDEX))); | |||
desc.getFileName(INDEX))); | |||
e2.initCause(e); | |||
throw e2; | |||
} | |||
@@ -237,7 +231,7 @@ public final class DfsPackFile extends BlockBasedFile { | |||
} | |||
final boolean isGarbage() { | |||
return packDesc.getPackSource() == UNREACHABLE_GARBAGE; | |||
return desc.getPackSource() == UNREACHABLE_GARBAGE; | |||
} | |||
PackBitmapIndex getBitmapIndex(DfsReader ctx) throws IOException { | |||
@@ -250,7 +244,7 @@ public final class DfsPackFile extends BlockBasedFile { | |||
return idx; | |||
} | |||
if (!packDesc.hasFileExt(PackExt.BITMAP_INDEX)) | |||
if (!desc.hasFileExt(BITMAP_INDEX)) | |||
return null; | |||
synchronized (initLock) { | |||
@@ -261,14 +255,14 @@ public final class DfsPackFile extends BlockBasedFile { | |||
return idx; | |||
} | |||
if (bitmapKey == null) { | |||
bitmapKey = new DfsStreamKey(); | |||
bitmapKey = desc.getStreamKey(BITMAP_INDEX); | |||
} | |||
long size; | |||
PackBitmapIndex idx; | |||
try { | |||
ctx.stats.readBitmap++; | |||
long start = System.nanoTime(); | |||
ReadableChannel rc = ctx.db.openFile(packDesc, BITMAP_INDEX); | |||
ReadableChannel rc = ctx.db.openFile(desc, BITMAP_INDEX); | |||
try { | |||
InputStream in = Channels.newInputStream(rc); | |||
int wantSize = 8192; | |||
@@ -289,13 +283,13 @@ public final class DfsPackFile extends BlockBasedFile { | |||
} catch (EOFException e) { | |||
IOException e2 = new IOException(MessageFormat.format( | |||
DfsText.get().shortReadOfIndex, | |||
packDesc.getFileName(BITMAP_INDEX))); | |||
desc.getFileName(BITMAP_INDEX))); | |||
e2.initCause(e); | |||
throw e2; | |||
} catch (IOException e) { | |||
IOException e2 = new IOException(MessageFormat.format( | |||
DfsText.get().cannotReadIndex, | |||
packDesc.getFileName(BITMAP_INDEX))); | |||
desc.getFileName(BITMAP_INDEX))); | |||
e2.initCause(e); | |||
throw e2; | |||
} | |||
@@ -377,7 +371,6 @@ public final class DfsPackFile extends BlockBasedFile { | |||
/** Release all memory used by this DfsPackFile instance. */ | |||
public void close() { | |||
cache.remove(this); | |||
index = null; | |||
reverseIndex = null; | |||
} | |||
@@ -445,7 +438,7 @@ public final class DfsPackFile extends BlockBasedFile { | |||
} else { | |||
b = cache.get(key, alignToBlock(position)); | |||
if (b == null) { | |||
rc = ctx.db.openFile(packDesc, PACK); | |||
rc = ctx.db.openFile(desc, PACK); | |||
int sz = ctx.getOptions().getStreamPackBufferSize(); | |||
if (sz > 0) { | |||
rc.setReadAheadBytes(sz); | |||
@@ -469,7 +462,7 @@ public final class DfsPackFile extends BlockBasedFile { | |||
private long copyPackBypassCache(PackOutputStream out, DfsReader ctx) | |||
throws IOException { | |||
try (ReadableChannel rc = ctx.db.openFile(packDesc, PACK)) { | |||
try (ReadableChannel rc = ctx.db.openFile(desc, PACK)) { | |||
ByteBuffer buf = newCopyBuffer(out, rc); | |||
if (ctx.getOptions().getStreamPackBufferSize() > 0) | |||
rc.setReadAheadBytes(ctx.getOptions().getStreamPackBufferSize()); |
@@ -155,7 +155,7 @@ public class DfsPackParser extends PackParser { | |||
objdb.commitPack(Collections.singletonList(packDsc), null); | |||
rollback = false; | |||
DfsPackFile p = blockCache.getOrCreate(packDsc, packKey); | |||
DfsPackFile p = new DfsPackFile(blockCache, packDsc); | |||
p.setBlockSize(blockSize); | |||
if (packIndex != null) | |||
p.setPackIndex(packIndex); | |||
@@ -206,9 +206,9 @@ public class DfsPackParser extends PackParser { | |||
} | |||
packDsc = objdb.newPack(DfsObjDatabase.PackSource.RECEIVE); | |||
packKey = new DfsStreamKey(); | |||
out = objdb.writeFile(packDsc, PACK); | |||
packKey = packDsc.getStreamKey(PACK); | |||
int size = out.blockSize(); | |||
if (size <= 0) | |||
size = blockCache.getBlockSize(); |
@@ -43,15 +43,89 @@ | |||
package org.eclipse.jgit.internal.storage.dfs; | |||
import java.util.concurrent.atomic.AtomicLong; | |||
import static java.nio.charset.StandardCharsets.UTF_8; | |||
import java.util.Arrays; | |||
/** Key used by {@link DfsBlockCache} to disambiguate streams. */ | |||
public abstract class DfsStreamKey { | |||
/** | |||
* @param name | |||
* compute the key from a string name. | |||
* @return key for {@code name} | |||
*/ | |||
public static DfsStreamKey of(String name) { | |||
return of(name.getBytes(UTF_8)); | |||
} | |||
/** | |||
* @param name | |||
* compute the key from a byte array. The key takes ownership of | |||
* the passed {@code byte[] name}. | |||
* @return key for {@code name} | |||
*/ | |||
public static DfsStreamKey of(byte[] name) { | |||
return new ByteArrayDfsStreamKey(name); | |||
} | |||
final class DfsStreamKey { | |||
final int hash; | |||
final AtomicLong cachedSize = new AtomicLong(); | |||
DfsStreamKey() { | |||
/** | |||
* @param hash | |||
* hash of the other identifying components of the key. | |||
*/ | |||
protected DfsStreamKey(int hash) { | |||
// Multiply by 31 here so we can more directly combine with another | |||
// value without doing the multiply there. | |||
hash = System.identityHashCode(this) * 31; | |||
this.hash = hash * 31; | |||
} | |||
/** | |||
* Derive a new StreamKey based on this existing key. | |||
* | |||
* @param suffix | |||
* a derivation suffix. | |||
* @return derived stream key. | |||
*/ | |||
public abstract DfsStreamKey derive(String suffix); | |||
@Override | |||
public int hashCode() { | |||
return hash; | |||
} | |||
@Override | |||
public abstract boolean equals(Object o); | |||
@SuppressWarnings("boxing") | |||
@Override | |||
public String toString() { | |||
return String.format("DfsStreamKey[hash=%08x]", hash); //$NON-NLS-1$ | |||
} | |||
private static final class ByteArrayDfsStreamKey extends DfsStreamKey { | |||
private final byte[] name; | |||
ByteArrayDfsStreamKey(byte[] name) { | |||
super(Arrays.hashCode(name)); | |||
this.name = name; | |||
} | |||
@Override | |||
public DfsStreamKey derive(String suffix) { | |||
byte[] s = suffix.getBytes(UTF_8); | |||
byte[] n = Arrays.copyOf(name, name.length + s.length); | |||
System.arraycopy(s, 0, n, name.length, s.length); | |||
return new ByteArrayDfsStreamKey(n); | |||
} | |||
@Override | |||
public boolean equals(Object o) { | |||
if (o instanceof ByteArrayDfsStreamKey) { | |||
ByteArrayDfsStreamKey k = (ByteArrayDfsStreamKey) o; | |||
return hash == k.hash && Arrays.equals(name, k.name); | |||
} | |||
return false; | |||
} | |||
} | |||
} |