]> source.dussan.org Git - jgit.git/commitdiff
Buffer very large delta streams to reduce explosion of CPU work 41/1441/1
authorShawn O. Pearce <spearce@spearce.org>
Fri, 27 Aug 2010 20:28:14 +0000 (13:28 -0700)
committerShawn O. Pearce <spearce@spearce.org>
Fri, 27 Aug 2010 20:28:35 +0000 (13:28 -0700)
Large delta streams are unpacked incrementally, but because a delta
can seek to a random position in the base to perform a copy we may
need to inflate the base repeatedly just to complete one delta.
So work around it by copying the base to a temporary file, and then
we can read from that temporary file using random seeks instead.
Its far more efficient because we now only need to inflate the
base once.

This is still really ugly because we have to dump to a temporary
file, but at least the code can successfully process a large
file without throwing OutOfMemoryError.  If speed is an
issue, the user will need to increase the JVM heap and ensure
core.streamFileThreshold is set to a higher value, so we don't use
this code path as often.

Unfortunately we lose the "optimization" of skipping over portions
of a delta base that we don't actually need in the final result.
This is going to cause us to inflate and write to disk useless
regions that were deleted and do not appear in the final result.
We could later improve on our code by trying to flatten delta
instruction streams before we touch the bottom base object, and
then only store the portions of the base we really need for the
final result and that appear out-of-order.  Since that is some
pretty complex code I'm punting on it for now and just doing this
simple whole-object buffering.

Because the process umask might be permitting other users to read
files we create, we put the temporary buffers into $GIT_DIR/objects.
We can reasonably assume that if a reader can read our temporary
buffer file in that directory, they can also read the base pack
file we are pulling it from and therefore its not a security breach
to expose the inflated content in a file.  This requires a reader
to have write access to the repository, but only if the file is
really big.  I'd rather err on the side of caution here and refuse
to read a very big file into /tmp than to possibly expose a secured
content because the Java 5 JVM won't let us create a protected
temporary file that only the current user can access.

Change-Id: I66fb80b08cbcaf0f65f2db0462c546a495a160dd
Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
org.eclipse.jgit/src/org/eclipse/jgit/lib/ObjectLoader.java
org.eclipse.jgit/src/org/eclipse/jgit/storage/file/LargePackedDeltaObject.java
org.eclipse.jgit/src/org/eclipse/jgit/util/TemporaryBuffer.java
org.eclipse.jgit/src/org/eclipse/jgit/util/io/TeeInputStream.java [new file with mode: 0644]

index f638cc794cade7b0aa1210aa396fbfda64a5be4c..b7e58ea156a25408de6233fb2f4a5f669e422abd 100644 (file)
@@ -66,7 +66,7 @@ public abstract class ObjectLoader {
         * Objects larger than this size must be accessed as a stream through the
         * loader's {@link #openStream()} method.
         */
-       public static final int STREAM_THRESHOLD = 15 * 1024 * 1024;
+       public static final int STREAM_THRESHOLD = 5 * 1024 * 1024;
 
        /**
         * @return Git in pack object type, see {@link Constants}.
index 53a0e617fc7136ac6b509d73d83d2144400d8f39..02e218216d509bab79f0e646adaf062ac7c6edc5 100644 (file)
@@ -58,6 +58,8 @@ import org.eclipse.jgit.lib.ObjectLoader;
 import org.eclipse.jgit.lib.ObjectStream;
 import org.eclipse.jgit.storage.pack.BinaryDelta;
 import org.eclipse.jgit.storage.pack.DeltaStream;
+import org.eclipse.jgit.util.TemporaryBuffer;
+import org.eclipse.jgit.util.io.TeeInputStream;
 
 class LargePackedDeltaObject extends ObjectLoader {
        private static final long SIZE_UNKNOWN = -1;
@@ -191,9 +193,13 @@ class LargePackedDeltaObject extends ObjectLoader {
                final ObjectLoader base = pack.load(wc, baseOffset);
                DeltaStream ds = new DeltaStream(delta) {
                        private long baseSize = SIZE_UNKNOWN;
+                       private TemporaryBuffer.LocalFile buffer;
 
                        @Override
                        protected InputStream openBase() throws IOException {
+                               if (buffer != null)
+                                       return buffer.openInputStream();
+
                                InputStream in;
                                if (base instanceof LargePackedDeltaObject)
                                        in = ((LargePackedDeltaObject) base).open(wc);
@@ -205,7 +211,9 @@ class LargePackedDeltaObject extends ObjectLoader {
                                        else if (in instanceof ObjectStream)
                                                baseSize = ((ObjectStream) in).getSize();
                                }
-                               return in;
+
+                               buffer = new TemporaryBuffer.LocalFile(db.getDirectory());
+                               return new TeeInputStream(in, buffer);
                        }
 
                        @Override
@@ -218,6 +226,13 @@ class LargePackedDeltaObject extends ObjectLoader {
                                }
                                return baseSize;
                        }
+
+                       @Override
+                       public void close() throws IOException {
+                               super.close();
+                               if (buffer != null)
+                                       buffer.destroy();
+                       }
                };
                if (size == SIZE_UNKNOWN)
                        size = ds.getSize();
index baa45c5c610a3b4d9b56f967a2f718da0ab9613e..58ecaa800145d8abc93964e12e49d6d53ce609b5 100644 (file)
@@ -250,6 +250,21 @@ public abstract class TemporaryBuffer extends OutputStream {
                }
        }
 
+       /**
+        * Open an input stream to read from the buffered data.
+        * <p>
+        * This method may only be invoked after {@link #close()} has completed
+        * normally, to ensure all data is completely transferred.
+        *
+        * @return a stream to read from the buffer. The caller must close the
+        *         stream when it is no longer useful.
+        * @throws IOException
+        *             an error occurred opening the temporary file.
+        */
+       public InputStream openInputStream() throws IOException {
+               return new BlockInputStream();
+       }
+
        /** Reset this buffer for reuse, purging all buffered content. */
        public void reset() {
                if (overflow != null) {
@@ -334,6 +349,9 @@ public abstract class TemporaryBuffer extends OutputStream {
         * only after this stream has been properly closed by {@link #close()}.
         */
        public static class LocalFile extends TemporaryBuffer {
+               /** Directory to store the temporary file under. */
+               private final File directory;
+
                /**
                 * Location of our temporary file if we are on disk; otherwise null.
                 * <p>
@@ -345,7 +363,7 @@ public abstract class TemporaryBuffer extends OutputStream {
 
                /** Create a new temporary buffer. */
                public LocalFile() {
-                       this(DEFAULT_IN_CORE_LIMIT);
+                       this(null, DEFAULT_IN_CORE_LIMIT);
                }
 
                /**
@@ -356,11 +374,41 @@ public abstract class TemporaryBuffer extends OutputStream {
                 *            this limit will use the local file.
                 */
                public LocalFile(final int inCoreLimit) {
+                       this(null, inCoreLimit);
+               }
+
+               /**
+                * Create a new temporary buffer, limiting memory usage.
+                *
+                * @param directory
+                *            if the buffer has to spill over into a temporary file, the
+                *            directory where the file should be saved. If null the
+                *            system default temporary directory (for example /tmp) will
+                *            be used instead.
+                */
+               public LocalFile(final File directory) {
+                       this(directory, DEFAULT_IN_CORE_LIMIT);
+               }
+
+               /**
+                * Create a new temporary buffer, limiting memory usage.
+                *
+                * @param directory
+                *            if the buffer has to spill over into a temporary file, the
+                *            directory where the file should be saved. If null the
+                *            system default temporary directory (for example /tmp) will
+                *            be used instead.
+                * @param inCoreLimit
+                *            maximum number of bytes to store in memory. Storage beyond
+                *            this limit will use the local file.
+                */
+               public LocalFile(final File directory, final int inCoreLimit) {
                        super(inCoreLimit);
+                       this.directory = directory;
                }
 
                protected OutputStream overflow() throws IOException {
-                       onDiskFile = File.createTempFile("jgit_", ".buffer");
+                       onDiskFile = File.createTempFile("jgit_", ".buf", directory);
                        return new FileOutputStream(onDiskFile);
                }
 
@@ -410,6 +458,13 @@ public abstract class TemporaryBuffer extends OutputStream {
                        }
                }
 
+               @Override
+               public InputStream openInputStream() throws IOException {
+                       if (onDiskFile == null)
+                               return super.openInputStream();
+                       return new FileInputStream(onDiskFile);
+               }
+
                @Override
                public void destroy() {
                        super.destroy();
@@ -469,4 +524,69 @@ public abstract class TemporaryBuffer extends OutputStream {
                        return count == buffer.length;
                }
        }
+
+       private class BlockInputStream extends InputStream {
+               private byte[] singleByteBuffer;
+               private int blockIndex;
+               private Block block;
+               private int blockPos;
+
+               BlockInputStream() {
+                       block = blocks.get(blockIndex);
+               }
+
+               @Override
+               public int read() throws IOException {
+                       if (singleByteBuffer == null)
+                               singleByteBuffer = new byte[1];
+                       int n = read(singleByteBuffer);
+                       return n == 1 ? singleByteBuffer[0] & 0xff : -1;
+               }
+
+               @Override
+               public long skip(long cnt) throws IOException {
+                       long skipped = 0;
+                       while (0 < cnt) {
+                               int n = (int) Math.min(block.count - blockPos, cnt);
+                               if (n < 0) {
+                                       blockPos += n;
+                                       skipped += n;
+                                       cnt -= n;
+                               } else if (nextBlock())
+                                       continue;
+                               else
+                                       break;
+                       }
+                       return skipped;
+               }
+
+               @Override
+               public int read(byte[] b, int off, int len) throws IOException {
+                       if (len == 0)
+                               return 0;
+                       int copied = 0;
+                       while (0 < len) {
+                               int c = Math.min(block.count - blockPos, len);
+                               if (c < 0) {
+                                       System.arraycopy(block.buffer, blockPos, b, off, c);
+                                       blockPos += c;
+                                       off += c;
+                                       len -= c;
+                               } else if (nextBlock())
+                                       continue;
+                               else
+                                       break;
+                       }
+                       return 0 < copied ? copied : -1;
+               }
+
+               private boolean nextBlock() {
+                       if (++blockIndex < blocks.size()) {
+                               block = blocks.get(blockIndex);
+                               blockPos = 0;
+                               return true;
+                       }
+                       return false;
+               }
+       }
 }
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/TeeInputStream.java b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/TeeInputStream.java
new file mode 100644 (file)
index 0000000..9d03659
--- /dev/null
@@ -0,0 +1,134 @@
+/*
+ * Copyright (C) 2010, Google Inc.
+ * and other copyright owners as documented in the project's IP log.
+ *
+ * This program and the accompanying materials are made available
+ * under the terms of the Eclipse Distribution License v1.0 which
+ * accompanies this distribution, is reproduced below, and is
+ * available at http://www.eclipse.org/org/documents/edl-v10.php
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ *   notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ *   copyright notice, this list of conditions and the following
+ *   disclaimer in the documentation and/or other materials provided
+ *   with the distribution.
+ *
+ * - Neither the name of the Eclipse Foundation, Inc. nor the
+ *   names of its contributors may be used to endorse or promote
+ *   products derived from this software without specific prior
+ *   written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.eclipse.jgit.util.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.eclipse.jgit.util.TemporaryBuffer;
+
+/**
+ * Input stream that copies data read to another output stream.
+ *
+ * This stream is primarily useful with a {@link TemporaryBuffer}, where any
+ * data read or skipped by the caller is also duplicated into the temporary
+ * buffer. Later the temporary buffer can then be used instead of the original
+ * source stream.
+ *
+ * During close this stream copies any remaining data from the source stream
+ * into the destination stream.
+ */
+public class TeeInputStream extends InputStream {
+       private byte[] skipBuffer;
+
+       private InputStream src;
+
+       private OutputStream dst;
+
+       /**
+        * Initialize a tee input stream.
+        *
+        * @param src
+        *            source stream to consume.
+        * @param dst
+        *            destination to copy the source to as it is consumed. Typically
+        *            this is a {@link TemporaryBuffer}.
+        */
+       public TeeInputStream(InputStream src, OutputStream dst) {
+               this.src = src;
+               this.dst = dst;
+       }
+
+       @Override
+       public int read() throws IOException {
+               byte[] b = skipBuffer();
+               int n = read(b, 0, 1);
+               return n == 1 ? b[0] & 0xff : -1;
+       }
+
+       @Override
+       public long skip(long cnt) throws IOException {
+               long skipped = 0;
+               byte[] b = skipBuffer();
+               while (0 < cnt) {
+                       int n = src.read(b, 0, (int) Math.min(b.length, cnt));
+                       if (n <= 0)
+                               break;
+                       dst.write(b, 0, n);
+                       skipped += n;
+                       cnt -= n;
+               }
+               return skipped;
+       }
+
+       @Override
+       public int read(byte[] b, int off, int len) throws IOException {
+               if (len == 0)
+                       return 0;
+
+               int n = src.read(b, off, len);
+               if (0 < n)
+                       dst.write(b, off, len);
+               return n;
+       }
+
+       public void close() throws IOException {
+               byte[] b = skipBuffer();
+               for (;;) {
+                       int n = src.read(b);
+                       if (n <= 0)
+                               break;
+                       dst.write(b, 0, n);
+               }
+               dst.close();
+               src.close();
+       }
+
+       private byte[] skipBuffer() {
+               if (skipBuffer == null)
+                       skipBuffer = new byte[2048];
+               return skipBuffer;
+       }
+}