The message delaying for the initial SSH messages (client identification followed by the initial key exchange request) was broken. sendKexInit() is _also_ called when a new key exchange is requested. We inadvertently also re-sent the client identification at that point, which is wrong and makes the server terminate the connection. Re-keying occurs from time to time during an SSH connection depending on time, the number of messages (packets/blocks) exchanged, or the amount of data exchanged. The net result was that for large repositories data-intensive operations failed on the first re-keying. Change the initial message delay such that the two messages for the client identification and the initial key exchange can be buffered individually while the proxy protocol is still in progress. The AbstractClientProxyConnector can now buffer several commands, which should also resolve bug 544715. Bug: 545920 Change-Id: If09ee963a439b39098a0f52a1510237b428df8dd Signed-off-by: Thomas Wolf <thomas.wolf@paranor.ch>tags/v5.3.1.201904271842-r
@@ -149,10 +149,27 @@ public class JGitClientSession extends ClientSessionImpl { | |||
@Override | |||
protected IoWriteFuture sendIdentification(String ident) | |||
throws IOException { | |||
// Nothing; we do this below together with the KEX init in | |||
// sendStartSsh(). Called only from the ClientSessionImpl constructor, | |||
// where the return value is ignored. | |||
return null; | |||
StatefulProxyConnector proxy = proxyHandler; | |||
if (proxy != null) { | |||
try { | |||
// We must not block here; the framework starts reading messages | |||
// from the peer only once the initial sendKexInit() following | |||
// this call to sendIdentification() has returned! | |||
proxy.runWhenDone(() -> { | |||
JGitClientSession.super.sendIdentification(ident); | |||
return null; | |||
}); | |||
// Called only from the ClientSessionImpl constructor, where the | |||
// return value is ignored. | |||
return null; | |||
} catch (IOException e) { | |||
throw e; | |||
} catch (Exception other) { | |||
throw new IOException(other.getLocalizedMessage(), other); | |||
} | |||
} else { | |||
return super.sendIdentification(ident); | |||
} | |||
} | |||
@Override | |||
@@ -161,12 +178,13 @@ public class JGitClientSession extends ClientSessionImpl { | |||
if (proxy != null) { | |||
try { | |||
// We must not block here; the framework starts reading messages | |||
// from the peer only once sendKexInit() has returned! | |||
// from the peer only once the initial sendKexInit() has | |||
// returned! | |||
proxy.runWhenDone(() -> { | |||
sendStartSsh(); | |||
JGitClientSession.super.sendKexInit(); | |||
return null; | |||
}); | |||
// sendKexInit() is called only from the ClientSessionImpl | |||
// This is called only from the ClientSessionImpl | |||
// constructor, where the return value is ignored. | |||
return null; | |||
} catch (IOException e) { | |||
@@ -175,23 +193,10 @@ public class JGitClientSession extends ClientSessionImpl { | |||
throw new IOException(other.getLocalizedMessage(), other); | |||
} | |||
} else { | |||
return sendStartSsh(); | |||
return super.sendKexInit(); | |||
} | |||
} | |||
/** | |||
* Sends the initial messages starting the ssh setup: the client | |||
* identification and the KEX init message. | |||
* | |||
* @return the client's KEX seed | |||
* @throws IOException | |||
* if something goes wrong | |||
*/ | |||
private byte[] sendStartSsh() throws IOException { | |||
super.sendIdentification(clientVersion); | |||
return super.sendKexInit(); | |||
} | |||
/** | |||
* {@inheritDoc} | |||
* |
@@ -43,7 +43,9 @@ | |||
package org.eclipse.jgit.internal.transport.sshd.proxy; | |||
import java.net.InetSocketAddress; | |||
import java.util.ArrayList; | |||
import java.util.Arrays; | |||
import java.util.List; | |||
import java.util.concurrent.Callable; | |||
import java.util.concurrent.TimeUnit; | |||
import java.util.concurrent.atomic.AtomicReference; | |||
@@ -61,12 +63,12 @@ public abstract class AbstractClientProxyConnector | |||
private static final long DEFAULT_PROXY_TIMEOUT_MILLIS = TimeUnit.SECONDS | |||
.toMillis(30L); | |||
/** Guards {@link #done} and {@link #startSsh}. */ | |||
/** Guards {@link #done} and {@link #bufferedCommands}. */ | |||
private Object lock = new Object(); | |||
private boolean done; | |||
private Callable<Void> startSsh; | |||
private List<Callable<Void>> bufferedCommands = new ArrayList<>(); | |||
private AtomicReference<Runnable> unregister = new AtomicReference<>(); | |||
@@ -173,18 +175,20 @@ public abstract class AbstractClientProxyConnector | |||
* if starting ssh fails | |||
*/ | |||
protected void setDone(boolean success) throws Exception { | |||
Callable<Void> starter; | |||
List<Callable<Void>> buffered; | |||
Runnable unset = unregister.getAndSet(null); | |||
if (unset != null) { | |||
unset.run(); | |||
} | |||
synchronized (lock) { | |||
done = true; | |||
starter = startSsh; | |||
startSsh = null; | |||
buffered = bufferedCommands; | |||
bufferedCommands = null; | |||
} | |||
if (success && starter != null) { | |||
starter.call(); | |||
if (success && buffered != null) { | |||
for (Callable<Void> starter : buffered) { | |||
starter.call(); | |||
} | |||
} | |||
} | |||
@@ -192,7 +196,7 @@ public abstract class AbstractClientProxyConnector | |||
public void runWhenDone(Callable<Void> starter) throws Exception { | |||
synchronized (lock) { | |||
if (!done) { | |||
this.startSsh = starter; | |||
bufferedCommands.add(starter); | |||
return; | |||
} | |||
} |
@@ -78,12 +78,15 @@ public interface StatefulProxyConnector extends ClientProxyConnector { | |||
void messageReceived(IoSession session, Readable buffer) throws Exception; | |||
/** | |||
* Runs {@code startSsh} once the proxy connection is established. | |||
* Runs {@code command} once the proxy connection is established. May be | |||
* called multiple times; commands are run sequentially. If the proxy | |||
* connection is already established, {@code command} is executed directly | |||
* synchronously. | |||
* | |||
* @param startSsh | |||
* @param command | |||
* operation to run | |||
* @throws Exception | |||
* if the operation is run synchronously and throws an exception | |||
*/ | |||
void runWhenDone(Callable<Void> startSsh) throws Exception; | |||
void runWhenDone(Callable<Void> command) throws Exception; | |||
} |