Add an update_index to every reference in a reftable, storing the exact transaction that last modified the reference. This is necessary to fix some merge race conditions. Consider updates at T1, T3 are present in two reftables. Compacting these will create a table with range [T1,T3]. If T2 arrives during or after the compaction its impossible for readers to know how to merge the [T1,T3] table with the T2 table. With an explicit update_index per reference, MergedReftable is able to individually sort each reference, merging individual entries at T3 from [T1,T3] ahead of identically named entries appearing in T2. Change-Id: Ie4065d4176a5a0207dcab9696ae05d086e042140tags/v4.9.0.201710071750-r
@@ -243,6 +243,7 @@ its value(s). Records are formatted as: | |||
varint( prefix_length ) | |||
varint( (suffix_length << 3) | value_type ) | |||
suffix | |||
varint( update_index_delta ) | |||
value? | |||
The `prefix_length` field specifies how many leading bytes of the | |||
@@ -258,6 +259,10 @@ Recovering a reference name from any `ref_record` is a simple concat: | |||
The `suffix_length` value provides the number of bytes available in | |||
`suffix` to copy from `suffix` to complete the reference name. | |||
The `update_index` that last modified the reference can be obtained by | |||
adding `update_index_delta` to the `min_update_index` from the file | |||
header: `min_update_index + update_index_delta`. | |||
The `value` follows. Its format is determined by `value_type`, one of | |||
the following: | |||
@@ -261,6 +261,41 @@ public class MergedReftableTest { | |||
} | |||
} | |||
@Test | |||
public void missedUpdate() throws IOException { | |||
ByteArrayOutputStream buf = new ByteArrayOutputStream(); | |||
ReftableWriter writer = new ReftableWriter() | |||
.setMinUpdateIndex(1) | |||
.setMaxUpdateIndex(3) | |||
.begin(buf); | |||
writer.writeRef(ref("refs/heads/a", 1), 1); | |||
writer.writeRef(ref("refs/heads/c", 3), 3); | |||
writer.finish(); | |||
byte[] base = buf.toByteArray(); | |||
byte[] delta = write(Arrays.asList( | |||
ref("refs/heads/b", 2), | |||
ref("refs/heads/c", 4)), | |||
2); | |||
MergedReftable mr = merge(base, delta); | |||
try (RefCursor rc = mr.allRefs()) { | |||
assertTrue(rc.next()); | |||
assertEquals("refs/heads/a", rc.getRef().getName()); | |||
assertEquals(id(1), rc.getRef().getObjectId()); | |||
assertEquals(1, rc.getUpdateIndex()); | |||
assertTrue(rc.next()); | |||
assertEquals("refs/heads/b", rc.getRef().getName()); | |||
assertEquals(id(2), rc.getRef().getObjectId()); | |||
assertEquals(2, rc.getUpdateIndex()); | |||
assertTrue(rc.next()); | |||
assertEquals("refs/heads/c", rc.getRef().getName()); | |||
assertEquals(id(3), rc.getRef().getObjectId()); | |||
assertEquals(3, rc.getUpdateIndex()); | |||
} | |||
} | |||
@Test | |||
public void compaction() throws IOException { | |||
List<Ref> delta1 = Arrays.asList( | |||
@@ -322,12 +357,18 @@ public class MergedReftableTest { | |||
} | |||
private byte[] write(Collection<Ref> refs) throws IOException { | |||
return write(refs, 1); | |||
} | |||
private byte[] write(Collection<Ref> refs, long updateIndex) | |||
throws IOException { | |||
ByteArrayOutputStream buffer = new ByteArrayOutputStream(); | |||
ReftableWriter writer = new ReftableWriter().begin(buffer); | |||
for (Ref r : RefComparator.sort(refs)) { | |||
writer.writeRef(r); | |||
} | |||
writer.finish(); | |||
new ReftableWriter() | |||
.setMinUpdateIndex(updateIndex) | |||
.setMaxUpdateIndex(updateIndex) | |||
.begin(buffer) | |||
.sortAndWriteRefs(refs) | |||
.finish(); | |||
return buffer.toByteArray(); | |||
} | |||
@@ -126,7 +126,7 @@ public class ReftableTest { | |||
@Test | |||
public void estimateCurrentBytesOneRef() throws IOException { | |||
Ref exp = ref(MASTER, 1); | |||
int expBytes = 24 + 4 + 5 + 3 + MASTER.length() + 20 + 68; | |||
int expBytes = 24 + 4 + 5 + 4 + MASTER.length() + 20 + 68; | |||
byte[] table; | |||
ReftableConfig cfg = new ReftableConfig(); | |||
@@ -155,7 +155,7 @@ public class ReftableTest { | |||
cfg.setIndexObjects(false); | |||
cfg.setMaxIndexLevels(1); | |||
int expBytes = 139654; | |||
int expBytes = 147860; | |||
byte[] table; | |||
ReftableWriter writer = new ReftableWriter().setConfig(cfg); | |||
try (ByteArrayOutputStream buf = new ByteArrayOutputStream()) { | |||
@@ -174,7 +174,7 @@ public class ReftableTest { | |||
public void oneIdRef() throws IOException { | |||
Ref exp = ref(MASTER, 1); | |||
byte[] table = write(exp); | |||
assertEquals(24 + 4 + 5 + 3 + MASTER.length() + 20 + 68, table.length); | |||
assertEquals(24 + 4 + 5 + 4 + MASTER.length() + 20 + 68, table.length); | |||
ReftableReader t = read(table); | |||
try (RefCursor rc = t.allRefs()) { | |||
@@ -203,7 +203,7 @@ public class ReftableTest { | |||
public void oneTagRef() throws IOException { | |||
Ref exp = tag(V1_0, 1, 2); | |||
byte[] table = write(exp); | |||
assertEquals(24 + 4 + 5 + 2 + V1_0.length() + 40 + 68, table.length); | |||
assertEquals(24 + 4 + 5 + 3 + V1_0.length() + 40 + 68, table.length); | |||
ReftableReader t = read(table); | |||
try (RefCursor rc = t.allRefs()) { | |||
@@ -224,7 +224,7 @@ public class ReftableTest { | |||
Ref exp = sym(HEAD, MASTER); | |||
byte[] table = write(exp); | |||
assertEquals( | |||
24 + 4 + 5 + 2 + HEAD.length() + 1 + MASTER.length() + 68, | |||
24 + 4 + 5 + 2 + HEAD.length() + 2 + MASTER.length() + 68, | |||
table.length); | |||
ReftableReader t = read(table); | |||
@@ -281,7 +281,7 @@ public class ReftableTest { | |||
String name = "refs/heads/gone"; | |||
Ref exp = newRef(name); | |||
byte[] table = write(exp); | |||
assertEquals(24 + 4 + 5 + 2 + name.length() + 68, table.length); | |||
assertEquals(24 + 4 + 5 + 3 + name.length() + 68, table.length); | |||
ReftableReader t = read(table); | |||
try (RefCursor rc = t.allRefs()) { | |||
@@ -425,13 +425,14 @@ public class ReftableTest { | |||
writer.finish(); | |||
byte[] table = buffer.toByteArray(); | |||
assertEquals(245, table.length); | |||
assertEquals(247, table.length); | |||
ReftableReader t = read(table); | |||
try (RefCursor rc = t.allRefs()) { | |||
assertTrue(rc.next()); | |||
assertEquals(MASTER, rc.getRef().getName()); | |||
assertEquals(id(1), rc.getRef().getObjectId()); | |||
assertEquals(1, rc.getUpdateIndex()); | |||
assertTrue(rc.next()); | |||
assertEquals(NEXT, rc.getRef().getName()); | |||
@@ -636,7 +637,7 @@ public class ReftableTest { | |||
writer.finish(); | |||
fail("expected BlockSizeTooSmallException"); | |||
} catch (BlockSizeTooSmallException e) { | |||
assertEquals(84, e.getMinimumBlockSize()); | |||
assertEquals(85, e.getMinimumBlockSize()); | |||
} | |||
} | |||
@@ -166,6 +166,10 @@ class BlockReader { | |||
return readVarint64(); | |||
} | |||
long readUpdateIndexDelta() { | |||
return readVarint64(); | |||
} | |||
Ref readRef() throws IOException { | |||
String name = RawParseUtils.decode(UTF_8, nameBuf, 0, nameLen); | |||
switch (valueType & VALUE_TYPE_MASK) { | |||
@@ -490,6 +494,7 @@ class BlockReader { | |||
void skipValue() { | |||
switch (blockType) { | |||
case REF_BLOCK_TYPE: | |||
readVarint64(); // update_index_delta | |||
switch (valueType & VALUE_TYPE_MASK) { | |||
case VALUE_NONE: | |||
return; |
@@ -354,10 +354,12 @@ class BlockWriter { | |||
static class RefEntry extends Entry { | |||
final Ref ref; | |||
final long updateIndexDelta; | |||
RefEntry(Ref ref) { | |||
RefEntry(Ref ref, long updateIndexDelta) { | |||
super(nameUtf8(ref)); | |||
this.ref = ref; | |||
this.updateIndexDelta = updateIndexDelta; | |||
} | |||
@Override | |||
@@ -380,17 +382,18 @@ class BlockWriter { | |||
@Override | |||
int valueSize() { | |||
int n = computeVarintSize(updateIndexDelta); | |||
switch (valueType()) { | |||
case VALUE_NONE: | |||
return 0; | |||
return n; | |||
case VALUE_1ID: | |||
return OBJECT_ID_LENGTH; | |||
return n + OBJECT_ID_LENGTH; | |||
case VALUE_2ID: | |||
return 2 * OBJECT_ID_LENGTH; | |||
return n + 2 * OBJECT_ID_LENGTH; | |||
case VALUE_SYMREF: | |||
if (ref.isSymbolic()) { | |||
int nameLen = nameUtf8(ref.getTarget()).length; | |||
return computeVarintSize(nameLen) + nameLen; | |||
return n + computeVarintSize(nameLen) + nameLen; | |||
} | |||
} | |||
throw new IllegalStateException(); | |||
@@ -398,6 +401,7 @@ class BlockWriter { | |||
@Override | |||
void writeValue(ReftableOutputStream os) throws IOException { | |||
os.writeVarint(updateIndexDelta); | |||
switch (valueType()) { | |||
case VALUE_NONE: | |||
return; |
@@ -100,13 +100,6 @@ public class MergedReftable extends Reftable { | |||
@Override | |||
public RefCursor seekRef(String name) throws IOException { | |||
if (name.endsWith("/")) { //$NON-NLS-1$ | |||
return seekRefPrefix(name); | |||
} | |||
return seekSingleRef(name); | |||
} | |||
private RefCursor seekRefPrefix(String name) throws IOException { | |||
MergedRefCursor m = new MergedRefCursor(); | |||
for (int i = 0; i < tables.length; i++) { | |||
m.add(new RefQueueEntry(tables[i].seekRef(name), i)); | |||
@@ -114,17 +107,6 @@ public class MergedReftable extends Reftable { | |||
return m; | |||
} | |||
private RefCursor seekSingleRef(String name) throws IOException { | |||
// Walk the tables from highest priority (end of list) to lowest. | |||
// As soon as the reference is found (queue not empty), all lower | |||
// priority tables are irrelevant as current table shadows them. | |||
MergedRefCursor m = new MergedRefCursor(); | |||
for (int i = tables.length - 1; i >= 0 && m.queue.isEmpty(); i--) { | |||
m.add(new RefQueueEntry(tables[i].seekRef(name), i)); | |||
} | |||
return m; | |||
} | |||
@Override | |||
public RefCursor byObjectId(AnyObjectId name) throws IOException { | |||
MergedRefCursor m = new MergedRefCursor(); | |||
@@ -168,6 +150,7 @@ public class MergedReftable extends Reftable { | |||
private final PriorityQueue<RefQueueEntry> queue; | |||
private RefQueueEntry head; | |||
private Ref ref; | |||
private long updateIndex; | |||
MergedRefCursor() { | |||
queue = new PriorityQueue<>(queueSize(), RefQueueEntry::compare); | |||
@@ -205,6 +188,7 @@ public class MergedReftable extends Reftable { | |||
} | |||
ref = t.rc.getRef(); | |||
updateIndex = t.rc.getUpdateIndex(); | |||
boolean include = includeDeletes || !t.rc.wasDeleted(); | |||
skipShadowedRefs(ref.getName()); | |||
add(t); | |||
@@ -239,8 +223,17 @@ public class MergedReftable extends Reftable { | |||
return ref; | |||
} | |||
@Override | |||
public long getUpdateIndex() { | |||
return updateIndex; | |||
} | |||
@Override | |||
public void close() { | |||
if (head != null) { | |||
head.rc.close(); | |||
head = null; | |||
} | |||
while (!queue.isEmpty()) { | |||
queue.remove().rc.close(); | |||
} | |||
@@ -250,6 +243,10 @@ public class MergedReftable extends Reftable { | |||
private static class RefQueueEntry { | |||
static int compare(RefQueueEntry a, RefQueueEntry b) { | |||
int cmp = a.name().compareTo(b.name()); | |||
if (cmp == 0) { | |||
// higher updateIndex shadows lower updateIndex. | |||
cmp = Long.signum(b.updateIndex() - a.updateIndex()); | |||
} | |||
if (cmp == 0) { | |||
// higher index shadows lower index, so higher index first. | |||
cmp = b.stackIdx - a.stackIdx; | |||
@@ -268,6 +265,10 @@ public class MergedReftable extends Reftable { | |||
String name() { | |||
return rc.getRef().getName(); | |||
} | |||
long updateIndex() { | |||
return rc.getUpdateIndex(); | |||
} | |||
} | |||
private class MergedLogCursor extends LogCursor { |
@@ -61,6 +61,9 @@ public abstract class RefCursor implements AutoCloseable { | |||
/** @return reference at the current position. */ | |||
public abstract Ref getRef(); | |||
/** @return updateIndex that last modified the current reference, */ | |||
public abstract long getUpdateIndex(); | |||
/** @return {@code true} if the current reference was deleted. */ | |||
public boolean wasDeleted() { | |||
Ref r = getRef(); |
@@ -220,7 +220,7 @@ public class ReftableCompactor { | |||
private void mergeRefs(MergedReftable mr) throws IOException { | |||
try (RefCursor rc = mr.allRefs()) { | |||
while (rc.next()) { | |||
writer.writeRef(rc.getRef()); | |||
writer.writeRef(rc.getRef(), rc.getUpdateIndex()); | |||
} | |||
} | |||
} |
@@ -455,6 +455,7 @@ public class ReftableReader extends Reftable { | |||
private final boolean prefix; | |||
private Ref ref; | |||
private long updateIndex; | |||
BlockReader block; | |||
RefCursorImpl(long scanEnd, byte[] match, boolean prefix) { | |||
@@ -483,6 +484,7 @@ public class ReftableReader extends Reftable { | |||
return false; | |||
} | |||
updateIndex = minUpdateIndex + block.readUpdateIndexDelta(); | |||
ref = block.readRef(); | |||
if (!includeDeletes && wasDeleted()) { | |||
continue; | |||
@@ -496,6 +498,11 @@ public class ReftableReader extends Reftable { | |||
return ref; | |||
} | |||
@Override | |||
public long getUpdateIndex() { | |||
return updateIndex; | |||
} | |||
@Override | |||
public void close() { | |||
// Do nothing. | |||
@@ -574,6 +581,7 @@ public class ReftableReader extends Reftable { | |||
private final ObjectId match; | |||
private Ref ref; | |||
private long updateIndex; | |||
private int listIdx; | |||
private LongList blockPos; | |||
@@ -647,6 +655,7 @@ public class ReftableReader extends Reftable { | |||
} | |||
block.parseKey(); | |||
updateIndex = minUpdateIndex + block.readUpdateIndexDelta(); | |||
ref = block.readRef(); | |||
ObjectId id = ref.getObjectId(); | |||
if (id != null && match.equals(id) | |||
@@ -661,6 +670,11 @@ public class ReftableReader extends Reftable { | |||
return ref; | |||
} | |||
@Override | |||
public long getUpdateIndex() { | |||
return updateIndex; | |||
} | |||
@Override | |||
public void close() { | |||
// Do nothing. |
@@ -214,7 +214,7 @@ public class ReftableWriter { | |||
public ReftableWriter sortAndWriteRefs(Collection<Ref> refsToPack) | |||
throws IOException { | |||
Iterator<RefEntry> itr = refsToPack.stream() | |||
.map(RefEntry::new) | |||
.map(r -> new RefEntry(r, maxUpdateIndex - minUpdateIndex)) | |||
.sorted(Entry::compare) | |||
.iterator(); | |||
while (itr.hasNext()) { | |||
@@ -236,7 +236,28 @@ public class ReftableWriter { | |||
* if reftable cannot be written. | |||
*/ | |||
public void writeRef(Ref ref) throws IOException { | |||
long blockPos = refs.write(new RefEntry(ref)); | |||
writeRef(ref, maxUpdateIndex); | |||
} | |||
/** | |||
* Write one reference to the reftable. | |||
* <p> | |||
* References must be passed in sorted order. | |||
* | |||
* @param ref | |||
* the reference to store. | |||
* @param updateIndex | |||
* the updateIndex that modified this reference. Must be | |||
* {@code >= minUpdateIndex} for this file. | |||
* @throws IOException | |||
* if reftable cannot be written. | |||
*/ | |||
public void writeRef(Ref ref, long updateIndex) throws IOException { | |||
if (updateIndex < minUpdateIndex) { | |||
throw new IllegalArgumentException(); | |||
} | |||
long d = updateIndex - minUpdateIndex; | |||
long blockPos = refs.write(new RefEntry(ref, d)); | |||
indexRef(ref, blockPos); | |||
} | |||