import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.elasticsearch.action.DocWriteRequest;
}
private final class BulkProcessorListener implements Listener {
- private final Profiler profiler = Profiler.createIfTrace(EsClient.LOGGER);
+ // a map containing per each request the associated profiler
+ private final Map<BulkRequest, Profiler> profilerByRequest = new ConcurrentHashMap<>();
@Override
public void beforeBulk(long executionId, BulkRequest request) {
+ final Profiler profiler = Profiler.createIfTrace(EsClient.LOGGER);
profiler.start();
+ profilerByRequest.put(request, profiler);
}
@Override
}
private void stopProfiler(BulkRequest request) {
- if (profiler.isTraceEnabled()) {
+ final Profiler profiler = profilerByRequest.get(request);
+ if (Objects.nonNull(profiler) && profiler.isTraceEnabled()) {
profiler.stopTrace(toString(request));
}
+ profilerByRequest.remove(request);
}
private String toString(BulkRequest bulkRequest) {