@Override
public FetchConnection openFetch() throws TransportException {
- return new SshFetchConnection();
+ return new SshFetchConnection(newConnection());
}
@Override
public PushConnection openPush() throws TransportException {
- return new SshPushConnection();
+ return new SshPushConnection(newConnection());
+ }
+
+ private Connection newConnection() {
+ return new JschConnection();
}
private static void sqMinimal(final StringBuilder cmd, final String val) {
cmd.append(QuotedString.BOURNE.quote(val));
}
- private String commandFor(final String exe) {
+ String commandFor(final String exe) {
String path = uri.getPath();
if (uri.getScheme() != null && uri.getPath().startsWith("/~"))
path = (uri.getPath().substring(1));
return cmd.toString();
}
- ChannelExec exec(final String exe) throws TransportException {
- initSession();
-
- try {
- final ChannelExec channel = (ChannelExec) sock.openChannel("exec");
- channel.setCommand(commandFor(exe));
- return channel;
- } catch (JSchException je) {
- throw new TransportException(uri, je.getMessage(), je);
- }
- }
-
- private void connect(ChannelExec channel) throws TransportException {
- try {
- channel.connect(getTimeout() > 0 ? getTimeout() * 1000 : 0);
- if (!channel.isConnected())
- throw new TransportException(uri, "connection failed");
- } catch (JSchException e) {
- throw new TransportException(uri, e.getMessage(), e);
- }
- }
-
void checkExecFailure(int status, String exe, String why)
throws TransportException {
if (status == 127) {
return new NoRemoteRepositoryException(uri, why);
}
- // JSch won't let us interrupt writes when we use our InterruptTimer to
- // break out of a long-running write operation. To work around that we
- // spawn a background thread to shuttle data through a pipe, as we can
- // issue an interrupted write out of that. Its slower, so we only use
- // this route if there is a timeout.
- //
- private OutputStream outputStream(ChannelExec channel) throws IOException {
- final OutputStream out = channel.getOutputStream();
- if (getTimeout() <= 0)
- return out;
- final PipedInputStream pipeIn = new PipedInputStream();
- final StreamCopyThread copyThread = new StreamCopyThread(pipeIn, out);
- final PipedOutputStream pipeOut = new PipedOutputStream(pipeIn) {
- @Override
- public void flush() throws IOException {
- super.flush();
- copyThread.flush();
+ private abstract class Connection {
+ abstract void exec(String commandName) throws TransportException;
+
+ abstract void connect() throws TransportException;
+
+ abstract InputStream getInputStream() throws IOException;
+
+ abstract OutputStream getOutputStream() throws IOException;
+
+ abstract InputStream getErrorStream() throws IOException;
+
+ abstract int getExitStatus();
+
+ abstract void close();
+ }
+
+ private class JschConnection extends Connection {
+ private ChannelExec channel;
+
+ private int exitStatus;
+
+ @Override
+ void exec(String commandName) throws TransportException {
+ initSession();
+ try {
+ channel = (ChannelExec) sock.openChannel("exec");
+ channel.setCommand(commandFor(commandName));
+ } catch (JSchException je) {
+ throw new TransportException(uri, je.getMessage(), je);
+ }
+ }
+
+ @Override
+ void connect() throws TransportException {
+ try {
+ channel.connect(getTimeout() > 0 ? getTimeout() * 1000 : 0);
+ if (!channel.isConnected())
+ throw new TransportException(uri, "connection failed");
+ } catch (JSchException e) {
+ throw new TransportException(uri, e.getMessage(), e);
}
+ }
+
+ @Override
+ InputStream getInputStream() throws IOException {
+ return channel.getInputStream();
+ }
+
+ @Override
+ OutputStream getOutputStream() throws IOException {
+ // JSch won't let us interrupt writes when we use our InterruptTimer
+ // to break out of a long-running write operation. To work around
+ // that we spawn a background thread to shuttle data through a pipe,
+ // as we can issue an interrupted write out of that. Its slower, so
+ // we only use this route if there is a timeout.
+ //
+ final OutputStream out = channel.getOutputStream();
+ if (getTimeout() <= 0)
+ return out;
+ final PipedInputStream pipeIn = new PipedInputStream();
+ final StreamCopyThread copier = new StreamCopyThread(pipeIn, out);
+ final PipedOutputStream pipeOut = new PipedOutputStream(pipeIn) {
+ @Override
+ public void flush() throws IOException {
+ super.flush();
+ copier.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ try {
+ copier.join(getTimeout() * 1000);
+ } catch (InterruptedException e) {
+ // Just wake early, the thread will terminate anyway.
+ }
+ }
+ };
+ copier.start();
+ return pipeOut;
+ }
+
+ @Override
+ InputStream getErrorStream() throws IOException {
+ return channel.getErrStream();
+ }
+
+ @Override
+ int getExitStatus() {
+ return exitStatus;
+ }
- @Override
- public void close() throws IOException {
- super.close();
+ @Override
+ void close() {
+ if (channel != null) {
try {
- copyThread.join(getTimeout() * 1000);
- } catch (InterruptedException e) {
- // Just wake early, the thread will terminate anyway.
+ exitStatus = channel.getExitStatus();
+ if (channel.isConnected())
+ channel.disconnect();
+ } finally {
+ channel = null;
}
}
- };
- copyThread.start();
- return pipeOut;
+ }
}
class SshFetchConnection extends BasePackFetchConnection {
- private ChannelExec channel;
+ private Connection conn;
private StreamCopyThread errorThread;
- private int exitStatus;
-
- SshFetchConnection() throws TransportException {
+ SshFetchConnection(Connection conn) throws TransportException {
super(TransportGitSsh.this);
+ this.conn = conn;
try {
final MessageWriter msg = new MessageWriter();
setMessageWriter(msg);
- channel = exec(getOptionUploadPack());
+ conn.exec(getOptionUploadPack());
- final InputStream upErr = channel.getErrStream();
+ final InputStream upErr = conn.getErrorStream();
errorThread = new StreamCopyThread(upErr, msg.getRawStream());
errorThread.start();
- init(channel.getInputStream(), outputStream(channel));
- connect(channel);
+ init(conn.getInputStream(), conn.getOutputStream());
+ conn.connect();
} catch (TransportException err) {
close();
readAdvertisedRefs();
} catch (NoRemoteRepositoryException notFound) {
final String msgs = getMessages();
- checkExecFailure(exitStatus, getOptionUploadPack(), msgs);
+ checkExecFailure(conn.getExitStatus(), getOptionUploadPack(),
+ msgs);
throw cleanNotFound(notFound, msgs);
}
}
}
super.close();
-
- if (channel != null) {
- try {
- exitStatus = channel.getExitStatus();
- if (channel.isConnected())
- channel.disconnect();
- } finally {
- channel = null;
- }
- }
+ conn.close();
}
}
class SshPushConnection extends BasePackPushConnection {
- private ChannelExec channel;
+ private Connection conn;
private StreamCopyThread errorThread;
- private int exitStatus;
-
- SshPushConnection() throws TransportException {
+ SshPushConnection(Connection conn) throws TransportException {
super(TransportGitSsh.this);
+ this.conn = conn;
try {
final MessageWriter msg = new MessageWriter();
setMessageWriter(msg);
- channel = exec(getOptionReceivePack());
+ conn.exec(getOptionReceivePack());
- final InputStream rpErr = channel.getErrStream();
+ final InputStream rpErr = conn.getErrorStream();
errorThread = new StreamCopyThread(rpErr, msg.getRawStream());
errorThread.start();
- init(channel.getInputStream(), outputStream(channel));
- connect(channel);
+ init(conn.getInputStream(), conn.getOutputStream());
+ conn.connect();
} catch (TransportException err) {
close();
readAdvertisedRefs();
} catch (NoRemoteRepositoryException notFound) {
final String msgs = getMessages();
- checkExecFailure(exitStatus, getOptionReceivePack(), msgs);
+ checkExecFailure(conn.getExitStatus(), getOptionReceivePack(),
+ msgs);
throw cleanNotFound(notFound, msgs);
}
}
}
super.close();
-
- if (channel != null) {
- try {
- exitStatus = channel.getExitStatus();
- if (channel.isConnected())
- channel.disconnect();
- } finally {
- channel = null;
- }
- }
+ conn.close();
}
}
}