/*
* Copyright (c) 2024, Google LLC and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Distribution License v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*/
package org.eclipse.jgit.internal.storage.dfs;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.LongStream;
import org.eclipse.jgit.internal.JGitText;
import org.eclipse.jgit.internal.storage.dfs.DfsBlockCache.ReadableChannelSupplier;
import org.eclipse.jgit.internal.storage.dfs.DfsBlockCache.Ref;
import org.eclipse.jgit.internal.storage.dfs.DfsBlockCache.RefLoader;
import org.eclipse.jgit.internal.storage.pack.PackExt;
/**
* Default implementation of the {@link DfsBlockCacheTable}.
*
* This cache implements a clock replacement algorithm, giving each block at
* least one chance to have been accessed during a sweep of the cache to save
* itself from eviction. The number of swipe chances is configurable per pack
* extension.
*
* Entities created by the cache are held under hard references, preventing the
* Java VM from clearing anything. Blocks are discarded by the replacement
* algorithm when adding a new block would cause the cache to exceed its
* configured maximum size.
*
* Whenever a cache miss occurs, loading is invoked by exactly one thread for
* the given (DfsStreamKey,position)
key tuple. This is ensured by
* an array of locks, with the tuple hashed to a lock instance.
*
* The internal hash table does not expand at runtime, instead it is fixed in
* size at cache creation time. The internal lock table used to gate load
* invocations is also fixed in size.
*/
final class ClockBlockCacheTable implements DfsBlockCacheTable {
/**
* Table name.
*/
private final String name;
/** Number of entries in {@link #table}. */
private final int tableSize;
/** Maximum number of bytes the cache should hold. */
private final long maxBytes;
/**
* Used to reserve space for blocks.
*
* The value for blockSize must be a power of 2.
*/
private final int blockSize;
private final Hash hash;
/** Hash bucket directory; entries are chained below. */
private final AtomicReferenceArray table;
/**
* Locks to prevent concurrent loads for same (PackFile,position) block. The
* number of locks is {@link DfsBlockCacheConfig#getConcurrencyLevel()} to
* cap the overall concurrent block loads.
*/
private final ReentrantLock[] loadLocks;
/**
* A separate pool of locks per pack extension to prevent concurrent loads
* for same index or bitmap from PackFile.
*/
private final ReentrantLock[][] refLocks;
/** Protects the clock and its related data. */
private final ReentrantLock clockLock;
/** Current position of the clock. */
private Ref clockHand;
private final DfsBlockCacheStats dfsBlockCacheStats;
/**
* A consumer of object reference lock wait time milliseconds. May be used
* to build a metric.
*/
private final Consumer refLockWaitTime;
/** Consumer of loading and eviction events of indexes. */
private final DfsBlockCacheConfig.IndexEventConsumer indexEventConsumer;
/** Stores timestamps of the last eviction of indexes. */
private final Map indexEvictionMap = new ConcurrentHashMap<>();
ClockBlockCacheTable(DfsBlockCacheConfig cfg) {
this.tableSize = tableSize(cfg);
if (tableSize < 1) {
throw new IllegalArgumentException(
JGitText.get().tSizeMustBeGreaterOrEqual1);
}
int concurrencyLevel = cfg.getConcurrencyLevel();
this.maxBytes = cfg.getBlockLimit();
this.blockSize = cfg.getBlockSize();
int blockSizeShift = Integer.numberOfTrailingZeros(blockSize);
this.hash = new Hash(blockSizeShift);
table = new AtomicReferenceArray<>(tableSize);
loadLocks = new ReentrantLock[concurrencyLevel];
for (int i = 0; i < loadLocks.length; i++) {
loadLocks[i] = new ReentrantLock(/* fair= */ true);
}
refLocks = new ReentrantLock[PackExt.values().length][concurrencyLevel];
for (int i = 0; i < PackExt.values().length; i++) {
for (int j = 0; j < concurrencyLevel; ++j) {
refLocks[i][j] = new ReentrantLock(/* fair= */ true);
}
}
clockLock = new ReentrantLock(/* fair= */ true);
String none = ""; //$NON-NLS-1$
clockHand = new Ref<>(
DfsStreamKey.of(new DfsRepositoryDescription(none), none, null),
-1, 0, null);
clockHand.next = clockHand;
this.name = cfg.getName();
this.dfsBlockCacheStats = new DfsBlockCacheStats(this.name);
this.refLockWaitTime = cfg.getRefLockWaitTimeConsumer();
this.indexEventConsumer = cfg.getIndexEventConsumer();
}
@Override
public List getBlockCacheStats() {
return List.of(dfsBlockCacheStats);
}
@Override
public String getName() {
return name;
}
@Override
public boolean hasBlock0(DfsStreamKey key) {
HashEntry e1 = table.get(slot(key, 0));
DfsBlock v = scan(e1, key, 0);
return v != null && v.contains(key, 0);
}
@Override
public DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader ctx,
ReadableChannelSupplier fileChannel) throws IOException {
final long requestedPosition = position;
position = file.alignToBlock(position);
DfsStreamKey key = file.key;
int slot = slot(key, position);
HashEntry e1 = table.get(slot);
DfsBlock v = scan(e1, key, position);
if (v != null && v.contains(key, requestedPosition)) {
ctx.stats.blockCacheHit++;
dfsBlockCacheStats.incrementHit(key);
return v;
}
reserveSpace(blockSize, key);
ReentrantLock regionLock = lockFor(key, position);
regionLock.lock();
try {
HashEntry e2 = table.get(slot);
if (e2 != e1) {
v = scan(e2, key, position);
if (v != null) {
ctx.stats.blockCacheHit++;
dfsBlockCacheStats.incrementHit(key);
creditSpace(blockSize, key);
return v;
}
}
dfsBlockCacheStats.incrementMiss(key);
boolean credit = true;
try {
v = file.readOneBlock(position, ctx, fileChannel.get());
credit = false;
} finally {
if (credit) {
creditSpace(blockSize, key);
}
}
if (position != v.start) {
// The file discovered its blockSize and adjusted.
position = v.start;
slot = slot(key, position);
e2 = table.get(slot);
}
Ref ref = new Ref<>(key, position, v.size(), v);
ref.markHotter();
for (;;) {
HashEntry n = new HashEntry(HashEntry.clean(e2), ref);
if (table.compareAndSet(slot, e2, n)) {
break;
}
e2 = table.get(slot);
}
addToClock(ref, blockSize - v.size());
} finally {
regionLock.unlock();
}
// If the block size changed from the default, it is possible the
// block
// that was loaded is the wrong block for the requested position.
if (v.contains(file.key, requestedPosition)) {
return v;
}
return getOrLoad(file, requestedPosition, ctx, fileChannel);
}
@Override
public Ref getOrLoadRef(DfsStreamKey key, long position,
RefLoader loader) throws IOException {
long start = System.nanoTime();
int slot = slot(key, position);
HashEntry e1 = table.get(slot);
Ref ref = scanRef(e1, key, position);
if (ref != null) {
dfsBlockCacheStats.incrementHit(key);
reportIndexRequested(ref, true /* cacheHit= */, start);
return ref;
}
ReentrantLock regionLock = lockForRef(key);
long lockStart = System.currentTimeMillis();
regionLock.lock();
try {
HashEntry e2 = table.get(slot);
if (e2 != e1) {
ref = scanRef(e2, key, position);
if (ref != null) {
dfsBlockCacheStats.incrementHit(key);
reportIndexRequested(ref, true /* cacheHit= */, start);
return ref;
}
}
if (refLockWaitTime != null) {
refLockWaitTime.accept(
Long.valueOf(System.currentTimeMillis() - lockStart));
}
dfsBlockCacheStats.incrementMiss(key);
ref = loader.load();
ref.markHotter();
// Reserve after loading to get the size of the object
reserveSpace(ref.size, key);
for (;;) {
HashEntry n = new HashEntry(HashEntry.clean(e2), ref);
if (table.compareAndSet(slot, e2, n)) {
break;
}
e2 = table.get(slot);
}
addToClock(ref, 0);
} finally {
regionLock.unlock();
}
reportIndexRequested(ref, /* cacheHit= */ false, start);
return ref;
}
@Override
public void put(DfsBlock v) {
put(v.stream, v.start, v.size(), v);
}
@Override
public Ref put(DfsStreamKey key, long pos, long size, T v) {
int slot = slot(key, pos);
HashEntry e1 = table.get(slot);
Ref ref = scanRef(e1, key, pos);
if (ref != null) {
return ref;
}
reserveSpace(size, key);
ReentrantLock regionLock = lockFor(key, pos);
regionLock.lock();
try {
HashEntry e2 = table.get(slot);
if (e2 != e1) {
ref = scanRef(e2, key, pos);
if (ref != null) {
creditSpace(size, key);
return ref;
}
}
ref = new Ref<>(key, pos, size, v);
ref.markHotter();
for (;;) {
HashEntry n = new HashEntry(HashEntry.clean(e2), ref);
if (table.compareAndSet(slot, e2, n)) {
break;
}
e2 = table.get(slot);
}
addToClock(ref, 0);
} finally {
regionLock.unlock();
}
return ref;
}
@Override
public Ref putRef(DfsStreamKey key, long size, T v) {
return put(key, 0, size, v);
}
@Override
public boolean contains(DfsStreamKey key, long position) {
return scan(table.get(slot(key, position)), key, position) != null;
}
@SuppressWarnings("unchecked")
@Override
public T get(DfsStreamKey key, long position) {
T val = (T) scan(table.get(slot(key, position)), key, position);
if (val == null) {
dfsBlockCacheStats.incrementMiss(key);
} else {
dfsBlockCacheStats.incrementHit(key);
}
return val;
}
private int slot(DfsStreamKey key, long position) {
return (hash.hash(key.hash, position) >>> 1) % tableSize;
}
@SuppressWarnings("unchecked")
private void reserveSpace(long reserve, DfsStreamKey key) {
clockLock.lock();
try {
long live = LongStream.of(dfsBlockCacheStats.getCurrentSize()).sum()
+ reserve;
if (maxBytes < live) {
Ref prev = clockHand;
Ref hand = clockHand.next;
do {
if (hand.isHot()) {
// Value was recently touched. Cache is still hot so
// give it another chance, but cool it down a bit.
hand.markColder();
prev = hand;
hand = hand.next;
continue;
} else if (prev == hand) {
break;
}
// No recent access since last scan, kill
// value and remove from clock.
Ref dead = hand;
hand = hand.next;
prev.next = hand;
dead.next = null;
dead.value = null;
live -= dead.size;
dfsBlockCacheStats.addToLiveBytes(dead.key, -dead.size);
dfsBlockCacheStats.incrementEvict(dead.key);
reportIndexEvicted(dead);
} while (maxBytes < live);
clockHand = prev;
}
dfsBlockCacheStats.addToLiveBytes(key, reserve);
} finally {
clockLock.unlock();
}
}
private void creditSpace(long credit, DfsStreamKey key) {
clockLock.lock();
try {
dfsBlockCacheStats.addToLiveBytes(key, -credit);
} finally {
clockLock.unlock();
}
}
@SuppressWarnings("unchecked")
private void addToClock(Ref ref, long credit) {
clockLock.lock();
try {
if (credit != 0) {
dfsBlockCacheStats.addToLiveBytes(ref.key, -credit);
}
Ref ptr = clockHand;
ref.next = ptr.next;
ptr.next = ref;
clockHand = ref;
} finally {
clockLock.unlock();
}
}
private T scan(HashEntry n, DfsStreamKey key, long position) {
Ref r = scanRef(n, key, position);
return r != null ? r.get() : null;
}
@SuppressWarnings("unchecked")
private Ref scanRef(HashEntry n, DfsStreamKey key, long position) {
for (; n != null; n = n.next) {
Ref r = n.ref;
if (r.position == position && r.key.equals(key)) {
return r.get() != null ? r : null;
}
}
return null;
}
private ReentrantLock lockFor(DfsStreamKey key, long position) {
return loadLocks[(hash.hash(key.hash, position) >>> 1)
% loadLocks.length];
}
private ReentrantLock lockForRef(DfsStreamKey key) {
int slot = (key.hash >>> 1) % refLocks[key.packExtPos].length;
return refLocks[key.packExtPos][slot];
}
private void reportIndexRequested(Ref> ref, boolean cacheHit,
long start) {
if (indexEventConsumer == null || !isIndexExtPos(ref.key.packExtPos)) {
return;
}
EvictKey evictKey = createEvictKey(ref);
Long prevEvictedTime = indexEvictionMap.get(evictKey);
long now = System.nanoTime();
long sinceLastEvictionNanos = prevEvictedTime == null ? 0L
: now - prevEvictedTime.longValue();
indexEventConsumer.acceptRequestedEvent(ref.key.packExtPos, cacheHit,
(now - start) / 1000L /* micros */, ref.size,
Duration.ofNanos(sinceLastEvictionNanos));
}
private void reportIndexEvicted(Ref> dead) {
if (indexEventConsumer == null
|| !indexEventConsumer.shouldReportEvictedEvent()
|| !isIndexExtPos(dead.key.packExtPos)) {
return;
}
EvictKey evictKey = createEvictKey(dead);
Long prevEvictedTime = indexEvictionMap.get(evictKey);
long now = System.nanoTime();
long sinceLastEvictionNanos = prevEvictedTime == null ? 0L
: now - prevEvictedTime.longValue();
indexEvictionMap.put(evictKey, Long.valueOf(now));
indexEventConsumer.acceptEvictedEvent(dead.key.packExtPos, dead.size,
dead.getTotalHitCount(),
Duration.ofNanos(sinceLastEvictionNanos));
}
private static final class HashEntry {
/** Next entry in the hash table's chain list. */
final HashEntry next;
/** The referenced object. */
final Ref ref;
HashEntry(HashEntry n, Ref r) {
next = n;
ref = r;
}
private static HashEntry clean(HashEntry top) {
while (top != null && top.ref.next == null) {
top = top.next;
}
if (top == null) {
return null;
}
HashEntry n = clean(top.next);
return n == top.next ? top : new HashEntry(n, top.ref);
}
}
private EvictKey createEvictKey(Ref> ref) {
return new EvictKey(hash, ref);
}
private static boolean isIndexExtPos(int packExtPos) {
return packExtPos == PackExt.INDEX.getPosition()
|| packExtPos == PackExt.REVERSE_INDEX.getPosition()
|| packExtPos == PackExt.BITMAP_INDEX.getPosition();
}
private static int tableSize(DfsBlockCacheConfig cfg) {
final int wsz = cfg.getBlockSize();
final long limit = cfg.getBlockLimit();
if (wsz <= 0) {
throw new IllegalArgumentException(
JGitText.get().invalidWindowSize);
}
if (limit < wsz) {
throw new IllegalArgumentException(
JGitText.get().windowSizeMustBeLesserThanLimit);
}
return (int) Math.min(5 * (limit / wsz) / 2, Integer.MAX_VALUE);
}
private static final class Hash {
/**
* As {@link #blockSize} is a power of 2, bits to shift for a /
* blockSize.
*/
private final int blockSizeShift;
Hash(int blockSizeShift) {
this.blockSizeShift = blockSizeShift;
}
int hash(int packHash, long off) {
return packHash + (int) (off >>> blockSizeShift);
}
}
private static final class EvictKey {
/**
* Provides the hash function to be used for this key's hashCode method.
*/
private final Hash hash;
private final int keyHash;
private final int packExtPos;
private final long position;
EvictKey(Hash hash, Ref> ref) {
this.hash = hash;
keyHash = ref.key.hash;
packExtPos = ref.key.packExtPos;
position = ref.position;
}
@Override
public boolean equals(Object object) {
if (object instanceof EvictKey) {
EvictKey other = (EvictKey) object;
return keyHash == other.keyHash
&& packExtPos == other.packExtPos
&& position == other.position;
}
return false;
}
@Override
public int hashCode() {
return hash.hash(keyHash, position);
}
}
}