You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

DfsBlockCache.java 20KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733
  1. /*
  2. * Copyright (C) 2008-2011, Google Inc.
  3. * Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org> and others
  4. *
  5. * This program and the accompanying materials are made available under the
  6. * terms of the Eclipse Distribution License v. 1.0 which is available at
  7. * https://www.eclipse.org/org/documents/edl-v10.php.
  8. *
  9. * SPDX-License-Identifier: BSD-3-Clause
  10. */
  11. package org.eclipse.jgit.internal.storage.dfs;
  12. import java.io.IOException;
  13. import java.util.concurrent.atomic.AtomicLong;
  14. import java.util.concurrent.atomic.AtomicReference;
  15. import java.util.concurrent.atomic.AtomicReferenceArray;
  16. import java.util.concurrent.locks.ReentrantLock;
  17. import java.util.function.Consumer;
  18. import java.util.stream.LongStream;
  19. import org.eclipse.jgit.internal.JGitText;
  20. import org.eclipse.jgit.internal.storage.pack.PackExt;
  21. /**
  22. * Caches slices of a
  23. * {@link org.eclipse.jgit.internal.storage.dfs.BlockBasedFile} in memory for
  24. * faster read access.
  25. * <p>
  26. * The DfsBlockCache serves as a Java based "buffer cache", loading segments of
  27. * a BlockBasedFile into the JVM heap prior to use. As JGit often wants to do
  28. * reads of only tiny slices of a file, the DfsBlockCache tries to smooth out
  29. * these tiny reads into larger block-sized IO operations.
  30. * <p>
  31. * Whenever a cache miss occurs, loading is invoked by exactly one thread for
  32. * the given <code>(DfsStreamKey,position)</code> key tuple. This is ensured by
  33. * an array of locks, with the tuple hashed to a lock instance.
  34. * <p>
  35. * Its too expensive during object access to be accurate with a least recently
  36. * used (LRU) algorithm. Strictly ordering every read is a lot of overhead that
  37. * typically doesn't yield a corresponding benefit to the application. This
  38. * cache implements a clock replacement algorithm, giving each block one chance
  39. * to have been accessed during a sweep of the cache to save itself from
  40. * eviction.
  41. * <p>
  42. * Entities created by the cache are held under hard references, preventing the
  43. * Java VM from clearing anything. Blocks are discarded by the replacement
  44. * algorithm when adding a new block would cause the cache to exceed its
  45. * configured maximum size.
  46. * <p>
  47. * The key tuple is passed through to methods as a pair of parameters rather
  48. * than as a single Object, thus reducing the transient memory allocations of
  49. * callers. It is more efficient to avoid the allocation, as we can't be 100%
  50. * sure that a JIT would be able to stack-allocate a key tuple.
  51. * <p>
  52. * The internal hash table does not expand at runtime, instead it is fixed in
  53. * size at cache creation time. The internal lock table used to gate load
  54. * invocations is also fixed in size.
  55. */
  56. public final class DfsBlockCache {
  57. private static volatile DfsBlockCache cache;
  58. static {
  59. reconfigure(new DfsBlockCacheConfig());
  60. }
  61. /**
  62. * Modify the configuration of the window cache.
  63. * <p>
  64. * The new configuration is applied immediately, and the existing cache is
  65. * cleared.
  66. *
  67. * @param cfg
  68. * the new window cache configuration.
  69. * @throws java.lang.IllegalArgumentException
  70. * the cache configuration contains one or more invalid
  71. * settings, usually too low of a limit.
  72. */
  73. public static void reconfigure(DfsBlockCacheConfig cfg) {
  74. cache = new DfsBlockCache(cfg);
  75. }
  76. /**
  77. * Get the currently active DfsBlockCache.
  78. *
  79. * @return the currently active DfsBlockCache.
  80. */
  81. public static DfsBlockCache getInstance() {
  82. return cache;
  83. }
  84. /** Number of entries in {@link #table}. */
  85. private final int tableSize;
  86. /** Hash bucket directory; entries are chained below. */
  87. private final AtomicReferenceArray<HashEntry> table;
  88. /**
  89. * Locks to prevent concurrent loads for same (PackFile,position) block. The
  90. * number of locks is {@link DfsBlockCacheConfig#getConcurrencyLevel()} to
  91. * cap the overall concurrent block loads.
  92. */
  93. private final ReentrantLock[] loadLocks;
  94. /**
  95. * A separate pool of locks to prevent concurrent loads for same index or bitmap from PackFile.
  96. */
  97. private final ReentrantLock[] refLocks;
  98. /** Maximum number of bytes the cache should hold. */
  99. private final long maxBytes;
  100. /** Pack files smaller than this size can be copied through the cache. */
  101. private final long maxStreamThroughCache;
  102. /**
  103. * Suggested block size to read from pack files in.
  104. * <p>
  105. * If a pack file does not have a native block size, this size will be used.
  106. * <p>
  107. * If a pack file has a native size, a whole multiple of the native size
  108. * will be used until it matches this size.
  109. * <p>
  110. * The value for blockSize must be a power of 2.
  111. */
  112. private final int blockSize;
  113. /** As {@link #blockSize} is a power of 2, bits to shift for a / blockSize. */
  114. private final int blockSizeShift;
  115. /**
  116. * Number of times a block was found in the cache, per pack file extension.
  117. */
  118. private final AtomicReference<AtomicLong[]> statHit;
  119. /**
  120. * Number of times a block was not found, and had to be loaded, per pack
  121. * file extension.
  122. */
  123. private final AtomicReference<AtomicLong[]> statMiss;
  124. /**
  125. * Number of blocks evicted due to cache being full, per pack file
  126. * extension.
  127. */
  128. private final AtomicReference<AtomicLong[]> statEvict;
  129. /**
  130. * Number of bytes currently loaded in the cache, per pack file extension.
  131. */
  132. private final AtomicReference<AtomicLong[]> liveBytes;
  133. /** Protects the clock and its related data. */
  134. private final ReentrantLock clockLock;
  135. /**
  136. * A consumer of object reference lock wait time milliseconds. May be used to build a metric.
  137. */
  138. private final Consumer<Long> refLockWaitTime;
  139. /** Current position of the clock. */
  140. private Ref clockHand;
  141. @SuppressWarnings("unchecked")
  142. private DfsBlockCache(DfsBlockCacheConfig cfg) {
  143. tableSize = tableSize(cfg);
  144. if (tableSize < 1) {
  145. throw new IllegalArgumentException(JGitText.get().tSizeMustBeGreaterOrEqual1);
  146. }
  147. table = new AtomicReferenceArray<>(tableSize);
  148. loadLocks = new ReentrantLock[cfg.getConcurrencyLevel()];
  149. for (int i = 0; i < loadLocks.length; i++) {
  150. loadLocks[i] = new ReentrantLock(true /* fair */);
  151. }
  152. refLocks = new ReentrantLock[cfg.getConcurrencyLevel()];
  153. for (int i = 0; i < refLocks.length; i++) {
  154. refLocks[i] = new ReentrantLock(true /* fair */);
  155. }
  156. maxBytes = cfg.getBlockLimit();
  157. maxStreamThroughCache = (long) (maxBytes * cfg.getStreamRatio());
  158. blockSize = cfg.getBlockSize();
  159. blockSizeShift = Integer.numberOfTrailingZeros(blockSize);
  160. clockLock = new ReentrantLock(true /* fair */);
  161. String none = ""; //$NON-NLS-1$
  162. clockHand = new Ref<>(
  163. DfsStreamKey.of(new DfsRepositoryDescription(none), none, null),
  164. -1, 0, null);
  165. clockHand.next = clockHand;
  166. statHit = new AtomicReference<>(newCounters());
  167. statMiss = new AtomicReference<>(newCounters());
  168. statEvict = new AtomicReference<>(newCounters());
  169. liveBytes = new AtomicReference<>(newCounters());
  170. refLockWaitTime = cfg.getRefLockWaitTimeConsumer();
  171. }
  172. boolean shouldCopyThroughCache(long length) {
  173. return length <= maxStreamThroughCache;
  174. }
  175. /**
  176. * Get total number of bytes in the cache, per pack file extension.
  177. *
  178. * @return total number of bytes in the cache, per pack file extension.
  179. */
  180. public long[] getCurrentSize() {
  181. return getStatVals(liveBytes);
  182. }
  183. /**
  184. * Get 0..100, defining how full the cache is.
  185. *
  186. * @return 0..100, defining how full the cache is.
  187. */
  188. public long getFillPercentage() {
  189. return LongStream.of(getCurrentSize()).sum() * 100 / maxBytes;
  190. }
  191. /**
  192. * Get number of requests for items in the cache, per pack file extension.
  193. *
  194. * @return number of requests for items in the cache, per pack file
  195. * extension.
  196. */
  197. public long[] getHitCount() {
  198. return getStatVals(statHit);
  199. }
  200. /**
  201. * Get number of requests for items not in the cache, per pack file
  202. * extension.
  203. *
  204. * @return number of requests for items not in the cache, per pack file
  205. * extension.
  206. */
  207. public long[] getMissCount() {
  208. return getStatVals(statMiss);
  209. }
  210. /**
  211. * Get total number of requests (hit + miss), per pack file extension.
  212. *
  213. * @return total number of requests (hit + miss), per pack file extension.
  214. */
  215. public long[] getTotalRequestCount() {
  216. AtomicLong[] hit = statHit.get();
  217. AtomicLong[] miss = statMiss.get();
  218. long[] cnt = new long[Math.max(hit.length, miss.length)];
  219. for (int i = 0; i < hit.length; i++) {
  220. cnt[i] += hit[i].get();
  221. }
  222. for (int i = 0; i < miss.length; i++) {
  223. cnt[i] += miss[i].get();
  224. }
  225. return cnt;
  226. }
  227. /**
  228. * Get hit ratios
  229. *
  230. * @return hit ratios
  231. */
  232. public long[] getHitRatio() {
  233. AtomicLong[] hit = statHit.get();
  234. AtomicLong[] miss = statMiss.get();
  235. long[] ratio = new long[Math.max(hit.length, miss.length)];
  236. for (int i = 0; i < ratio.length; i++) {
  237. if (i >= hit.length) {
  238. ratio[i] = 0;
  239. } else if (i >= miss.length) {
  240. ratio[i] = 100;
  241. } else {
  242. long hitVal = hit[i].get();
  243. long missVal = miss[i].get();
  244. long total = hitVal + missVal;
  245. ratio[i] = total == 0 ? 0 : hitVal * 100 / total;
  246. }
  247. }
  248. return ratio;
  249. }
  250. /**
  251. * Get number of evictions performed due to cache being full, per pack file
  252. * extension.
  253. *
  254. * @return number of evictions performed due to cache being full, per pack
  255. * file extension.
  256. */
  257. public long[] getEvictions() {
  258. return getStatVals(statEvict);
  259. }
  260. /**
  261. * Quickly check if the cache contains block 0 of the given stream.
  262. * <p>
  263. * This can be useful for sophisticated pre-read algorithms to quickly
  264. * determine if a file is likely already in cache, especially small
  265. * reftables which may be smaller than a typical DFS block size.
  266. *
  267. * @param key
  268. * the file to check.
  269. * @return true if block 0 (the first block) is in the cache.
  270. */
  271. public boolean hasBlock0(DfsStreamKey key) {
  272. HashEntry e1 = table.get(slot(key, 0));
  273. DfsBlock v = scan(e1, key, 0);
  274. return v != null && v.contains(key, 0);
  275. }
  276. private int hash(int packHash, long off) {
  277. return packHash + (int) (off >>> blockSizeShift);
  278. }
  279. int getBlockSize() {
  280. return blockSize;
  281. }
  282. private static int tableSize(DfsBlockCacheConfig cfg) {
  283. final int wsz = cfg.getBlockSize();
  284. final long limit = cfg.getBlockLimit();
  285. if (wsz <= 0) {
  286. throw new IllegalArgumentException(JGitText.get().invalidWindowSize);
  287. }
  288. if (limit < wsz) {
  289. throw new IllegalArgumentException(JGitText.get().windowSizeMustBeLesserThanLimit);
  290. }
  291. return (int) Math.min(5 * (limit / wsz) / 2, Integer.MAX_VALUE);
  292. }
  293. /**
  294. * Look up a cached object, creating and loading it if it doesn't exist.
  295. *
  296. * @param file
  297. * the pack that "contains" the cached object.
  298. * @param position
  299. * offset within <code>pack</code> of the object.
  300. * @param ctx
  301. * current thread's reader.
  302. * @param fileChannel
  303. * supplier for channel to read {@code pack}.
  304. * @return the object reference.
  305. * @throws IOException
  306. * the reference was not in the cache and could not be loaded.
  307. */
  308. DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader ctx,
  309. ReadableChannelSupplier fileChannel) throws IOException {
  310. final long requestedPosition = position;
  311. position = file.alignToBlock(position);
  312. DfsStreamKey key = file.key;
  313. int slot = slot(key, position);
  314. HashEntry e1 = table.get(slot);
  315. DfsBlock v = scan(e1, key, position);
  316. if (v != null && v.contains(key, requestedPosition)) {
  317. ctx.stats.blockCacheHit++;
  318. getStat(statHit, key).incrementAndGet();
  319. return v;
  320. }
  321. reserveSpace(blockSize, key);
  322. ReentrantLock regionLock = lockFor(key, position);
  323. regionLock.lock();
  324. try {
  325. HashEntry e2 = table.get(slot);
  326. if (e2 != e1) {
  327. v = scan(e2, key, position);
  328. if (v != null) {
  329. ctx.stats.blockCacheHit++;
  330. getStat(statHit, key).incrementAndGet();
  331. creditSpace(blockSize, key);
  332. return v;
  333. }
  334. }
  335. getStat(statMiss, key).incrementAndGet();
  336. boolean credit = true;
  337. try {
  338. v = file.readOneBlock(position, ctx, fileChannel.get());
  339. credit = false;
  340. } finally {
  341. if (credit) {
  342. creditSpace(blockSize, key);
  343. }
  344. }
  345. if (position != v.start) {
  346. // The file discovered its blockSize and adjusted.
  347. position = v.start;
  348. slot = slot(key, position);
  349. e2 = table.get(slot);
  350. }
  351. Ref<DfsBlock> ref = new Ref<>(key, position, v.size(), v);
  352. ref.hot = true;
  353. for (;;) {
  354. HashEntry n = new HashEntry(clean(e2), ref);
  355. if (table.compareAndSet(slot, e2, n)) {
  356. break;
  357. }
  358. e2 = table.get(slot);
  359. }
  360. addToClock(ref, blockSize - v.size());
  361. } finally {
  362. regionLock.unlock();
  363. }
  364. // If the block size changed from the default, it is possible the block
  365. // that was loaded is the wrong block for the requested position.
  366. if (v.contains(file.key, requestedPosition)) {
  367. return v;
  368. }
  369. return getOrLoad(file, requestedPosition, ctx, fileChannel);
  370. }
  371. @SuppressWarnings("unchecked")
  372. private void reserveSpace(long reserve, DfsStreamKey key) {
  373. clockLock.lock();
  374. try {
  375. long live = LongStream.of(getCurrentSize()).sum() + reserve;
  376. if (maxBytes < live) {
  377. Ref prev = clockHand;
  378. Ref hand = clockHand.next;
  379. do {
  380. if (hand.hot) {
  381. // Value was recently touched. Clear
  382. // hot and give it another chance.
  383. hand.hot = false;
  384. prev = hand;
  385. hand = hand.next;
  386. continue;
  387. } else if (prev == hand)
  388. break;
  389. // No recent access since last scan, kill
  390. // value and remove from clock.
  391. Ref dead = hand;
  392. hand = hand.next;
  393. prev.next = hand;
  394. dead.next = null;
  395. dead.value = null;
  396. live -= dead.size;
  397. getStat(liveBytes, dead.key).addAndGet(-dead.size);
  398. getStat(statEvict, dead.key).incrementAndGet();
  399. } while (maxBytes < live);
  400. clockHand = prev;
  401. }
  402. getStat(liveBytes, key).addAndGet(reserve);
  403. } finally {
  404. clockLock.unlock();
  405. }
  406. }
  407. private void creditSpace(long credit, DfsStreamKey key) {
  408. clockLock.lock();
  409. try {
  410. getStat(liveBytes, key).addAndGet(-credit);
  411. } finally {
  412. clockLock.unlock();
  413. }
  414. }
  415. @SuppressWarnings("unchecked")
  416. private void addToClock(Ref ref, long credit) {
  417. clockLock.lock();
  418. try {
  419. if (credit != 0) {
  420. getStat(liveBytes, ref.key).addAndGet(-credit);
  421. }
  422. Ref ptr = clockHand;
  423. ref.next = ptr.next;
  424. ptr.next = ref;
  425. clockHand = ref;
  426. } finally {
  427. clockLock.unlock();
  428. }
  429. }
  430. void put(DfsBlock v) {
  431. put(v.stream, v.start, v.size(), v);
  432. }
  433. /**
  434. * Look up a cached object, creating and loading it if it doesn't exist.
  435. *
  436. * @param key
  437. * the stream key of the pack.
  438. * @param position
  439. * the position in the key. The default should be 0.
  440. * @param loader
  441. * the function to load the reference.
  442. * @return the object reference.
  443. * @throws IOException
  444. * the reference was not in the cache and could not be loaded.
  445. */
  446. <T> Ref<T> getOrLoadRef(
  447. DfsStreamKey key, long position, RefLoader<T> loader)
  448. throws IOException {
  449. int slot = slot(key, position);
  450. HashEntry e1 = table.get(slot);
  451. Ref<T> ref = scanRef(e1, key, position);
  452. if (ref != null) {
  453. getStat(statHit, key).incrementAndGet();
  454. return ref;
  455. }
  456. ReentrantLock regionLock = lockForRef(key);
  457. long lockStart = System.currentTimeMillis();
  458. regionLock.lock();
  459. try {
  460. HashEntry e2 = table.get(slot);
  461. if (e2 != e1) {
  462. ref = scanRef(e2, key, position);
  463. if (ref != null) {
  464. getStat(statHit, key).incrementAndGet();
  465. return ref;
  466. }
  467. }
  468. if (refLockWaitTime != null) {
  469. refLockWaitTime.accept(
  470. Long.valueOf(System.currentTimeMillis() - lockStart));
  471. }
  472. getStat(statMiss, key).incrementAndGet();
  473. ref = loader.load();
  474. ref.hot = true;
  475. // Reserve after loading to get the size of the object
  476. reserveSpace(ref.size, key);
  477. for (;;) {
  478. HashEntry n = new HashEntry(clean(e2), ref);
  479. if (table.compareAndSet(slot, e2, n)) {
  480. break;
  481. }
  482. e2 = table.get(slot);
  483. }
  484. addToClock(ref, 0);
  485. } finally {
  486. regionLock.unlock();
  487. }
  488. return ref;
  489. }
  490. <T> Ref<T> putRef(DfsStreamKey key, long size, T v) {
  491. return put(key, 0, size, v);
  492. }
  493. <T> Ref<T> put(DfsStreamKey key, long pos, long size, T v) {
  494. int slot = slot(key, pos);
  495. HashEntry e1 = table.get(slot);
  496. Ref<T> ref = scanRef(e1, key, pos);
  497. if (ref != null) {
  498. return ref;
  499. }
  500. reserveSpace(size, key);
  501. ReentrantLock regionLock = lockFor(key, pos);
  502. regionLock.lock();
  503. try {
  504. HashEntry e2 = table.get(slot);
  505. if (e2 != e1) {
  506. ref = scanRef(e2, key, pos);
  507. if (ref != null) {
  508. creditSpace(size, key);
  509. return ref;
  510. }
  511. }
  512. ref = new Ref<>(key, pos, size, v);
  513. ref.hot = true;
  514. for (;;) {
  515. HashEntry n = new HashEntry(clean(e2), ref);
  516. if (table.compareAndSet(slot, e2, n)) {
  517. break;
  518. }
  519. e2 = table.get(slot);
  520. }
  521. addToClock(ref, 0);
  522. } finally {
  523. regionLock.unlock();
  524. }
  525. return ref;
  526. }
  527. boolean contains(DfsStreamKey key, long position) {
  528. return scan(table.get(slot(key, position)), key, position) != null;
  529. }
  530. @SuppressWarnings("unchecked")
  531. <T> T get(DfsStreamKey key, long position) {
  532. T val = (T) scan(table.get(slot(key, position)), key, position);
  533. if (val == null) {
  534. getStat(statMiss, key).incrementAndGet();
  535. } else {
  536. getStat(statHit, key).incrementAndGet();
  537. }
  538. return val;
  539. }
  540. private <T> T scan(HashEntry n, DfsStreamKey key, long position) {
  541. Ref<T> r = scanRef(n, key, position);
  542. return r != null ? r.get() : null;
  543. }
  544. @SuppressWarnings("unchecked")
  545. private <T> Ref<T> scanRef(HashEntry n, DfsStreamKey key, long position) {
  546. for (; n != null; n = n.next) {
  547. Ref<T> r = n.ref;
  548. if (r.position == position && r.key.equals(key)) {
  549. return r.get() != null ? r : null;
  550. }
  551. }
  552. return null;
  553. }
  554. private int slot(DfsStreamKey key, long position) {
  555. return (hash(key.hash, position) >>> 1) % tableSize;
  556. }
  557. private ReentrantLock lockFor(DfsStreamKey key, long position) {
  558. return loadLocks[(hash(key.hash, position) >>> 1) % loadLocks.length];
  559. }
  560. private ReentrantLock lockForRef(DfsStreamKey key) {
  561. return refLocks[(key.hash >>> 1) % refLocks.length];
  562. }
  563. private static AtomicLong[] newCounters() {
  564. AtomicLong[] ret = new AtomicLong[PackExt.values().length];
  565. for (int i = 0; i < ret.length; i++) {
  566. ret[i] = new AtomicLong();
  567. }
  568. return ret;
  569. }
  570. private static AtomicLong getStat(AtomicReference<AtomicLong[]> stats,
  571. DfsStreamKey key) {
  572. int pos = key.packExtPos;
  573. while (true) {
  574. AtomicLong[] vals = stats.get();
  575. if (pos < vals.length) {
  576. return vals[pos];
  577. }
  578. AtomicLong[] expect = vals;
  579. vals = new AtomicLong[Math.max(pos + 1, PackExt.values().length)];
  580. System.arraycopy(expect, 0, vals, 0, expect.length);
  581. for (int i = expect.length; i < vals.length; i++) {
  582. vals[i] = new AtomicLong();
  583. }
  584. if (stats.compareAndSet(expect, vals)) {
  585. return vals[pos];
  586. }
  587. }
  588. }
  589. private static long[] getStatVals(AtomicReference<AtomicLong[]> stat) {
  590. AtomicLong[] stats = stat.get();
  591. long[] cnt = new long[stats.length];
  592. for (int i = 0; i < stats.length; i++) {
  593. cnt[i] = stats[i].get();
  594. }
  595. return cnt;
  596. }
  597. private static HashEntry clean(HashEntry top) {
  598. while (top != null && top.ref.next == null)
  599. top = top.next;
  600. if (top == null) {
  601. return null;
  602. }
  603. HashEntry n = clean(top.next);
  604. return n == top.next ? top : new HashEntry(n, top.ref);
  605. }
  606. private static final class HashEntry {
  607. /** Next entry in the hash table's chain list. */
  608. final HashEntry next;
  609. /** The referenced object. */
  610. final Ref ref;
  611. HashEntry(HashEntry n, Ref r) {
  612. next = n;
  613. ref = r;
  614. }
  615. }
  616. static final class Ref<T> {
  617. final DfsStreamKey key;
  618. final long position;
  619. final long size;
  620. volatile T value;
  621. Ref next;
  622. volatile boolean hot;
  623. Ref(DfsStreamKey key, long position, long size, T v) {
  624. this.key = key;
  625. this.position = position;
  626. this.size = size;
  627. this.value = v;
  628. }
  629. T get() {
  630. T v = value;
  631. if (v != null) {
  632. hot = true;
  633. }
  634. return v;
  635. }
  636. boolean has() {
  637. return value != null;
  638. }
  639. }
  640. @FunctionalInterface
  641. interface RefLoader<T> {
  642. Ref<T> load() throws IOException;
  643. }
  644. /**
  645. * Supplier for readable channel
  646. */
  647. @FunctionalInterface
  648. interface ReadableChannelSupplier {
  649. /**
  650. * @return ReadableChannel
  651. * @throws IOException
  652. */
  653. ReadableChannel get() throws IOException;
  654. }
  655. }