diff options
author | Shawn Pearce <spearce@spearce.org> | 2016-11-13 16:13:02 -0500 |
---|---|---|
committer | Gerrit Code Review @ Eclipse.org <gerrit@eclipse.org> | 2016-11-13 16:13:04 -0500 |
commit | 6aa126ec427c1beba49143aceb712e04b3f2aea6 (patch) | |
tree | 387c284d25cf5bcc536add79289676f5ac1c62c4 /org.eclipse.jgit/src | |
parent | 92eab1867dd10e3d1acd6eb2fffa04473c2df5ce (diff) | |
parent | 659cd813a98b5a203a21419e58a7dc6de3f40e56 (diff) | |
download | jgit-6aa126ec427c1beba49143aceb712e04b3f2aea6.tar.gz jgit-6aa126ec427c1beba49143aceb712e04b3f2aea6.zip |
Merge "Switch JSchSession to simple isolated OutputStream"
Diffstat (limited to 'org.eclipse.jgit/src')
4 files changed, 248 insertions, 27 deletions
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/JGitText.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/JGitText.java index 29b319fe65..212cb7fdef 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/JGitText.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/JGitText.java @@ -175,6 +175,7 @@ public class JGitText extends TranslationBundle { /***/ public String checkoutUnexpectedResult; /***/ public String classCastNotA; /***/ public String cloneNonEmptyDirectory; + /***/ public String closed; /***/ public String collisionOn; /***/ public String commandRejectedByHook; /***/ public String commandWasCalledInTheWrongState; diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/JschSession.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/JschSession.java index 1dfe5d9797..fa27bfce5f 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/JschSession.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/JschSession.java @@ -48,15 +48,14 @@ package org.eclipse.jgit.transport; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; import org.eclipse.jgit.errors.TransportException; import org.eclipse.jgit.internal.JGitText; -import org.eclipse.jgit.util.io.StreamCopyThread; +import org.eclipse.jgit.util.io.IsolatedOutputStream; import com.jcraft.jsch.Channel; import com.jcraft.jsch.ChannelExec; @@ -178,33 +177,12 @@ public class JschSession implements RemoteSession { // that we spawn a background thread to shuttle data through a pipe, // as we can issue an interrupted write out of that. Its slower, so // we only use this route if there is a timeout. - final OutputStream out = channel.getOutputStream(); + OutputStream out = channel.getOutputStream(); if (timeout <= 0) { outputStream = out; } else { - final PipedInputStream pipeIn = new PipedInputStream(); - final StreamCopyThread copier = new StreamCopyThread(pipeIn, - out); - final PipedOutputStream pipeOut = new PipedOutputStream(pipeIn) { - @Override - public void flush() throws IOException { - super.flush(); - copier.flush(); - } - - @Override - public void close() throws IOException { - super.close(); - try { - copier.join(timeout * 1000); - } catch (InterruptedException e) { - // Just wake early, the thread will terminate - // anyway. - } - } - }; - copier.start(); - outputStream = pipeOut; + IsolatedOutputStream i = new IsolatedOutputStream(out); + outputStream = new BufferedOutputStream(i, 16 * 1024); } errStream = channel.getErrStream(); diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/IsolatedOutputStream.java b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/IsolatedOutputStream.java new file mode 100644 index 0000000000..cdc4a4d863 --- /dev/null +++ b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/IsolatedOutputStream.java @@ -0,0 +1,241 @@ +/* + * Copyright (C) 2016, Google Inc. + * and other copyright owners as documented in the project's IP log. + * + * This program and the accompanying materials are made available + * under the terms of the Eclipse Distribution License v1.0 which + * accompanies this distribution, is reproduced below, and is + * available at http://www.eclipse.org/org/documents/edl-v10.php + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided + * with the distribution. + * + * - Neither the name of the Eclipse Foundation, Inc. nor the + * names of its contributors may be used to endorse or promote + * products derived from this software without specific prior + * written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND + * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package org.eclipse.jgit.util.io; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.eclipse.jgit.internal.JGitText; + +/** + * OutputStream isolated from interrupts. + * <p> + * Wraps an OutputStream to prevent interrupts during writes from being made + * visible to that stream instance. This works around buggy or difficult + * OutputStream implementations like JSch that cannot gracefully handle an + * interrupt during write. + * <p> + * Every write (or flush) requires a context switch to another thread. Callers + * should wrap this stream with {@code BufferedOutputStream} using a suitable + * buffer size to amortize the cost of context switches. + */ +public class IsolatedOutputStream extends OutputStream { + private final OutputStream dst; + private final ExecutorService copier; + private Future<Void> pending; + + /** + * Wraps an OutputStream. + * + * @param out + * stream to send all writes to. + */ + public IsolatedOutputStream(OutputStream out) { + dst = out; + copier = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<Runnable>(1), new NamedThreadFactory()); + } + + @Override + public void write(int ch) throws IOException { + write(new byte[] { (byte) ch }, 0, 1); + } + + @Override + public void write(final byte[] buf, final int pos, final int cnt) + throws IOException { + checkClosed(); + execute(new Callable<Void>() { + @Override + public Void call() throws IOException { + dst.write(buf, pos, cnt); + return null; + } + }); + } + + @Override + public void flush() throws IOException { + checkClosed(); + execute(new Callable<Void>() { + @Override + public Void call() throws IOException { + dst.flush(); + return null; + } + }); + } + + @Override + public void close() throws IOException { + if (!copier.isShutdown()) { + try { + if (pending == null || tryCleanClose()) { + cleanClose(); + } else { + dirtyClose(); + } + } finally { + copier.shutdown(); + } + } + } + + private boolean tryCleanClose() { + /* + * If the caller stopped waiting for a prior write or flush, they could + * be trying to close a stream that is still in-use. Check if the prior + * operation ended in a predictable way. + */ + try { + pending.get(0, TimeUnit.MILLISECONDS); + pending = null; + return true; + } catch (TimeoutException | InterruptedException e) { + return false; + } catch (ExecutionException e) { + pending = null; + return true; + } + } + + private void cleanClose() throws IOException { + execute(new Callable<Void>() { + @Override + public Void call() throws IOException { + dst.close(); + return null; + } + }); + } + + private void dirtyClose() throws IOException { + /* + * Interrupt any still pending write or flush operation. This may cause + * massive failures inside of the stream, but its going to be closed as + * the next step. + */ + pending.cancel(true); + + Future<Void> close; + try { + close = copier.submit(new Callable<Void>() { + @Override + public Void call() throws IOException { + dst.close(); + return null; + } + }); + } catch (RejectedExecutionException e) { + throw new IOException(e); + } + try { + close.get(200, TimeUnit.MILLISECONDS); + } catch (InterruptedException | TimeoutException e) { + close.cancel(true); + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } + } + + private void checkClosed() throws IOException { + if (copier.isShutdown()) { + throw new IOException(JGitText.get().closed); + } + } + + private void execute(Callable<Void> task) throws IOException { + if (pending != null) { + // Check (and rethrow) any prior failed operation. + checkedGet(pending); + } + try { + pending = copier.submit(task); + } catch (RejectedExecutionException e) { + throw new IOException(e); + } + checkedGet(pending); + pending = null; + } + + private static void checkedGet(Future<Void> future) throws IOException { + try { + future.get(); + } catch (InterruptedException e) { + throw interrupted(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } + } + + private static InterruptedIOException interrupted(InterruptedException c) { + InterruptedIOException e = new InterruptedIOException(); + e.initCause(c); + return e; + } + + private static class NamedThreadFactory implements ThreadFactory { + private static final AtomicInteger cnt = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + int n = cnt.incrementAndGet(); + String name = IsolatedOutputStream.class.getSimpleName() + '-' + n; + return new Thread(r, name); + } + } +} diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java index ae760e3b8e..506330eb08 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java @@ -88,6 +88,7 @@ public class StreamCopyThread extends Thread { * happen at some future point in time, when the thread wakes up to process * the request. */ + @Deprecated public void flush() { flushCount.incrementAndGet(); synchronized (writeLock) { |