import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicInteger;
/** Thread to copy from an input stream to an output stream. */
public class StreamCopyThread extends Thread {
private final OutputStream dst;
- private final AtomicInteger flushCounter = new AtomicInteger(0);
-
private volatile boolean done;
/**
* the request.
*/
public void flush() {
- flushCounter.incrementAndGet();
interrupt();
}
public void run() {
try {
final byte[] buf = new byte[BUFFER_SIZE];
+ int interruptCounter = 0;
for (;;) {
try {
- if (needFlush())
+ if (interruptCounter > 0) {
dst.flush();
+ interruptCounter--;
+ }
if (done)
break;
try {
n = src.read(buf);
} catch (InterruptedIOException wakey) {
+ interruptCounter++;
continue;
}
if (n < 0)
break;
+ boolean writeInterrupted = false;
for (;;) {
try {
dst.write(buf, 0, n);
} catch (InterruptedIOException wakey) {
+ writeInterrupted = true;
continue;
}
+
+ // set interrupt status, which will be checked
+ // when we block in src.read
+ if (writeInterrupted)
+ interrupt();
break;
}
} catch (IOException e) {
}
}
}
-
- private boolean needFlush() {
- int i = flushCounter.get();
- if (i > 0) {
- flushCounter.decrementAndGet();
- return true;
- }
- return false;
- }
}