You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

MergedReftable.java 9.6KB

Simplify ReftableCompactor The ReftableCompactor supported a byteLimit, but this is currently unused. The FileReftableStack has a more sophisticated strategy that amortizes compaction costs. Rename min/maxUpdateIndex to reflogExpire{Min,Max}UpdateIndex to reflect their purpose more accurately. Since reflogs are generally pruned chronologically (oldest entries are expired first), one can only prune entries on full compaction, so they should not be set by default. Rephrase the function Reader#minUpdateIndex and maxUpdateIndex. These vars are documented to affect log entries, but semantically, they are about ref entries. Since ref entries have their timestamps delta-compressed, it is important for the min/maxUpdateIndex values to be coherent between different tables. The logical timestamps for log entries do not have to be coherent in different tables, as the timestamps of a log entry is part of the key. For example, a table written at update index 20 may contain a tombstone log entry at timestamp 1. Therefore, we set ReftableWriter's min/maxUpdateIndex from the merged tables we are compacting, rather than from the compaction settings (which should only control reflog expiry.) The previous behavior could drop log entries erroneously, especially in the presence of tombstone log entries. Unfortunately, testing this properly requires both an API for adding log tombstones, and a more refined API for controlling automatic compaction. Hence, no test. Change-Id: I2f4eb7866f607fddd0629809e8e61f0b9097717f Signed-off-by: Han-Wen Nienhuys <hanwen@google.com>
4 年之前
Simplify ReftableCompactor The ReftableCompactor supported a byteLimit, but this is currently unused. The FileReftableStack has a more sophisticated strategy that amortizes compaction costs. Rename min/maxUpdateIndex to reflogExpire{Min,Max}UpdateIndex to reflect their purpose more accurately. Since reflogs are generally pruned chronologically (oldest entries are expired first), one can only prune entries on full compaction, so they should not be set by default. Rephrase the function Reader#minUpdateIndex and maxUpdateIndex. These vars are documented to affect log entries, but semantically, they are about ref entries. Since ref entries have their timestamps delta-compressed, it is important for the min/maxUpdateIndex values to be coherent between different tables. The logical timestamps for log entries do not have to be coherent in different tables, as the timestamps of a log entry is part of the key. For example, a table written at update index 20 may contain a tombstone log entry at timestamp 1. Therefore, we set ReftableWriter's min/maxUpdateIndex from the merged tables we are compacting, rather than from the compaction settings (which should only control reflog expiry.) The previous behavior could drop log entries erroneously, especially in the presence of tombstone log entries. Unfortunately, testing this properly requires both an API for adding log tombstones, and a more refined API for controlling automatic compaction. Hence, no test. Change-Id: I2f4eb7866f607fddd0629809e8e61f0b9097717f Signed-off-by: Han-Wen Nienhuys <hanwen@google.com>
4 年之前
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. /*
  2. * Copyright (C) 2017, Google Inc. and others
  3. *
  4. * This program and the accompanying materials are made available under the
  5. * terms of the Eclipse Distribution License v. 1.0 which is available at
  6. * https://www.eclipse.org/org/documents/edl-v10.php.
  7. *
  8. * SPDX-License-Identifier: BSD-3-Clause
  9. */
  10. package org.eclipse.jgit.internal.storage.reftable;
  11. import java.io.IOException;
  12. import java.util.List;
  13. import java.util.PriorityQueue;
  14. import org.eclipse.jgit.lib.AnyObjectId;
  15. import org.eclipse.jgit.lib.Ref;
  16. import org.eclipse.jgit.lib.ReflogEntry;
  17. /**
  18. * Merges multiple reference tables together.
  19. * <p>
  20. * A {@link org.eclipse.jgit.internal.storage.reftable.MergedReftable}
  21. * merge-joins multiple
  22. * {@link org.eclipse.jgit.internal.storage.reftable.ReftableReader} on the fly.
  23. * Tables higher/later in the stack shadow lower/earlier tables, hiding
  24. * references that been updated/replaced.
  25. * <p>
  26. * By default deleted references are skipped and not returned to the caller.
  27. * {@link #setIncludeDeletes(boolean)} can be used to modify this behavior if
  28. * the caller needs to preserve deletions during partial compaction.
  29. * <p>
  30. * A {@code MergedReftable} is not thread-safe.
  31. */
  32. public class MergedReftable extends Reftable {
  33. private final ReftableReader[] tables;
  34. /**
  35. * Initialize a merged table reader.
  36. * <p>
  37. *
  38. * @param tableStack
  39. * stack of tables to read from. The base of the stack is at
  40. * index 0, the most recent should be at the top of the stack at
  41. * {@code tableStack.size() - 1}. The top of the stack (higher
  42. * index) shadows the base of the stack (lower index).
  43. */
  44. public MergedReftable(List<ReftableReader> tableStack) {
  45. tables = tableStack.toArray(new ReftableReader[0]);
  46. // Tables must expose deletes to this instance to correctly
  47. // shadow references from lower tables.
  48. for (ReftableReader t : tables) {
  49. t.setIncludeDeletes(true);
  50. }
  51. }
  52. /**
  53. * {@inheritDoc}
  54. */
  55. @Override
  56. public long maxUpdateIndex() throws IOException {
  57. if (tables.length == 0) {
  58. return 0;
  59. }
  60. long maxUpdateIndex = tables[tables.length - 1].maxUpdateIndex();
  61. for (int i = tables.length - 2; i >= 0; i--) {
  62. if (maxUpdateIndex < tables[i].maxUpdateIndex()) {
  63. maxUpdateIndex = tables[i].maxUpdateIndex();
  64. }
  65. }
  66. return maxUpdateIndex;
  67. }
  68. /**
  69. * {@inheritDoc}
  70. */
  71. @Override
  72. public long minUpdateIndex() throws IOException {
  73. if (tables.length == 0) {
  74. return 0;
  75. }
  76. long minUpdateIndex = tables[0].minUpdateIndex();
  77. for (int i = 0; i < tables.length - 1; i++) {
  78. if (tables[i].minUpdateIndex() < minUpdateIndex) {
  79. minUpdateIndex = tables[i].minUpdateIndex();
  80. }
  81. }
  82. return minUpdateIndex;
  83. }
  84. /** {@inheritDoc} */
  85. @Override
  86. public boolean hasObjectMap() throws IOException {
  87. boolean has = true;
  88. for (int i = 0; has && i < tables.length; i++) {
  89. has = has && tables[i].hasObjectMap();
  90. }
  91. return has;
  92. }
  93. /** {@inheritDoc} */
  94. @Override
  95. public RefCursor allRefs() throws IOException {
  96. MergedRefCursor m = new MergedRefCursor();
  97. for (int i = 0; i < tables.length; i++) {
  98. m.add(new RefQueueEntry(tables[i].allRefs(), i));
  99. }
  100. return m;
  101. }
  102. /** {@inheritDoc} */
  103. @Override
  104. public RefCursor seekRef(String name) throws IOException {
  105. MergedRefCursor m = new MergedRefCursor();
  106. for (int i = 0; i < tables.length; i++) {
  107. m.add(new RefQueueEntry(tables[i].seekRef(name), i));
  108. }
  109. return m;
  110. }
  111. /** {@inheritDoc} */
  112. @Override
  113. public RefCursor seekRefsWithPrefix(String prefix) throws IOException {
  114. MergedRefCursor m = new MergedRefCursor();
  115. for (int i = 0; i < tables.length; i++) {
  116. m.add(new RefQueueEntry(tables[i].seekRefsWithPrefix(prefix), i));
  117. }
  118. return m;
  119. }
  120. /** {@inheritDoc} */
  121. @Override
  122. public RefCursor byObjectId(AnyObjectId name) throws IOException {
  123. MergedRefCursor m = new FilteringMergedRefCursor(name);
  124. for (int i = 0; i < tables.length; i++) {
  125. m.add(new RefQueueEntry(tables[i].byObjectId(name), i));
  126. }
  127. return m;
  128. }
  129. /** {@inheritDoc} */
  130. @Override
  131. public LogCursor allLogs() throws IOException {
  132. MergedLogCursor m = new MergedLogCursor();
  133. for (int i = 0; i < tables.length; i++) {
  134. m.add(new LogQueueEntry(tables[i].allLogs(), i));
  135. }
  136. return m;
  137. }
  138. /** {@inheritDoc} */
  139. @Override
  140. public LogCursor seekLog(String refName, long updateIdx)
  141. throws IOException {
  142. MergedLogCursor m = new MergedLogCursor();
  143. for (int i = 0; i < tables.length; i++) {
  144. m.add(new LogQueueEntry(tables[i].seekLog(refName, updateIdx), i));
  145. }
  146. return m;
  147. }
  148. int queueSize() {
  149. return Math.max(1, tables.length);
  150. }
  151. private class MergedRefCursor extends RefCursor {
  152. private final PriorityQueue<RefQueueEntry> queue;
  153. private RefQueueEntry head;
  154. private Ref ref;
  155. MergedRefCursor() {
  156. queue = new PriorityQueue<>(queueSize(), RefQueueEntry::compare);
  157. }
  158. void add(RefQueueEntry t) throws IOException {
  159. // Common case is many iterations over the same RefQueueEntry
  160. // for the bottom of the stack (scanning all refs). Its almost
  161. // always less than the top of the queue. Avoid the queue's
  162. // O(log N) insertion and removal costs for this common case.
  163. if (!t.rc.next()) {
  164. t.rc.close();
  165. } else if (head == null) {
  166. RefQueueEntry p = queue.peek();
  167. if (p == null || RefQueueEntry.compare(t, p) < 0) {
  168. head = t;
  169. } else {
  170. head = queue.poll();
  171. queue.add(t);
  172. }
  173. } else if (RefQueueEntry.compare(t, head) > 0) {
  174. queue.add(t);
  175. } else {
  176. queue.add(head);
  177. head = t;
  178. }
  179. }
  180. @Override
  181. public boolean next() throws IOException {
  182. for (;;) {
  183. RefQueueEntry t = poll();
  184. if (t == null) {
  185. return false;
  186. }
  187. ref = t.rc.getRef();
  188. boolean include = includeDeletes || !t.rc.wasDeleted();
  189. add(t);
  190. skipShadowedRefs(ref.getName());
  191. if (include) {
  192. return true;
  193. }
  194. }
  195. }
  196. private RefQueueEntry poll() {
  197. RefQueueEntry e = head;
  198. if (e != null) {
  199. head = null;
  200. return e;
  201. }
  202. return queue.poll();
  203. }
  204. private void skipShadowedRefs(String name) throws IOException {
  205. for (;;) {
  206. RefQueueEntry t = head != null ? head : queue.peek();
  207. if (t != null && name.equals(t.name())) {
  208. add(poll());
  209. } else {
  210. break;
  211. }
  212. }
  213. }
  214. @Override
  215. public Ref getRef() {
  216. return ref;
  217. }
  218. @Override
  219. public void close() {
  220. if (head != null) {
  221. head.rc.close();
  222. head = null;
  223. }
  224. while (!queue.isEmpty()) {
  225. queue.remove().rc.close();
  226. }
  227. }
  228. }
  229. private class FilteringMergedRefCursor extends MergedRefCursor {
  230. final AnyObjectId filterId;
  231. Ref filteredRef;
  232. FilteringMergedRefCursor(AnyObjectId id) {
  233. filterId = id;
  234. filteredRef = null;
  235. }
  236. @Override
  237. public Ref getRef() {
  238. return filteredRef;
  239. }
  240. @Override
  241. public boolean next() throws IOException {
  242. for (;;) {
  243. boolean ok = super.next();
  244. if (!ok) {
  245. return false;
  246. }
  247. String name = super.getRef().getName();
  248. try (RefCursor c = seekRef(name)) {
  249. if (c.next()) {
  250. if (filterId.equals(c.getRef().getObjectId())) {
  251. filteredRef = c.getRef();
  252. return true;
  253. }
  254. }
  255. }
  256. }
  257. }
  258. }
  259. private static class RefQueueEntry {
  260. static int compare(RefQueueEntry a, RefQueueEntry b) {
  261. int cmp = a.name().compareTo(b.name());
  262. if (cmp == 0) {
  263. // higher updateIndex shadows lower updateIndex.
  264. cmp = Long.signum(b.updateIndex() - a.updateIndex());
  265. }
  266. if (cmp == 0) {
  267. // higher index shadows lower index, so higher index first.
  268. cmp = b.stackIdx - a.stackIdx;
  269. }
  270. return cmp;
  271. }
  272. final RefCursor rc;
  273. final int stackIdx;
  274. RefQueueEntry(RefCursor rc, int stackIdx) {
  275. this.rc = rc;
  276. this.stackIdx = stackIdx;
  277. }
  278. String name() {
  279. return rc.getRef().getName();
  280. }
  281. long updateIndex() {
  282. return rc.getRef().getUpdateIndex();
  283. }
  284. }
  285. private class MergedLogCursor extends LogCursor {
  286. private final PriorityQueue<LogQueueEntry> queue;
  287. private String refName;
  288. private long updateIndex;
  289. private ReflogEntry entry;
  290. MergedLogCursor() {
  291. queue = new PriorityQueue<>(queueSize(), LogQueueEntry::compare);
  292. }
  293. void add(LogQueueEntry t) throws IOException {
  294. if (t.lc.next()) {
  295. queue.add(t);
  296. } else {
  297. t.lc.close();
  298. }
  299. }
  300. @Override
  301. public boolean next() throws IOException {
  302. for (;;) {
  303. LogQueueEntry t = queue.poll();
  304. if (t == null) {
  305. return false;
  306. }
  307. refName = t.lc.getRefName();
  308. updateIndex = t.lc.getUpdateIndex();
  309. entry = t.lc.getReflogEntry();
  310. boolean include = includeDeletes || entry != null;
  311. skipShadowed(refName, updateIndex);
  312. add(t);
  313. if (include) {
  314. return true;
  315. }
  316. }
  317. }
  318. private void skipShadowed(String name, long index) throws IOException {
  319. for (;;) {
  320. LogQueueEntry t = queue.peek();
  321. if (t != null && name.equals(t.name()) && index == t.index()) {
  322. add(queue.remove());
  323. } else {
  324. break;
  325. }
  326. }
  327. }
  328. @Override
  329. public String getRefName() {
  330. return refName;
  331. }
  332. @Override
  333. public long getUpdateIndex() {
  334. return updateIndex;
  335. }
  336. @Override
  337. public ReflogEntry getReflogEntry() {
  338. return entry;
  339. }
  340. @Override
  341. public void close() {
  342. while (!queue.isEmpty()) {
  343. queue.remove().lc.close();
  344. }
  345. }
  346. }
  347. private static class LogQueueEntry {
  348. static int compare(LogQueueEntry a, LogQueueEntry b) {
  349. int cmp = a.name().compareTo(b.name());
  350. if (cmp == 0) {
  351. // higher update index sorts first.
  352. cmp = Long.signum(b.index() - a.index());
  353. }
  354. if (cmp == 0) {
  355. // higher index comes first.
  356. cmp = b.stackIdx - a.stackIdx;
  357. }
  358. return cmp;
  359. }
  360. final LogCursor lc;
  361. final int stackIdx;
  362. LogQueueEntry(LogCursor lc, int stackIdx) {
  363. this.lc = lc;
  364. this.stackIdx = stackIdx;
  365. }
  366. String name() {
  367. return lc.getRefName();
  368. }
  369. long index() {
  370. return lc.getUpdateIndex();
  371. }
  372. }
  373. }