aboutsummaryrefslogtreecommitdiffstats
path: root/org.eclipse.jgit
diff options
context:
space:
mode:
authorShawn O. Pearce <spearce@spearce.org>2010-03-12 17:00:50 -0800
committerShawn O. Pearce <spearce@spearce.org>2010-03-12 17:00:54 -0800
commit23bd331cb2ce9c387eac377b244a7d9285cf3d25 (patch)
treed2d9edfefbd06e17aee9674a39be4c863bca3047 /org.eclipse.jgit
parentd42603578cd1f656ea8bb2cb949a5e5d3ad8e037 (diff)
parent89cdc3b713c214a8f7142ef0d0df714027ad9876 (diff)
downloadjgit-23bd331cb2ce9c387eac377b244a7d9285cf3d25.tar.gz
jgit-23bd331cb2ce9c387eac377b244a7d9285cf3d25.zip
Merge branch 'push-sideband' into stable-0.7
* push-sideband: Reuse the line buffer between strings in PacketLineIn http.server: Use TemporaryBuffer and compress some responses Reduce multi-level buffered streams in transport code Fix smart HTTP client buffer alignment Use "ERR message" for early ReceivePack problems Catch and report "ERR message" during remote advertisements Wait for EOF on stderr before finishing SSH channel Capture non-progress side band #2 messages and put in result ReceivePack: Enable side-band-64k capability for status reports Use more restrictive patterns for sideband progress scraping Prefix remote progress tasks with "remote: " Decode side-band channel number as unsigned integer Refactor SideBandInputStream construction Refactor SideBandOutputStream to be buffered Change-Id: Ic9689e64e8c87971f2fd402cb619082309d5587f
Diffstat (limited to 'org.eclipse.jgit')
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/errors/RemoteRepositoryException.java70
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/lib/PackWriter.java12
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/BaseConnection.java37
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackConnection.java44
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackFetchConnection.java7
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackPushConnection.java33
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/BundleWriter.java12
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/Connection.java20
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/FetchProcess.java7
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/OperationResult.java29
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineIn.java26
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineOut.java14
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/PushProcess.java25
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/ReceivePack.java111
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandInputStream.java71
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandOutputStream.java98
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandProgressMonitor.java12
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportGitAnon.java20
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportGitSsh.java173
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportHttp.java47
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportLocal.java97
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java20
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/transport/WalkPushConnection.java3
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/util/io/MessageWriter.java115
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java128
25 files changed, 865 insertions, 366 deletions
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/errors/RemoteRepositoryException.java b/org.eclipse.jgit/src/org/eclipse/jgit/errors/RemoteRepositoryException.java
new file mode 100644
index 0000000000..ab094c9466
--- /dev/null
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/errors/RemoteRepositoryException.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright (C) 2010, Google Inc.
+ * and other copyright owners as documented in the project's IP log.
+ *
+ * This program and the accompanying materials are made available
+ * under the terms of the Eclipse Distribution License v1.0 which
+ * accompanies this distribution, is reproduced below, and is
+ * available at http://www.eclipse.org/org/documents/edl-v10.php
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * - Neither the name of the Eclipse Foundation, Inc. nor the
+ * names of its contributors may be used to endorse or promote
+ * products derived from this software without specific prior
+ * written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.eclipse.jgit.errors;
+
+import org.eclipse.jgit.transport.URIish;
+
+/**
+ * Contains a message from the remote repository indicating a problem.
+ * <p>
+ * Some remote repositories may send customized error messages describing why
+ * they cannot be accessed. These messages are wrapped up in this exception and
+ * thrown to the caller of the transport operation.
+ */
+public class RemoteRepositoryException extends TransportException {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs a RemoteRepositoryException for a message.
+ *
+ * @param uri
+ * URI used for transport
+ * @param message
+ * message, exactly as supplied by the remote repository. May
+ * contain LFs (newlines) if the remote formatted it that way.
+ */
+ public RemoteRepositoryException(URIish uri, String message) {
+ super(uri, message);
+ }
+}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/lib/PackWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/lib/PackWriter.java
index 6162deab7f..b30e5f7c23 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/lib/PackWriter.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/lib/PackWriter.java
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2008-2009, Google Inc.
+ * Copyright (C) 2008-2010, Google Inc.
* Copyright (C) 2008, Marek Zawirski <marek.zawirski@gmail.com>
* and other copyright owners as documented in the project's IP log.
*
@@ -44,7 +44,6 @@
package org.eclipse.jgit.lib;
-import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.security.MessageDigest;
@@ -97,7 +96,6 @@ import org.eclipse.jgit.util.NB;
* undefined behavior.
* </p>
*/
-
public class PackWriter {
/**
* Title of {@link ProgressMonitor} task used during counting objects to
@@ -578,9 +576,8 @@ public class PackWriter {
* </p>
*
* @param packStream
- * output stream of pack data. If the stream is not buffered it
- * will be buffered by the writer. Caller is responsible for
- * closing the stream.
+ * output stream of pack data. The stream should be buffered by
+ * the caller. The caller is responsible for closing the stream.
* @throws IOException
* an error occurred reading a local object's data to include in
* the pack, or writing compressed object data to the output
@@ -590,8 +587,6 @@ public class PackWriter {
if (reuseDeltas || reuseObjects)
searchForReuse();
- if (!(packStream instanceof BufferedOutputStream))
- packStream = new BufferedOutputStream(packStream);
out = new PackOutputStream(packStream);
writeMonitor.beginTask(WRITING_OBJECTS_PROGRESS, getObjectsNumber());
@@ -599,7 +594,6 @@ public class PackWriter {
writeObjects();
writeChecksum();
- out.flush();
windowCursor.release();
writeMonitor.endTask();
}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BaseConnection.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BaseConnection.java
index 14be1700c8..1339b86913 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BaseConnection.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BaseConnection.java
@@ -1,4 +1,5 @@
/*
+ * Copyright (C) 2010, Google Inc.
* Copyright (C) 2008, Marek Zawirski <marek.zawirski@gmail.com>
* Copyright (C) 2008, Robin Rosenberg <robin.rosenberg@dewire.com>
* Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
@@ -45,6 +46,8 @@
package org.eclipse.jgit.transport;
+import java.io.StringWriter;
+import java.io.Writer;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
@@ -58,12 +61,13 @@ import org.eclipse.jgit.lib.Ref;
* @see BasePackConnection
* @see BaseFetchConnection
*/
-abstract class BaseConnection implements Connection {
-
+public abstract class BaseConnection implements Connection {
private Map<String, Ref> advertisedRefs = Collections.emptyMap();
private boolean startedOperation;
+ private Writer messageWriter;
+
public Map<String, Ref> getRefsMap() {
return advertisedRefs;
}
@@ -76,6 +80,10 @@ abstract class BaseConnection implements Connection {
return advertisedRefs.get(name);
}
+ public String getMessages() {
+ return messageWriter != null ? messageWriter.toString() : "";
+ }
+
public abstract void close();
/**
@@ -106,4 +114,29 @@ abstract class BaseConnection implements Connection {
"Only one operation call per connection is supported.");
startedOperation = true;
}
+
+ /**
+ * Get the writer that buffers messages from the remote side.
+ *
+ * @return writer to store messages from the remote.
+ */
+ protected Writer getMessageWriter() {
+ if (messageWriter == null)
+ setMessageWriter(new StringWriter());
+ return messageWriter;
+ }
+
+ /**
+ * Set the writer that buffers messages from the remote side.
+ *
+ * @param writer
+ * the writer that messages will be delivered to. The writer's
+ * {@code toString()} method should be overridden to return the
+ * complete contents.
+ */
+ protected void setMessageWriter(Writer writer) {
+ if (messageWriter != null)
+ throw new IllegalStateException("Writer already initialized");
+ messageWriter = writer;
+ }
}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackConnection.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackConnection.java
index 74c27a7cda..a2c572c601 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackConnection.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackConnection.java
@@ -1,5 +1,4 @@
/*
- * Copyright (C) 2009, Constantine Plotnikov <constantine.plotnikov@gmail.com>
* Copyright (C) 2008-2010, Google Inc.
* Copyright (C) 2008, Marek Zawirski <marek.zawirski@gmail.com>
* Copyright (C) 2008, Robin Rosenberg <robin.rosenberg@dewire.com>
@@ -47,8 +46,6 @@
package org.eclipse.jgit.transport;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
@@ -59,6 +56,7 @@ import java.util.Set;
import org.eclipse.jgit.errors.NoRemoteRepositoryException;
import org.eclipse.jgit.errors.PackProtocolException;
+import org.eclipse.jgit.errors.RemoteRepositoryException;
import org.eclipse.jgit.errors.TransportException;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectIdRef;
@@ -96,10 +94,10 @@ abstract class BasePackConnection extends BaseConnection {
/** Timer to manage {@link #timeoutIn} and {@link #timeoutOut}. */
private InterruptTimer myTimer;
- /** Buffered input stream reading from the remote. */
+ /** Input stream reading from the remote. */
protected InputStream in;
- /** Buffered output stream sending to the remote. */
+ /** Output stream sending to the remote. */
protected OutputStream out;
/** Packet line decoder around {@link #in}. */
@@ -126,6 +124,17 @@ abstract class BasePackConnection extends BaseConnection {
uri = transport.uri;
}
+ /**
+ * Configure this connection with the directional pipes.
+ *
+ * @param myIn
+ * input stream to receive data from the peer. Caller must ensure
+ * the input is buffered, otherwise read performance may suffer.
+ * @param myOut
+ * output stream to transmit data to the peer. Caller must ensure
+ * the output is buffered, otherwise write performance may
+ * suffer.
+ */
protected final void init(InputStream myIn, OutputStream myOut) {
final int timeout = transport.getTimeout();
if (timeout > 0) {
@@ -139,16 +148,27 @@ abstract class BasePackConnection extends BaseConnection {
myOut = timeoutOut;
}
- in = myIn instanceof BufferedInputStream ? myIn
- : new BufferedInputStream(myIn, IndexPack.BUFFER_SIZE);
- out = myOut instanceof BufferedOutputStream ? myOut
- : new BufferedOutputStream(myOut);
+ in = myIn;
+ out = myOut;
pckIn = new PacketLineIn(in);
pckOut = new PacketLineOut(out);
outNeedsEnd = true;
}
+ /**
+ * Reads the advertised references through the initialized stream.
+ * <p>
+ * Subclass implementations may call this method only after setting up the
+ * input and output streams with {@link #init(InputStream, OutputStream)}.
+ * <p>
+ * If any errors occur, this connection is automatically closed by invoking
+ * {@link #close()} and the exception is wrapped (if necessary) and thrown
+ * as a {@link TransportException}.
+ *
+ * @throws TransportException
+ * the reference list could not be scanned.
+ */
protected void readAdvertisedRefs() throws TransportException {
try {
readAdvertisedRefsImpl();
@@ -179,6 +199,12 @@ abstract class BasePackConnection extends BaseConnection {
if (line == PacketLineIn.END)
break;
+ if (line.startsWith("ERR ")) {
+ // This is a customized remote service error.
+ // Users should be informed about it.
+ throw new RemoteRepositoryException(uri, line.substring(4));
+ }
+
if (avail.isEmpty()) {
final int nul = line.indexOf('\0');
if (nul >= 0) {
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackFetchConnection.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackFetchConnection.java
index 84e55b61ea..7b90ec199f 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackFetchConnection.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackFetchConnection.java
@@ -46,6 +46,7 @@
package org.eclipse.jgit.transport;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
@@ -609,7 +610,11 @@ abstract class BasePackFetchConnection extends BasePackConnection implements
private void receivePack(final ProgressMonitor monitor) throws IOException {
final IndexPack ip;
- ip = IndexPack.create(local, sideband ? pckIn.sideband(monitor) : in);
+ InputStream input = in;
+ if (sideband)
+ input = new SideBandInputStream(input, monitor, getMessageWriter());
+
+ ip = IndexPack.create(local, input);
ip.setFixThin(thinPack);
ip.setObjectChecking(transport.isCheckFetchedObjects());
ip.index(monitor);
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackPushConnection.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackPushConnection.java
index 2603ca2879..e10cefd3ab 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackPushConnection.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackPushConnection.java
@@ -86,12 +86,16 @@ class BasePackPushConnection extends BasePackConnection implements
static final String CAPABILITY_OFS_DELTA = "ofs-delta";
+ static final String CAPABILITY_SIDE_BAND_64K = "side-band-64k";
+
private final boolean thinPack;
private boolean capableDeleteRefs;
private boolean capableReport;
+ private boolean capableSideBand;
+
private boolean capableOfsDelta;
private boolean sentCommand;
@@ -143,8 +147,21 @@ class BasePackPushConnection extends BasePackConnection implements
writeCommands(refUpdates.values(), monitor);
if (writePack)
writePack(refUpdates, monitor);
- if (sentCommand && capableReport)
- readStatusReport(refUpdates);
+ if (sentCommand) {
+ if (capableReport)
+ readStatusReport(refUpdates);
+ if (capableSideBand) {
+ // Ensure the data channel is at EOF, so we know we have
+ // read all side-band data from all channels and have a
+ // complete copy of the messages (if any) buffered from
+ // the other data channels.
+ //
+ int b = in.read();
+ if (0 <= b)
+ throw new TransportException(uri, "expected EOF;"
+ + " received '" + (char) b + "' instead");
+ }
+ }
} catch (TransportException e) {
throw e;
} catch (Exception e) {
@@ -156,7 +173,7 @@ class BasePackPushConnection extends BasePackConnection implements
private void writeCommands(final Collection<RemoteRefUpdate> refUpdates,
final ProgressMonitor monitor) throws IOException {
- final String capabilities = enableCapabilities();
+ final String capabilities = enableCapabilities(monitor);
for (final RemoteRefUpdate rru : refUpdates) {
if (!capableDeleteRefs && rru.isDelete()) {
rru.setStatus(Status.REJECTED_NODELETE);
@@ -189,11 +206,18 @@ class BasePackPushConnection extends BasePackConnection implements
outNeedsEnd = false;
}
- private String enableCapabilities() {
+ private String enableCapabilities(final ProgressMonitor monitor) {
final StringBuilder line = new StringBuilder();
capableReport = wantCapability(line, CAPABILITY_REPORT_STATUS);
capableDeleteRefs = wantCapability(line, CAPABILITY_DELETE_REFS);
capableOfsDelta = wantCapability(line, CAPABILITY_OFS_DELTA);
+
+ capableSideBand = wantCapability(line, CAPABILITY_SIDE_BAND_64K);
+ if (capableSideBand) {
+ in = new SideBandInputStream(in, monitor, getMessageWriter());
+ pckIn = new PacketLineIn(in);
+ }
+
if (line.length() > 0)
line.setCharAt(0, '\0');
return line.toString();
@@ -220,6 +244,7 @@ class BasePackPushConnection extends BasePackConnection implements
writer.preparePack(newObjects, remoteObjects);
final long start = System.currentTimeMillis();
writer.writePack(out);
+ out.flush();
packTransferTime = System.currentTimeMillis() - start;
}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BundleWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BundleWriter.java
index db1312ca30..7b0a5eec45 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BundleWriter.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BundleWriter.java
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2008-2009, Google Inc.
+ * Copyright (C) 2008-2010, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
@@ -43,7 +43,6 @@
package org.eclipse.jgit.transport;
-import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
@@ -155,18 +154,15 @@ public class BundleWriter {
* This method can only be called once per BundleWriter instance.
*
* @param os
- * the stream the bundle is written to. If the stream is not
- * buffered it will be buffered by the writer. Caller is
- * responsible for closing the stream.
+ * the stream the bundle is written to. The stream should be
+ * buffered by the caller. The caller is responsible for closing
+ * the stream.
* @throws IOException
* an error occurred reading a local object's data to include in
* the bundle, or writing compressed object data to the output
* stream.
*/
public void writeBundle(OutputStream os) throws IOException {
- if (!(os instanceof BufferedOutputStream))
- os = new BufferedOutputStream(os);
-
final HashSet<ObjectId> inc = new HashSet<ObjectId>();
final HashSet<ObjectId> exc = new HashSet<ObjectId>();
inc.addAll(include.values());
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/Connection.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/Connection.java
index 3f0c3d8e5b..e386c26c1f 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/Connection.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/Connection.java
@@ -1,4 +1,5 @@
/*
+ * Copyright (C) 2010, Google Inc.
* Copyright (C) 2008, Marek Zawirski <marek.zawirski@gmail.com>
* Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
* and other copyright owners as documented in the project's IP log.
@@ -104,7 +105,26 @@ public interface Connection {
* must close that network socket, disconnecting the two peers. If the
* remote repository is actually local (same system) this method must close
* any open file handles used to read the "remote" repository.
+ * <p>
+ * If additional messages were produced by the remote peer, these should
+ * still be retained in the connection instance for {@link #getMessages()}.
*/
public void close();
+ /**
+ * Get the additional messages, if any, returned by the remote process.
+ * <p>
+ * These messages are most likely informational or error messages, sent by
+ * the remote peer, to help the end-user correct any problems that may have
+ * prevented the operation from completing successfully. Application UIs
+ * should try to show these in an appropriate context.
+ * <p>
+ * The message buffer is available after {@link #close()} has been called.
+ * Prior to closing the connection, the message buffer may be empty.
+ *
+ * @return the messages returned by the remote, most likely terminated by a
+ * newline (LF) character. The empty string is returned if the
+ * remote produced no additional messages.
+ */
+ public String getMessages();
}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/FetchProcess.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/FetchProcess.java
index 65a5b1769e..b86f86d2f9 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/FetchProcess.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/FetchProcess.java
@@ -146,7 +146,7 @@ class FetchProcess {
// Connection was used for object transfer. If we
// do another fetch we must open a new connection.
//
- closeConnection();
+ closeConnection(result);
} else {
includedTags = false;
}
@@ -170,7 +170,7 @@ class FetchProcess {
}
}
} finally {
- closeConnection();
+ closeConnection(result);
}
final RevWalk walk = new RevWalk(transport.local);
@@ -210,9 +210,10 @@ class FetchProcess {
"peer did not supply a complete object graph");
}
- private void closeConnection() {
+ private void closeConnection(final FetchResult result) {
if (conn != null) {
conn.close();
+ result.addMessages(conn.getMessages());
conn = null;
}
}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/OperationResult.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/OperationResult.java
index e93f7f7600..115cfbcc81 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/OperationResult.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/OperationResult.java
@@ -1,4 +1,5 @@
/*
+ * Copyright (C) 2010, Google Inc.
* Copyright (C) 2008, Marek Zawirski <marek.zawirski@gmail.com>
* Copyright (C) 2007-2009, Robin Rosenberg <robin.rosenberg@dewire.com>
* Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
@@ -65,6 +66,8 @@ public abstract class OperationResult {
final SortedMap<String, TrackingRefUpdate> updates = new TreeMap<String, TrackingRefUpdate>();
+ StringBuilder messageBuffer;
+
/**
* Get the URI this result came from.
* <p>
@@ -136,4 +139,30 @@ public abstract class OperationResult {
void add(final TrackingRefUpdate u) {
updates.put(u.getLocalName(), u);
}
+
+ /**
+ * Get the additional messages, if any, returned by the remote process.
+ * <p>
+ * These messages are most likely informational or error messages, sent by
+ * the remote peer, to help the end-user correct any problems that may have
+ * prevented the operation from completing successfully. Application UIs
+ * should try to show these in an appropriate context.
+ *
+ * @return the messages returned by the remote, most likely terminated by a
+ * newline (LF) character. The empty string is returned if the
+ * remote produced no additional messages.
+ */
+ public String getMessages() {
+ return messageBuffer != null ? messageBuffer.toString() : "";
+ }
+
+ void addMessages(final String msg) {
+ if (msg != null && msg.length() > 0) {
+ if (messageBuffer == null)
+ messageBuffer = new StringBuilder();
+ messageBuffer.append(msg);
+ if (!msg.endsWith("\n"))
+ messageBuffer.append('\n');
+ }
+ }
}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineIn.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineIn.java
index db6abef1ac..170e4ddbe0 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineIn.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineIn.java
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2008-2009, Google Inc.
+ * Copyright (C) 2008-2010, Google Inc.
* Copyright (C) 2008-2009, Robin Rosenberg <robin.rosenberg@dewire.com>
* Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
* and other copyright owners as documented in the project's IP log.
@@ -51,7 +51,6 @@ import java.io.InputStream;
import org.eclipse.jgit.errors.PackProtocolException;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.MutableObjectId;
-import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.util.IO;
import org.eclipse.jgit.util.RawParseUtils;
@@ -73,15 +72,11 @@ class PacketLineIn {
private final InputStream in;
- private final byte[] lenbuffer;
+ private final byte[] lineBuffer;
PacketLineIn(final InputStream i) {
in = i;
- lenbuffer = new byte[4];
- }
-
- InputStream sideband(final ProgressMonitor pm) {
- return new SideBandInputStream(this, in, pm);
+ lineBuffer = new byte[SideBandOutputStream.SMALL_BUF];
}
AckNackResult readACK(final MutableObjectId returnedId) throws IOException {
@@ -129,22 +124,27 @@ class PacketLineIn {
len -= 4; // length header (4 bytes)
- final byte[] raw = new byte[len];
+ byte[] raw;
+ if (len <= lineBuffer.length)
+ raw = lineBuffer;
+ else
+ raw = new byte[len];
+
IO.readFully(in, raw, 0, len);
return RawParseUtils.decode(Constants.CHARSET, raw, 0, len);
}
int readLength() throws IOException {
- IO.readFully(in, lenbuffer, 0, 4);
+ IO.readFully(in, lineBuffer, 0, 4);
try {
- final int len = RawParseUtils.parseHexInt16(lenbuffer, 0);
+ final int len = RawParseUtils.parseHexInt16(lineBuffer, 0);
if (len != 0 && len < 4)
throw new ArrayIndexOutOfBoundsException();
return len;
} catch (ArrayIndexOutOfBoundsException err) {
throw new IOException("Invalid packet line header: "
- + (char) lenbuffer[0] + (char) lenbuffer[1]
- + (char) lenbuffer[2] + (char) lenbuffer[3]);
+ + (char) lineBuffer[0] + (char) lineBuffer[1]
+ + (char) lineBuffer[2] + (char) lineBuffer[3]);
}
}
}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineOut.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineOut.java
index 81dd4f6a15..51506b20aa 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineOut.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineOut.java
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2008-2009, Google Inc.
+ * Copyright (C) 2008-2010, Google Inc.
* Copyright (C) 2008-2009, Robin Rosenberg <robin.rosenberg@dewire.com>
* Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
* and other copyright owners as documented in the project's IP log.
@@ -105,14 +105,6 @@ public class PacketLineOut {
out.write(packet);
}
- void writeChannelPacket(final int channel, final byte[] buf, int off,
- int len) throws IOException {
- formatLength(len + 5);
- lenbuffer[4] = (byte) channel;
- out.write(lenbuffer, 0, 5);
- out.write(buf, off, len);
- }
-
/**
* Write a packet end marker, sometimes referred to as a flush command.
* <p>
@@ -149,6 +141,10 @@ public class PacketLineOut {
'7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
private void formatLength(int w) {
+ formatLength(lenbuffer, w);
+ }
+
+ static void formatLength(byte[] lenbuffer, int w) {
int o = 3;
while (o >= 0 && w != 0) {
lenbuffer[o--] = hexchar[w & 0xf];
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/PushProcess.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/PushProcess.java
index 17e1dfc77b..03b783427a 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/PushProcess.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/PushProcess.java
@@ -122,8 +122,12 @@ class PushProcess {
PushResult execute(final ProgressMonitor monitor)
throws NotSupportedException, TransportException {
monitor.beginTask(PROGRESS_OPENING_CONNECTION, ProgressMonitor.UNKNOWN);
+
+ final PushResult res = new PushResult();
connection = transport.openPush();
try {
+ res.setAdvertisedRefs(transport.getURI(), connection.getRefsMap());
+ res.setRemoteUpdates(toPush);
monitor.endTask();
final Map<String, RemoteRefUpdate> preprocessed = prepareRemoteUpdates();
@@ -133,10 +137,16 @@ class PushProcess {
connection.push(monitor, preprocessed);
} finally {
connection.close();
+ res.addMessages(connection.getMessages());
}
if (!transport.isDryRun())
updateTrackingRefs();
- return prepareOperationResult();
+ for (final RemoteRefUpdate rru : toPush.values()) {
+ final TrackingRefUpdate tru = rru.getTrackingRefUpdate();
+ if (tru != null)
+ res.add(tru);
+ }
+ return res;
}
private Map<String, RemoteRefUpdate> prepareRemoteUpdates()
@@ -226,17 +236,4 @@ class PushProcess {
}
}
}
-
- private PushResult prepareOperationResult() {
- final PushResult result = new PushResult();
- result.setAdvertisedRefs(transport.getURI(), connection.getRefsMap());
- result.setRemoteUpdates(toPush);
-
- for (final RemoteRefUpdate rru : toPush.values()) {
- final TrackingRefUpdate tru = rru.getTrackingRefUpdate();
- if (tru != null)
- result.add(tru);
- }
- return result;
- }
}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/ReceivePack.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/ReceivePack.java
index 8d75f3cb92..dae28ab271 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/ReceivePack.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/ReceivePack.java
@@ -43,13 +43,20 @@
package org.eclipse.jgit.transport;
-import java.io.BufferedWriter;
+import static org.eclipse.jgit.transport.BasePackPushConnection.CAPABILITY_DELETE_REFS;
+import static org.eclipse.jgit.transport.BasePackPushConnection.CAPABILITY_OFS_DELTA;
+import static org.eclipse.jgit.transport.BasePackPushConnection.CAPABILITY_REPORT_STATUS;
+import static org.eclipse.jgit.transport.BasePackPushConnection.CAPABILITY_SIDE_BAND_64K;
+import static org.eclipse.jgit.transport.SideBandOutputStream.CH_DATA;
+import static org.eclipse.jgit.transport.SideBandOutputStream.CH_PROGRESS;
+import static org.eclipse.jgit.transport.SideBandOutputStream.MAX_BUF;
+
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
+import java.io.Writer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -84,12 +91,6 @@ import org.eclipse.jgit.util.io.TimeoutOutputStream;
* Implements the server side of a push connection, receiving objects.
*/
public class ReceivePack {
- static final String CAPABILITY_REPORT_STATUS = BasePackPushConnection.CAPABILITY_REPORT_STATUS;
-
- static final String CAPABILITY_DELETE_REFS = BasePackPushConnection.CAPABILITY_DELETE_REFS;
-
- static final String CAPABILITY_OFS_DELTA = BasePackPushConnection.CAPABILITY_OFS_DELTA;
-
/** Database we write the stored objects into. */
private final Repository db;
@@ -151,7 +152,7 @@ public class ReceivePack {
private PacketLineOut pckOut;
- private PrintWriter msgs;
+ private Writer msgs;
private IndexPack ip;
@@ -164,12 +165,18 @@ public class ReceivePack {
/** Commands to execute, as received by the client. */
private List<ReceiveCommand> commands;
+ /** Error to display instead of advertising the references. */
+ private StringBuilder advertiseError;
+
/** An exception caught while unpacking and fsck'ing the objects. */
private Throwable unpackError;
- /** if {@link #enabledCapablities} has {@link #CAPABILITY_REPORT_STATUS} */
+ /** If {@link BasePackPushConnection#CAPABILITY_REPORT_STATUS} is enabled. */
private boolean reportStatus;
+ /** If {@link BasePackPushConnection#CAPABILITY_SIDE_BAND_64K} is enabled. */
+ private boolean sideBand;
+
/** Lock around the received pack file, while updating refs. */
private PackLock packLock;
@@ -469,10 +476,17 @@ public class ReceivePack {
}
/**
- * Send an error message to the client, if it supports receiving them.
+ * Send an error message to the client.
* <p>
- * If the client doesn't support receiving messages, the message will be
- * discarded, with no other indication to the caller or to the client.
+ * If any error messages are sent before the references are advertised to
+ * the client, the errors will be sent instead of the advertisement and the
+ * receive operation will be aborted. All clients should receive and display
+ * such early stage errors.
+ * <p>
+ * If the reference advertisements have already been sent, messages are sent
+ * in a side channel. If the client doesn't support receiving messages, the
+ * message will be discarded, with no other indication to the caller or to
+ * the client.
* <p>
* {@link PreReceiveHook}s should always try to use
* {@link ReceiveCommand#setResult(Result, String)} with a result status of
@@ -485,7 +499,18 @@ public class ReceivePack {
* string must not end with an LF, and must not contain an LF.
*/
public void sendError(final String what) {
- sendMessage("error", what);
+ if (refs == null) {
+ if (advertiseError == null)
+ advertiseError = new StringBuilder();
+ advertiseError.append(what).append('\n');
+ } else {
+ try {
+ if (msgs != null)
+ msgs.write("error: " + what + "\n");
+ } catch (IOException e) {
+ // Ignore write failures.
+ }
+ }
}
/**
@@ -499,12 +524,12 @@ public class ReceivePack {
* string must not end with an LF, and must not contain an LF.
*/
public void sendMessage(final String what) {
- sendMessage("remote", what);
- }
-
- private void sendMessage(final String type, final String what) {
- if (msgs != null)
- msgs.println(type + ": " + what);
+ try {
+ if (msgs != null)
+ msgs.write(what + "\n");
+ } catch (IOException e) {
+ // Ignore write failures.
+ }
}
/**
@@ -544,16 +569,8 @@ public class ReceivePack {
pckIn = new PacketLineIn(rawIn);
pckOut = new PacketLineOut(rawOut);
- if (messages != null) {
- msgs = new PrintWriter(new BufferedWriter(
- new OutputStreamWriter(messages, Constants.CHARSET),
- 8192)) {
- @Override
- public void println() {
- print('\n');
- }
- };
- }
+ if (messages != null)
+ msgs = new OutputStreamWriter(messages, Constants.CHARSET);
enabledCapablities = new HashSet<String>();
commands = new ArrayList<ReceiveCommand>();
@@ -561,8 +578,19 @@ public class ReceivePack {
service();
} finally {
try {
- if (msgs != null) {
+ if (pckOut != null)
+ pckOut.flush();
+ if (msgs != null)
msgs.flush();
+
+ if (sideBand) {
+ // If we are using side band, we need to send a final
+ // flush-pkt to tell the remote peer the side band is
+ // complete and it should stop decoding. We need to
+ // use the original output stream as rawOut is now the
+ // side band data channel.
+ //
+ new PacketLineOut(output).end();
}
} finally {
unlockPack();
@@ -591,6 +619,8 @@ public class ReceivePack {
sendAdvertisedRefs(new PacketLineOutRefAdvertiser(pckOut));
else
refs = refFilter.filter(db.getAllRefs());
+ if (advertiseError != null)
+ return;
recvCommands();
if (!commands.isEmpty()) {
enableCapabilities();
@@ -626,10 +656,9 @@ public class ReceivePack {
} else if (msgs != null) {
sendStatusReport(false, new Reporter() {
void sendString(final String s) throws IOException {
- msgs.println(s);
+ msgs.write(s + "\n");
}
});
- msgs.flush();
}
postReceive.onPostReceive(this, filterCommands(Result.OK));
@@ -652,8 +681,14 @@ public class ReceivePack {
* the formatter failed to write an advertisement.
*/
public void sendAdvertisedRefs(final RefAdvertiser adv) throws IOException {
+ if (advertiseError != null) {
+ adv.writeOne("ERR " + advertiseError);
+ return;
+ }
+
final RevFlag advertised = walk.newFlag("ADVERTISED");
adv.init(walk, advertised);
+ adv.advertiseCapability(CAPABILITY_SIDE_BAND_64K);
adv.advertiseCapability(CAPABILITY_DELETE_REFS);
adv.advertiseCapability(CAPABILITY_REPORT_STATUS);
if (allowOfsDelta)
@@ -712,6 +747,16 @@ public class ReceivePack {
private void enableCapabilities() {
reportStatus = enabledCapablities.contains(CAPABILITY_REPORT_STATUS);
+
+ sideBand = enabledCapablities.contains(CAPABILITY_SIDE_BAND_64K);
+ if (sideBand) {
+ OutputStream out = rawOut;
+
+ rawOut = new SideBandOutputStream(CH_DATA, MAX_BUF, out);
+ pckOut = new PacketLineOut(rawOut);
+ msgs = new OutputStreamWriter(new SideBandOutputStream(CH_PROGRESS,
+ MAX_BUF, out), Constants.CHARSET);
+ }
}
private boolean needPack() {
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandInputStream.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandInputStream.java
index 40a6808d25..796cb745a1 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandInputStream.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandInputStream.java
@@ -44,8 +44,11 @@
package org.eclipse.jgit.transport;
+import static org.eclipse.jgit.transport.SideBandOutputStream.HDR_SIZE;
+
import java.io.IOException;
import java.io.InputStream;
+import java.io.Writer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -69,27 +72,31 @@ import org.eclipse.jgit.util.RawParseUtils;
* Channel 3 results in an exception being thrown, as the remote side has issued
* an unrecoverable error.
*
- * @see PacketLineIn#sideband(ProgressMonitor)
+ * @see SideBandOutputStream
*/
class SideBandInputStream extends InputStream {
+ private static final String PFX_REMOTE = "remote: ";
+
static final int CH_DATA = 1;
static final int CH_PROGRESS = 2;
static final int CH_ERROR = 3;
- private static Pattern P_UNBOUNDED = Pattern.compile(
- "^([\\w ]+): (\\d+)( |, done)?.*", Pattern.DOTALL);
+ private static Pattern P_UNBOUNDED = Pattern
+ .compile("^([\\w ]+): +(\\d+)(?:, done\\.)? *[\r\n]$");
- private static Pattern P_BOUNDED = Pattern.compile(
- "^([\\w ]+):.*\\((\\d+)/(\\d+)\\).*", Pattern.DOTALL);
+ private static Pattern P_BOUNDED = Pattern
+ .compile("^([\\w ]+): +\\d+% +\\( *(\\d+)/ *(\\d+)\\)(?:, done\\.)? *[\r\n]$");
- private final PacketLineIn pckIn;
+ private final InputStream rawIn;
- private final InputStream in;
+ private final PacketLineIn pckIn;
private final ProgressMonitor monitor;
+ private final Writer messages;
+
private String progressBuffer = "";
private String currentTask;
@@ -102,11 +109,12 @@ class SideBandInputStream extends InputStream {
private int available;
- SideBandInputStream(final PacketLineIn aPckIn, final InputStream aIn,
- final ProgressMonitor aProgress) {
- pckIn = aPckIn;
- in = aIn;
- monitor = aProgress;
+ SideBandInputStream(final InputStream in, final ProgressMonitor progress,
+ final Writer messageStream) {
+ rawIn = in;
+ pckIn = new PacketLineIn(rawIn);
+ monitor = progress;
+ messages = messageStream;
currentTask = "";
}
@@ -116,7 +124,7 @@ class SideBandInputStream extends InputStream {
if (eof)
return -1;
available--;
- return in.read();
+ return rawIn.read();
}
@Override
@@ -126,7 +134,7 @@ class SideBandInputStream extends InputStream {
needDataPacket();
if (eof)
break;
- final int n = in.read(b, off, Math.min(len, available));
+ final int n = rawIn.read(b, off, Math.min(len, available));
if (n < 0)
break;
r += n;
@@ -147,8 +155,8 @@ class SideBandInputStream extends InputStream {
return;
}
- channel = in.read();
- available -= 5; // length header plus channel indicator
+ channel = rawIn.read() & 0xff;
+ available -= HDR_SIZE; // length header plus channel indicator
if (available == 0)
continue;
@@ -157,18 +165,17 @@ class SideBandInputStream extends InputStream {
return;
case CH_PROGRESS:
progress(readString(available));
-
continue;
case CH_ERROR:
eof = true;
- throw new TransportException("remote: " + readString(available));
+ throw new TransportException(PFX_REMOTE + readString(available));
default:
throw new PackProtocolException("Invalid channel " + channel);
}
}
}
- private void progress(String pkt) {
+ private void progress(String pkt) throws IOException {
pkt = progressBuffer + pkt;
for (;;) {
final int lf = pkt.indexOf('\n');
@@ -183,16 +190,13 @@ class SideBandInputStream extends InputStream {
else
break;
- final String msg = pkt.substring(0, s);
- if (doProgressLine(msg))
- pkt = pkt.substring(s + 1);
- else
- break;
+ doProgressLine(pkt.substring(0, s + 1));
+ pkt = pkt.substring(s + 1);
}
progressBuffer = pkt;
}
- private boolean doProgressLine(final String msg) {
+ private void doProgressLine(final String msg) throws IOException {
Matcher matcher;
matcher = P_BOUNDED.matcher(msg);
@@ -201,13 +205,12 @@ class SideBandInputStream extends InputStream {
if (!currentTask.equals(taskname)) {
currentTask = taskname;
lastCnt = 0;
- final int tot = Integer.parseInt(matcher.group(3));
- monitor.beginTask(currentTask, tot);
+ beginTask(Integer.parseInt(matcher.group(3)));
}
final int cnt = Integer.parseInt(matcher.group(2));
monitor.update(cnt - lastCnt);
lastCnt = cnt;
- return true;
+ return;
}
matcher = P_UNBOUNDED.matcher(msg);
@@ -216,20 +219,24 @@ class SideBandInputStream extends InputStream {
if (!currentTask.equals(taskname)) {
currentTask = taskname;
lastCnt = 0;
- monitor.beginTask(currentTask, ProgressMonitor.UNKNOWN);
+ beginTask(ProgressMonitor.UNKNOWN);
}
final int cnt = Integer.parseInt(matcher.group(2));
monitor.update(cnt - lastCnt);
lastCnt = cnt;
- return true;
+ return;
}
- return false;
+ messages.write(msg);
+ }
+
+ private void beginTask(final int totalWorkUnits) {
+ monitor.beginTask(PFX_REMOTE + currentTask, totalWorkUnits);
}
private String readString(final int len) throws IOException {
final byte[] raw = new byte[len];
- IO.readFully(in, raw, 0, len);
+ IO.readFully(rawIn, raw, 0, len);
return RawParseUtils.decode(Constants.CHARSET, raw, 0, len);
}
}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandOutputStream.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandOutputStream.java
index 5e50fd89b3..6e0a52627e 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandOutputStream.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandOutputStream.java
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2008, Google Inc.
+ * Copyright (C) 2008-2010, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
@@ -47,11 +47,10 @@ import java.io.IOException;
import java.io.OutputStream;
/**
- * Multiplexes data and progress messages
+ * Multiplexes data and progress messages.
* <p>
- * To correctly use this class you must wrap it in a BufferedOutputStream with a
- * buffer size no larger than either {@link #SMALL_BUF} or {@link #MAX_BUF},
- * minus {@link #HDR_SIZE}.
+ * This stream is buffered at packet sizes, so the caller doesn't need to wrap
+ * it in yet another buffered stream.
*/
class SideBandOutputStream extends OutputStream {
static final int CH_DATA = SideBandInputStream.CH_DATA;
@@ -66,34 +65,93 @@ class SideBandOutputStream extends OutputStream {
static final int HDR_SIZE = 5;
- private final int channel;
+ private final OutputStream out;
- private final PacketLineOut pckOut;
+ private final byte[] buffer;
- private byte[] singleByteBuffer;
+ /**
+ * Number of bytes in {@link #buffer} that are valid data.
+ * <p>
+ * Initialized to {@link #HDR_SIZE} if there is no application data in the
+ * buffer, as the packet header always appears at the start of the buffer.
+ */
+ private int cnt;
- SideBandOutputStream(final int chan, final PacketLineOut out) {
- channel = chan;
- pckOut = out;
+ /**
+ * Create a new stream to write side band packets.
+ *
+ * @param chan
+ * channel number to prefix all packets with, so the remote side
+ * can demultiplex the stream and get back the original data.
+ * Must be in the range [0, 255].
+ * @param sz
+ * maximum size of a data packet within the stream. The remote
+ * side needs to agree to the packet size to prevent buffer
+ * overflows. Must be in the range [HDR_SIZE + 1, MAX_BUF).
+ * @param os
+ * stream that the packets are written onto. This stream should
+ * be attached to a SideBandInputStream on the remote side.
+ */
+ SideBandOutputStream(final int chan, final int sz, final OutputStream os) {
+ if (chan <= 0 || chan > 255)
+ throw new IllegalArgumentException("channel " + chan
+ + " must be in range [0, 255]");
+ if (sz <= HDR_SIZE)
+ throw new IllegalArgumentException("packet size " + sz
+ + " must be >= " + HDR_SIZE);
+ else if (MAX_BUF < sz)
+ throw new IllegalArgumentException("packet size " + sz
+ + " must be <= " + MAX_BUF);
+
+ out = os;
+ buffer = new byte[sz];
+ buffer[4] = (byte) chan;
+ cnt = HDR_SIZE;
}
@Override
public void flush() throws IOException {
- if (channel != CH_DATA)
- pckOut.flush();
+ if (HDR_SIZE < cnt)
+ writeBuffer();
+ out.flush();
}
@Override
- public void write(final byte[] b, final int off, final int len)
- throws IOException {
- pckOut.writeChannelPacket(channel, b, off, len);
+ public void write(final byte[] b, int off, int len) throws IOException {
+ while (0 < len) {
+ int capacity = buffer.length - cnt;
+ if (cnt == HDR_SIZE && capacity < len) {
+ // Our block to write is bigger than the packet size,
+ // stream it out as-is to avoid unnecessary copies.
+ PacketLineOut.formatLength(buffer, buffer.length);
+ out.write(buffer, 0, HDR_SIZE);
+ out.write(b, off, capacity);
+ off += capacity;
+ len -= capacity;
+
+ } else {
+ if (capacity == 0)
+ writeBuffer();
+
+ int n = Math.min(len, capacity);
+ System.arraycopy(b, off, buffer, cnt, n);
+ cnt += n;
+ off += n;
+ len -= n;
+ }
+ }
}
@Override
public void write(final int b) throws IOException {
- if (singleByteBuffer == null)
- singleByteBuffer = new byte[1];
- singleByteBuffer[0] = (byte) b;
- write(singleByteBuffer);
+ if (cnt == buffer.length)
+ writeBuffer();
+ buffer[cnt++] = (byte) b;
+ }
+
+ private void writeBuffer() throws IOException {
+ PacketLineOut.formatLength(buffer, cnt);
+ out.write(buffer, 0, cnt);
+ cnt = HDR_SIZE;
}
}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandProgressMonitor.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandProgressMonitor.java
index 89d338c897..efce7b1da7 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandProgressMonitor.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandProgressMonitor.java
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2008, Google Inc.
+ * Copyright (C) 2008-2010, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
@@ -43,7 +43,7 @@
package org.eclipse.jgit.transport;
-import java.io.BufferedOutputStream;
+import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
@@ -66,12 +66,8 @@ class SideBandProgressMonitor implements ProgressMonitor {
private int totalWork;
- SideBandProgressMonitor(final PacketLineOut pckOut) {
- final int bufsz = SideBandOutputStream.SMALL_BUF
- - SideBandOutputStream.HDR_SIZE;
- out = new PrintWriter(new OutputStreamWriter(new BufferedOutputStream(
- new SideBandOutputStream(SideBandOutputStream.CH_PROGRESS,
- pckOut), bufsz), Constants.CHARSET));
+ SideBandProgressMonitor(final OutputStream os) {
+ out = new PrintWriter(new OutputStreamWriter(os, Constants.CHARSET));
}
public void start(final int totalTasks) {
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportGitAnon.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportGitAnon.java
index a127ff50ab..8a0b4357cd 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportGitAnon.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportGitAnon.java
@@ -45,7 +45,11 @@
package org.eclipse.jgit.transport;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -136,7 +140,13 @@ class TransportGitAnon extends TcpTransport implements PackTransport {
super(TransportGitAnon.this);
sock = openConnection();
try {
- init(sock.getInputStream(), sock.getOutputStream());
+ InputStream sIn = sock.getInputStream();
+ OutputStream sOut = sock.getOutputStream();
+
+ sIn = new BufferedInputStream(sIn);
+ sOut = new BufferedOutputStream(sOut);
+
+ init(sIn, sOut);
service("git-upload-pack", pckOut);
} catch (IOException err) {
close();
@@ -169,7 +179,13 @@ class TransportGitAnon extends TcpTransport implements PackTransport {
super(TransportGitAnon.this);
sock = openConnection();
try {
- init(sock.getInputStream(), sock.getOutputStream());
+ InputStream sIn = sock.getInputStream();
+ OutputStream sOut = sock.getOutputStream();
+
+ sIn = new BufferedInputStream(sIn);
+ sOut = new BufferedOutputStream(sOut);
+
+ init(sIn, sOut);
service("git-receive-pack", pckOut);
} catch (IOException err) {
close();
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportGitSsh.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportGitSsh.java
index 5ee7887f6a..d4d4f5412f 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportGitSsh.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportGitSsh.java
@@ -48,7 +48,6 @@ package org.eclipse.jgit.transport;
import java.io.IOException;
import java.io.InputStream;
-import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
@@ -57,6 +56,8 @@ import org.eclipse.jgit.errors.NoRemoteRepositoryException;
import org.eclipse.jgit.errors.TransportException;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.util.QuotedString;
+import org.eclipse.jgit.util.io.MessageWriter;
+import org.eclipse.jgit.util.io.StreamCopyThread;
import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.JSchException;
@@ -88,8 +89,6 @@ public class TransportGitSsh extends SshTransport implements PackTransport {
return false;
}
- OutputStream errStream;
-
TransportGitSsh(final Repository local, final URIish uri) {
super(local, uri);
}
@@ -152,8 +151,6 @@ public class TransportGitSsh extends SshTransport implements PackTransport {
try {
final ChannelExec channel = (ChannelExec) sock.openChannel("exec");
channel.setCommand(commandFor(exe));
- errStream = createErrorStream();
- channel.setErrStream(errStream, true);
channel.connect(tms);
return channel;
} catch (JSchException je) {
@@ -161,9 +158,9 @@ public class TransportGitSsh extends SshTransport implements PackTransport {
}
}
- void checkExecFailure(int status, String exe) throws TransportException {
+ void checkExecFailure(int status, String exe, String why)
+ throws TransportException {
if (status == 127) {
- String why = errStream.toString();
IOException cause = null;
if (why != null && why.length() > 0)
cause = new IOException(why);
@@ -172,41 +169,8 @@ public class TransportGitSsh extends SshTransport implements PackTransport {
}
}
- /**
- * @return the error stream for the channel, the stream is used to detect
- * specific error reasons for exceptions.
- */
- private static OutputStream createErrorStream() {
- return new OutputStream() {
- private StringBuilder all = new StringBuilder();
-
- private StringBuilder sb = new StringBuilder();
-
- public String toString() {
- String r = all.toString();
- while (r.endsWith("\n"))
- r = r.substring(0, r.length() - 1);
- return r;
- }
-
- @Override
- public void write(final int b) throws IOException {
- if (b == '\r') {
- return;
- }
-
- sb.append((char) b);
-
- if (b == '\n') {
- all.append(sb);
- sb.setLength(0);
- }
- }
- };
- }
-
- NoRemoteRepositoryException cleanNotFound(NoRemoteRepositoryException nf) {
- String why = errStream.toString();
+ NoRemoteRepositoryException cleanNotFound(NoRemoteRepositoryException nf,
+ String why) {
if (why == null || why.length() == 0)
return nf;
@@ -235,7 +199,7 @@ public class TransportGitSsh extends SshTransport implements PackTransport {
if (getTimeout() <= 0)
return out;
final PipedInputStream pipeIn = new PipedInputStream();
- final CopyThread copyThread = new CopyThread(pipeIn, out);
+ final StreamCopyThread copyThread = new StreamCopyThread(pipeIn, out);
final PipedOutputStream pipeOut = new PipedOutputStream(pipeIn) {
@Override
public void flush() throws IOException {
@@ -257,79 +221,28 @@ public class TransportGitSsh extends SshTransport implements PackTransport {
return pipeOut;
}
- private static class CopyThread extends Thread {
- private final InputStream src;
-
- private final OutputStream dst;
-
- private volatile boolean doFlush;
-
- CopyThread(final InputStream i, final OutputStream o) {
- setName(Thread.currentThread().getName() + "-Output");
- src = i;
- dst = o;
- }
-
- void flush() {
- if (!doFlush) {
- doFlush = true;
- interrupt();
- }
- }
-
- @Override
- public void run() {
- try {
- final byte[] buf = new byte[1024];
- for (;;) {
- try {
- if (doFlush) {
- doFlush = false;
- dst.flush();
- }
-
- final int n;
- try {
- n = src.read(buf);
- } catch (InterruptedIOException wakey) {
- continue;
- }
- if (n < 0)
- break;
- dst.write(buf, 0, n);
- } catch (IOException e) {
- break;
- }
- }
- } finally {
- try {
- src.close();
- } catch (IOException e) {
- // Ignore IO errors on close
- }
- try {
- dst.close();
- } catch (IOException e) {
- // Ignore IO errors on close
- }
- }
- }
- }
-
class SshFetchConnection extends BasePackFetchConnection {
private ChannelExec channel;
+ private Thread errorThread;
+
private int exitStatus;
SshFetchConnection() throws TransportException {
super(TransportGitSsh.this);
try {
+ final MessageWriter msg = new MessageWriter();
+ setMessageWriter(msg);
+
channel = exec(getOptionUploadPack());
+ if (!channel.isConnected())
+ throw new TransportException(uri, "connection failed");
- if (channel.isConnected())
- init(channel.getInputStream(), outputStream(channel));
- else
- throw new TransportException(uri, errStream.toString());
+ final InputStream upErr = channel.getErrStream();
+ errorThread = new StreamCopyThread(upErr, msg.getRawStream());
+ errorThread.start();
+
+ init(channel.getInputStream(), outputStream(channel));
} catch (TransportException err) {
close();
@@ -343,14 +256,24 @@ public class TransportGitSsh extends SshTransport implements PackTransport {
try {
readAdvertisedRefs();
} catch (NoRemoteRepositoryException notFound) {
- close();
- checkExecFailure(exitStatus, getOptionUploadPack());
- throw cleanNotFound(notFound);
+ final String msgs = getMessages();
+ checkExecFailure(exitStatus, getOptionUploadPack(), msgs);
+ throw cleanNotFound(notFound, msgs);
}
}
@Override
public void close() {
+ if (errorThread != null) {
+ try {
+ errorThread.join();
+ } catch (InterruptedException e) {
+ // Stop waiting and return anyway.
+ } finally {
+ errorThread = null;
+ }
+ }
+
super.close();
if (channel != null) {
@@ -368,17 +291,25 @@ public class TransportGitSsh extends SshTransport implements PackTransport {
class SshPushConnection extends BasePackPushConnection {
private ChannelExec channel;
+ private Thread errorThread;
+
private int exitStatus;
SshPushConnection() throws TransportException {
super(TransportGitSsh.this);
try {
+ final MessageWriter msg = new MessageWriter();
+ setMessageWriter(msg);
+
channel = exec(getOptionReceivePack());
+ if (!channel.isConnected())
+ throw new TransportException(uri, "connection failed");
+
+ final InputStream rpErr = channel.getErrStream();
+ errorThread = new StreamCopyThread(rpErr, msg.getRawStream());
+ errorThread.start();
- if (channel.isConnected())
- init(channel.getInputStream(), outputStream(channel));
- else
- throw new TransportException(uri, errStream.toString());
+ init(channel.getInputStream(), outputStream(channel));
} catch (TransportException err) {
close();
@@ -392,14 +323,24 @@ public class TransportGitSsh extends SshTransport implements PackTransport {
try {
readAdvertisedRefs();
} catch (NoRemoteRepositoryException notFound) {
- close();
- checkExecFailure(exitStatus, getOptionReceivePack());
- throw cleanNotFound(notFound);
+ final String msgs = getMessages();
+ checkExecFailure(exitStatus, getOptionReceivePack(), msgs);
+ throw cleanNotFound(notFound, msgs);
}
}
@Override
public void close() {
+ if (errorThread != null) {
+ try {
+ errorThread.join();
+ } catch (InterruptedException e) {
+ // Stop waiting and return anyway.
+ } finally {
+ errorThread = null;
+ }
+ }
+
super.close();
if (channel != null) {
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportHttp.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportHttp.java
index 8de16c13d0..f49828bf2d 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportHttp.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportHttp.java
@@ -632,9 +632,9 @@ public class TransportHttp extends HttpTransport implements WalkTransport,
private final String responseType;
- private final UnionInputStream httpIn;
+ private final HttpExecuteStream execute;
- final HttpInputStream in;
+ final UnionInputStream in;
final HttpOutputStream out;
@@ -645,8 +645,8 @@ public class TransportHttp extends HttpTransport implements WalkTransport,
this.requestType = "application/x-" + serviceName + "-request";
this.responseType = "application/x-" + serviceName + "-result";
- this.httpIn = new UnionInputStream();
- this.in = new HttpInputStream(httpIn);
+ this.execute = new HttpExecuteStream();
+ this.in = new UnionInputStream(execute);
this.out = new HttpOutputStream();
}
@@ -712,7 +712,8 @@ public class TransportHttp extends HttpTransport implements WalkTransport,
throw wrongContentType(responseType, contentType);
}
- httpIn.add(openInputStream(conn));
+ in.add(openInputStream(conn));
+ in.add(execute);
conn = null;
}
@@ -729,43 +730,25 @@ public class TransportHttp extends HttpTransport implements WalkTransport,
}
}
- class HttpInputStream extends InputStream {
- private final UnionInputStream src;
-
- HttpInputStream(UnionInputStream httpIn) {
- this.src = httpIn;
- }
-
- private InputStream self() throws IOException {
- if (src.isEmpty()) {
- // If we have no InputStreams available it means we must
- // have written data previously to the service, but have
- // not yet finished the HTTP request in order to get the
- // response from the service. Ensure we get it now.
- //
- execute();
- }
- return src;
- }
-
+ class HttpExecuteStream extends InputStream {
public int available() throws IOException {
- return self().available();
+ execute();
+ return 0;
}
public int read() throws IOException {
- return self().read();
+ execute();
+ return -1;
}
public int read(byte[] b, int off, int len) throws IOException {
- return self().read(b, off, len);
+ execute();
+ return -1;
}
public long skip(long n) throws IOException {
- return self().skip(n);
- }
-
- public void close() throws IOException {
- src.close();
+ execute();
+ return 0;
}
}
}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportLocal.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportLocal.java
index a99a9b4131..b9b9dbd001 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportLocal.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportLocal.java
@@ -1,6 +1,6 @@
/*
* Copyright (C) 2007, Dave Watson <dwatson@mimvista.com>
- * Copyright (C) 2008-2009, Google Inc.
+ * Copyright (C) 2008-2010, Google Inc.
* Copyright (C) 2008, Marek Zawirski <marek.zawirski@gmail.com>
* Copyright (C) 2008, Robin Rosenberg <robin.rosenberg@dewire.com>
* Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
@@ -47,6 +47,8 @@
package org.eclipse.jgit.transport;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -59,6 +61,8 @@ import org.eclipse.jgit.errors.TransportException;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.util.FS;
+import org.eclipse.jgit.util.io.MessageWriter;
+import org.eclipse.jgit.util.io.StreamCopyThread;
/**
* Transport to access a local directory as though it were a remote peer.
@@ -129,11 +133,10 @@ class TransportLocal extends Transport implements PackTransport {
// Resources must be established per-connection.
}
- protected Process startProcessWithErrStream(final String cmd)
+ protected Process spawn(final String cmd)
throws TransportException {
try {
final String[] args;
- final Process proc;
if (cmd.startsWith("git-")) {
args = new String[] { "git", cmd.substring(4), PWD };
@@ -148,9 +151,7 @@ class TransportLocal extends Transport implements PackTransport {
}
}
- proc = Runtime.getRuntime().exec(args, null, remoteGitDir);
- new StreamRewritingThread(cmd, proc.getErrorStream()).start();
- return proc;
+ return Runtime.getRuntime().exec(args, null, remoteGitDir);
} catch (IOException err) {
throw new TransportException(uri, err.getMessage(), err);
}
@@ -246,11 +247,26 @@ class TransportLocal extends Transport implements PackTransport {
class ForkLocalFetchConnection extends BasePackFetchConnection {
private Process uploadPack;
+ private Thread errorReaderThread;
+
ForkLocalFetchConnection() throws TransportException {
super(TransportLocal.this);
- uploadPack = startProcessWithErrStream(getOptionUploadPack());
- final InputStream upIn = uploadPack.getInputStream();
- final OutputStream upOut = uploadPack.getOutputStream();
+
+ final MessageWriter msg = new MessageWriter();
+ setMessageWriter(msg);
+
+ uploadPack = spawn(getOptionUploadPack());
+
+ final InputStream upErr = uploadPack.getErrorStream();
+ errorReaderThread = new StreamCopyThread(upErr, msg.getRawStream());
+ errorReaderThread.start();
+
+ InputStream upIn = uploadPack.getInputStream();
+ OutputStream upOut = uploadPack.getOutputStream();
+
+ upIn = new BufferedInputStream(upIn);
+ upOut = new BufferedOutputStream(upOut);
+
init(upIn, upOut);
readAdvertisedRefs();
}
@@ -268,6 +284,16 @@ class TransportLocal extends Transport implements PackTransport {
uploadPack = null;
}
}
+
+ if (errorReaderThread != null) {
+ try {
+ errorReaderThread.join();
+ } catch (InterruptedException e) {
+ // Stop waiting and return anyway.
+ } finally {
+ errorReaderThread = null;
+ }
+ }
}
}
@@ -351,11 +377,26 @@ class TransportLocal extends Transport implements PackTransport {
class ForkLocalPushConnection extends BasePackPushConnection {
private Process receivePack;
+ private Thread errorReaderThread;
+
ForkLocalPushConnection() throws TransportException {
super(TransportLocal.this);
- receivePack = startProcessWithErrStream(getOptionReceivePack());
- final InputStream rpIn = receivePack.getInputStream();
- final OutputStream rpOut = receivePack.getOutputStream();
+
+ final MessageWriter msg = new MessageWriter();
+ setMessageWriter(msg);
+
+ receivePack = spawn(getOptionReceivePack());
+
+ final InputStream rpErr = receivePack.getErrorStream();
+ errorReaderThread = new StreamCopyThread(rpErr, msg.getRawStream());
+ errorReaderThread.start();
+
+ InputStream rpIn = receivePack.getInputStream();
+ OutputStream rpOut = receivePack.getOutputStream();
+
+ rpIn = new BufferedInputStream(rpIn);
+ rpOut = new BufferedOutputStream(rpOut);
+
init(rpIn, rpOut);
readAdvertisedRefs();
}
@@ -373,34 +414,14 @@ class TransportLocal extends Transport implements PackTransport {
receivePack = null;
}
}
- }
- }
-
- static class StreamRewritingThread extends Thread {
- private final InputStream in;
- StreamRewritingThread(final String cmd, final InputStream in) {
- super("JGit " + cmd + " Errors");
- this.in = in;
- }
-
- public void run() {
- final byte[] tmp = new byte[512];
- try {
- for (;;) {
- final int n = in.read(tmp);
- if (n < 0)
- break;
- System.err.write(tmp, 0, n);
- System.err.flush();
- }
- } catch (IOException err) {
- // Ignore errors reading errors.
- } finally {
+ if (errorReaderThread != null) {
try {
- in.close();
- } catch (IOException err2) {
- // Ignore errors closing the pipe.
+ errorReaderThread.join();
+ } catch (InterruptedException e) {
+ // Stop waiting and return anyway.
+ } finally {
+ errorReaderThread = null;
}
}
}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java
index b76b22b77e..3d5abd34bd 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java
@@ -43,9 +43,6 @@
package org.eclipse.jgit.transport;
-import static org.eclipse.jgit.transport.BasePackFetchConnection.MultiAck;
-
-import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
@@ -70,6 +67,7 @@ import org.eclipse.jgit.revwalk.RevFlagSet;
import org.eclipse.jgit.revwalk.RevObject;
import org.eclipse.jgit.revwalk.RevTag;
import org.eclipse.jgit.revwalk.RevWalk;
+import org.eclipse.jgit.transport.BasePackFetchConnection.MultiAck;
import org.eclipse.jgit.transport.RefAdvertiser.PacketLineOutRefAdvertiser;
import org.eclipse.jgit.util.io.InterruptTimer;
import org.eclipse.jgit.util.io.TimeoutInputStream;
@@ -556,13 +554,12 @@ public class UploadPack {
int bufsz = SideBandOutputStream.SMALL_BUF;
if (options.contains(OPTION_SIDE_BAND_64K))
bufsz = SideBandOutputStream.MAX_BUF;
- bufsz -= SideBandOutputStream.HDR_SIZE;
-
- packOut = new BufferedOutputStream(new SideBandOutputStream(
- SideBandOutputStream.CH_DATA, pckOut), bufsz);
+ packOut = new SideBandOutputStream(SideBandOutputStream.CH_DATA,
+ bufsz, rawOut);
if (progress)
- pm = new SideBandProgressMonitor(pckOut);
+ pm = new SideBandProgressMonitor(new SideBandOutputStream(
+ SideBandOutputStream.CH_PROGRESS, bufsz, rawOut));
}
final PackWriter pw;
@@ -586,12 +583,9 @@ public class UploadPack {
}
}
pw.writePack(packOut);
+ packOut.flush();
- if (sideband) {
- packOut.flush();
+ if (sideband)
pckOut.end();
- } else {
- rawOut.flush();
- }
}
}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/WalkPushConnection.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/WalkPushConnection.java
index 88b7ca438b..f977915bb3 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/WalkPushConnection.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/WalkPushConnection.java
@@ -45,6 +45,7 @@ package org.eclipse.jgit.transport;
import static org.eclipse.jgit.transport.WalkRemoteObjectDatabase.ROOT_DIR;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
@@ -251,6 +252,7 @@ class WalkPushConnection extends BaseConnection implements PushConnection {
final String wt = "Put " + base.substring(0, 12);
OutputStream os = dest.writeFile(pathPack, monitor, wt + "..pack");
try {
+ os = new BufferedOutputStream(os);
pw.writePack(os);
} finally {
os.close();
@@ -258,6 +260,7 @@ class WalkPushConnection extends BaseConnection implements PushConnection {
os = dest.writeFile(pathIdx, monitor, wt + "..idx");
try {
+ os = new BufferedOutputStream(os);
pw.writeIndex(os);
} finally {
os.close();
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/MessageWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/MessageWriter.java
new file mode 100644
index 0000000000..22c3ce94ed
--- /dev/null
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/MessageWriter.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright (C) 2009-2010, Google Inc.
+ * and other copyright owners as documented in the project's IP log.
+ *
+ * This program and the accompanying materials are made available
+ * under the terms of the Eclipse Distribution License v1.0 which
+ * accompanies this distribution, is reproduced below, and is
+ * available at http://www.eclipse.org/org/documents/edl-v10.php
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * - Neither the name of the Eclipse Foundation, Inc. nor the
+ * names of its contributors may be used to endorse or promote
+ * products derived from this software without specific prior
+ * written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.eclipse.jgit.util.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+import org.eclipse.jgit.lib.Constants;
+import org.eclipse.jgit.transport.BaseConnection;
+import org.eclipse.jgit.util.RawParseUtils;
+
+/**
+ * Combines messages from an OutputStream (hopefully in UTF-8) and a Writer.
+ * <p>
+ * This class is primarily meant for {@link BaseConnection} in contexts where a
+ * standard error stream from a command execution, as well as messages from a
+ * side-band channel, need to be combined together into a buffer to represent
+ * the complete set of messages from a remote repository.
+ * <p>
+ * Writes made to the writer are re-encoded as UTF-8 and interleaved into the
+ * buffer that {@link #getRawStream()} also writes to.
+ * <p>
+ * {@link #toString()} returns all written data, after converting it to a String
+ * under the assumption of UTF-8 encoding.
+ * <p>
+ * Internally {@link RawParseUtils#decode(byte[])} is used by {@code toString()}
+ * tries to work out a reasonably correct character set for the raw data.
+ */
+public class MessageWriter extends Writer {
+ private final ByteArrayOutputStream buf;
+
+ private final OutputStreamWriter enc;
+
+ /** Create an empty writer. */
+ public MessageWriter() {
+ buf = new ByteArrayOutputStream();
+ enc = new OutputStreamWriter(getRawStream(), Constants.CHARSET);
+ }
+
+ @Override
+ public void write(char[] cbuf, int off, int len) throws IOException {
+ synchronized (buf) {
+ enc.write(cbuf, off, len);
+ enc.flush();
+ }
+ }
+
+ /**
+ * @return the underlying byte stream that character writes to this writer
+ * drop into. Writes to this stream should should be in UTF-8.
+ */
+ public OutputStream getRawStream() {
+ return buf;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Do nothing, we are buffered with no resources.
+ }
+
+ @Override
+ public void flush() throws IOException {
+ // Do nothing, we are buffered with no resources.
+ }
+
+ /** @return string version of all buffered data. */
+ @Override
+ public String toString() {
+ return RawParseUtils.decode(buf.toByteArray());
+ }
+}
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java
new file mode 100644
index 0000000000..a2b9540170
--- /dev/null
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright (C) 2009-2010, Google Inc.
+ * and other copyright owners as documented in the project's IP log.
+ *
+ * This program and the accompanying materials are made available
+ * under the terms of the Eclipse Distribution License v1.0 which
+ * accompanies this distribution, is reproduced below, and is
+ * available at http://www.eclipse.org/org/documents/edl-v10.php
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * - Neither the name of the Eclipse Foundation, Inc. nor the
+ * names of its contributors may be used to endorse or promote
+ * products derived from this software without specific prior
+ * written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.eclipse.jgit.util.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+
+/** Thread to copy from an input stream to an output stream. */
+public class StreamCopyThread extends Thread {
+ private static final int BUFFER_SIZE = 1024;
+
+ private final InputStream src;
+
+ private final OutputStream dst;
+
+ private volatile boolean doFlush;
+
+ /**
+ * Create a thread to copy data from an input stream to an output stream.
+ *
+ * @param i
+ * stream to copy from. The thread terminates when this stream
+ * reaches EOF. The thread closes this stream before it exits.
+ * @param o
+ * stream to copy into. The destination stream is automatically
+ * closed when the thread terminates.
+ */
+ public StreamCopyThread(final InputStream i, final OutputStream o) {
+ setName(Thread.currentThread().getName() + "-StreamCopy");
+ src = i;
+ dst = o;
+ }
+
+ /**
+ * Request the thread to flush the output stream as soon as possible.
+ * <p>
+ * This is an asynchronous request to the thread. The actual flush will
+ * happen at some future point in time, when the thread wakes up to process
+ * the request.
+ */
+ public void flush() {
+ if (!doFlush) {
+ doFlush = true;
+ interrupt();
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ final byte[] buf = new byte[BUFFER_SIZE];
+ for (;;) {
+ try {
+ if (doFlush) {
+ doFlush = false;
+ dst.flush();
+ }
+
+ final int n;
+ try {
+ n = src.read(buf);
+ } catch (InterruptedIOException wakey) {
+ continue;
+ }
+ if (n < 0)
+ break;
+ dst.write(buf, 0, n);
+ } catch (IOException e) {
+ break;
+ }
+ }
+ } finally {
+ try {
+ src.close();
+ } catch (IOException e) {
+ // Ignore IO errors on close
+ }
+ try {
+ dst.close();
+ } catch (IOException e) {
+ // Ignore IO errors on close
+ }
+ }
+ }
+}