]> source.dussan.org Git - jgit.git/commitdiff
Refactor SideBandOutputStream to be buffered 87/287/4
authorShawn O. Pearce <spearce@spearce.org>
Tue, 9 Feb 2010 03:10:50 +0000 (19:10 -0800)
committerShawn O. Pearce <spearce@spearce.org>
Sat, 13 Mar 2010 00:07:45 +0000 (16:07 -0800)
Instead of relying on our callers to wrap us up inside of a
BufferedOutputStream and using the proper block sizing, do the
buffering directly inside of SideBandOutputStream.  This ensures
we don't get large write-throughs from BufferedOutputStream that
might overflow the configured packet size.

The constructor of SideBandOutputStream is also beefed up to check
its arguments and ensure they are within acceptable ranges for the
current side-band protocol.

Change-Id: Ic14567327d03c9e972f9734b8228178bc448867d
Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/PacketLineOutTest.java
org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/SideBandOutputStreamTest.java
org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineOut.java
org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandOutputStream.java
org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandProgressMonitor.java
org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java

index 6eb98ac1264b6b03e0fab556f5e4766f487bf54e..c66d4d52a4ae46618cefeeab15db2ff72206c8be 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2009, Google Inc.
+ * Copyright (C) 2009-2010, Google Inc.
  * and other copyright owners as documented in the project's IP log.
  *
  * This program and the accompanying materials are made available
@@ -119,8 +119,7 @@ public class PacketLineOutTest extends TestCase {
        }
 
        public void testWritePacket3() throws IOException {
-               final int buflen = SideBandOutputStream.MAX_BUF
-                               - SideBandOutputStream.HDR_SIZE;
+               final int buflen = SideBandOutputStream.MAX_BUF - 5;
                final byte[] buf = new byte[buflen];
                for (int i = 0; i < buf.length; i++) {
                        buf[i] = (byte) i;
@@ -137,23 +136,6 @@ public class PacketLineOutTest extends TestCase {
                }
        }
 
-       // writeChannelPacket
-
-       public void testWriteChannelPacket1() throws IOException {
-               out.writeChannelPacket(1, new byte[] { 'a' }, 0, 1);
-               assertBuffer("0006\001a");
-       }
-
-       public void testWriteChannelPacket2() throws IOException {
-               out.writeChannelPacket(2, new byte[] { 'b' }, 0, 1);
-               assertBuffer("0006\002b");
-       }
-
-       public void testWriteChannelPacket3() throws IOException {
-               out.writeChannelPacket(3, new byte[] { 'c' }, 0, 1);
-               assertBuffer("0006\003c");
-       }
-
        // flush
 
        public void testFlush() throws IOException {
index 3c79f138c8f50d2109b4c8cd858e46d40c04e7fd..61c894e4146bfae2d8d8be11171acbeeb5ce514c 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2009, Google Inc.
+ * Copyright (C) 2009-2010, Google Inc.
  * and other copyright owners as documented in the project's IP log.
  *
  * This program and the accompanying materials are made available
 
 package org.eclipse.jgit.transport;
 
+import static org.eclipse.jgit.transport.SideBandOutputStream.CH_DATA;
+import static org.eclipse.jgit.transport.SideBandOutputStream.CH_ERROR;
+import static org.eclipse.jgit.transport.SideBandOutputStream.CH_PROGRESS;
+import static org.eclipse.jgit.transport.SideBandOutputStream.HDR_SIZE;
+import static org.eclipse.jgit.transport.SideBandOutputStream.MAX_BUF;
+import static org.eclipse.jgit.transport.SideBandOutputStream.SMALL_BUF;
+
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -58,62 +65,90 @@ import org.eclipse.jgit.lib.Constants;
 public class SideBandOutputStreamTest extends TestCase {
        private ByteArrayOutputStream rawOut;
 
-       private PacketLineOut pckOut;
-
        protected void setUp() throws Exception {
                super.setUp();
                rawOut = new ByteArrayOutputStream();
-               pckOut = new PacketLineOut(rawOut);
        }
 
        public void testWrite_CH_DATA() throws IOException {
                final SideBandOutputStream out;
-               out = new SideBandOutputStream(SideBandOutputStream.CH_DATA, pckOut);
+               out = new SideBandOutputStream(CH_DATA, SMALL_BUF, rawOut);
                out.write(new byte[] { 'a', 'b', 'c' });
+               out.flush();
                assertBuffer("0008\001abc");
        }
 
        public void testWrite_CH_PROGRESS() throws IOException {
                final SideBandOutputStream out;
-               out = new SideBandOutputStream(SideBandOutputStream.CH_PROGRESS, pckOut);
+               out = new SideBandOutputStream(CH_PROGRESS, SMALL_BUF, rawOut);
                out.write(new byte[] { 'a', 'b', 'c' });
+               out.flush();
                assertBuffer("0008\002abc");
        }
 
        public void testWrite_CH_ERROR() throws IOException {
                final SideBandOutputStream out;
-               out = new SideBandOutputStream(SideBandOutputStream.CH_ERROR, pckOut);
+               out = new SideBandOutputStream(CH_ERROR, SMALL_BUF, rawOut);
                out.write(new byte[] { 'a', 'b', 'c' });
+               out.flush();
                assertBuffer("0008\003abc");
        }
 
        public void testWrite_Small() throws IOException {
                final SideBandOutputStream out;
-               out = new SideBandOutputStream(SideBandOutputStream.CH_DATA, pckOut);
+               out = new SideBandOutputStream(CH_DATA, SMALL_BUF, rawOut);
+               out.write('a');
+               out.write('b');
+               out.write('c');
+               out.flush();
+               assertBuffer("0008\001abc");
+       }
+
+       public void testWrite_SmallBlocks1() throws IOException {
+               final SideBandOutputStream out;
+               out = new SideBandOutputStream(CH_DATA, 6, rawOut);
                out.write('a');
                out.write('b');
                out.write('c');
+               out.flush();
                assertBuffer("0006\001a0006\001b0006\001c");
        }
 
+       public void testWrite_SmallBlocks2() throws IOException {
+               final SideBandOutputStream out;
+               out = new SideBandOutputStream(CH_DATA, 6, rawOut);
+               out.write(new byte[] { 'a', 'b', 'c' });
+               out.flush();
+               assertBuffer("0006\001a0006\001b0006\001c");
+       }
+
+       public void testWrite_SmallBlocks3() throws IOException {
+               final SideBandOutputStream out;
+               out = new SideBandOutputStream(CH_DATA, 7, rawOut);
+               out.write('a');
+               out.write(new byte[] { 'b', 'c' });
+               out.flush();
+               assertBuffer("0007\001ab0006\001c");
+       }
+
        public void testWrite_Large() throws IOException {
-               final int buflen = SideBandOutputStream.MAX_BUF
-                               - SideBandOutputStream.HDR_SIZE;
+               final int buflen = MAX_BUF - HDR_SIZE;
                final byte[] buf = new byte[buflen];
                for (int i = 0; i < buf.length; i++) {
                        buf[i] = (byte) i;
                }
 
                final SideBandOutputStream out;
-               out = new SideBandOutputStream(SideBandOutputStream.CH_DATA, pckOut);
+               out = new SideBandOutputStream(CH_DATA, MAX_BUF, rawOut);
                out.write(buf);
+               out.flush();
 
                final byte[] act = rawOut.toByteArray();
-               final String explen = Integer.toString(buf.length + 5, 16);
-               assertEquals(5 + buf.length, act.length);
+               final String explen = Integer.toString(buf.length + HDR_SIZE, 16);
+               assertEquals(HDR_SIZE + buf.length, act.length);
                assertEquals(new String(act, 0, 4, "UTF-8"), explen);
                assertEquals(1, act[4]);
-               for (int i = 0, j = 5; i < buf.length; i++, j++) {
+               for (int i = 0, j = HDR_SIZE; i < buf.length; i++, j++) {
                        assertEquals(buf[i], act[j]);
                }
        }
@@ -132,17 +167,63 @@ public class SideBandOutputStreamTest extends TestCase {
                        }
                };
 
-               new SideBandOutputStream(SideBandOutputStream.CH_DATA,
-                               new PacketLineOut(mockout)).flush();
-               assertEquals(0, flushCnt[0]);
-
-               new SideBandOutputStream(SideBandOutputStream.CH_ERROR,
-                               new PacketLineOut(mockout)).flush();
+               new SideBandOutputStream(CH_DATA, SMALL_BUF, mockout).flush();
                assertEquals(1, flushCnt[0]);
+       }
+
+       public void testConstructor_RejectsBadChannel() {
+               try {
+                       new SideBandOutputStream(-1, MAX_BUF, rawOut);
+                       fail("Accepted -1 channel number");
+               } catch (IllegalArgumentException e) {
+                       assertEquals("channel -1 must be in range [0, 255]", e.getMessage());
+               }
 
-               new SideBandOutputStream(SideBandOutputStream.CH_PROGRESS,
-                               new PacketLineOut(mockout)).flush();
-               assertEquals(2, flushCnt[0]);
+               try {
+                       new SideBandOutputStream(0, MAX_BUF, rawOut);
+                       fail("Accepted 0 channel number");
+               } catch (IllegalArgumentException e) {
+                       assertEquals("channel 0 must be in range [0, 255]", e.getMessage());
+               }
+
+               try {
+                       new SideBandOutputStream(256, MAX_BUF, rawOut);
+                       fail("Accepted 256 channel number");
+               } catch (IllegalArgumentException e) {
+                       assertEquals("channel 256 must be in range [0, 255]", e
+                                       .getMessage());
+               }
+       }
+
+       public void testConstructor_RejectsBadBufferSize() {
+               try {
+                       new SideBandOutputStream(CH_DATA, -1, rawOut);
+                       fail("Accepted -1 for buffer size");
+               } catch (IllegalArgumentException e) {
+                       assertEquals("packet size -1 must be >= 5", e.getMessage());
+               }
+
+               try {
+                       new SideBandOutputStream(CH_DATA, 0, rawOut);
+                       fail("Accepted 0 for buffer size");
+               } catch (IllegalArgumentException e) {
+                       assertEquals("packet size 0 must be >= 5", e.getMessage());
+               }
+
+               try {
+                       new SideBandOutputStream(CH_DATA, 1, rawOut);
+                       fail("Accepted 1 for buffer size");
+               } catch (IllegalArgumentException e) {
+                       assertEquals("packet size 1 must be >= 5", e.getMessage());
+               }
+
+               try {
+                       new SideBandOutputStream(CH_DATA, Integer.MAX_VALUE, rawOut);
+                       fail("Accepted " + Integer.MAX_VALUE + " for buffer size");
+               } catch (IllegalArgumentException e) {
+                       assertEquals("packet size " + Integer.MAX_VALUE
+                                       + " must be <= 65520", e.getMessage());
+               }
        }
 
        private void assertBuffer(final String exp) throws IOException {
index 81dd4f6a15d6a709de0432ccaacce7f0e68e7c8d..51506b20aa3ae072058f022e3a4ba6b659fdd7c1 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2008-2009, Google Inc.
+ * Copyright (C) 2008-2010, Google Inc.
  * Copyright (C) 2008-2009, Robin Rosenberg <robin.rosenberg@dewire.com>
  * Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
  * and other copyright owners as documented in the project's IP log.
@@ -105,14 +105,6 @@ public class PacketLineOut {
                out.write(packet);
        }
 
-       void writeChannelPacket(final int channel, final byte[] buf, int off,
-                       int len) throws IOException {
-               formatLength(len + 5);
-               lenbuffer[4] = (byte) channel;
-               out.write(lenbuffer, 0, 5);
-               out.write(buf, off, len);
-       }
-
        /**
         * Write a packet end marker, sometimes referred to as a flush command.
         * <p>
@@ -149,6 +141,10 @@ public class PacketLineOut {
                        '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
 
        private void formatLength(int w) {
+               formatLength(lenbuffer, w);
+       }
+
+       static void formatLength(byte[] lenbuffer, int w) {
                int o = 3;
                while (o >= 0 && w != 0) {
                        lenbuffer[o--] = hexchar[w & 0xf];
index 5e50fd89b3b0dfd8b333c6c08cb52ca3924ec572..6e0a52627ed1f1fc1bf8bd6c870052b4bb68c8fb 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2008, Google Inc.
+ * Copyright (C) 2008-2010, Google Inc.
  * and other copyright owners as documented in the project's IP log.
  *
  * This program and the accompanying materials are made available
@@ -47,11 +47,10 @@ import java.io.IOException;
 import java.io.OutputStream;
 
 /**
- * Multiplexes data and progress messages
+ * Multiplexes data and progress messages.
  * <p>
- * To correctly use this class you must wrap it in a BufferedOutputStream with a
- * buffer size no larger than either {@link #SMALL_BUF} or {@link #MAX_BUF},
- * minus {@link #HDR_SIZE}.
+ * This stream is buffered at packet sizes, so the caller doesn't need to wrap
+ * it in yet another buffered stream.
  */
 class SideBandOutputStream extends OutputStream {
        static final int CH_DATA = SideBandInputStream.CH_DATA;
@@ -66,34 +65,93 @@ class SideBandOutputStream extends OutputStream {
 
        static final int HDR_SIZE = 5;
 
-       private final int channel;
+       private final OutputStream out;
 
-       private final PacketLineOut pckOut;
+       private final byte[] buffer;
 
-       private byte[] singleByteBuffer;
+       /**
+        * Number of bytes in {@link #buffer} that are valid data.
+        * <p>
+        * Initialized to {@link #HDR_SIZE} if there is no application data in the
+        * buffer, as the packet header always appears at the start of the buffer.
+        */
+       private int cnt;
 
-       SideBandOutputStream(final int chan, final PacketLineOut out) {
-               channel = chan;
-               pckOut = out;
+       /**
+        * Create a new stream to write side band packets.
+        *
+        * @param chan
+        *            channel number to prefix all packets with, so the remote side
+        *            can demultiplex the stream and get back the original data.
+        *            Must be in the range [0, 255].
+        * @param sz
+        *            maximum size of a data packet within the stream. The remote
+        *            side needs to agree to the packet size to prevent buffer
+        *            overflows. Must be in the range [HDR_SIZE + 1, MAX_BUF).
+        * @param os
+        *            stream that the packets are written onto. This stream should
+        *            be attached to a SideBandInputStream on the remote side.
+        */
+       SideBandOutputStream(final int chan, final int sz, final OutputStream os) {
+               if (chan <= 0 || chan > 255)
+                       throw new IllegalArgumentException("channel " + chan
+                                       + " must be in range [0, 255]");
+               if (sz <= HDR_SIZE)
+                       throw new IllegalArgumentException("packet size " + sz
+                                       + " must be >= " + HDR_SIZE);
+               else if (MAX_BUF < sz)
+                       throw new IllegalArgumentException("packet size " + sz
+                                       + " must be <= " + MAX_BUF);
+
+               out = os;
+               buffer = new byte[sz];
+               buffer[4] = (byte) chan;
+               cnt = HDR_SIZE;
        }
 
        @Override
        public void flush() throws IOException {
-               if (channel != CH_DATA)
-                       pckOut.flush();
+               if (HDR_SIZE < cnt)
+                       writeBuffer();
+               out.flush();
        }
 
        @Override
-       public void write(final byte[] b, final int off, final int len)
-                       throws IOException {
-               pckOut.writeChannelPacket(channel, b, off, len);
+       public void write(final byte[] b, int off, int len) throws IOException {
+               while (0 < len) {
+                       int capacity = buffer.length - cnt;
+                       if (cnt == HDR_SIZE && capacity < len) {
+                               // Our block to write is bigger than the packet size,
+                               // stream it out as-is to avoid unnecessary copies.
+                               PacketLineOut.formatLength(buffer, buffer.length);
+                               out.write(buffer, 0, HDR_SIZE);
+                               out.write(b, off, capacity);
+                               off += capacity;
+                               len -= capacity;
+
+                       } else {
+                               if (capacity == 0)
+                                       writeBuffer();
+
+                               int n = Math.min(len, capacity);
+                               System.arraycopy(b, off, buffer, cnt, n);
+                               cnt += n;
+                               off += n;
+                               len -= n;
+                       }
+               }
        }
 
        @Override
        public void write(final int b) throws IOException {
-               if (singleByteBuffer == null)
-                       singleByteBuffer = new byte[1];
-               singleByteBuffer[0] = (byte) b;
-               write(singleByteBuffer);
+               if (cnt == buffer.length)
+                       writeBuffer();
+               buffer[cnt++] = (byte) b;
+       }
+
+       private void writeBuffer() throws IOException {
+               PacketLineOut.formatLength(buffer, cnt);
+               out.write(buffer, 0, cnt);
+               cnt = HDR_SIZE;
        }
 }
index 89d338c897bfcc7e6294c5e0c0a1b7444c404692..efce7b1da7c2b7fb68b0a69527a99db89b17d2be 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2008, Google Inc.
+ * Copyright (C) 2008-2010, Google Inc.
  * and other copyright owners as documented in the project's IP log.
  *
  * This program and the accompanying materials are made available
@@ -43,7 +43,7 @@
 
 package org.eclipse.jgit.transport;
 
-import java.io.BufferedOutputStream;
+import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 
@@ -66,12 +66,8 @@ class SideBandProgressMonitor implements ProgressMonitor {
 
        private int totalWork;
 
-       SideBandProgressMonitor(final PacketLineOut pckOut) {
-               final int bufsz = SideBandOutputStream.SMALL_BUF
-                               - SideBandOutputStream.HDR_SIZE;
-               out = new PrintWriter(new OutputStreamWriter(new BufferedOutputStream(
-                               new SideBandOutputStream(SideBandOutputStream.CH_PROGRESS,
-                                               pckOut), bufsz), Constants.CHARSET));
+       SideBandProgressMonitor(final OutputStream os) {
+               out = new PrintWriter(new OutputStreamWriter(os, Constants.CHARSET));
        }
 
        public void start(final int totalTasks) {
index b76b22b77ee3c21dd22a9a6cc2046217db01c920..39c4243bad7bf96ec3424b476a3c5ed0f017ea04 100644 (file)
@@ -43,9 +43,6 @@
 
 package org.eclipse.jgit.transport;
 
-import static org.eclipse.jgit.transport.BasePackFetchConnection.MultiAck;
-
-import java.io.BufferedOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -70,6 +67,7 @@ import org.eclipse.jgit.revwalk.RevFlagSet;
 import org.eclipse.jgit.revwalk.RevObject;
 import org.eclipse.jgit.revwalk.RevTag;
 import org.eclipse.jgit.revwalk.RevWalk;
+import org.eclipse.jgit.transport.BasePackFetchConnection.MultiAck;
 import org.eclipse.jgit.transport.RefAdvertiser.PacketLineOutRefAdvertiser;
 import org.eclipse.jgit.util.io.InterruptTimer;
 import org.eclipse.jgit.util.io.TimeoutInputStream;
@@ -556,13 +554,12 @@ public class UploadPack {
                        int bufsz = SideBandOutputStream.SMALL_BUF;
                        if (options.contains(OPTION_SIDE_BAND_64K))
                                bufsz = SideBandOutputStream.MAX_BUF;
-                       bufsz -= SideBandOutputStream.HDR_SIZE;
-
-                       packOut = new BufferedOutputStream(new SideBandOutputStream(
-                                       SideBandOutputStream.CH_DATA, pckOut), bufsz);
 
+                       packOut = new SideBandOutputStream(SideBandOutputStream.CH_DATA,
+                                       bufsz, rawOut);
                        if (progress)
-                               pm = new SideBandProgressMonitor(pckOut);
+                               pm = new SideBandProgressMonitor(new SideBandOutputStream(
+                                               SideBandOutputStream.CH_PROGRESS, bufsz, rawOut));
                }
 
                final PackWriter pw;