Browse Source

Refactor TemporaryBuffer to support reuse in other contexts

Later we are going to add support for smart HTTP, which requires us to
buffer at least some of the request created by a client before we ship
it to the server.  For many requests, we can fit it completely into a
1 MiB buffer, but if it doesn't we can drop back to using the chunked
transfer encoding to send an unknown stream length.

Rather than recoding the block based memory buffer, we refactor the
local file overflow strategy into a subclass, allowing the HTTP client
code to replace this portion of the logic with its own approach to
start the chunked encoding request.

Change-Id: Iac61ea1017b14e0ad3c4425efc3d75718b71bb8e
Signed-off-by: Shawn O. Pearce <sop@google.com>
tags/v0.7.0
Shawn O. Pearce 14 years ago
parent
commit
3f8fdc0325

+ 2
- 2
org.eclipse.jgit.test/exttst/org/eclipse/jgit/patch/EGitPatchHistoryTest.java View File

@@ -1,5 +1,5 @@
/*
* Copyright (C) 2008, Google Inc.
* Copyright (C) 2008-2009, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
@@ -211,7 +211,7 @@ public class EGitPatchHistoryTest extends TestCase {
buf.destroy();
}
commitId = line.substring("commit ".length());
buf = new TemporaryBuffer();
buf = new TemporaryBuffer.LocalFile();
} else if (buf != null) {
buf.write(line.getBytes("ISO-8859-1"));
buf.write('\n');

+ 25
- 13
org.eclipse.jgit.test/tst/org/eclipse/jgit/util/TemporaryBufferTest.java View File

@@ -1,5 +1,5 @@
/*
* Copyright (C) 2008, Google Inc.
* Copyright (C) 2008-2009, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
@@ -54,7 +54,7 @@ import junit.framework.TestCase;

public class TemporaryBufferTest extends TestCase {
public void testEmpty() throws IOException {
final TemporaryBuffer b = new TemporaryBuffer();
final TemporaryBuffer b = new TemporaryBuffer.LocalFile();
try {
b.close();
assertEquals(0, b.length());
@@ -67,7 +67,7 @@ public class TemporaryBufferTest extends TestCase {
}

public void testOneByte() throws IOException {
final TemporaryBuffer b = new TemporaryBuffer();
final TemporaryBuffer b = new TemporaryBuffer.LocalFile();
final byte test = (byte) new TestRng(getName()).nextInt();
try {
b.write(test);
@@ -93,7 +93,7 @@ public class TemporaryBufferTest extends TestCase {
}

public void testOneBlock_BulkWrite() throws IOException {
final TemporaryBuffer b = new TemporaryBuffer();
final TemporaryBuffer b = new TemporaryBuffer.LocalFile();
final byte[] test = new TestRng(getName())
.nextBytes(TemporaryBuffer.Block.SZ);
try {
@@ -123,7 +123,7 @@ public class TemporaryBufferTest extends TestCase {
}

public void testOneBlockAndHalf_BulkWrite() throws IOException {
final TemporaryBuffer b = new TemporaryBuffer();
final TemporaryBuffer b = new TemporaryBuffer.LocalFile();
final byte[] test = new TestRng(getName())
.nextBytes(TemporaryBuffer.Block.SZ * 3 / 2);
try {
@@ -153,7 +153,7 @@ public class TemporaryBufferTest extends TestCase {
}

public void testOneBlockAndHalf_SingleWrite() throws IOException {
final TemporaryBuffer b = new TemporaryBuffer();
final TemporaryBuffer b = new TemporaryBuffer.LocalFile();
final byte[] test = new TestRng(getName())
.nextBytes(TemporaryBuffer.Block.SZ * 3 / 2);
try {
@@ -181,7 +181,7 @@ public class TemporaryBufferTest extends TestCase {
}

public void testOneBlockAndHalf_Copy() throws IOException {
final TemporaryBuffer b = new TemporaryBuffer();
final TemporaryBuffer b = new TemporaryBuffer.LocalFile();
final byte[] test = new TestRng(getName())
.nextBytes(TemporaryBuffer.Block.SZ * 3 / 2);
try {
@@ -210,7 +210,7 @@ public class TemporaryBufferTest extends TestCase {
}

public void testLarge_SingleWrite() throws IOException {
final TemporaryBuffer b = new TemporaryBuffer();
final TemporaryBuffer b = new TemporaryBuffer.LocalFile();
final byte[] test = new TestRng(getName())
.nextBytes(TemporaryBuffer.DEFAULT_IN_CORE_LIMIT * 3);
try {
@@ -237,7 +237,7 @@ public class TemporaryBufferTest extends TestCase {
}

public void testInCoreLimit_SwitchOnAppendByte() throws IOException {
final TemporaryBuffer b = new TemporaryBuffer();
final TemporaryBuffer b = new TemporaryBuffer.LocalFile();
final byte[] test = new TestRng(getName())
.nextBytes(TemporaryBuffer.DEFAULT_IN_CORE_LIMIT + 1);
try {
@@ -265,7 +265,7 @@ public class TemporaryBufferTest extends TestCase {
}

public void testInCoreLimit_SwitchBeforeAppendByte() throws IOException {
final TemporaryBuffer b = new TemporaryBuffer();
final TemporaryBuffer b = new TemporaryBuffer.LocalFile();
final byte[] test = new TestRng(getName())
.nextBytes(TemporaryBuffer.DEFAULT_IN_CORE_LIMIT * 3);
try {
@@ -293,7 +293,7 @@ public class TemporaryBufferTest extends TestCase {
}

public void testInCoreLimit_SwitchOnCopy() throws IOException {
final TemporaryBuffer b = new TemporaryBuffer();
final TemporaryBuffer b = new TemporaryBuffer.LocalFile();
final byte[] test = new TestRng(getName())
.nextBytes(TemporaryBuffer.DEFAULT_IN_CORE_LIMIT * 2);
try {
@@ -324,7 +324,7 @@ public class TemporaryBufferTest extends TestCase {
}

public void testDestroyWhileOpen() throws IOException {
final TemporaryBuffer b = new TemporaryBuffer();
final TemporaryBuffer b = new TemporaryBuffer.LocalFile();
try {
b.write(new TestRng(getName())
.nextBytes(TemporaryBuffer.DEFAULT_IN_CORE_LIMIT * 2));
@@ -334,7 +334,7 @@ public class TemporaryBufferTest extends TestCase {
}

public void testRandomWrites() throws IOException {
final TemporaryBuffer b = new TemporaryBuffer();
final TemporaryBuffer b = new TemporaryBuffer.LocalFile();
final TestRng rng = new TestRng(getName());
final int max = TemporaryBuffer.DEFAULT_IN_CORE_LIMIT * 2;
final byte[] expect = new byte[max];
@@ -379,4 +379,16 @@ public class TemporaryBufferTest extends TestCase {
}
}

public void testHeap() throws IOException {
final TemporaryBuffer b = new TemporaryBuffer.Heap(2 * 8 * 1024);
final byte[] r = new byte[8 * 1024];
b.write(r);
b.write(r);
try {
b.write(1);
fail("accepted too many bytes of data");
} catch (IOException e) {
assertEquals("In-memory buffer limit exceeded", e.getMessage());
}
}
}

+ 1
- 1
org.eclipse.jgit/src/org/eclipse/jgit/dircache/DirCache.java View File

@@ -518,7 +518,7 @@ public class DirCache {
}

if (tree != null) {
final TemporaryBuffer bb = new TemporaryBuffer();
final TemporaryBuffer bb = new TemporaryBuffer.LocalFile();
tree.write(tmp, bb);
bb.close();


+ 1
- 1
org.eclipse.jgit/src/org/eclipse/jgit/patch/FileHeader.java View File

@@ -289,7 +289,7 @@ public class FileHeader {
final TemporaryBuffer[] tmp = new TemporaryBuffer[getParentCount() + 1];
try {
for (int i = 0; i < tmp.length; i++)
tmp[i] = new TemporaryBuffer();
tmp[i] = new TemporaryBuffer.LocalFile();
for (final HunkHeader h : getHunks())
h.extractFileLines(tmp);


+ 2
- 2
org.eclipse.jgit/src/org/eclipse/jgit/patch/Patch.java View File

@@ -1,5 +1,5 @@
/*
* Copyright (C) 2008, Google Inc.
* Copyright (C) 2008-2009, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
@@ -138,7 +138,7 @@ public class Patch {
}

private static byte[] readFully(final InputStream is) throws IOException {
final TemporaryBuffer b = new TemporaryBuffer();
final TemporaryBuffer b = new TemporaryBuffer.LocalFile();
try {
b.copy(is);
b.close();

+ 1
- 1
org.eclipse.jgit/src/org/eclipse/jgit/transport/AmazonS3.java View File

@@ -450,7 +450,7 @@ public class AmazonS3 {
final ProgressMonitor monitor, final String monitorTask)
throws IOException {
final MessageDigest md5 = newMD5();
final TemporaryBuffer buffer = new TemporaryBuffer() {
final TemporaryBuffer buffer = new TemporaryBuffer.LocalFile() {
@Override
public void close() throws IOException {
super.close();

+ 206
- 103
org.eclipse.jgit/src/org/eclipse/jgit/util/TemporaryBuffer.java View File

@@ -1,4 +1,5 @@
/*
* Copyright (C) 2008-2009, Google Inc.
* Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
* and other copyright owners as documented in the project's IP log.
*
@@ -56,17 +57,14 @@ import org.eclipse.jgit.lib.NullProgressMonitor;
import org.eclipse.jgit.lib.ProgressMonitor;

/**
* A fully buffered output stream using local disk storage for large data.
* A fully buffered output stream.
* <p>
* Initially this output stream buffers to memory, like ByteArrayOutputStream
* might do, but it shifts to using an on disk temporary file if the output gets
* too large.
* <p>
* The content of this buffered stream may be sent to another OutputStream only
* after this stream has been properly closed by {@link #close()}.
* Subclasses determine the behavior when the in-memory buffer capacity has been
* exceeded and additional bytes are still being received for output.
*/
public class TemporaryBuffer extends OutputStream {
static final int DEFAULT_IN_CORE_LIMIT = 1024 * 1024;
public abstract class TemporaryBuffer extends OutputStream {
/** Default limit for in-core storage. */
protected static final int DEFAULT_IN_CORE_LIMIT = 1024 * 1024;

/** Chain of data, if we are still completely in-core; otherwise null. */
private ArrayList<Block> blocks;
@@ -79,35 +77,32 @@ public class TemporaryBuffer extends OutputStream {
*/
private int inCoreLimit;

/** If {@link #inCoreLimit} has been reached, remainder goes here. */
private OutputStream overflow;

/**
* Location of our temporary file if we are on disk; otherwise null.
* <p>
* If we exceeded the {@link #inCoreLimit} we nulled out {@link #blocks} and
* created this file instead. All output goes here through {@link #diskOut}.
* Create a new empty temporary buffer.
*
* @param limit
* maximum number of bytes to store in memory before entering the
* overflow output path.
*/
private File onDiskFile;

/** If writing to {@link #onDiskFile} this is a buffered stream to it. */
private OutputStream diskOut;

/** Create a new empty temporary buffer. */
public TemporaryBuffer() {
inCoreLimit = DEFAULT_IN_CORE_LIMIT;
blocks = new ArrayList<Block>(inCoreLimit / Block.SZ);
blocks.add(new Block());
protected TemporaryBuffer(final int limit) {
inCoreLimit = limit;
reset();
}

@Override
public void write(final int b) throws IOException {
if (blocks == null) {
diskOut.write(b);
if (overflow != null) {
overflow.write(b);
return;
}

Block s = last();
if (s.isFull()) {
if (reachedInCoreLimit()) {
diskOut.write(b);
overflow.write(b);
return;
}

@@ -119,7 +114,7 @@ public class TemporaryBuffer extends OutputStream {

@Override
public void write(final byte[] b, int off, int len) throws IOException {
if (blocks != null) {
if (overflow == null) {
while (len > 0) {
Block s = last();
if (s.isFull()) {
@@ -139,7 +134,7 @@ public class TemporaryBuffer extends OutputStream {
}

if (len > 0)
diskOut.write(b, off, len);
overflow.write(b, off, len);
}

/**
@@ -172,38 +167,7 @@ public class TemporaryBuffer extends OutputStream {
final byte[] tmp = new byte[Block.SZ];
int n;
while ((n = in.read(tmp)) > 0)
diskOut.write(tmp, 0, n);
}

private Block last() {
return blocks.get(blocks.size() - 1);
}

private boolean reachedInCoreLimit() throws IOException {
if (blocks.size() * Block.SZ < inCoreLimit)
return false;

onDiskFile = File.createTempFile("jgit_", ".buffer");
diskOut = new FileOutputStream(onDiskFile);

final Block last = blocks.remove(blocks.size() - 1);
for (final Block b : blocks)
diskOut.write(b.buffer, 0, b.count);
blocks = null;

diskOut = new BufferedOutputStream(diskOut, Block.SZ);
diskOut.write(last.buffer, 0, last.count);
return true;
}

public void close() throws IOException {
if (diskOut != null) {
try {
diskOut.close();
} finally {
diskOut = null;
}
}
overflow.write(tmp, 0, n);
}

/**
@@ -214,9 +178,6 @@ public class TemporaryBuffer extends OutputStream {
* @return total length of the buffer, in bytes.
*/
public long length() {
if (onDiskFile != null)
return onDiskFile.length();

final Block last = last();
return ((long) blocks.size()) * Block.SZ - (Block.SZ - last.count);
}
@@ -236,21 +197,11 @@ public class TemporaryBuffer extends OutputStream {
final long len = length();
if (Integer.MAX_VALUE < len)
throw new OutOfMemoryError("Length exceeds maximum array size");

final byte[] out = new byte[(int) len];
if (blocks != null) {
int outPtr = 0;
for (final Block b : blocks) {
System.arraycopy(b.buffer, 0, out, outPtr, b.count);
outPtr += b.count;
}
} else {
final FileInputStream in = new FileInputStream(onDiskFile);
try {
IO.readFully(in, out, 0, (int) len);
} finally {
in.close();
}
int outPtr = 0;
for (final Block b : blocks) {
System.arraycopy(b.buffer, 0, out, outPtr, b.count);
outPtr += b.count;
}
return out;
}
@@ -265,8 +216,8 @@ public class TemporaryBuffer extends OutputStream {
* stream to send this buffer's complete content to.
* @param pm
* if not null progress updates are sent here. Caller should
* initialize the task and the number of work units to
* <code>{@link #length()}/1024</code>.
* initialize the task and the number of work units to <code>
* {@link #length()}/1024</code>.
* @throws IOException
* an error occurred reading from a temporary file on the local
* system, or writing to the output stream.
@@ -275,16 +226,150 @@ public class TemporaryBuffer extends OutputStream {
throws IOException {
if (pm == null)
pm = NullProgressMonitor.INSTANCE;
if (blocks != null) {
// Everything is in core so we can stream directly to the output.
//
for (final Block b : blocks) {
os.write(b.buffer, 0, b.count);
pm.update(b.count / 1024);
for (final Block b : blocks) {
os.write(b.buffer, 0, b.count);
pm.update(b.count / 1024);
}
}

/** Reset this buffer for reuse, purging all buffered content. */
public void reset() {
if (overflow != null) {
destroy();
}
blocks = new ArrayList<Block>(inCoreLimit / Block.SZ);
blocks.add(new Block());
}

/**
* Open the overflow output stream, so the remaining output can be stored.
*
* @return the output stream to receive the buffered content, followed by
* the remaining output.
* @throws IOException
* the buffer cannot create the overflow stream.
*/
protected abstract OutputStream overflow() throws IOException;

private Block last() {
return blocks.get(blocks.size() - 1);
}

private boolean reachedInCoreLimit() throws IOException {
if (blocks.size() * Block.SZ < inCoreLimit)
return false;

overflow = overflow();

final Block last = blocks.remove(blocks.size() - 1);
for (final Block b : blocks)
overflow.write(b.buffer, 0, b.count);
blocks = null;

overflow = new BufferedOutputStream(overflow, Block.SZ);
overflow.write(last.buffer, 0, last.count);
return true;
}

public void close() throws IOException {
if (overflow != null) {
try {
overflow.close();
} finally {
overflow = null;
}
}
}

/** Clear this buffer so it has no data, and cannot be used again. */
public void destroy() {
blocks = null;

if (overflow != null) {
try {
overflow.close();
} catch (IOException err) {
// We shouldn't encounter an error closing the file.
} finally {
overflow = null;
}
}
}

/**
* A fully buffered output stream using local disk storage for large data.
* <p>
* Initially this output stream buffers to memory and is therefore similar
* to ByteArrayOutputStream, but it shifts to using an on disk temporary
* file if the output gets too large.
* <p>
* The content of this buffered stream may be sent to another OutputStream
* only after this stream has been properly closed by {@link #close()}.
*/
public static class LocalFile extends TemporaryBuffer {
/**
* Location of our temporary file if we are on disk; otherwise null.
* <p>
* If we exceeded the {@link #inCoreLimit} we nulled out {@link #blocks}
* and created this file instead. All output goes here through
* {@link #overflow}.
*/
private File onDiskFile;

/** Create a new temporary buffer. */
public LocalFile() {
this(DEFAULT_IN_CORE_LIMIT);
}

/**
* Create a new temporary buffer, limiting memory usage.
*
* @param inCoreLimit
* maximum number of bytes to store in memory. Storage beyond
* this limit will use the local file.
*/
public LocalFile(final int inCoreLimit) {
super(inCoreLimit);
}

protected OutputStream overflow() throws IOException {
onDiskFile = File.createTempFile("jgit_", ".buffer");
return new FileOutputStream(onDiskFile);
}

public long length() {
if (onDiskFile == null) {
return super.length();
}
} else {
// Reopen the temporary file and copy the contents.
//
return onDiskFile.length();
}

public byte[] toByteArray() throws IOException {
if (onDiskFile == null) {
return super.toByteArray();
}

final long len = length();
if (Integer.MAX_VALUE < len)
throw new OutOfMemoryError("Length exceeds maximum array size");
final byte[] out = new byte[(int) len];
final FileInputStream in = new FileInputStream(onDiskFile);
try {
IO.readFully(in, out, 0, (int) len);
} finally {
in.close();
}
return out;
}

public void writeTo(final OutputStream os, ProgressMonitor pm)
throws IOException {
if (onDiskFile == null) {
super.writeTo(os, pm);
return;
}
if (pm == null)
pm = NullProgressMonitor.INSTANCE;
final FileInputStream in = new FileInputStream(onDiskFile);
try {
int cnt;
@@ -297,26 +382,44 @@ public class TemporaryBuffer extends OutputStream {
in.close();
}
}
}

/** Clear this buffer so it has no data, and cannot be used again. */
public void destroy() {
blocks = null;
@Override
public void destroy() {
super.destroy();

if (diskOut != null) {
try {
diskOut.close();
} catch (IOException err) {
// We shouldn't encounter an error closing the file.
} finally {
diskOut = null;
if (onDiskFile != null) {
try {
if (!onDiskFile.delete())
onDiskFile.deleteOnExit();
} finally {
onDiskFile = null;
}
}
}
}

/**
* A temporary buffer that will never exceed its in-memory limit.
* <p>
* If the in-memory limit is reached an IOException is thrown, rather than
* attempting to spool to local disk.
*/
public static class Heap extends TemporaryBuffer {
/**
* Create a new heap buffer with a maximum storage limit.
*
* @param limit
* maximum number of bytes that can be stored in this buffer.
* Storing beyond this many will cause an IOException to be
* thrown during write.
*/
public Heap(final int limit) {
super(limit);
}

if (onDiskFile != null) {
if (!onDiskFile.delete())
onDiskFile.deleteOnExit();
onDiskFile = null;
@Override
protected OutputStream overflow() throws IOException {
throw new IOException("In-memory buffer limit exceeded");
}
}


Loading…
Cancel
Save