From e67d59df3fb8ae1efe94a54efce36784f7cc83e8 Mon Sep 17 00:00:00 2001 From: Jonathan Nieder Date: Fri, 4 Nov 2016 11:48:38 -0700 Subject: [PATCH] StreamCopyThread: Do not let flush interrupt a write flush calls interrupt() to interrupt a pending read and trigger a flush. Unfortunately that interrupt() call can also interrupt a pending write, putting Jsch in a bad state and triggering "Short read of block" errors. Add locking to ensure the flush only interrupts reads as intended. Change-Id: Ib105d9e107ae43549ced7e6da29c22ee41cde9d8 --- .../jgit/util/io/StreamCopyThread.java | 46 ++++++++++++------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java index eabe72b252..3ee1a2d6fa 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java @@ -61,6 +61,9 @@ public class StreamCopyThread extends Thread { private final AtomicInteger flushCount = new AtomicInteger(0); + /** Lock held by flush to avoid interrupting a write. */ + private final Object writeLock; + /** * Create a thread to copy data from an input stream to an output stream. * @@ -75,6 +78,7 @@ public class StreamCopyThread extends Thread { setName(Thread.currentThread().getName() + "-StreamCopy"); //$NON-NLS-1$ src = i; dst = o; + writeLock = new Object(); } /** @@ -86,7 +90,9 @@ public class StreamCopyThread extends Thread { */ public void flush() { flushCount.incrementAndGet(); - interrupt(); + synchronized (writeLock) { + interrupt(); + } } /** @@ -118,12 +124,12 @@ public class StreamCopyThread extends Thread { for (;;) { try { if (readInterrupted) { - try { + synchronized (writeLock) { + boolean interruptedAgain = Thread.interrupted(); dst.flush(); - } catch (InterruptedIOException e) { - // There was a new flush() call during flush previous bytes - // need continue read/write/flush for the new bytes - continue; + if (interruptedAgain) { + interrupt(); + } } readInterrupted = false; if (!flushCount.compareAndSet(flushCountBeforeRead, 0)) { @@ -148,20 +154,26 @@ public class StreamCopyThread extends Thread { if (n < 0) break; - boolean writeInterrupted = false; - for (;;) { - try { - dst.write(buf, 0, n); - } catch (InterruptedIOException wakey) { - writeInterrupted = true; + synchronized (writeLock) { + if (isInterrupted()) { continue; } - // set interrupt status, which will be checked - // when we block in src.read - if (writeInterrupted || flushCount.get() > 0) - interrupt(); - break; + boolean writeInterrupted = false; + for (;;) { + try { + dst.write(buf, 0, n); + } catch (InterruptedIOException wakey) { + writeInterrupted = true; + continue; + } + + // set interrupt status, which will be checked + // when we block in src.read + if (writeInterrupted || flushCount.get() > 0) + interrupt(); + break; + } } } catch (IOException e) { break; -- 2.39.5