private final List<Future<?>> running;
+ private final Object runningLock;
+
private final Semaphore spaceAvailable;
private int queuedCount;
this.executor = executor;
this.bufferSize = bufferSize;
this.running = new LinkedList<Future<?>>();
+ this.runningLock = new Object();
this.spaceAvailable = new Semaphore(bufferSize);
}
}
}
- checkRunningTasks(true);
+ synchronized (runningLock) {
+ checkRunningTasks(true);
+ }
} finally {
flushing = false;
}
}
public void abort() throws DhtException {
- checkRunningTasks(true);
+ synchronized (runningLock) {
+ checkRunningTasks(true);
+ }
}
private void acquireSpace(int sz) throws DhtException {
return;
}
- if (!flushing)
- checkRunningTasks(false);
- running.add(executor.submit(op));
+ synchronized (runningLock) {
+ if (!flushing)
+ checkRunningTasks(false);
+ running.add(executor.submit(op));
+ }
}
/**
int size) throws DhtException {
int permits = permitsForSize(size);
WrappedCallback<T> op = new WrappedCallback<T>(callback, permits);
- checkRunningTasks(false);
- running.add(op);
+ synchronized (runningLock) {
+ checkRunningTasks(false);
+ running.add(op);
+ }
return op;
}