private final AtomicInteger flushCount = new AtomicInteger(0);
+ /** Lock held by flush to avoid interrupting a write. */
+ private final Object writeLock;
+
/**
* Create a thread to copy data from an input stream to an output stream.
*
setName(Thread.currentThread().getName() + "-StreamCopy"); //$NON-NLS-1$
src = i;
dst = o;
+ writeLock = new Object();
}
/**
*/
public void flush() {
flushCount.incrementAndGet();
- interrupt();
+ synchronized (writeLock) {
+ interrupt();
+ }
}
/**
for (;;) {
try {
if (readInterrupted) {
- try {
+ synchronized (writeLock) {
+ boolean interruptedAgain = Thread.interrupted();
dst.flush();
- } catch (InterruptedIOException e) {
- // There was a new flush() call during flush previous bytes
- // need continue read/write/flush for the new bytes
- continue;
+ if (interruptedAgain) {
+ interrupt();
+ }
}
readInterrupted = false;
if (!flushCount.compareAndSet(flushCountBeforeRead, 0)) {
if (n < 0)
break;
- boolean writeInterrupted = false;
- for (;;) {
- try {
- dst.write(buf, 0, n);
- } catch (InterruptedIOException wakey) {
- writeInterrupted = true;
+ synchronized (writeLock) {
+ if (isInterrupted()) {
continue;
}
- // set interrupt status, which will be checked
- // when we block in src.read
- if (writeInterrupted || flushCount.get() > 0)
- interrupt();
- break;
+ boolean writeInterrupted = false;
+ for (;;) {
+ try {
+ dst.write(buf, 0, n);
+ } catch (InterruptedIOException wakey) {
+ writeInterrupted = true;
+ continue;
+ }
+
+ // set interrupt status, which will be checked
+ // when we block in src.read
+ if (writeInterrupted || flushCount.get() > 0)
+ interrupt();
+ break;
+ }
}
} catch (IOException e) {
break;