]> source.dussan.org Git - jgit.git/commitdiff
StreamCopyThread: Do not let flush interrupt a write 97/84497/2
authorJonathan Nieder <jrn@google.com>
Fri, 4 Nov 2016 18:48:38 +0000 (11:48 -0700)
committerJonathan Nieder <jrn@google.com>
Fri, 4 Nov 2016 20:00:08 +0000 (13:00 -0700)
flush calls interrupt() to interrupt a pending read and trigger a
flush.  Unfortunately that interrupt() call can also interrupt a
pending write, putting Jsch in a bad state and triggering "Short read
of block" errors.  Add locking to ensure the flush only interrupts
reads as intended.

Change-Id: Ib105d9e107ae43549ced7e6da29c22ee41cde9d8

org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java

index eabe72b252edf482f087d9400e7e4056392aa345..3ee1a2d6fa9149409b7f4a418eef1a84b7086613 100644 (file)
@@ -61,6 +61,9 @@ public class StreamCopyThread extends Thread {
 
        private final AtomicInteger flushCount = new AtomicInteger(0);
 
+       /** Lock held by flush to avoid interrupting a write. */
+       private final Object writeLock;
+
        /**
         * Create a thread to copy data from an input stream to an output stream.
         *
@@ -75,6 +78,7 @@ public class StreamCopyThread extends Thread {
                setName(Thread.currentThread().getName() + "-StreamCopy"); //$NON-NLS-1$
                src = i;
                dst = o;
+               writeLock = new Object();
        }
 
        /**
@@ -86,7 +90,9 @@ public class StreamCopyThread extends Thread {
         */
        public void flush() {
                flushCount.incrementAndGet();
-               interrupt();
+               synchronized (writeLock) {
+                       interrupt();
+               }
        }
 
        /**
@@ -118,12 +124,12 @@ public class StreamCopyThread extends Thread {
                        for (;;) {
                                try {
                                        if (readInterrupted) {
-                                               try {
+                                               synchronized (writeLock) {
+                                                       boolean interruptedAgain = Thread.interrupted();
                                                        dst.flush();
-                                               } catch (InterruptedIOException e) {
-                                                       // There was a new flush() call during flush previous bytes
-                                                       // need continue read/write/flush for the new bytes
-                                                       continue;
+                                                       if (interruptedAgain) {
+                                                               interrupt();
+                                                       }
                                                }
                                                readInterrupted = false;
                                                if (!flushCount.compareAndSet(flushCountBeforeRead, 0)) {
@@ -148,20 +154,26 @@ public class StreamCopyThread extends Thread {
                                        if (n < 0)
                                                break;
 
-                                       boolean writeInterrupted = false;
-                                       for (;;) {
-                                               try {
-                                                       dst.write(buf, 0, n);
-                                               } catch (InterruptedIOException wakey) {
-                                                       writeInterrupted = true;
+                                       synchronized (writeLock) {
+                                               if (isInterrupted()) {
                                                        continue;
                                                }
 
-                                               // set interrupt status, which will be checked
-                                               // when we block in src.read
-                                               if (writeInterrupted || flushCount.get() > 0)
-                                                       interrupt();
-                                               break;
+                                               boolean writeInterrupted = false;
+                                               for (;;) {
+                                                       try {
+                                                               dst.write(buf, 0, n);
+                                                       } catch (InterruptedIOException wakey) {
+                                                               writeInterrupted = true;
+                                                               continue;
+                                                       }
+
+                                                       // set interrupt status, which will be checked
+                                                       // when we block in src.read
+                                                       if (writeInterrupted || flushCount.get() > 0)
+                                                               interrupt();
+                                                       break;
+                                               }
                                        }
                                } catch (IOException e) {
                                        break;