diff options
author | Laura Hamelin <haowl@google.com> | 2024-02-27 09:40:24 -0800 |
---|---|---|
committer | Laura Hamelin <haowl@google.com> | 2024-03-05 10:29:21 -0800 |
commit | fe681b34fb36cd3fe9757fad789cfb16827c142d (patch) | |
tree | a3059c94ed8e10e5374b8b072bba840ba839d497 /org.eclipse.jgit | |
parent | 07d7452b380028df214d54f2d33beecd93d05be3 (diff) | |
download | jgit-fe681b34fb36cd3fe9757fad789cfb16827c142d.tar.gz jgit-fe681b34fb36cd3fe9757fad789cfb16827c142d.zip |
DfsBlockCache: move cache table specific implementations to a new class
This move of the cache table specific implementation to its own class
and extracting the method definition to an interface will allow for
additional reworking of the dfs block cache with the goal of letting
users implement their own context specific cache tables.
This work does not intend to change how the dfs block cache is accessed,
limiting the change to DfsBlockCache internal changes only.
Change-Id: Ief6755d1dcf54b4f73f9fe8d22ccb9e2952cb0c4
Signed-off-by: Laura Hamelin <haowl@google.com>
Diffstat (limited to 'org.eclipse.jgit')
3 files changed, 933 insertions, 551 deletions
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/ClockBlockCacheTable.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/ClockBlockCacheTable.java new file mode 100644 index 0000000000..d0907bcc8d --- /dev/null +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/ClockBlockCacheTable.java @@ -0,0 +1,563 @@ +/* + * Copyright (c) 2024, Google LLC and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Distribution License v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +package org.eclipse.jgit.internal.storage.dfs; + +import java.io.IOException; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.stream.LongStream; + +import org.eclipse.jgit.internal.JGitText; +import org.eclipse.jgit.internal.storage.dfs.DfsBlockCache.ReadableChannelSupplier; +import org.eclipse.jgit.internal.storage.dfs.DfsBlockCache.Ref; +import org.eclipse.jgit.internal.storage.dfs.DfsBlockCache.RefLoader; +import org.eclipse.jgit.internal.storage.pack.PackExt; + +/** + * Default implementation of the {@link DfsBlockCacheTable}. + * <p> + * This cache implements a clock replacement algorithm, giving each block at + * least one chance to have been accessed during a sweep of the cache to save + * itself from eviction. The number of swipe chances is configurable per pack + * extension. + * <p> + * Entities created by the cache are held under hard references, preventing the + * Java VM from clearing anything. Blocks are discarded by the replacement + * algorithm when adding a new block would cause the cache to exceed its + * configured maximum size. + * <p> + * Whenever a cache miss occurs, loading is invoked by exactly one thread for + * the given <code>(DfsStreamKey,position)</code> key tuple. This is ensured by + * an array of locks, with the tuple hashed to a lock instance. + * <p> + * The internal hash table does not expand at runtime, instead it is fixed in + * size at cache creation time. The internal lock table used to gate load + * invocations is also fixed in size. + */ +final class ClockBlockCacheTable implements DfsBlockCacheTable { + /** Number of entries in {@link #table}. */ + private final int tableSize; + + /** Maximum number of bytes the cache should hold. */ + private final long maxBytes; + + /** + * Used to reserve space for blocks. + * <p> + * The value for blockSize must be a power of 2. + */ + private final int blockSize; + + private final Hash hash; + + /** Hash bucket directory; entries are chained below. */ + private final AtomicReferenceArray<HashEntry> table; + + /** + * Locks to prevent concurrent loads for same (PackFile,position) block. The + * number of locks is {@link DfsBlockCacheConfig#getConcurrencyLevel()} to + * cap the overall concurrent block loads. + */ + private final ReentrantLock[] loadLocks; + + /** + * A separate pool of locks per pack extension to prevent concurrent loads + * for same index or bitmap from PackFile. + */ + private final ReentrantLock[][] refLocks; + + /** Protects the clock and its related data. */ + private final ReentrantLock clockLock; + + /** Current position of the clock. */ + private Ref clockHand; + + private final DfsBlockCacheStats dfsBlockCacheStats; + + /** + * A consumer of object reference lock wait time milliseconds. May be used + * to build a metric. + */ + private final Consumer<Long> refLockWaitTime; + + /** Consumer of loading and eviction events of indexes. */ + private final DfsBlockCacheConfig.IndexEventConsumer indexEventConsumer; + + /** Stores timestamps of the last eviction of indexes. */ + private final Map<EvictKey, Long> indexEvictionMap = new ConcurrentHashMap<>(); + + ClockBlockCacheTable(DfsBlockCacheConfig cfg) { + this.tableSize = tableSize(cfg); + if (tableSize < 1) { + throw new IllegalArgumentException( + JGitText.get().tSizeMustBeGreaterOrEqual1); + } + int concurrencyLevel = cfg.getConcurrencyLevel(); + this.maxBytes = cfg.getBlockLimit(); + this.blockSize = cfg.getBlockSize(); + int blockSizeShift = Integer.numberOfTrailingZeros(blockSize); + this.hash = new Hash(blockSizeShift); + table = new AtomicReferenceArray<>(tableSize); + + loadLocks = new ReentrantLock[concurrencyLevel]; + for (int i = 0; i < loadLocks.length; i++) { + loadLocks[i] = new ReentrantLock(/* fair= */ true); + } + refLocks = new ReentrantLock[PackExt.values().length][concurrencyLevel]; + for (int i = 0; i < PackExt.values().length; i++) { + for (int j = 0; j < concurrencyLevel; ++j) { + refLocks[i][j] = new ReentrantLock(/* fair= */ true); + } + } + + clockLock = new ReentrantLock(/* fair= */ true); + String none = ""; //$NON-NLS-1$ + clockHand = new Ref<>( + DfsStreamKey.of(new DfsRepositoryDescription(none), none, null), + -1, 0, null); + clockHand.next = clockHand; + + this.dfsBlockCacheStats = new DfsBlockCacheStats(); + this.refLockWaitTime = cfg.getRefLockWaitTimeConsumer(); + this.indexEventConsumer = cfg.getIndexEventConsumer(); + } + + @Override + public DfsBlockCacheStats getDfsBlockCacheStats() { + return dfsBlockCacheStats; + } + + @Override + public boolean hasBlock0(DfsStreamKey key) { + HashEntry e1 = table.get(slot(key, 0)); + DfsBlock v = scan(e1, key, 0); + return v != null && v.contains(key, 0); + } + + @Override + public DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader ctx, + ReadableChannelSupplier fileChannel) throws IOException { + final long requestedPosition = position; + position = file.alignToBlock(position); + + DfsStreamKey key = file.key; + int slot = slot(key, position); + HashEntry e1 = table.get(slot); + DfsBlock v = scan(e1, key, position); + if (v != null && v.contains(key, requestedPosition)) { + ctx.stats.blockCacheHit++; + dfsBlockCacheStats.incrementHit(key); + return v; + } + + reserveSpace(blockSize, key); + ReentrantLock regionLock = lockFor(key, position); + regionLock.lock(); + try { + HashEntry e2 = table.get(slot); + if (e2 != e1) { + v = scan(e2, key, position); + if (v != null) { + ctx.stats.blockCacheHit++; + dfsBlockCacheStats.incrementHit(key); + creditSpace(blockSize, key); + return v; + } + } + + dfsBlockCacheStats.incrementMiss(key); + boolean credit = true; + try { + v = file.readOneBlock(position, ctx, fileChannel.get()); + credit = false; + } finally { + if (credit) { + creditSpace(blockSize, key); + } + } + if (position != v.start) { + // The file discovered its blockSize and adjusted. + position = v.start; + slot = slot(key, position); + e2 = table.get(slot); + } + + Ref<DfsBlock> ref = new Ref<>(key, position, v.size(), v); + ref.markHotter(); + for (;;) { + HashEntry n = new HashEntry(HashEntry.clean(e2), ref); + if (table.compareAndSet(slot, e2, n)) { + break; + } + e2 = table.get(slot); + } + addToClock(ref, blockSize - v.size()); + } finally { + regionLock.unlock(); + } + + // If the block size changed from the default, it is possible the + // block + // that was loaded is the wrong block for the requested position. + if (v.contains(file.key, requestedPosition)) { + return v; + } + return getOrLoad(file, requestedPosition, ctx, fileChannel); + } + + @Override + public <T> Ref<T> getOrLoadRef(DfsStreamKey key, long position, + RefLoader<T> loader) throws IOException { + long start = System.nanoTime(); + int slot = slot(key, position); + HashEntry e1 = table.get(slot); + Ref<T> ref = scanRef(e1, key, position); + if (ref != null) { + dfsBlockCacheStats.incrementHit(key); + reportIndexRequested(ref, true /* cacheHit= */, start); + return ref; + } + + ReentrantLock regionLock = lockForRef(key); + long lockStart = System.currentTimeMillis(); + regionLock.lock(); + try { + HashEntry e2 = table.get(slot); + if (e2 != e1) { + ref = scanRef(e2, key, position); + if (ref != null) { + dfsBlockCacheStats.incrementHit(key); + reportIndexRequested(ref, true /* cacheHit= */, start); + return ref; + } + } + + if (refLockWaitTime != null) { + refLockWaitTime.accept( + Long.valueOf(System.currentTimeMillis() - lockStart)); + } + dfsBlockCacheStats.incrementMiss(key); + ref = loader.load(); + ref.markHotter(); + // Reserve after loading to get the size of the object + reserveSpace(ref.size, key); + for (;;) { + HashEntry n = new HashEntry(HashEntry.clean(e2), ref); + if (table.compareAndSet(slot, e2, n)) { + break; + } + e2 = table.get(slot); + } + addToClock(ref, 0); + } finally { + regionLock.unlock(); + } + reportIndexRequested(ref, /* cacheHit= */ false, start); + return ref; + } + + @Override + public void put(DfsBlock v) { + put(v.stream, v.start, v.size(), v); + } + + @Override + public <T> Ref<T> put(DfsStreamKey key, long pos, long size, T v) { + int slot = slot(key, pos); + HashEntry e1 = table.get(slot); + Ref<T> ref = scanRef(e1, key, pos); + if (ref != null) { + return ref; + } + + reserveSpace(size, key); + ReentrantLock regionLock = lockFor(key, pos); + regionLock.lock(); + try { + HashEntry e2 = table.get(slot); + if (e2 != e1) { + ref = scanRef(e2, key, pos); + if (ref != null) { + creditSpace(size, key); + return ref; + } + } + + ref = new Ref<>(key, pos, size, v); + ref.markHotter(); + for (;;) { + HashEntry n = new HashEntry(HashEntry.clean(e2), ref); + if (table.compareAndSet(slot, e2, n)) { + break; + } + e2 = table.get(slot); + } + addToClock(ref, 0); + } finally { + regionLock.unlock(); + } + return ref; + } + + @Override + public <T> Ref<T> putRef(DfsStreamKey key, long size, T v) { + return put(key, 0, size, v); + } + + @Override + public boolean contains(DfsStreamKey key, long position) { + return scan(table.get(slot(key, position)), key, position) != null; + } + + @SuppressWarnings("unchecked") + @Override + public <T> T get(DfsStreamKey key, long position) { + T val = (T) scan(table.get(slot(key, position)), key, position); + if (val == null) { + dfsBlockCacheStats.incrementMiss(key); + } else { + dfsBlockCacheStats.incrementHit(key); + } + return val; + } + + private int slot(DfsStreamKey key, long position) { + return (hash.hash(key.hash, position) >>> 1) % tableSize; + } + + @SuppressWarnings("unchecked") + private void reserveSpace(long reserve, DfsStreamKey key) { + clockLock.lock(); + try { + long live = LongStream.of(dfsBlockCacheStats.getCurrentSize()).sum() + + reserve; + if (maxBytes < live) { + Ref prev = clockHand; + Ref hand = clockHand.next; + do { + if (hand.isHot()) { + // Value was recently touched. Cache is still hot so + // give it another chance, but cool it down a bit. + hand.markColder(); + prev = hand; + hand = hand.next; + continue; + } else if (prev == hand) { + break; + } + + // No recent access since last scan, kill + // value and remove from clock. + Ref dead = hand; + hand = hand.next; + prev.next = hand; + dead.next = null; + dead.value = null; + live -= dead.size; + dfsBlockCacheStats.addToLiveBytes(dead.key, -dead.size); + dfsBlockCacheStats.incrementEvict(dead.key); + reportIndexEvicted(dead); + } while (maxBytes < live); + clockHand = prev; + } + dfsBlockCacheStats.addToLiveBytes(key, reserve); + } finally { + clockLock.unlock(); + } + } + + private void creditSpace(long credit, DfsStreamKey key) { + clockLock.lock(); + try { + dfsBlockCacheStats.addToLiveBytes(key, -credit); + } finally { + clockLock.unlock(); + } + } + + @SuppressWarnings("unchecked") + private void addToClock(Ref ref, long credit) { + clockLock.lock(); + try { + if (credit != 0) { + dfsBlockCacheStats.addToLiveBytes(ref.key, -credit); + } + Ref ptr = clockHand; + ref.next = ptr.next; + ptr.next = ref; + clockHand = ref; + } finally { + clockLock.unlock(); + } + } + + private <T> T scan(HashEntry n, DfsStreamKey key, long position) { + Ref<T> r = scanRef(n, key, position); + return r != null ? r.get() : null; + } + + @SuppressWarnings("unchecked") + private <T> Ref<T> scanRef(HashEntry n, DfsStreamKey key, long position) { + for (; n != null; n = n.next) { + Ref<T> r = n.ref; + if (r.position == position && r.key.equals(key)) { + return r.get() != null ? r : null; + } + } + return null; + } + + private ReentrantLock lockFor(DfsStreamKey key, long position) { + return loadLocks[(hash.hash(key.hash, position) >>> 1) + % loadLocks.length]; + } + + private ReentrantLock lockForRef(DfsStreamKey key) { + int slot = (key.hash >>> 1) % refLocks[key.packExtPos].length; + return refLocks[key.packExtPos][slot]; + } + + private void reportIndexRequested(Ref<?> ref, boolean cacheHit, + long start) { + if (indexEventConsumer == null || !isIndexExtPos(ref.key.packExtPos)) { + return; + } + EvictKey evictKey = createEvictKey(ref); + Long prevEvictedTime = indexEvictionMap.get(evictKey); + long now = System.nanoTime(); + long sinceLastEvictionNanos = prevEvictedTime == null ? 0L + : now - prevEvictedTime.longValue(); + indexEventConsumer.acceptRequestedEvent(ref.key.packExtPos, cacheHit, + (now - start) / 1000L /* micros */, ref.size, + Duration.ofNanos(sinceLastEvictionNanos)); + } + + private void reportIndexEvicted(Ref<?> dead) { + if (indexEventConsumer == null + || !indexEventConsumer.shouldReportEvictedEvent() + || !isIndexExtPos(dead.key.packExtPos)) { + return; + } + EvictKey evictKey = createEvictKey(dead); + Long prevEvictedTime = indexEvictionMap.get(evictKey); + long now = System.nanoTime(); + long sinceLastEvictionNanos = prevEvictedTime == null ? 0L + : now - prevEvictedTime.longValue(); + indexEvictionMap.put(evictKey, Long.valueOf(now)); + indexEventConsumer.acceptEvictedEvent(dead.key.packExtPos, dead.size, + dead.getTotalHitCount(), + Duration.ofNanos(sinceLastEvictionNanos)); + } + + private static final class HashEntry { + /** Next entry in the hash table's chain list. */ + final HashEntry next; + + /** The referenced object. */ + final Ref ref; + + HashEntry(HashEntry n, Ref r) { + next = n; + ref = r; + } + + private static HashEntry clean(HashEntry top) { + while (top != null && top.ref.next == null) { + top = top.next; + } + if (top == null) { + return null; + } + HashEntry n = clean(top.next); + return n == top.next ? top : new HashEntry(n, top.ref); + } + } + + private EvictKey createEvictKey(Ref<?> ref) { + return new EvictKey(hash, ref); + } + + private static boolean isIndexExtPos(int packExtPos) { + return packExtPos == PackExt.INDEX.getPosition() + || packExtPos == PackExt.REVERSE_INDEX.getPosition() + || packExtPos == PackExt.BITMAP_INDEX.getPosition(); + } + + private static int tableSize(DfsBlockCacheConfig cfg) { + final int wsz = cfg.getBlockSize(); + final long limit = cfg.getBlockLimit(); + if (wsz <= 0) { + throw new IllegalArgumentException( + JGitText.get().invalidWindowSize); + } + if (limit < wsz) { + throw new IllegalArgumentException( + JGitText.get().windowSizeMustBeLesserThanLimit); + } + return (int) Math.min(5 * (limit / wsz) / 2, Integer.MAX_VALUE); + } + + private static final class Hash { + /** + * As {@link #blockSize} is a power of 2, bits to shift for a / + * blockSize. + */ + private final int blockSizeShift; + + Hash(int blockSizeShift) { + this.blockSizeShift = blockSizeShift; + } + + int hash(int packHash, long off) { + return packHash + (int) (off >>> blockSizeShift); + } + } + + private static final class EvictKey { + /** + * Provides the hash function to be used for this key's hashCode method. + */ + private final Hash hash; + + private final int keyHash; + + private final int packExtPos; + + private final long position; + + EvictKey(Hash hash, Ref<?> ref) { + this.hash = hash; + keyHash = ref.key.hash; + packExtPos = ref.key.packExtPos; + position = ref.position; + } + + @Override + public boolean equals(Object object) { + if (object instanceof EvictKey) { + EvictKey other = (EvictKey) object; + return keyHash == other.keyHash + && packExtPos == other.packExtPos + && position == other.position; + } + return false; + } + + @Override + public int hashCode() { + return hash.hash(keyHash, position); + } + } +} diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsBlockCache.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsBlockCache.java index f7c460c1a8..56719cf0f4 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsBlockCache.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsBlockCache.java @@ -12,18 +12,9 @@ package org.eclipse.jgit.internal.storage.dfs; import java.io.IOException; -import java.time.Duration; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicReferenceArray; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Consumer; import java.util.stream.LongStream; -import org.eclipse.jgit.internal.JGitText; import org.eclipse.jgit.internal.storage.pack.PackExt; /** @@ -36,31 +27,14 @@ import org.eclipse.jgit.internal.storage.pack.PackExt; * reads of only tiny slices of a file, the DfsBlockCache tries to smooth out * these tiny reads into larger block-sized IO operations. * <p> - * Whenever a cache miss occurs, loading is invoked by exactly one thread for - * the given <code>(DfsStreamKey,position)</code> key tuple. This is ensured by - * an array of locks, with the tuple hashed to a lock instance. - * <p> * Its too expensive during object access to be accurate with a least recently * used (LRU) algorithm. Strictly ordering every read is a lot of overhead that - * typically doesn't yield a corresponding benefit to the application. This - * cache implements a clock replacement algorithm, giving each block at least - * one chance to have been accessed during a sweep of the cache to save itself - * from eviction. The number of swipe chances is configurable per pack - * extension. - * <p> - * Entities created by the cache are held under hard references, preventing the - * Java VM from clearing anything. Blocks are discarded by the replacement - * algorithm when adding a new block would cause the cache to exceed its - * configured maximum size. + * typically doesn't yield a corresponding benefit to the application. * <p> * The key tuple is passed through to methods as a pair of parameters rather * than as a single Object, thus reducing the transient memory allocations of * callers. It is more efficient to avoid the allocation, as we can't be 100% * sure that a JIT would be able to stack-allocate a key tuple. - * <p> - * The internal hash table does not expand at runtime, instead it is fixed in - * size at cache creation time. The internal lock table used to gate load - * invocations is also fixed in size. */ public final class DfsBlockCache { private static volatile DfsBlockCache cache; @@ -94,24 +68,7 @@ public final class DfsBlockCache { return cache; } - /** Number of entries in {@link #table}. */ - private final int tableSize; - - /** Hash bucket directory; entries are chained below. */ - private final AtomicReferenceArray<HashEntry> table; - - /** - * Locks to prevent concurrent loads for same (PackFile,position) block. The - * number of locks is {@link DfsBlockCacheConfig#getConcurrencyLevel()} to - * cap the overall concurrent block loads. - */ - private final ReentrantLock[] loadLocks; - - /** - * A separate pool of locks per pack extension to prevent concurrent loads - * for same index or bitmap from PackFile. - */ - private final ReentrantLock[][] refLocks; + private final DfsBlockCacheTable dfsBlockCacheTable; /** Maximum number of bytes the cache should hold. */ private final long maxBytes; @@ -131,89 +88,16 @@ public final class DfsBlockCache { */ private final int blockSize; - /** As {@link #blockSize} is a power of 2, bits to shift for a / blockSize. */ - private final int blockSizeShift; - - /** - * Number of times a block was found in the cache, per pack file extension. - */ - private final AtomicReference<AtomicLong[]> statHit; - - /** - * Number of times a block was not found, and had to be loaded, per pack - * file extension. - */ - private final AtomicReference<AtomicLong[]> statMiss; - - /** - * Number of blocks evicted due to cache being full, per pack file - * extension. - */ - private final AtomicReference<AtomicLong[]> statEvict; - - /** - * Number of bytes currently loaded in the cache, per pack file extension. - */ - private final AtomicReference<AtomicLong[]> liveBytes; - - /** Protects the clock and its related data. */ - private final ReentrantLock clockLock; - - /** - * A consumer of object reference lock wait time milliseconds. May be used to build a metric. - */ - private final Consumer<Long> refLockWaitTime; - - /** Current position of the clock. */ - private Ref clockHand; - /** Limits of cache hot count per pack file extension. */ private final int[] cacheHotLimits = new int[PackExt.values().length]; - /** Consumer of loading and eviction events of indexes. */ - private final DfsBlockCacheConfig.IndexEventConsumer indexEventConsumer; - - /** Stores timestamps of the last eviction of indexes. */ - private final Map<EvictKey, Long> indexEvictionMap = new ConcurrentHashMap<>(); - - @SuppressWarnings("unchecked") private DfsBlockCache(DfsBlockCacheConfig cfg) { - tableSize = tableSize(cfg); - if (tableSize < 1) { - throw new IllegalArgumentException(JGitText.get().tSizeMustBeGreaterOrEqual1); - } - - table = new AtomicReferenceArray<>(tableSize); - int concurrencyLevel = cfg.getConcurrencyLevel(); - loadLocks = new ReentrantLock[concurrencyLevel]; - for (int i = 0; i < loadLocks.length; i++) { - loadLocks[i] = new ReentrantLock(true /* fair */); - } - refLocks = new ReentrantLock[PackExt.values().length][concurrencyLevel]; - for (int i = 0; i < PackExt.values().length; i++) { - for (int j = 0; j < concurrencyLevel; ++j) { - refLocks[i][j] = new ReentrantLock(true /* fair */); - } - } - maxBytes = cfg.getBlockLimit(); - maxStreamThroughCache = (long) (maxBytes * cfg.getStreamRatio()); blockSize = cfg.getBlockSize(); - blockSizeShift = Integer.numberOfTrailingZeros(blockSize); - - clockLock = new ReentrantLock(true /* fair */); - String none = ""; //$NON-NLS-1$ - clockHand = new Ref<>( - DfsStreamKey.of(new DfsRepositoryDescription(none), none, null), - -1, 0, null); - clockHand.next = clockHand; - - statHit = new AtomicReference<>(newCounters()); - statMiss = new AtomicReference<>(newCounters()); - statEvict = new AtomicReference<>(newCounters()); - liveBytes = new AtomicReference<>(newCounters()); + double streamRatio = cfg.getStreamRatio(); + maxStreamThroughCache = (long) (maxBytes * streamRatio); - refLockWaitTime = cfg.getRefLockWaitTimeConsumer(); + dfsBlockCacheTable = new ClockBlockCacheTable(cfg); for (int i = 0; i < PackExt.values().length; ++i) { Integer limit = cfg.getCacheHotMap().get(PackExt.values()[i]); @@ -223,7 +107,6 @@ public final class DfsBlockCache { cacheHotLimits[i] = DfsBlockCacheConfig.DEFAULT_CACHE_HOT_MAX; } } - indexEventConsumer = cfg.getIndexEventConsumer(); } boolean shouldCopyThroughCache(long length) { @@ -236,7 +119,7 @@ public final class DfsBlockCache { * @return total number of bytes in the cache, per pack file extension. */ public long[] getCurrentSize() { - return getStatVals(liveBytes); + return dfsBlockCacheTable.getDfsBlockCacheStats().getCurrentSize(); } /** @@ -255,7 +138,7 @@ public final class DfsBlockCache { * extension. */ public long[] getHitCount() { - return getStatVals(statHit); + return dfsBlockCacheTable.getDfsBlockCacheStats().getHitCount(); } /** @@ -266,7 +149,7 @@ public final class DfsBlockCache { * extension. */ public long[] getMissCount() { - return getStatVals(statMiss); + return dfsBlockCacheTable.getDfsBlockCacheStats().getMissCount(); } /** @@ -275,16 +158,8 @@ public final class DfsBlockCache { * @return total number of requests (hit + miss), per pack file extension. */ public long[] getTotalRequestCount() { - AtomicLong[] hit = statHit.get(); - AtomicLong[] miss = statMiss.get(); - long[] cnt = new long[Math.max(hit.length, miss.length)]; - for (int i = 0; i < hit.length; i++) { - cnt[i] += hit[i].get(); - } - for (int i = 0; i < miss.length; i++) { - cnt[i] += miss[i].get(); - } - return cnt; + return dfsBlockCacheTable.getDfsBlockCacheStats() + .getTotalRequestCount(); } /** @@ -293,22 +168,7 @@ public final class DfsBlockCache { * @return hit ratios */ public long[] getHitRatio() { - AtomicLong[] hit = statHit.get(); - AtomicLong[] miss = statMiss.get(); - long[] ratio = new long[Math.max(hit.length, miss.length)]; - for (int i = 0; i < ratio.length; i++) { - if (i >= hit.length) { - ratio[i] = 0; - } else if (i >= miss.length) { - ratio[i] = 100; - } else { - long hitVal = hit[i].get(); - long missVal = miss[i].get(); - long total = hitVal + missVal; - ratio[i] = total == 0 ? 0 : hitVal * 100 / total; - } - } - return ratio; + return dfsBlockCacheTable.getDfsBlockCacheStats().getHitRatio(); } /** @@ -319,7 +179,7 @@ public final class DfsBlockCache { * file extension. */ public long[] getEvictions() { - return getStatVals(statEvict); + return dfsBlockCacheTable.getDfsBlockCacheStats().getEvictions(); } /** @@ -334,31 +194,13 @@ public final class DfsBlockCache { * @return true if block 0 (the first block) is in the cache. */ public boolean hasBlock0(DfsStreamKey key) { - HashEntry e1 = table.get(slot(key, 0)); - DfsBlock v = scan(e1, key, 0); - return v != null && v.contains(key, 0); - } - - private int hash(int packHash, long off) { - return packHash + (int) (off >>> blockSizeShift); + return dfsBlockCacheTable.hasBlock0(key); } int getBlockSize() { return blockSize; } - private static int tableSize(DfsBlockCacheConfig cfg) { - final int wsz = cfg.getBlockSize(); - final long limit = cfg.getBlockLimit(); - if (wsz <= 0) { - throw new IllegalArgumentException(JGitText.get().invalidWindowSize); - } - if (limit < wsz) { - throw new IllegalArgumentException(JGitText.get().windowSizeMustBeLesserThanLimit); - } - return (int) Math.min(5 * (limit / wsz) / 2, Integer.MAX_VALUE); - } - /** * Look up a cached object, creating and loading it if it doesn't exist. * @@ -376,139 +218,11 @@ public final class DfsBlockCache { */ DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader ctx, ReadableChannelSupplier fileChannel) throws IOException { - final long requestedPosition = position; - position = file.alignToBlock(position); - - DfsStreamKey key = file.key; - int slot = slot(key, position); - HashEntry e1 = table.get(slot); - DfsBlock v = scan(e1, key, position); - if (v != null && v.contains(key, requestedPosition)) { - ctx.stats.blockCacheHit++; - getStat(statHit, key).incrementAndGet(); - return v; - } - - reserveSpace(blockSize, key); - ReentrantLock regionLock = lockFor(key, position); - regionLock.lock(); - try { - HashEntry e2 = table.get(slot); - if (e2 != e1) { - v = scan(e2, key, position); - if (v != null) { - ctx.stats.blockCacheHit++; - getStat(statHit, key).incrementAndGet(); - creditSpace(blockSize, key); - return v; - } - } - - getStat(statMiss, key).incrementAndGet(); - boolean credit = true; - try { - v = file.readOneBlock(position, ctx, fileChannel.get()); - credit = false; - } finally { - if (credit) { - creditSpace(blockSize, key); - } - } - if (position != v.start) { - // The file discovered its blockSize and adjusted. - position = v.start; - slot = slot(key, position); - e2 = table.get(slot); - } - - Ref<DfsBlock> ref = new Ref<>(key, position, v.size(), v); - ref.markHotter(); - for (;;) { - HashEntry n = new HashEntry(clean(e2), ref); - if (table.compareAndSet(slot, e2, n)) { - break; - } - e2 = table.get(slot); - } - addToClock(ref, blockSize - v.size()); - } finally { - regionLock.unlock(); - } - - // If the block size changed from the default, it is possible the block - // that was loaded is the wrong block for the requested position. - if (v.contains(file.key, requestedPosition)) { - return v; - } - return getOrLoad(file, requestedPosition, ctx, fileChannel); - } - - @SuppressWarnings("unchecked") - private void reserveSpace(long reserve, DfsStreamKey key) { - clockLock.lock(); - try { - long live = LongStream.of(getCurrentSize()).sum() + reserve; - if (maxBytes < live) { - Ref prev = clockHand; - Ref hand = clockHand.next; - do { - if (hand.isHot()) { - // Value was recently touched. Cache is still hot so - // give it another chance, but cool it down a bit. - hand.markColder(); - prev = hand; - hand = hand.next; - continue; - } else if (prev == hand) - break; - - // No recent access since last scan, kill - // value and remove from clock. - Ref dead = hand; - hand = hand.next; - prev.next = hand; - dead.next = null; - dead.value = null; - live -= dead.size; - getStat(liveBytes, dead.key).addAndGet(-dead.size); - getStat(statEvict, dead.key).incrementAndGet(); - reportIndexEvicted(dead); - } while (maxBytes < live); - clockHand = prev; - } - getStat(liveBytes, key).addAndGet(reserve); - } finally { - clockLock.unlock(); - } - } - - private void creditSpace(long credit, DfsStreamKey key) { - clockLock.lock(); - try { - getStat(liveBytes, key).addAndGet(-credit); - } finally { - clockLock.unlock(); - } - } - - @SuppressWarnings("unchecked") - private void addToClock(Ref ref, long credit) { - clockLock.lock(); - try { - if (credit != 0) { - getStat(liveBytes, ref.key).addAndGet(-credit); - } - Ref ptr = clockHand; - ref.next = ptr.next; - ptr.next = ref; - clockHand = ref; - } finally { - clockLock.unlock(); - } + return dfsBlockCacheTable.getOrLoad(file, position, ctx, fileChannel); } void put(DfsBlock v) { - put(v.stream, v.start, v.size(), v); + dfsBlockCacheTable.put(v); } /** @@ -524,252 +238,41 @@ public final class DfsBlockCache { * @throws IOException * the reference was not in the cache and could not be loaded. */ - <T> Ref<T> getOrLoadRef( - DfsStreamKey key, long position, RefLoader<T> loader) - throws IOException { - long start = System.nanoTime(); - int slot = slot(key, position); - HashEntry e1 = table.get(slot); - Ref<T> ref = scanRef(e1, key, position); - if (ref != null) { - getStat(statHit, key).incrementAndGet(); - reportIndexRequested(ref, true /* cacheHit */, start); - return ref; - } - - ReentrantLock regionLock = lockForRef(key); - long lockStart = System.currentTimeMillis(); - regionLock.lock(); - try { - HashEntry e2 = table.get(slot); - if (e2 != e1) { - ref = scanRef(e2, key, position); - if (ref != null) { - getStat(statHit, key).incrementAndGet(); - reportIndexRequested(ref, true /* cacheHit */, - start); - return ref; - } - } - - if (refLockWaitTime != null) { - refLockWaitTime.accept( - Long.valueOf(System.currentTimeMillis() - lockStart)); - } - getStat(statMiss, key).incrementAndGet(); - ref = loader.load(); - ref.markHotter(); - // Reserve after loading to get the size of the object - reserveSpace(ref.size, key); - for (;;) { - HashEntry n = new HashEntry(clean(e2), ref); - if (table.compareAndSet(slot, e2, n)) { - break; - } - e2 = table.get(slot); - } - addToClock(ref, 0); - } finally { - regionLock.unlock(); - } - reportIndexRequested(ref, false /* cacheHit */, start); - return ref; + <T> Ref<T> getOrLoadRef(DfsStreamKey key, long position, + RefLoader<T> loader) throws IOException { + return dfsBlockCacheTable.getOrLoadRef(key, position, loader); } <T> Ref<T> putRef(DfsStreamKey key, long size, T v) { - return put(key, 0, size, v); + return dfsBlockCacheTable.putRef(key, size, v); } <T> Ref<T> put(DfsStreamKey key, long pos, long size, T v) { - int slot = slot(key, pos); - HashEntry e1 = table.get(slot); - Ref<T> ref = scanRef(e1, key, pos); - if (ref != null) { - return ref; - } - - reserveSpace(size, key); - ReentrantLock regionLock = lockFor(key, pos); - regionLock.lock(); - try { - HashEntry e2 = table.get(slot); - if (e2 != e1) { - ref = scanRef(e2, key, pos); - if (ref != null) { - creditSpace(size, key); - return ref; - } - } - - ref = new Ref<>(key, pos, size, v); - ref.markHotter(); - for (;;) { - HashEntry n = new HashEntry(clean(e2), ref); - if (table.compareAndSet(slot, e2, n)) { - break; - } - e2 = table.get(slot); - } - addToClock(ref, 0); - } finally { - regionLock.unlock(); - } - return ref; + return dfsBlockCacheTable.put(key, pos, size, v); } boolean contains(DfsStreamKey key, long position) { - return scan(table.get(slot(key, position)), key, position) != null; + return dfsBlockCacheTable.contains(key, position); } - @SuppressWarnings("unchecked") <T> T get(DfsStreamKey key, long position) { - T val = (T) scan(table.get(slot(key, position)), key, position); - if (val == null) { - getStat(statMiss, key).incrementAndGet(); - } else { - getStat(statHit, key).incrementAndGet(); - } - return val; - } - - private <T> T scan(HashEntry n, DfsStreamKey key, long position) { - Ref<T> r = scanRef(n, key, position); - return r != null ? r.get() : null; - } - - @SuppressWarnings("unchecked") - private <T> Ref<T> scanRef(HashEntry n, DfsStreamKey key, long position) { - for (; n != null; n = n.next) { - Ref<T> r = n.ref; - if (r.position == position && r.key.equals(key)) { - return r.get() != null ? r : null; - } - } - return null; - } - - private int slot(DfsStreamKey key, long position) { - return (hash(key.hash, position) >>> 1) % tableSize; - } - - private ReentrantLock lockFor(DfsStreamKey key, long position) { - return loadLocks[(hash(key.hash, position) >>> 1) % loadLocks.length]; - } - - private ReentrantLock lockForRef(DfsStreamKey key) { - int slot = (key.hash >>> 1) % refLocks[key.packExtPos].length; - return refLocks[key.packExtPos][slot]; - } - - private static AtomicLong[] newCounters() { - AtomicLong[] ret = new AtomicLong[PackExt.values().length]; - for (int i = 0; i < ret.length; i++) { - ret[i] = new AtomicLong(); - } - return ret; - } - - private static AtomicLong getStat(AtomicReference<AtomicLong[]> stats, - DfsStreamKey key) { - int pos = key.packExtPos; - while (true) { - AtomicLong[] vals = stats.get(); - if (pos < vals.length) { - return vals[pos]; - } - AtomicLong[] expect = vals; - vals = new AtomicLong[Math.max(pos + 1, PackExt.values().length)]; - System.arraycopy(expect, 0, vals, 0, expect.length); - for (int i = expect.length; i < vals.length; i++) { - vals[i] = new AtomicLong(); - } - if (stats.compareAndSet(expect, vals)) { - return vals[pos]; - } - } - } - - private static long[] getStatVals(AtomicReference<AtomicLong[]> stat) { - AtomicLong[] stats = stat.get(); - long[] cnt = new long[stats.length]; - for (int i = 0; i < stats.length; i++) { - cnt[i] = stats[i].get(); - } - return cnt; - } - - private static HashEntry clean(HashEntry top) { - while (top != null && top.ref.next == null) { - top = top.next; - } - if (top == null) { - return null; - } - HashEntry n = clean(top.next); - return n == top.next ? top : new HashEntry(n, top.ref); - } - - private void reportIndexRequested(Ref<?> ref, boolean cacheHit, - long start) { - if (indexEventConsumer == null - || !isIndexExtPos(ref.key.packExtPos)) { - return; - } - EvictKey evictKey = new EvictKey(ref); - Long prevEvictedTime = indexEvictionMap.get(evictKey); - long now = System.nanoTime(); - long sinceLastEvictionNanos = prevEvictedTime == null ? 0L - : now - prevEvictedTime.longValue(); - indexEventConsumer.acceptRequestedEvent(ref.key.packExtPos, cacheHit, - (now - start) / 1000L /* micros */, ref.size, - Duration.ofNanos(sinceLastEvictionNanos)); - } - - private void reportIndexEvicted(Ref<?> dead) { - if (indexEventConsumer == null - || !indexEventConsumer.shouldReportEvictedEvent() - || !isIndexExtPos(dead.key.packExtPos)) { - return; - } - EvictKey evictKey = new EvictKey(dead); - Long prevEvictedTime = indexEvictionMap.get(evictKey); - long now = System.nanoTime(); - long sinceLastEvictionNanos = prevEvictedTime == null ? 0L - : now - prevEvictedTime.longValue(); - indexEvictionMap.put(evictKey, Long.valueOf(now)); - indexEventConsumer.acceptEvictedEvent(dead.key.packExtPos, dead.size, - dead.totalHitCount.get(), - Duration.ofNanos(sinceLastEvictionNanos)); - } - - private static boolean isIndexExtPos(int packExtPos) { - return packExtPos == PackExt.INDEX.getPosition() - || packExtPos == PackExt.REVERSE_INDEX.getPosition() - || packExtPos == PackExt.BITMAP_INDEX.getPosition(); - } - - private static final class HashEntry { - /** Next entry in the hash table's chain list. */ - final HashEntry next; - - /** The referenced object. */ - final Ref ref; - - HashEntry(HashEntry n, Ref r) { - next = n; - ref = r; - } + return dfsBlockCacheTable.get(key, position); } static final class Ref<T> { final DfsStreamKey key; + final long position; + final long size; + volatile T value; + Ref next; private volatile int hotCount; - private AtomicInteger totalHitCount = new AtomicInteger(); + + private final AtomicInteger totalHitCount = new AtomicInteger(); Ref(DfsStreamKey key, long position, long size, T v) { this.key = key; @@ -804,33 +307,9 @@ public final class DfsBlockCache { boolean isHot() { return hotCount > 0; } - } - - private static final class EvictKey { - private final int keyHash; - private final int packExtPos; - private final long position; - - EvictKey(Ref<?> ref) { - keyHash = ref.key.hash; - packExtPos = ref.key.packExtPos; - position = ref.position; - } - - @Override - public boolean equals(Object object) { - if (object instanceof EvictKey) { - EvictKey other = (EvictKey) object; - return keyHash == other.keyHash - && packExtPos == other.packExtPos - && position == other.position; - } - return false; - } - @Override - public int hashCode() { - return DfsBlockCache.getInstance().hash(keyHash, position); + int getTotalHitCount() { + return totalHitCount.get(); } } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsBlockCacheTable.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsBlockCacheTable.java new file mode 100644 index 0000000000..701d1fdce3 --- /dev/null +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsBlockCacheTable.java @@ -0,0 +1,340 @@ +/* + * Copyright (c) 2024, Google LLC and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Distribution License v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +package org.eclipse.jgit.internal.storage.dfs; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jgit.internal.storage.pack.PackExt; + +/** + * Block cache table. + */ +public interface DfsBlockCacheTable { + /** + * Quickly check if the cache contains block 0 of the given stream. + * <p> + * This can be useful for sophisticated pre-read algorithms to quickly + * determine if a file is likely already in cache, especially small + * reftables which may be smaller than a typical DFS block size. + * + * @param key + * the file to check. + * @return true if block 0 (the first block) is in the cache. + */ + boolean hasBlock0(DfsStreamKey key); + + /** + * Look up a cached object, creating and loading it if it doesn't exist. + * + * @param file + * the pack that "contains" the cached object. + * @param position + * offset within <code>pack</code> of the object. + * @param dfsReader + * current thread's reader. + * @param fileChannel + * supplier for channel to read {@code pack}. + * @return the object reference. + * @throws IOException + * the reference was not in the cache and could not be loaded. + */ + DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader dfsReader, + DfsBlockCache.ReadableChannelSupplier fileChannel) + throws IOException; + + /** + * Look up a cached object, creating and loading it if it doesn't exist. + * + * @param key + * the stream key of the pack. + * @param position + * the position in the key. The default should be 0. + * @param loader + * the function to load the reference. + * @return the object reference. + * @throws IOException + * the reference was not in the cache and could not be loaded. + */ + <T> DfsBlockCache.Ref<T> getOrLoadRef(DfsStreamKey key, long position, + DfsBlockCache.RefLoader<T> loader) throws IOException; + + /** + * Put a block in the block cache. + * + * @param v + * the block to put in the cache. + */ + void put(DfsBlock v); + + /** + * Put a block in the block cache. + * + * @param key + * the stream key of the pack. + * @param pos + * the position in the key. + * @param size + * the size of the object. + * @param v + * the object to put in the block cache. + * @return the object reference. + */ + <T> DfsBlockCache.Ref<T> put(DfsStreamKey key, long pos, long size, T v); + + /** + * Put an object in the block cache. + * + * @param key + * the stream key of the pack. + * @param size + * the size of the object. + * @param v + * the object to put in the block cache. + * @return the object reference. + */ + <T> DfsBlockCache.Ref<T> putRef(DfsStreamKey key, long size, T v); + + /** + * Check if the block cache contains an object identified by (key, + * position). + * + * @param key + * the stream key of the pack. + * @param position + * the position in the key. + * @return if the block cache contains the object identified by (key, + * position). + */ + boolean contains(DfsStreamKey key, long position); + + /** + * Get the object identified by (key, position) from the block cache. + * + * @param key + * the stream key of the pack. + * @param position + * the position in the key. + * @return the object identified by (key, position). + */ + <T> T get(DfsStreamKey key, long position); + + /** + * Get the DfsBlockCacheStats object for this block cache table's + * statistics. + * + * @return the DfsBlockCacheStats tracking this block cache table's + * statistics. + */ + DfsBlockCacheStats getDfsBlockCacheStats(); + + /** + * Keeps track of stats for a Block Cache table. + */ + class DfsBlockCacheStats { + /** + * Number of times a block was found in the cache, per pack file + * extension. + */ + private final AtomicReference<AtomicLong[]> statHit; + + /** + * Number of times a block was not found, and had to be loaded, per pack + * file extension. + */ + private final AtomicReference<AtomicLong[]> statMiss; + + /** + * Number of blocks evicted due to cache being full, per pack file + * extension. + */ + private final AtomicReference<AtomicLong[]> statEvict; + + /** + * Number of bytes currently loaded in the cache, per pack file + * extension. + */ + private final AtomicReference<AtomicLong[]> liveBytes; + + DfsBlockCacheStats() { + statHit = new AtomicReference<>(newCounters()); + statMiss = new AtomicReference<>(newCounters()); + statEvict = new AtomicReference<>(newCounters()); + liveBytes = new AtomicReference<>(newCounters()); + } + + /** + * Increment the {@code statHit} count. + * + * @param key + * key identifying which liveBytes entry to update. + */ + void incrementHit(DfsStreamKey key) { + getStat(statHit, key).incrementAndGet(); + } + + /** + * Increment the {@code statMiss} count. + * + * @param key + * key identifying which liveBytes entry to update. + */ + void incrementMiss(DfsStreamKey key) { + getStat(statMiss, key).incrementAndGet(); + } + + /** + * Increment the {@code statEvict} count. + * + * @param key + * key identifying which liveBytes entry to update. + */ + void incrementEvict(DfsStreamKey key) { + getStat(statEvict, key).incrementAndGet(); + } + + /** + * Add {@code size} to the {@code liveBytes} count. + * + * @param key + * key identifying which liveBytes entry to update. + * @param size + * amount to increment the count by. + */ + void addToLiveBytes(DfsStreamKey key, long size) { + getStat(liveBytes, key).addAndGet(size); + } + + /** + * Get total number of bytes in the cache, per pack file extension. + * + * @return total number of bytes in the cache, per pack file extension. + */ + long[] getCurrentSize() { + return getStatVals(liveBytes); + } + + /** + * Get number of requests for items in the cache, per pack file + * extension. + * + * @return the number of requests for items in the cache, per pack file + * extension. + */ + long[] getHitCount() { + return getStatVals(statHit); + } + + /** + * Get number of requests for items not in the cache, per pack file + * extension. + * + * @return the number of requests for items not in the cache, per pack + * file extension. + */ + long[] getMissCount() { + return getStatVals(statMiss); + } + + /** + * Get total number of requests (hit + miss), per pack file extension. + * + * @return total number of requests (hit + miss), per pack file + * extension. + */ + long[] getTotalRequestCount() { + AtomicLong[] hit = statHit.get(); + AtomicLong[] miss = statMiss.get(); + long[] cnt = new long[Math.max(hit.length, miss.length)]; + for (int i = 0; i < hit.length; i++) { + cnt[i] += hit[i].get(); + } + for (int i = 0; i < miss.length; i++) { + cnt[i] += miss[i].get(); + } + return cnt; + } + + /** + * Get hit ratios. + * + * @return hit ratios. + */ + long[] getHitRatio() { + AtomicLong[] hit = statHit.get(); + AtomicLong[] miss = statMiss.get(); + long[] ratio = new long[Math.max(hit.length, miss.length)]; + for (int i = 0; i < ratio.length; i++) { + if (i >= hit.length) { + ratio[i] = 0; + } else if (i >= miss.length) { + ratio[i] = 100; + } else { + long hitVal = hit[i].get(); + long missVal = miss[i].get(); + long total = hitVal + missVal; + ratio[i] = total == 0 ? 0 : hitVal * 100 / total; + } + } + return ratio; + } + + /** + * Get number of evictions performed due to cache being full, per pack + * file extension. + * + * @return the number of evictions performed due to cache being full, + * per pack file extension. + */ + long[] getEvictions() { + return getStatVals(statEvict); + } + + private static AtomicLong[] newCounters() { + AtomicLong[] ret = new AtomicLong[PackExt.values().length]; + for (int i = 0; i < ret.length; i++) { + ret[i] = new AtomicLong(); + } + return ret; + } + + private static long[] getStatVals(AtomicReference<AtomicLong[]> stat) { + AtomicLong[] stats = stat.get(); + long[] cnt = new long[stats.length]; + for (int i = 0; i < stats.length; i++) { + cnt[i] = stats[i].get(); + } + return cnt; + } + + private static AtomicLong getStat(AtomicReference<AtomicLong[]> stats, + DfsStreamKey key) { + int pos = key.packExtPos; + while (true) { + AtomicLong[] vals = stats.get(); + if (pos < vals.length) { + return vals[pos]; + } + AtomicLong[] expect = vals; + vals = new AtomicLong[Math.max(pos + 1, + PackExt.values().length)]; + System.arraycopy(expect, 0, vals, 0, expect.length); + for (int i = expect.length; i < vals.length; i++) { + vals[i] = new AtomicLong(); + } + if (stats.compareAndSet(expect, vals)) { + return vals[pos]; + } + } + } + } +}
\ No newline at end of file |