Browse Source

Add a method to ObjectInserter to read back inserted objects

In the DFS implementation, flushing an inserter writes a new pack to
the storage system and is potentially very slow, but was the only way
to ensure previously-inserted objects were available.  For some tasks,
like performing a series of three-way merges, the total size of all
inserted objects may be small enough to avoid flushing the in-memory
buffered data.

DfsOutputStream already provides a read method to read back from the
not-yet-flushed data, so use this to provide an ObjectReader in the
DFS case.

In the file-backed case, objects are written out loosely on the fly,
so the implementation can just return the existing WindowCursor.

Change-Id: I454fdfb88f4d215e31b7da2b2a069853b197b3dd
tags/v3.5.0.201409071800-rc1
Dave Borowitz 11 years ago
parent
commit
e1856dbf44

+ 175
- 0
org.eclipse.jgit.test/tst/org/eclipse/jgit/internal/storage/dfs/DfsInserterTest.java View File

@@ -0,0 +1,175 @@
/*
* Copyright (C) 2013, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
* under the terms of the Eclipse Distribution License v1.0 which
* accompanies this distribution, is reproduced below, and is
* available at http://www.eclipse.org/org/documents/edl-v10.php
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
*
* - Neither the name of the Eclipse Foundation, Inc. nor the
* names of its contributors may be used to endorse or promote
* products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
* CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package org.eclipse.jgit.internal.storage.dfs;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.zip.Deflater;

import org.eclipse.jgit.internal.storage.pack.PackExt;
import org.eclipse.jgit.junit.JGitTestUtil;
import org.eclipse.jgit.junit.TestRng;
import org.eclipse.jgit.lib.AbbreviatedObjectId;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectInserter;
import org.eclipse.jgit.lib.ObjectLoader;
import org.eclipse.jgit.lib.ObjectReader;
import org.eclipse.jgit.util.IO;
import org.eclipse.jgit.util.RawParseUtils;
import org.junit.Before;
import org.junit.Test;

public class DfsInserterTest {
InMemoryRepository db;

@Before
public void setUp() {
db = new InMemoryRepository(new DfsRepositoryDescription("test"));
}

@Test
public void testInserterDiscardsPack() throws IOException {
ObjectInserter ins = db.newObjectInserter();
ins.insert(Constants.OBJ_BLOB, Constants.encode("foo"));
ins.insert(Constants.OBJ_BLOB, Constants.encode("bar"));
assertEquals(0, db.getObjectDatabase().listPacks().size());

ins.release();
assertEquals(0, db.getObjectDatabase().listPacks().size());
}

@Test
public void testReadFromInserterSmallObjects() throws IOException {
ObjectInserter ins = db.newObjectInserter();
ObjectId id1 = ins.insert(Constants.OBJ_BLOB, Constants.encode("foo"));
ObjectId id2 = ins.insert(Constants.OBJ_BLOB, Constants.encode("bar"));
assertEquals(0, db.getObjectDatabase().listPacks().size());

ObjectReader reader = ins.newReader();
assertEquals("foo", readString(reader.open(id1)));
assertEquals("bar", readString(reader.open(id2)));
assertEquals(0, db.getObjectDatabase().listPacks().size());
ins.flush();
assertEquals(1, db.getObjectDatabase().listPacks().size());
}

@Test
public void testReadFromInserterLargerObjects() throws IOException {
db.getObjectDatabase().getReaderOptions().setStreamFileThreshold(512);
DfsBlockCache.reconfigure(new DfsBlockCacheConfig()
.setBlockSize(512)
.setBlockLimit(2048));

byte[] data = new TestRng(JGitTestUtil.getName()).nextBytes(8192);
DfsInserter ins = (DfsInserter) db.newObjectInserter();
ins.setCompressionLevel(Deflater.NO_COMPRESSION);
ObjectId id1 = ins.insert(Constants.OBJ_BLOB, data);
assertEquals(0, db.getObjectDatabase().listPacks().size());

ObjectReader reader = ins.newReader();
assertTrue(Arrays.equals(data, readStream(reader.open(id1))));
assertEquals(0, db.getObjectDatabase().listPacks().size());
ins.flush();

List<DfsPackDescription> packs = db.getObjectDatabase().listPacks();
assertEquals(1, packs.size());
assertTrue(packs.get(0).getFileSize(PackExt.PACK) > 2048);
}

@Test
public void testReadFromFallback() throws IOException {
ObjectInserter ins = db.newObjectInserter();
ObjectId id1 = ins.insert(Constants.OBJ_BLOB, Constants.encode("foo"));
ins.flush();
ObjectId id2 = ins.insert(Constants.OBJ_BLOB, Constants.encode("bar"));
assertEquals(1, db.getObjectDatabase().listPacks().size());

ObjectReader reader = ins.newReader();
assertEquals("foo", readString(reader.open(id1)));
assertEquals("bar", readString(reader.open(id2)));
assertEquals(1, db.getObjectDatabase().listPacks().size());
ins.flush();
assertEquals(2, db.getObjectDatabase().listPacks().size());
}

@Test
public void testReaderResolve() throws IOException {
ObjectInserter ins = db.newObjectInserter();
ObjectId id1 = ins.insert(Constants.OBJ_BLOB, Constants.encode("foo"));
ins.flush();
ObjectId id2 = ins.insert(Constants.OBJ_BLOB, Constants.encode("bar"));
String abbr1 = ObjectId.toString(id1).substring(0, 4);
String abbr2 = ObjectId.toString(id2).substring(0, 4);
assertFalse(abbr1.equals(abbr2));

ObjectReader reader = ins.newReader();
Collection<ObjectId> objs;
objs = reader.resolve(AbbreviatedObjectId.fromString(abbr1));
assertEquals(1, objs.size());
assertEquals(id1, objs.iterator().next());

objs = reader.resolve(AbbreviatedObjectId.fromString(abbr2));
assertEquals(1, objs.size());
assertEquals(id2, objs.iterator().next());
}

private static String readString(ObjectLoader loader) throws IOException {
return RawParseUtils.decode(readStream(loader));
}

private static byte[] readStream(ObjectLoader loader) throws IOException {
ByteBuffer bb = IO.readWholeStream(loader.openStream(), 64);
byte[] buf = new byte[bb.remaining()];
bb.get(buf);
return buf;
}
}

+ 8
- 0
org.eclipse.jgit/.settings/.api_filters View File

@@ -8,6 +8,14 @@
</message_arguments>
</filter>
</resource>
<resource path="src/org/eclipse/jgit/lib/ObjectInserter.java" type="org.eclipse.jgit.lib.ObjectInserter">
<filter id="336695337">
<message_arguments>
<message_argument value="org.eclipse.jgit.lib.ObjectInserter"/>
<message_argument value="newReader()"/>
</message_arguments>
</filter>
</resource>
<resource path="src/org/eclipse/jgit/merge/ResolveMerger.java" type="org.eclipse.jgit.merge.ResolveMerger">
<filter comment="Doesn't break consumers. Breaking providers is allowed also in minor versions." id="338792546">
<message_arguments>

+ 2
- 0
org.eclipse.jgit/resources/org/eclipse/jgit/internal/storage/dfs/DfsText.properties View File

@@ -1,4 +1,6 @@
cannotReadIndex=Cannot read index {0}
cannotReadBackDelta=Cannot read delta type {0}
shortReadOfBlock=Short read of block at {0} in pack {1}; expected {2} bytes, received only {3}
shortReadOfIndex=Short read of index {0}
unexpectedEofInPack=Unexpected EOF in partially created pack
willNotStoreEmptyPack=Cannot store empty pack

+ 287
- 1
org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsInserter.java View File

@@ -45,24 +45,44 @@ package org.eclipse.jgit.internal.storage.dfs;

import static org.eclipse.jgit.internal.storage.pack.PackExt.INDEX;
import static org.eclipse.jgit.internal.storage.pack.PackExt.PACK;
import static org.eclipse.jgit.lib.Constants.OBJ_OFS_DELTA;
import static org.eclipse.jgit.lib.Constants.OBJ_REF_DELTA;

import java.io.BufferedInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.zip.CRC32;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;

import org.eclipse.jgit.errors.CorruptObjectException;
import org.eclipse.jgit.errors.LargeObjectException;
import org.eclipse.jgit.internal.JGitText;
import org.eclipse.jgit.internal.storage.file.PackIndex;
import org.eclipse.jgit.internal.storage.file.PackIndexWriter;
import org.eclipse.jgit.internal.storage.pack.PackExt;
import org.eclipse.jgit.lib.AbbreviatedObjectId;
import org.eclipse.jgit.lib.AnyObjectId;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectIdOwnerMap;
import org.eclipse.jgit.lib.ObjectInserter;
import org.eclipse.jgit.lib.ObjectLoader;
import org.eclipse.jgit.lib.ObjectReader;
import org.eclipse.jgit.lib.ObjectStream;
import org.eclipse.jgit.transport.PackedObjectInfo;
import org.eclipse.jgit.util.BlockList;
import org.eclipse.jgit.util.IO;
@@ -76,6 +96,7 @@ public class DfsInserter extends ObjectInserter {
private static final int INDEX_VERSION = 2;

private final DfsObjDatabase db;
private int compression = Deflater.BEST_COMPRESSION;

private List<PackedObjectInfo> objectList;
private ObjectIdOwnerMap<PackedObjectInfo> objectMap;
@@ -96,11 +117,20 @@ public class DfsInserter extends ObjectInserter {
this.db = db;
}

void setCompressionLevel(int compression) {
this.compression = compression;
}

@Override
public DfsPackParser newPackParser(InputStream in) throws IOException {
return new DfsPackParser(db, this, in);
}

@Override
public ObjectReader newReader() {
return new Reader();
}

@Override
public ObjectId insert(int type, byte[] data, int off, int len)
throws IOException {
@@ -309,7 +339,7 @@ public class DfsInserter extends ObjectInserter {
hdrBuf = new byte[32];
md = Constants.newMessageDigest();
crc32 = new CRC32();
deflater = new Deflater(Deflater.BEST_COMPRESSION);
deflater = new Deflater(compression);
compress = new DeflaterOutputStream(this, deflater, 8192);

int size = out.blockSize();
@@ -403,10 +433,266 @@ public class DfsInserter extends ObjectInserter {
return packHash;
}

int read(long pos, byte[] dst, int ptr, int cnt) throws IOException {
int r = 0;
while (pos < currPos && r < cnt) {
DfsBlock b = getOrLoadBlock(pos);
int n = b.copy(pos, dst, ptr + r, cnt - r);
pos += n;
r += n;
}
if (currPos <= pos && r < cnt) {
int s = (int) (pos - currPos);
int n = Math.min(currPtr - s, cnt - r);
System.arraycopy(currBuf, s, dst, ptr + r, n);
r += n;
}
return r;
}

byte[] inflate(DfsReader ctx, long pos, int len) throws IOException,
DataFormatException {
byte[] dstbuf;
try {
dstbuf = new byte[len];
} catch (OutOfMemoryError noMemory) {
return null; // Caller will switch to large object streaming.
}

Inflater inf = ctx.inflater();
DfsBlock b = setInput(inf, pos);
for (int dstoff = 0;;) {
int n = inf.inflate(dstbuf, dstoff, dstbuf.length - dstoff);
if (n > 0)
dstoff += n;
else if (inf.needsInput() && b != null) {
pos += b.remaining(pos);
b = setInput(inf, pos);
} else if (inf.needsInput())
throw new EOFException(DfsText.get().unexpectedEofInPack);
else if (inf.finished())
return dstbuf;
else
throw new DataFormatException();
}
}

private DfsBlock setInput(Inflater inf, long pos) throws IOException {
if (pos < currPos) {
DfsBlock b = getOrLoadBlock(pos);
b.setInput(inf, pos);
return b;
}
inf.setInput(currBuf, (int) (pos - currPos), currPtr);
return null;
}

private DfsBlock getOrLoadBlock(long pos) throws IOException {
long s = toBlockStart(pos);
DfsBlock b = cache.get(packKey, s);
if (b != null)
return b;

byte[] d = new byte[blockSize];
for (int p = 0; p < blockSize;) {
int n = out.read(s + p, ByteBuffer.wrap(d, p, blockSize - p));
if (n <= 0)
throw new EOFException(DfsText.get().unexpectedEofInPack);
p += n;
}
b = new DfsBlock(packKey, s, d);
cache.put(b);
return b;
}

private long toBlockStart(long pos) {
return (pos / blockSize) * blockSize;
}

@Override
public void close() throws IOException {
deflater.end();
out.close();
}
}

private class Reader extends ObjectReader {
private final DfsReader ctx = new DfsReader(db);

@Override
public ObjectReader newReader() {
return db.newReader();
}

@Override
public Collection<ObjectId> resolve(AbbreviatedObjectId id)
throws IOException {
Collection<ObjectId> stored = ctx.resolve(id);
if (objectList == null)
return stored;

Set<ObjectId> r = new HashSet<ObjectId>(stored.size() + 2);
r.addAll(stored);
for (PackedObjectInfo obj : objectList) {
if (id.prefixCompare(obj) == 0)
r.add(obj.copy());
}
return r;
}

@Override
public ObjectLoader open(AnyObjectId objectId, int typeHint)
throws IOException {
if (objectMap == null)
return ctx.open(objectId, typeHint);

PackedObjectInfo obj = objectMap.get(objectId);
if (obj == null)
return ctx.open(objectId, typeHint);

byte[] buf = buffer();
int cnt = packOut.read(obj.getOffset(), buf, 0, 20);
if (cnt <= 0)
throw new EOFException(DfsText.get().unexpectedEofInPack);

int c = buf[0] & 0xff;
int type = (c >> 4) & 7;
if (type == OBJ_OFS_DELTA || type == OBJ_REF_DELTA)
throw new IOException(MessageFormat.format(
DfsText.get().cannotReadBackDelta, Integer.toString(type)));

long sz = c & 0x0f;
int ptr = 1;
int shift = 4;
while ((c & 0x80) != 0) {
if (ptr >= cnt)
throw new EOFException(DfsText.get().unexpectedEofInPack);
c = buf[ptr++] & 0xff;
sz += ((long) (c & 0x7f)) << shift;
shift += 7;
}

long zpos = obj.getOffset() + ptr;
if (sz < ctx.getStreamFileThreshold()) {
byte[] data = inflate(obj, zpos, (int) sz);
if (data != null)
return new ObjectLoader.SmallObject(type, data);
}
return new StreamLoader(obj.copy(), type, sz, packKey, zpos);
}

private byte[] inflate(PackedObjectInfo obj, long zpos, int sz)
throws IOException, CorruptObjectException {
try {
return packOut.inflate(ctx, zpos, sz);
} catch (DataFormatException dfe) {
CorruptObjectException coe = new CorruptObjectException(
MessageFormat.format(
JGitText.get().objectAtHasBadZlibStream,
Long.valueOf(obj.getOffset()),
packDsc.getFileName(PackExt.PACK)));
coe.initCause(dfe);
throw coe;
}
}

@Override
public Set<ObjectId> getShallowCommits() throws IOException {
return ctx.getShallowCommits();
}

@Override
public void release() {
ctx.release();
}
}

private class StreamLoader extends ObjectLoader {
private final ObjectId id;
private final int type;
private final long size;

private final DfsPackKey srcPack;
private final long pos;

StreamLoader(ObjectId id, int type, long sz,
DfsPackKey key, long pos) {
this.id = id;
this.type = type;
this.size = sz;
this.srcPack = key;
this.pos = pos;
}

@Override
public ObjectStream openStream() throws IOException {
final DfsReader ctx = new DfsReader(db);
if (srcPack != packKey) {
try {
// Post DfsInserter.flush() use the normal code path.
// The newly created pack is registered in the cache.
return ctx.open(id, type).openStream();
} finally {
ctx.release();
}
}

int bufsz = 8192;
final Inflater inf = ctx.inflater();
return new ObjectStream.Filter(type,
size, new BufferedInputStream(new InflaterInputStream(
new ReadBackStream(pos), inf, bufsz), bufsz)) {
@Override
public void close() throws IOException {
ctx.release();
super.close();
}
};
}

@Override
public int getType() {
return type;
}

@Override
public long getSize() {
return size;
}

@Override
public boolean isLarge() {
return true;
}

@Override
public byte[] getCachedBytes() throws LargeObjectException {
throw new LargeObjectException.ExceedsLimit(
db.getReaderOptions().getStreamFileThreshold(), size);
}
}

private final class ReadBackStream extends InputStream {
private long pos;

ReadBackStream(long offset) {
pos = offset;
}

@Override
public int read() throws IOException {
byte[] b = new byte[1];
int n = read(b);
return n == 1 ? b[0] & 0xff : -1;
}

@Override
public int read(byte[] buf, int ptr, int len) throws IOException {
int n = packOut.read(pos, buf, ptr, len);
if (n > 0) {
pos += n;
}
return n;
}
}
}

+ 2
- 0
org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsText.java View File

@@ -55,7 +55,9 @@ public class DfsText extends TranslationBundle {

// @formatter:off
/***/ public String cannotReadIndex;
/***/ public String cannotReadBackDelta;
/***/ public String shortReadOfBlock;
/***/ public String shortReadOfIndex;
/***/ public String unexpectedEofInPack;
/***/ public String willNotStoreEmptyPack;
}

+ 7
- 1
org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/file/ObjectDirectoryInserter.java View File

@@ -63,6 +63,7 @@ import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectInserter;
import org.eclipse.jgit.lib.ObjectReader;
import org.eclipse.jgit.transport.PackParser;
import org.eclipse.jgit.util.FileUtils;
import org.eclipse.jgit.util.IO;
@@ -130,9 +131,14 @@ class ObjectDirectoryInserter extends ObjectInserter {
return new ObjectDirectoryPackParser(db, in);
}

@Override
public ObjectReader newReader() {
return new WindowCursor(db);
}

@Override
public void flush() throws IOException {
// Do nothing. Objects are immediately visible.
// Do nothing. Loose objects are immediately visible.
}

@Override

+ 24
- 0
org.eclipse.jgit/src/org/eclipse/jgit/lib/ObjectInserter.java View File

@@ -80,6 +80,11 @@ public abstract class ObjectInserter {
throw new UnsupportedOperationException();
}

@Override
public ObjectReader newReader() {
throw new UnsupportedOperationException();
}

@Override
public void flush() throws IOException {
// Do nothing.
@@ -136,6 +141,10 @@ public abstract class ObjectInserter {
return delegate().newPackParser(in);
}

public ObjectReader newReader() {
return delegate().newReader();
}

public void flush() throws IOException {
delegate().flush();
}
@@ -380,6 +389,21 @@ public abstract class ObjectInserter {
*/
public abstract PackParser newPackParser(InputStream in) throws IOException;

/**
* Open a reader for objects that may have been written by this inserter.
* <p>
* The returned reader allows the calling thread to read back recently
* inserted objects without first calling {@code flush()} to make them
* visible to the repository. The returned reader should only be used from
* the same thread as the inserter. Objects written by this inserter may not
* be visible to {@code this.newReader().newReader()}.
*
* @since 3.5
* @return reader for any object, including an object recently inserted by
* this inserter since the last flush.
*/
public abstract ObjectReader newReader();

/**
* Make all inserted objects visible.
* <p>

Loading…
Cancel
Save