/*
- * Copyright (C) 2009, Google Inc.
+ * Copyright (C) 2009-2010, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
}
public void testWritePacket3() throws IOException {
- final int buflen = SideBandOutputStream.MAX_BUF
- - SideBandOutputStream.HDR_SIZE;
+ final int buflen = SideBandOutputStream.MAX_BUF - 5;
final byte[] buf = new byte[buflen];
for (int i = 0; i < buf.length; i++) {
buf[i] = (byte) i;
}
}
- // writeChannelPacket
-
- public void testWriteChannelPacket1() throws IOException {
- out.writeChannelPacket(1, new byte[] { 'a' }, 0, 1);
- assertBuffer("0006\001a");
- }
-
- public void testWriteChannelPacket2() throws IOException {
- out.writeChannelPacket(2, new byte[] { 'b' }, 0, 1);
- assertBuffer("0006\002b");
- }
-
- public void testWriteChannelPacket3() throws IOException {
- out.writeChannelPacket(3, new byte[] { 'c' }, 0, 1);
- assertBuffer("0006\003c");
- }
-
// flush
public void testFlush() throws IOException {
/*
- * Copyright (C) 2009, Google Inc.
+ * Copyright (C) 2009-2010, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
package org.eclipse.jgit.transport;
+import static org.eclipse.jgit.transport.SideBandOutputStream.CH_DATA;
+import static org.eclipse.jgit.transport.SideBandOutputStream.CH_ERROR;
+import static org.eclipse.jgit.transport.SideBandOutputStream.CH_PROGRESS;
+import static org.eclipse.jgit.transport.SideBandOutputStream.HDR_SIZE;
+import static org.eclipse.jgit.transport.SideBandOutputStream.MAX_BUF;
+import static org.eclipse.jgit.transport.SideBandOutputStream.SMALL_BUF;
+
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
public class SideBandOutputStreamTest extends TestCase {
private ByteArrayOutputStream rawOut;
- private PacketLineOut pckOut;
-
protected void setUp() throws Exception {
super.setUp();
rawOut = new ByteArrayOutputStream();
- pckOut = new PacketLineOut(rawOut);
}
public void testWrite_CH_DATA() throws IOException {
final SideBandOutputStream out;
- out = new SideBandOutputStream(SideBandOutputStream.CH_DATA, pckOut);
+ out = new SideBandOutputStream(CH_DATA, SMALL_BUF, rawOut);
out.write(new byte[] { 'a', 'b', 'c' });
+ out.flush();
assertBuffer("0008\001abc");
}
public void testWrite_CH_PROGRESS() throws IOException {
final SideBandOutputStream out;
- out = new SideBandOutputStream(SideBandOutputStream.CH_PROGRESS, pckOut);
+ out = new SideBandOutputStream(CH_PROGRESS, SMALL_BUF, rawOut);
out.write(new byte[] { 'a', 'b', 'c' });
+ out.flush();
assertBuffer("0008\002abc");
}
public void testWrite_CH_ERROR() throws IOException {
final SideBandOutputStream out;
- out = new SideBandOutputStream(SideBandOutputStream.CH_ERROR, pckOut);
+ out = new SideBandOutputStream(CH_ERROR, SMALL_BUF, rawOut);
out.write(new byte[] { 'a', 'b', 'c' });
+ out.flush();
assertBuffer("0008\003abc");
}
public void testWrite_Small() throws IOException {
final SideBandOutputStream out;
- out = new SideBandOutputStream(SideBandOutputStream.CH_DATA, pckOut);
+ out = new SideBandOutputStream(CH_DATA, SMALL_BUF, rawOut);
+ out.write('a');
+ out.write('b');
+ out.write('c');
+ out.flush();
+ assertBuffer("0008\001abc");
+ }
+
+ public void testWrite_SmallBlocks1() throws IOException {
+ final SideBandOutputStream out;
+ out = new SideBandOutputStream(CH_DATA, 6, rawOut);
out.write('a');
out.write('b');
out.write('c');
+ out.flush();
assertBuffer("0006\001a0006\001b0006\001c");
}
+ public void testWrite_SmallBlocks2() throws IOException {
+ final SideBandOutputStream out;
+ out = new SideBandOutputStream(CH_DATA, 6, rawOut);
+ out.write(new byte[] { 'a', 'b', 'c' });
+ out.flush();
+ assertBuffer("0006\001a0006\001b0006\001c");
+ }
+
+ public void testWrite_SmallBlocks3() throws IOException {
+ final SideBandOutputStream out;
+ out = new SideBandOutputStream(CH_DATA, 7, rawOut);
+ out.write('a');
+ out.write(new byte[] { 'b', 'c' });
+ out.flush();
+ assertBuffer("0007\001ab0006\001c");
+ }
+
public void testWrite_Large() throws IOException {
- final int buflen = SideBandOutputStream.MAX_BUF
- - SideBandOutputStream.HDR_SIZE;
+ final int buflen = MAX_BUF - HDR_SIZE;
final byte[] buf = new byte[buflen];
for (int i = 0; i < buf.length; i++) {
buf[i] = (byte) i;
}
final SideBandOutputStream out;
- out = new SideBandOutputStream(SideBandOutputStream.CH_DATA, pckOut);
+ out = new SideBandOutputStream(CH_DATA, MAX_BUF, rawOut);
out.write(buf);
+ out.flush();
final byte[] act = rawOut.toByteArray();
- final String explen = Integer.toString(buf.length + 5, 16);
- assertEquals(5 + buf.length, act.length);
+ final String explen = Integer.toString(buf.length + HDR_SIZE, 16);
+ assertEquals(HDR_SIZE + buf.length, act.length);
assertEquals(new String(act, 0, 4, "UTF-8"), explen);
assertEquals(1, act[4]);
- for (int i = 0, j = 5; i < buf.length; i++, j++) {
+ for (int i = 0, j = HDR_SIZE; i < buf.length; i++, j++) {
assertEquals(buf[i], act[j]);
}
}
}
};
- new SideBandOutputStream(SideBandOutputStream.CH_DATA,
- new PacketLineOut(mockout)).flush();
- assertEquals(0, flushCnt[0]);
-
- new SideBandOutputStream(SideBandOutputStream.CH_ERROR,
- new PacketLineOut(mockout)).flush();
+ new SideBandOutputStream(CH_DATA, SMALL_BUF, mockout).flush();
assertEquals(1, flushCnt[0]);
+ }
+
+ public void testConstructor_RejectsBadChannel() {
+ try {
+ new SideBandOutputStream(-1, MAX_BUF, rawOut);
+ fail("Accepted -1 channel number");
+ } catch (IllegalArgumentException e) {
+ assertEquals("channel -1 must be in range [0, 255]", e.getMessage());
+ }
- new SideBandOutputStream(SideBandOutputStream.CH_PROGRESS,
- new PacketLineOut(mockout)).flush();
- assertEquals(2, flushCnt[0]);
+ try {
+ new SideBandOutputStream(0, MAX_BUF, rawOut);
+ fail("Accepted 0 channel number");
+ } catch (IllegalArgumentException e) {
+ assertEquals("channel 0 must be in range [0, 255]", e.getMessage());
+ }
+
+ try {
+ new SideBandOutputStream(256, MAX_BUF, rawOut);
+ fail("Accepted 256 channel number");
+ } catch (IllegalArgumentException e) {
+ assertEquals("channel 256 must be in range [0, 255]", e
+ .getMessage());
+ }
+ }
+
+ public void testConstructor_RejectsBadBufferSize() {
+ try {
+ new SideBandOutputStream(CH_DATA, -1, rawOut);
+ fail("Accepted -1 for buffer size");
+ } catch (IllegalArgumentException e) {
+ assertEquals("packet size -1 must be >= 5", e.getMessage());
+ }
+
+ try {
+ new SideBandOutputStream(CH_DATA, 0, rawOut);
+ fail("Accepted 0 for buffer size");
+ } catch (IllegalArgumentException e) {
+ assertEquals("packet size 0 must be >= 5", e.getMessage());
+ }
+
+ try {
+ new SideBandOutputStream(CH_DATA, 1, rawOut);
+ fail("Accepted 1 for buffer size");
+ } catch (IllegalArgumentException e) {
+ assertEquals("packet size 1 must be >= 5", e.getMessage());
+ }
+
+ try {
+ new SideBandOutputStream(CH_DATA, Integer.MAX_VALUE, rawOut);
+ fail("Accepted " + Integer.MAX_VALUE + " for buffer size");
+ } catch (IllegalArgumentException e) {
+ assertEquals("packet size " + Integer.MAX_VALUE
+ + " must be <= 65520", e.getMessage());
+ }
}
private void assertBuffer(final String exp) throws IOException {
/*
- * Copyright (C) 2008-2009, Google Inc.
+ * Copyright (C) 2008-2010, Google Inc.
* Copyright (C) 2008-2009, Robin Rosenberg <robin.rosenberg@dewire.com>
* Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
* and other copyright owners as documented in the project's IP log.
out.write(packet);
}
- void writeChannelPacket(final int channel, final byte[] buf, int off,
- int len) throws IOException {
- formatLength(len + 5);
- lenbuffer[4] = (byte) channel;
- out.write(lenbuffer, 0, 5);
- out.write(buf, off, len);
- }
-
/**
* Write a packet end marker, sometimes referred to as a flush command.
* <p>
'7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
private void formatLength(int w) {
+ formatLength(lenbuffer, w);
+ }
+
+ static void formatLength(byte[] lenbuffer, int w) {
int o = 3;
while (o >= 0 && w != 0) {
lenbuffer[o--] = hexchar[w & 0xf];
/*
- * Copyright (C) 2008, Google Inc.
+ * Copyright (C) 2008-2010, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
import java.io.OutputStream;
/**
- * Multiplexes data and progress messages
+ * Multiplexes data and progress messages.
* <p>
- * To correctly use this class you must wrap it in a BufferedOutputStream with a
- * buffer size no larger than either {@link #SMALL_BUF} or {@link #MAX_BUF},
- * minus {@link #HDR_SIZE}.
+ * This stream is buffered at packet sizes, so the caller doesn't need to wrap
+ * it in yet another buffered stream.
*/
class SideBandOutputStream extends OutputStream {
static final int CH_DATA = SideBandInputStream.CH_DATA;
static final int HDR_SIZE = 5;
- private final int channel;
+ private final OutputStream out;
- private final PacketLineOut pckOut;
+ private final byte[] buffer;
- private byte[] singleByteBuffer;
+ /**
+ * Number of bytes in {@link #buffer} that are valid data.
+ * <p>
+ * Initialized to {@link #HDR_SIZE} if there is no application data in the
+ * buffer, as the packet header always appears at the start of the buffer.
+ */
+ private int cnt;
- SideBandOutputStream(final int chan, final PacketLineOut out) {
- channel = chan;
- pckOut = out;
+ /**
+ * Create a new stream to write side band packets.
+ *
+ * @param chan
+ * channel number to prefix all packets with, so the remote side
+ * can demultiplex the stream and get back the original data.
+ * Must be in the range [0, 255].
+ * @param sz
+ * maximum size of a data packet within the stream. The remote
+ * side needs to agree to the packet size to prevent buffer
+ * overflows. Must be in the range [HDR_SIZE + 1, MAX_BUF).
+ * @param os
+ * stream that the packets are written onto. This stream should
+ * be attached to a SideBandInputStream on the remote side.
+ */
+ SideBandOutputStream(final int chan, final int sz, final OutputStream os) {
+ if (chan <= 0 || chan > 255)
+ throw new IllegalArgumentException("channel " + chan
+ + " must be in range [0, 255]");
+ if (sz <= HDR_SIZE)
+ throw new IllegalArgumentException("packet size " + sz
+ + " must be >= " + HDR_SIZE);
+ else if (MAX_BUF < sz)
+ throw new IllegalArgumentException("packet size " + sz
+ + " must be <= " + MAX_BUF);
+
+ out = os;
+ buffer = new byte[sz];
+ buffer[4] = (byte) chan;
+ cnt = HDR_SIZE;
}
@Override
public void flush() throws IOException {
- if (channel != CH_DATA)
- pckOut.flush();
+ if (HDR_SIZE < cnt)
+ writeBuffer();
+ out.flush();
}
@Override
- public void write(final byte[] b, final int off, final int len)
- throws IOException {
- pckOut.writeChannelPacket(channel, b, off, len);
+ public void write(final byte[] b, int off, int len) throws IOException {
+ while (0 < len) {
+ int capacity = buffer.length - cnt;
+ if (cnt == HDR_SIZE && capacity < len) {
+ // Our block to write is bigger than the packet size,
+ // stream it out as-is to avoid unnecessary copies.
+ PacketLineOut.formatLength(buffer, buffer.length);
+ out.write(buffer, 0, HDR_SIZE);
+ out.write(b, off, capacity);
+ off += capacity;
+ len -= capacity;
+
+ } else {
+ if (capacity == 0)
+ writeBuffer();
+
+ int n = Math.min(len, capacity);
+ System.arraycopy(b, off, buffer, cnt, n);
+ cnt += n;
+ off += n;
+ len -= n;
+ }
+ }
}
@Override
public void write(final int b) throws IOException {
- if (singleByteBuffer == null)
- singleByteBuffer = new byte[1];
- singleByteBuffer[0] = (byte) b;
- write(singleByteBuffer);
+ if (cnt == buffer.length)
+ writeBuffer();
+ buffer[cnt++] = (byte) b;
+ }
+
+ private void writeBuffer() throws IOException {
+ PacketLineOut.formatLength(buffer, cnt);
+ out.write(buffer, 0, cnt);
+ cnt = HDR_SIZE;
}
}
/*
- * Copyright (C) 2008, Google Inc.
+ * Copyright (C) 2008-2010, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
package org.eclipse.jgit.transport;
-import java.io.BufferedOutputStream;
+import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
private int totalWork;
- SideBandProgressMonitor(final PacketLineOut pckOut) {
- final int bufsz = SideBandOutputStream.SMALL_BUF
- - SideBandOutputStream.HDR_SIZE;
- out = new PrintWriter(new OutputStreamWriter(new BufferedOutputStream(
- new SideBandOutputStream(SideBandOutputStream.CH_PROGRESS,
- pckOut), bufsz), Constants.CHARSET));
+ SideBandProgressMonitor(final OutputStream os) {
+ out = new PrintWriter(new OutputStreamWriter(os, Constants.CHARSET));
}
public void start(final int totalTasks) {
package org.eclipse.jgit.transport;
-import static org.eclipse.jgit.transport.BasePackFetchConnection.MultiAck;
-
-import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import org.eclipse.jgit.revwalk.RevObject;
import org.eclipse.jgit.revwalk.RevTag;
import org.eclipse.jgit.revwalk.RevWalk;
+import org.eclipse.jgit.transport.BasePackFetchConnection.MultiAck;
import org.eclipse.jgit.transport.RefAdvertiser.PacketLineOutRefAdvertiser;
import org.eclipse.jgit.util.io.InterruptTimer;
import org.eclipse.jgit.util.io.TimeoutInputStream;
int bufsz = SideBandOutputStream.SMALL_BUF;
if (options.contains(OPTION_SIDE_BAND_64K))
bufsz = SideBandOutputStream.MAX_BUF;
- bufsz -= SideBandOutputStream.HDR_SIZE;
-
- packOut = new BufferedOutputStream(new SideBandOutputStream(
- SideBandOutputStream.CH_DATA, pckOut), bufsz);
+ packOut = new SideBandOutputStream(SideBandOutputStream.CH_DATA,
+ bufsz, rawOut);
if (progress)
- pm = new SideBandProgressMonitor(pckOut);
+ pm = new SideBandProgressMonitor(new SideBandOutputStream(
+ SideBandOutputStream.CH_PROGRESS, bufsz, rawOut));
}
final PackWriter pw;