Browse Source

Capture non-progress side band #2 messages and put in result

Any messages received on side band #2 that aren't scraped as a
progress message into our ProgressMonitor are now forwarded to a
buffer which is later included into the OperationResult object.
Application callers can use this buffer to present the additional
messages from the remote peer after the push or fetch operation
has concluded.

The smart push connections using the native send-pack/receive-pack
protocol now request side-band-64k capability if it is available
and forward any messages received through that channel onto this
message buffer.  This makes hook messages available over smart HTTP,
or even over SSH.

The SSH transport was modified to redirect the remote command's
stderr stream into the message buffer, interleaved with any data
received over side band #2.  Due to buffering between these two
different channels in the SSH channel mux itself the order of any
writes between the two cannot be ensured, but it tries to stay close.

The local fork transport was also modified to redirect the local
receive-pack's stderr into the message buffer, rather than going to
the invoking JVM's System.err.  This gives applications a chance
to log the local error messages, rather than needing to redirect
their JVM's stderr before startup.

To keep things simple, the application has to wait for the entire
operation to complete before it can see the messages.  This may
be a downside if the user is trying to debug a remote hook that is
blocking indefinitely, the user would need to abort the connection
before they can inspect the message buffer in any sort of UI built
on top of JGit.

Change-Id: Ibc215f4569e63071da5b7e5c6674ce924ae39e11
Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
tags/v0.7.0
Shawn O. Pearce 14 years ago
parent
commit
673b3984bd

+ 174
- 0
org.eclipse.jgit.http.test/tst/org/eclipse/jgit/http/test/HookMessageTest.java View File

@@ -0,0 +1,174 @@
/*
* Copyright (C) 2010, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
* under the terms of the Eclipse Distribution License v1.0 which
* accompanies this distribution, is reproduced below, and is
* available at http://www.eclipse.org/org/documents/edl-v10.php
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
*
* - Neither the name of the Eclipse Foundation, Inc. nor the
* names of its contributors may be used to endorse or promote
* products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
* CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package org.eclipse.jgit.http.test;

import java.util.Collection;
import java.util.Collections;
import java.util.List;

import javax.servlet.http.HttpServletRequest;

import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jgit.errors.RepositoryNotFoundException;
import org.eclipse.jgit.http.server.GitServlet;
import org.eclipse.jgit.http.server.resolver.DefaultReceivePackFactory;
import org.eclipse.jgit.http.server.resolver.RepositoryResolver;
import org.eclipse.jgit.http.server.resolver.ServiceNotAuthorizedException;
import org.eclipse.jgit.http.server.resolver.ServiceNotEnabledException;
import org.eclipse.jgit.http.test.util.AccessEvent;
import org.eclipse.jgit.http.test.util.HttpTestCase;
import org.eclipse.jgit.junit.TestRepository;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.NullProgressMonitor;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.lib.RepositoryConfig;
import org.eclipse.jgit.revwalk.RevBlob;
import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.transport.PreReceiveHook;
import org.eclipse.jgit.transport.PushResult;
import org.eclipse.jgit.transport.ReceiveCommand;
import org.eclipse.jgit.transport.ReceivePack;
import org.eclipse.jgit.transport.RemoteRefUpdate;
import org.eclipse.jgit.transport.Transport;
import org.eclipse.jgit.transport.URIish;

public class HookMessageTest extends HttpTestCase {
private Repository remoteRepository;

private URIish remoteURI;

protected void setUp() throws Exception {
super.setUp();

final TestRepository src = createTestRepository();
final String srcName = src.getRepository().getDirectory().getName();

ServletContextHandler app = server.addContext("/git");
GitServlet gs = new GitServlet();
gs.setRepositoryResolver(new RepositoryResolver() {
public Repository open(HttpServletRequest req, String name)
throws RepositoryNotFoundException,
ServiceNotEnabledException {
if (!name.equals(srcName))
throw new RepositoryNotFoundException(name);

final Repository db = src.getRepository();
db.incrementOpen();
return db;
}
});
gs.setReceivePackFactory(new DefaultReceivePackFactory() {
public ReceivePack create(HttpServletRequest req, Repository db)
throws ServiceNotEnabledException,
ServiceNotAuthorizedException {
ReceivePack recv = super.create(req, db);
recv.setPreReceiveHook(new PreReceiveHook() {
public void onPreReceive(ReceivePack rp,
Collection<ReceiveCommand> commands) {
rp.sendMessage("message line 1");
rp.sendError("no soup for you!");
rp.sendMessage("come back next year!");
}
});
return recv;
}

});
app.addServlet(new ServletHolder(gs), "/*");

server.setUp();

remoteRepository = src.getRepository();
remoteURI = toURIish(app, srcName);

RepositoryConfig cfg = remoteRepository.getConfig();
cfg.setBoolean("http", null, "receivepack", true);
cfg.save();
}

public void testPush_CreateBranch() throws Exception {
final TestRepository src = createTestRepository();
final RevBlob Q_txt = src.blob("new text");
final RevCommit Q = src.commit().add("Q", Q_txt).create();
final Repository db = src.getRepository();
final String dstName = Constants.R_HEADS + "new.branch";
Transport t;
PushResult result;

t = Transport.open(db, remoteURI);
try {
final String srcExpr = Q.name();
final boolean forceUpdate = false;
final String localName = null;
final ObjectId oldId = null;

RemoteRefUpdate update = new RemoteRefUpdate(src.getRepository(),
srcExpr, dstName, forceUpdate, localName, oldId);
result = t.push(NullProgressMonitor.INSTANCE, Collections
.singleton(update));
} finally {
t.close();
}

assertTrue(remoteRepository.hasObject(Q_txt));
assertNotNull("has " + dstName, remoteRepository.getRef(dstName));
assertEquals(Q, remoteRepository.getRef(dstName).getObjectId());
fsck(remoteRepository, Q);

List<AccessEvent> requests = getRequests();
assertEquals(2, requests.size());

AccessEvent service = requests.get(1);
assertEquals("POST", service.getMethod());
assertEquals(join(remoteURI, "git-receive-pack"), service.getPath());
assertEquals(200, service.getStatus());

assertEquals("message line 1\n" //
+ "error: no soup for you!\n" //
+ "come back next year!\n", //
result.getMessages());
}
}

+ 29
- 1
org.eclipse.jgit.pgm/src/org/eclipse/jgit/pgm/AbstractFetchCommand.java View File

@@ -1,6 +1,6 @@
/*
* Copyright (C) 2008, Charles O'Farrell <charleso@charleso.org>
* Copyright (C) 2008-2009, Google Inc.
* Copyright (C) 2008-2010, Google Inc.
* Copyright (C) 2008, Marek Zawirski <marek.zawirski@gmail.com>
* Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
* and other copyright owners as documented in the project's IP log.
@@ -79,6 +79,34 @@ abstract class AbstractFetchCommand extends TextBuiltin {
out.format(" %c %-17s %-10s -> %s", type, longType, src, dst);
out.println();
}

showRemoteMessages(r.getMessages());
}

static void showRemoteMessages(String pkt) {
while (0 < pkt.length()) {
final int lf = pkt.indexOf('\n');
final int cr = pkt.indexOf('\r');
final int s;
if (0 <= lf && 0 <= cr)
s = Math.min(lf, cr);
else if (0 <= lf)
s = lf;
else if (0 <= cr)
s = cr;
else {
System.err.println("remote: " + pkt);
break;
}

if (pkt.charAt(s) == '\r')
System.err.print("remote: " + pkt.substring(0, s) + "\r");
else
System.err.println("remote: " + pkt.substring(0, s));

pkt = pkt.substring(s + 1);
}
System.err.flush();
}

private String longTypeOf(final TrackingRefUpdate u) {

+ 1
- 0
org.eclipse.jgit.pgm/src/org/eclipse/jgit/pgm/Push.java View File

@@ -162,6 +162,7 @@ class Push extends TextBuiltin {
printRefUpdateResult(uri, result, rru);
}

AbstractFetchCommand.showRemoteMessages(result.getMessages());
if (everythingUpToDate)
out.println("Everything up-to-date");
}

+ 35
- 2
org.eclipse.jgit/src/org/eclipse/jgit/transport/BaseConnection.java View File

@@ -1,4 +1,5 @@
/*
* Copyright (C) 2010, Google Inc.
* Copyright (C) 2008, Marek Zawirski <marek.zawirski@gmail.com>
* Copyright (C) 2008, Robin Rosenberg <robin.rosenberg@dewire.com>
* Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
@@ -45,6 +46,8 @@

package org.eclipse.jgit.transport;

import java.io.StringWriter;
import java.io.Writer;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
@@ -58,12 +61,13 @@ import org.eclipse.jgit.lib.Ref;
* @see BasePackConnection
* @see BaseFetchConnection
*/
abstract class BaseConnection implements Connection {

public abstract class BaseConnection implements Connection {
private Map<String, Ref> advertisedRefs = Collections.emptyMap();

private boolean startedOperation;

private Writer messageWriter;

public Map<String, Ref> getRefsMap() {
return advertisedRefs;
}
@@ -76,6 +80,10 @@ abstract class BaseConnection implements Connection {
return advertisedRefs.get(name);
}

public String getMessages() {
return messageWriter != null ? messageWriter.toString() : "";
}

public abstract void close();

/**
@@ -106,4 +114,29 @@ abstract class BaseConnection implements Connection {
"Only one operation call per connection is supported.");
startedOperation = true;
}

/**
* Get the writer that buffers messages from the remote side.
*
* @return writer to store messages from the remote.
*/
protected Writer getMessageWriter() {
if (messageWriter == null)
setMessageWriter(new StringWriter());
return messageWriter;
}

/**
* Set the writer that buffers messages from the remote side.
*
* @param writer
* the writer that messages will be delivered to. The writer's
* {@code toString()} method should be overridden to return the
* complete contents.
*/
protected void setMessageWriter(Writer writer) {
if (messageWriter != null)
throw new IllegalStateException("Writer already initialized");
messageWriter = writer;
}
}

+ 1
- 1
org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackFetchConnection.java View File

@@ -612,7 +612,7 @@ abstract class BasePackFetchConnection extends BasePackConnection implements

InputStream input = in;
if (sideband)
input = new SideBandInputStream(input, monitor);
input = new SideBandInputStream(input, monitor, getMessageWriter());

ip = IndexPack.create(local, input);
ip.setFixThin(thinPack);

+ 26
- 4
org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackPushConnection.java View File

@@ -94,6 +94,8 @@ class BasePackPushConnection extends BasePackConnection implements

private boolean capableReport;

private boolean capableSideBand;

private boolean capableOfsDelta;

private boolean sentCommand;
@@ -145,8 +147,21 @@ class BasePackPushConnection extends BasePackConnection implements
writeCommands(refUpdates.values(), monitor);
if (writePack)
writePack(refUpdates, monitor);
if (sentCommand && capableReport)
readStatusReport(refUpdates);
if (sentCommand) {
if (capableReport)
readStatusReport(refUpdates);
if (capableSideBand) {
// Ensure the data channel is at EOF, so we know we have
// read all side-band data from all channels and have a
// complete copy of the messages (if any) buffered from
// the other data channels.
//
int b = in.read();
if (0 <= b)
throw new TransportException(uri, "expected EOF;"
+ " received '" + (char) b + "' instead");
}
}
} catch (TransportException e) {
throw e;
} catch (Exception e) {
@@ -158,7 +173,7 @@ class BasePackPushConnection extends BasePackConnection implements

private void writeCommands(final Collection<RemoteRefUpdate> refUpdates,
final ProgressMonitor monitor) throws IOException {
final String capabilities = enableCapabilities();
final String capabilities = enableCapabilities(monitor);
for (final RemoteRefUpdate rru : refUpdates) {
if (!capableDeleteRefs && rru.isDelete()) {
rru.setStatus(Status.REJECTED_NODELETE);
@@ -191,11 +206,18 @@ class BasePackPushConnection extends BasePackConnection implements
outNeedsEnd = false;
}

private String enableCapabilities() {
private String enableCapabilities(final ProgressMonitor monitor) {
final StringBuilder line = new StringBuilder();
capableReport = wantCapability(line, CAPABILITY_REPORT_STATUS);
capableDeleteRefs = wantCapability(line, CAPABILITY_DELETE_REFS);
capableOfsDelta = wantCapability(line, CAPABILITY_OFS_DELTA);

capableSideBand = wantCapability(line, CAPABILITY_SIDE_BAND_64K);
if (capableSideBand) {
in = new SideBandInputStream(in, monitor, getMessageWriter());
pckIn = new PacketLineIn(in);
}

if (line.length() > 0)
line.setCharAt(0, '\0');
return line.toString();

+ 20
- 0
org.eclipse.jgit/src/org/eclipse/jgit/transport/Connection.java View File

@@ -1,4 +1,5 @@
/*
* Copyright (C) 2010, Google Inc.
* Copyright (C) 2008, Marek Zawirski <marek.zawirski@gmail.com>
* Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
* and other copyright owners as documented in the project's IP log.
@@ -104,7 +105,26 @@ public interface Connection {
* must close that network socket, disconnecting the two peers. If the
* remote repository is actually local (same system) this method must close
* any open file handles used to read the "remote" repository.
* <p>
* If additional messages were produced by the remote peer, these should
* still be retained in the connection instance for {@link #getMessages()}.
*/
public void close();

/**
* Get the additional messages, if any, returned by the remote process.
* <p>
* These messages are most likely informational or error messages, sent by
* the remote peer, to help the end-user correct any problems that may have
* prevented the operation from completing successfully. Application UIs
* should try to show these in an appropriate context.
* <p>
* The message buffer is available after {@link #close()} has been called.
* Prior to closing the connection, the message buffer may be empty.
*
* @return the messages returned by the remote, most likely terminated by a
* newline (LF) character. The empty string is returned if the
* remote produced no additional messages.
*/
public String getMessages();
}

+ 4
- 3
org.eclipse.jgit/src/org/eclipse/jgit/transport/FetchProcess.java View File

@@ -146,7 +146,7 @@ class FetchProcess {
// Connection was used for object transfer. If we
// do another fetch we must open a new connection.
//
closeConnection();
closeConnection(result);
} else {
includedTags = false;
}
@@ -170,7 +170,7 @@ class FetchProcess {
}
}
} finally {
closeConnection();
closeConnection(result);
}

final RevWalk walk = new RevWalk(transport.local);
@@ -210,9 +210,10 @@ class FetchProcess {
"peer did not supply a complete object graph");
}

private void closeConnection() {
private void closeConnection(final FetchResult result) {
if (conn != null) {
conn.close();
result.addMessages(conn.getMessages());
conn = null;
}
}

+ 29
- 0
org.eclipse.jgit/src/org/eclipse/jgit/transport/OperationResult.java View File

@@ -1,4 +1,5 @@
/*
* Copyright (C) 2010, Google Inc.
* Copyright (C) 2008, Marek Zawirski <marek.zawirski@gmail.com>
* Copyright (C) 2007-2009, Robin Rosenberg <robin.rosenberg@dewire.com>
* Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
@@ -65,6 +66,8 @@ public abstract class OperationResult {

final SortedMap<String, TrackingRefUpdate> updates = new TreeMap<String, TrackingRefUpdate>();

StringBuilder messageBuffer;

/**
* Get the URI this result came from.
* <p>
@@ -136,4 +139,30 @@ public abstract class OperationResult {
void add(final TrackingRefUpdate u) {
updates.put(u.getLocalName(), u);
}

/**
* Get the additional messages, if any, returned by the remote process.
* <p>
* These messages are most likely informational or error messages, sent by
* the remote peer, to help the end-user correct any problems that may have
* prevented the operation from completing successfully. Application UIs
* should try to show these in an appropriate context.
*
* @return the messages returned by the remote, most likely terminated by a
* newline (LF) character. The empty string is returned if the
* remote produced no additional messages.
*/
public String getMessages() {
return messageBuffer != null ? messageBuffer.toString() : "";
}

void addMessages(final String msg) {
if (msg != null && msg.length() > 0) {
if (messageBuffer == null)
messageBuffer = new StringBuilder();
messageBuffer.append(msg);
if (!msg.endsWith("\n"))
messageBuffer.append('\n');
}
}
}

+ 11
- 14
org.eclipse.jgit/src/org/eclipse/jgit/transport/PushProcess.java View File

@@ -122,8 +122,12 @@ class PushProcess {
PushResult execute(final ProgressMonitor monitor)
throws NotSupportedException, TransportException {
monitor.beginTask(PROGRESS_OPENING_CONNECTION, ProgressMonitor.UNKNOWN);

final PushResult res = new PushResult();
connection = transport.openPush();
try {
res.setAdvertisedRefs(transport.getURI(), connection.getRefsMap());
res.setRemoteUpdates(toPush);
monitor.endTask();

final Map<String, RemoteRefUpdate> preprocessed = prepareRemoteUpdates();
@@ -133,10 +137,16 @@ class PushProcess {
connection.push(monitor, preprocessed);
} finally {
connection.close();
res.addMessages(connection.getMessages());
}
if (!transport.isDryRun())
updateTrackingRefs();
return prepareOperationResult();
for (final RemoteRefUpdate rru : toPush.values()) {
final TrackingRefUpdate tru = rru.getTrackingRefUpdate();
if (tru != null)
res.add(tru);
}
return res;
}

private Map<String, RemoteRefUpdate> prepareRemoteUpdates()
@@ -226,17 +236,4 @@ class PushProcess {
}
}
}

private PushResult prepareOperationResult() {
final PushResult result = new PushResult();
result.setAdvertisedRefs(transport.getURI(), connection.getRefsMap());
result.setRemoteUpdates(toPush);

for (final RemoteRefUpdate rru : toPush.values()) {
final TrackingRefUpdate tru = rru.getTrackingRefUpdate();
if (tru != null)
result.add(tru);
}
return result;
}
}

+ 15
- 13
org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandInputStream.java View File

@@ -48,6 +48,7 @@ import static org.eclipse.jgit.transport.SideBandOutputStream.HDR_SIZE;

import java.io.IOException;
import java.io.InputStream;
import java.io.Writer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@@ -83,10 +84,10 @@ class SideBandInputStream extends InputStream {
static final int CH_ERROR = 3;

private static Pattern P_UNBOUNDED = Pattern
.compile("^([\\w ]+): +(\\d+)(?:, done\\.)? *$");
.compile("^([\\w ]+): +(\\d+)(?:, done\\.)? *[\r\n]$");

private static Pattern P_BOUNDED = Pattern
.compile("^([\\w ]+): +\\d+% +\\( *(\\d+)/ *(\\d+)\\)(?:, done\\.)? *$");
.compile("^([\\w ]+): +\\d+% +\\( *(\\d+)/ *(\\d+)\\)(?:, done\\.)? *[\r\n]$");

private final InputStream rawIn;

@@ -94,6 +95,8 @@ class SideBandInputStream extends InputStream {

private final ProgressMonitor monitor;

private final Writer messages;

private String progressBuffer = "";

private String currentTask;
@@ -106,10 +109,12 @@ class SideBandInputStream extends InputStream {

private int available;

SideBandInputStream(final InputStream in, final ProgressMonitor progress) {
SideBandInputStream(final InputStream in, final ProgressMonitor progress,
final Writer messageStream) {
rawIn = in;
pckIn = new PacketLineIn(rawIn);
monitor = progress;
messages = messageStream;
currentTask = "";
}

@@ -170,7 +175,7 @@ class SideBandInputStream extends InputStream {
}
}

private void progress(String pkt) {
private void progress(String pkt) throws IOException {
pkt = progressBuffer + pkt;
for (;;) {
final int lf = pkt.indexOf('\n');
@@ -185,16 +190,13 @@ class SideBandInputStream extends InputStream {
else
break;

final String msg = pkt.substring(0, s);
if (doProgressLine(msg))
pkt = pkt.substring(s + 1);
else
break;
doProgressLine(pkt.substring(0, s + 1));
pkt = pkt.substring(s + 1);
}
progressBuffer = pkt;
}

private boolean doProgressLine(final String msg) {
private void doProgressLine(final String msg) throws IOException {
Matcher matcher;

matcher = P_BOUNDED.matcher(msg);
@@ -208,7 +210,7 @@ class SideBandInputStream extends InputStream {
final int cnt = Integer.parseInt(matcher.group(2));
monitor.update(cnt - lastCnt);
lastCnt = cnt;
return true;
return;
}

matcher = P_UNBOUNDED.matcher(msg);
@@ -222,10 +224,10 @@ class SideBandInputStream extends InputStream {
final int cnt = Integer.parseInt(matcher.group(2));
monitor.update(cnt - lastCnt);
lastCnt = cnt;
return true;
return;
}

return false;
messages.write(msg);
}

private void beginTask(final int totalWorkUnits) {

+ 24
- 115
org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportGitSsh.java View File

@@ -1,5 +1,4 @@
/*
* Copyright (C) 2008-2010, Google Inc.
* Copyright (C) 2008, Marek Zawirski <marek.zawirski@gmail.com>
* Copyright (C) 2008, Robin Rosenberg <robin.rosenberg@dewire.com>
* Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
@@ -47,8 +46,6 @@
package org.eclipse.jgit.transport;

import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
@@ -57,6 +54,8 @@ import org.eclipse.jgit.errors.NoRemoteRepositoryException;
import org.eclipse.jgit.errors.TransportException;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.util.QuotedString;
import org.eclipse.jgit.util.io.MessageWriter;
import org.eclipse.jgit.util.io.StreamCopyThread;

import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.JSchException;
@@ -88,8 +87,6 @@ public class TransportGitSsh extends SshTransport implements PackTransport {
return false;
}

OutputStream errStream;

TransportGitSsh(final Repository local, final URIish uri) {
super(local, uri);
}
@@ -145,15 +142,15 @@ public class TransportGitSsh extends SshTransport implements PackTransport {
return cmd.toString();
}

ChannelExec exec(final String exe) throws TransportException {
ChannelExec exec(final String exe, final OutputStream err)
throws TransportException {
initSession();

final int tms = getTimeout() > 0 ? getTimeout() * 1000 : 0;
try {
final ChannelExec channel = (ChannelExec) sock.openChannel("exec");
channel.setCommand(commandFor(exe));
errStream = createErrorStream();
channel.setErrStream(errStream, true);
channel.setErrStream(err);
channel.connect(tms);
return channel;
} catch (JSchException je) {
@@ -161,9 +158,9 @@ public class TransportGitSsh extends SshTransport implements PackTransport {
}
}

void checkExecFailure(int status, String exe) throws TransportException {
void checkExecFailure(int status, String exe, String why)
throws TransportException {
if (status == 127) {
String why = errStream.toString();
IOException cause = null;
if (why != null && why.length() > 0)
cause = new IOException(why);
@@ -172,41 +169,8 @@ public class TransportGitSsh extends SshTransport implements PackTransport {
}
}

/**
* @return the error stream for the channel, the stream is used to detect
* specific error reasons for exceptions.
*/
private static OutputStream createErrorStream() {
return new OutputStream() {
private StringBuilder all = new StringBuilder();

private StringBuilder sb = new StringBuilder();

public String toString() {
String r = all.toString();
while (r.endsWith("\n"))
r = r.substring(0, r.length() - 1);
return r;
}

@Override
public void write(final int b) throws IOException {
if (b == '\r') {
return;
}

sb.append((char) b);

if (b == '\n') {
all.append(sb);
sb.setLength(0);
}
}
};
}

NoRemoteRepositoryException cleanNotFound(NoRemoteRepositoryException nf) {
String why = errStream.toString();
NoRemoteRepositoryException cleanNotFound(NoRemoteRepositoryException nf,
String why) {
if (why == null || why.length() == 0)
return nf;

@@ -235,7 +199,7 @@ public class TransportGitSsh extends SshTransport implements PackTransport {
if (getTimeout() <= 0)
return out;
final PipedInputStream pipeIn = new PipedInputStream();
final CopyThread copyThread = new CopyThread(pipeIn, out);
final StreamCopyThread copyThread = new StreamCopyThread(pipeIn, out);
final PipedOutputStream pipeOut = new PipedOutputStream(pipeIn) {
@Override
public void flush() throws IOException {
@@ -257,65 +221,6 @@ public class TransportGitSsh extends SshTransport implements PackTransport {
return pipeOut;
}

private static class CopyThread extends Thread {
private final InputStream src;

private final OutputStream dst;

private volatile boolean doFlush;

CopyThread(final InputStream i, final OutputStream o) {
setName(Thread.currentThread().getName() + "-Output");
src = i;
dst = o;
}

void flush() {
if (!doFlush) {
doFlush = true;
interrupt();
}
}

@Override
public void run() {
try {
final byte[] buf = new byte[1024];
for (;;) {
try {
if (doFlush) {
doFlush = false;
dst.flush();
}

final int n;
try {
n = src.read(buf);
} catch (InterruptedIOException wakey) {
continue;
}
if (n < 0)
break;
dst.write(buf, 0, n);
} catch (IOException e) {
break;
}
}
} finally {
try {
src.close();
} catch (IOException e) {
// Ignore IO errors on close
}
try {
dst.close();
} catch (IOException e) {
// Ignore IO errors on close
}
}
}
}

class SshFetchConnection extends BasePackFetchConnection {
private ChannelExec channel;

@@ -324,12 +229,14 @@ public class TransportGitSsh extends SshTransport implements PackTransport {
SshFetchConnection() throws TransportException {
super(TransportGitSsh.this);
try {
channel = exec(getOptionUploadPack());
final MessageWriter msg = new MessageWriter();
setMessageWriter(msg);
channel = exec(getOptionUploadPack(), msg.getRawStream());

if (channel.isConnected())
init(channel.getInputStream(), outputStream(channel));
else
throw new TransportException(uri, errStream.toString());
throw new TransportException(uri, getMessages());

} catch (TransportException err) {
close();
@@ -343,9 +250,9 @@ public class TransportGitSsh extends SshTransport implements PackTransport {
try {
readAdvertisedRefs();
} catch (NoRemoteRepositoryException notFound) {
close();
checkExecFailure(exitStatus, getOptionUploadPack());
throw cleanNotFound(notFound);
final String msgs = getMessages();
checkExecFailure(exitStatus, getOptionUploadPack(), msgs);
throw cleanNotFound(notFound, msgs);
}
}

@@ -373,12 +280,14 @@ public class TransportGitSsh extends SshTransport implements PackTransport {
SshPushConnection() throws TransportException {
super(TransportGitSsh.this);
try {
channel = exec(getOptionReceivePack());
final MessageWriter msg = new MessageWriter();
setMessageWriter(msg);
channel = exec(getOptionReceivePack(), msg.getRawStream());

if (channel.isConnected())
init(channel.getInputStream(), outputStream(channel));
else
throw new TransportException(uri, errStream.toString());
throw new TransportException(uri, getMessages());

} catch (TransportException err) {
close();
@@ -392,9 +301,9 @@ public class TransportGitSsh extends SshTransport implements PackTransport {
try {
readAdvertisedRefs();
} catch (NoRemoteRepositoryException notFound) {
close();
checkExecFailure(exitStatus, getOptionReceivePack());
throw cleanNotFound(notFound);
final String msgs = getMessages();
checkExecFailure(exitStatus, getOptionReceivePack(), msgs);
throw cleanNotFound(notFound, msgs);
}
}


+ 45
- 34
org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportLocal.java View File

@@ -1,6 +1,6 @@
/*
* Copyright (C) 2007, Dave Watson <dwatson@mimvista.com>
* Copyright (C) 2008-2009, Google Inc.
* Copyright (C) 2008-2010, Google Inc.
* Copyright (C) 2008, Marek Zawirski <marek.zawirski@gmail.com>
* Copyright (C) 2008, Robin Rosenberg <robin.rosenberg@dewire.com>
* Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
@@ -59,6 +59,8 @@ import org.eclipse.jgit.errors.TransportException;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.util.FS;
import org.eclipse.jgit.util.io.MessageWriter;
import org.eclipse.jgit.util.io.StreamCopyThread;

/**
* Transport to access a local directory as though it were a remote peer.
@@ -129,11 +131,10 @@ class TransportLocal extends Transport implements PackTransport {
// Resources must be established per-connection.
}

protected Process startProcessWithErrStream(final String cmd)
protected Process spawn(final String cmd)
throws TransportException {
try {
final String[] args;
final Process proc;

if (cmd.startsWith("git-")) {
args = new String[] { "git", cmd.substring(4), PWD };
@@ -148,9 +149,7 @@ class TransportLocal extends Transport implements PackTransport {
}
}

proc = Runtime.getRuntime().exec(args, null, remoteGitDir);
new StreamRewritingThread(cmd, proc.getErrorStream()).start();
return proc;
return Runtime.getRuntime().exec(args, null, remoteGitDir);
} catch (IOException err) {
throw new TransportException(uri, err.getMessage(), err);
}
@@ -246,9 +245,20 @@ class TransportLocal extends Transport implements PackTransport {
class ForkLocalFetchConnection extends BasePackFetchConnection {
private Process uploadPack;

private Thread errorReaderThread;

ForkLocalFetchConnection() throws TransportException {
super(TransportLocal.this);
uploadPack = startProcessWithErrStream(getOptionUploadPack());

final MessageWriter msg = new MessageWriter();
setMessageWriter(msg);

uploadPack = spawn(getOptionUploadPack());

final InputStream upErr = uploadPack.getErrorStream();
errorReaderThread = new StreamCopyThread(upErr, msg.getRawStream());
errorReaderThread.start();

final InputStream upIn = uploadPack.getInputStream();
final OutputStream upOut = uploadPack.getOutputStream();
init(upIn, upOut);
@@ -268,6 +278,16 @@ class TransportLocal extends Transport implements PackTransport {
uploadPack = null;
}
}

if (errorReaderThread != null) {
try {
errorReaderThread.join();
} catch (InterruptedException e) {
// Stop waiting and return anyway.
} finally {
errorReaderThread = null;
}
}
}
}

@@ -351,9 +371,20 @@ class TransportLocal extends Transport implements PackTransport {
class ForkLocalPushConnection extends BasePackPushConnection {
private Process receivePack;

private Thread errorReaderThread;

ForkLocalPushConnection() throws TransportException {
super(TransportLocal.this);
receivePack = startProcessWithErrStream(getOptionReceivePack());

final MessageWriter msg = new MessageWriter();
setMessageWriter(msg);

receivePack = spawn(getOptionReceivePack());

final InputStream rpErr = receivePack.getErrorStream();
errorReaderThread = new StreamCopyThread(rpErr, msg.getRawStream());
errorReaderThread.start();

final InputStream rpIn = receivePack.getInputStream();
final OutputStream rpOut = receivePack.getOutputStream();
init(rpIn, rpOut);
@@ -373,34 +404,14 @@ class TransportLocal extends Transport implements PackTransport {
receivePack = null;
}
}
}
}

static class StreamRewritingThread extends Thread {
private final InputStream in;

StreamRewritingThread(final String cmd, final InputStream in) {
super("JGit " + cmd + " Errors");
this.in = in;
}

public void run() {
final byte[] tmp = new byte[512];
try {
for (;;) {
final int n = in.read(tmp);
if (n < 0)
break;
System.err.write(tmp, 0, n);
System.err.flush();
}
} catch (IOException err) {
// Ignore errors reading errors.
} finally {
if (errorReaderThread != null) {
try {
in.close();
} catch (IOException err2) {
// Ignore errors closing the pipe.
errorReaderThread.join();
} catch (InterruptedException e) {
// Stop waiting and return anyway.
} finally {
errorReaderThread = null;
}
}
}

+ 115
- 0
org.eclipse.jgit/src/org/eclipse/jgit/util/io/MessageWriter.java View File

@@ -0,0 +1,115 @@
/*
* Copyright (C) 2009-2010, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
* under the terms of the Eclipse Distribution License v1.0 which
* accompanies this distribution, is reproduced below, and is
* available at http://www.eclipse.org/org/documents/edl-v10.php
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
*
* - Neither the name of the Eclipse Foundation, Inc. nor the
* names of its contributors may be used to endorse or promote
* products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
* CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package org.eclipse.jgit.util.io;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;

import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.transport.BaseConnection;
import org.eclipse.jgit.util.RawParseUtils;

/**
* Combines messages from an OutputStream (hopefully in UTF-8) and a Writer.
* <p>
* This class is primarily meant for {@link BaseConnection} in contexts where a
* standard error stream from a command execution, as well as messages from a
* side-band channel, need to be combined together into a buffer to represent
* the complete set of messages from a remote repository.
* <p>
* Writes made to the writer are re-encoded as UTF-8 and interleaved into the
* buffer that {@link #getRawStream()} also writes to.
* <p>
* {@link #toString()} returns all written data, after converting it to a String
* under the assumption of UTF-8 encoding.
* <p>
* Internally {@link RawParseUtils#decode(byte[])} is used by {@code toString()}
* tries to work out a reasonably correct character set for the raw data.
*/
public class MessageWriter extends Writer {
private final ByteArrayOutputStream buf;

private final OutputStreamWriter enc;

/** Create an empty writer. */
public MessageWriter() {
buf = new ByteArrayOutputStream();
enc = new OutputStreamWriter(getRawStream(), Constants.CHARSET);
}

@Override
public void write(char[] cbuf, int off, int len) throws IOException {
synchronized (buf) {
enc.write(cbuf, off, len);
enc.flush();
}
}

/**
* @return the underlying byte stream that character writes to this writer
* drop into. Writes to this stream should should be in UTF-8.
*/
public OutputStream getRawStream() {
return buf;
}

@Override
public void close() throws IOException {
// Do nothing, we are buffered with no resources.
}

@Override
public void flush() throws IOException {
// Do nothing, we are buffered with no resources.
}

/** @return string version of all buffered data. */
@Override
public String toString() {
return RawParseUtils.decode(buf.toByteArray());
}
}

+ 128
- 0
org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java View File

@@ -0,0 +1,128 @@
/*
* Copyright (C) 2009-2010, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
* under the terms of the Eclipse Distribution License v1.0 which
* accompanies this distribution, is reproduced below, and is
* available at http://www.eclipse.org/org/documents/edl-v10.php
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
*
* - Neither the name of the Eclipse Foundation, Inc. nor the
* names of its contributors may be used to endorse or promote
* products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
* CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package org.eclipse.jgit.util.io;

import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;

/** Thread to copy from an input stream to an output stream. */
public class StreamCopyThread extends Thread {
private static final int BUFFER_SIZE = 1024;

private final InputStream src;

private final OutputStream dst;

private volatile boolean doFlush;

/**
* Create a thread to copy data from an input stream to an output stream.
*
* @param i
* stream to copy from. The thread terminates when this stream
* reaches EOF. The thread closes this stream before it exits.
* @param o
* stream to copy into. The destination stream is automatically
* closed when the thread terminates.
*/
public StreamCopyThread(final InputStream i, final OutputStream o) {
setName(Thread.currentThread().getName() + "-StreamCopy");
src = i;
dst = o;
}

/**
* Request the thread to flush the output stream as soon as possible.
* <p>
* This is an asynchronous request to the thread. The actual flush will
* happen at some future point in time, when the thread wakes up to process
* the request.
*/
public void flush() {
if (!doFlush) {
doFlush = true;
interrupt();
}
}

@Override
public void run() {
try {
final byte[] buf = new byte[BUFFER_SIZE];
for (;;) {
try {
if (doFlush) {
doFlush = false;
dst.flush();
}

final int n;
try {
n = src.read(buf);
} catch (InterruptedIOException wakey) {
continue;
}
if (n < 0)
break;
dst.write(buf, 0, n);
} catch (IOException e) {
break;
}
}
} finally {
try {
src.close();
} catch (IOException e) {
// Ignore IO errors on close
}
try {
dst.close();
} catch (IOException e) {
// Ignore IO errors on close
}
}
}
}

Loading…
Cancel
Save