summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJonathan Nieder <jrn@google.com>2016-11-04 11:48:38 -0700
committerJonathan Nieder <jrn@google.com>2016-11-04 13:00:08 -0700
commite67d59df3fb8ae1efe94a54efce36784f7cc83e8 (patch)
tree2d19c344531527a1abbf40047967e78f3dd9efc7
parentfeefcb02b0215b6dea9be5efbb9a3f4048d8c0ff (diff)
downloadjgit-e67d59df3fb8ae1efe94a54efce36784f7cc83e8.tar.gz
jgit-e67d59df3fb8ae1efe94a54efce36784f7cc83e8.zip
StreamCopyThread: Do not let flush interrupt a write
flush calls interrupt() to interrupt a pending read and trigger a flush. Unfortunately that interrupt() call can also interrupt a pending write, putting Jsch in a bad state and triggering "Short read of block" errors. Add locking to ensure the flush only interrupts reads as intended. Change-Id: Ib105d9e107ae43549ced7e6da29c22ee41cde9d8
-rw-r--r--org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java46
1 files changed, 29 insertions, 17 deletions
diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java
index eabe72b252..3ee1a2d6fa 100644
--- a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java
+++ b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java
@@ -61,6 +61,9 @@ public class StreamCopyThread extends Thread {
private final AtomicInteger flushCount = new AtomicInteger(0);
+ /** Lock held by flush to avoid interrupting a write. */
+ private final Object writeLock;
+
/**
* Create a thread to copy data from an input stream to an output stream.
*
@@ -75,6 +78,7 @@ public class StreamCopyThread extends Thread {
setName(Thread.currentThread().getName() + "-StreamCopy"); //$NON-NLS-1$
src = i;
dst = o;
+ writeLock = new Object();
}
/**
@@ -86,7 +90,9 @@ public class StreamCopyThread extends Thread {
*/
public void flush() {
flushCount.incrementAndGet();
- interrupt();
+ synchronized (writeLock) {
+ interrupt();
+ }
}
/**
@@ -118,12 +124,12 @@ public class StreamCopyThread extends Thread {
for (;;) {
try {
if (readInterrupted) {
- try {
+ synchronized (writeLock) {
+ boolean interruptedAgain = Thread.interrupted();
dst.flush();
- } catch (InterruptedIOException e) {
- // There was a new flush() call during flush previous bytes
- // need continue read/write/flush for the new bytes
- continue;
+ if (interruptedAgain) {
+ interrupt();
+ }
}
readInterrupted = false;
if (!flushCount.compareAndSet(flushCountBeforeRead, 0)) {
@@ -148,20 +154,26 @@ public class StreamCopyThread extends Thread {
if (n < 0)
break;
- boolean writeInterrupted = false;
- for (;;) {
- try {
- dst.write(buf, 0, n);
- } catch (InterruptedIOException wakey) {
- writeInterrupted = true;
+ synchronized (writeLock) {
+ if (isInterrupted()) {
continue;
}
- // set interrupt status, which will be checked
- // when we block in src.read
- if (writeInterrupted || flushCount.get() > 0)
- interrupt();
- break;
+ boolean writeInterrupted = false;
+ for (;;) {
+ try {
+ dst.write(buf, 0, n);
+ } catch (InterruptedIOException wakey) {
+ writeInterrupted = true;
+ continue;
+ }
+
+ // set interrupt status, which will be checked
+ // when we block in src.read
+ if (writeInterrupted || flushCount.get() > 0)
+ interrupt();
+ break;
+ }
}
} catch (IOException e) {
break;