*/
package org.sonar.application.process;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
public class EsManagedProcess extends AbstractManagedProcess {
private static final Logger LOG = LoggerFactory.getLogger(EsManagedProcess.class);
private static final int WAIT_FOR_UP_DELAY_IN_MILLIS = 100;
- private static final int WAIT_FOR_UP_TIMEOUT = 10 * 60; /* 1min */
+ private static final Set<String> SUPPRESSED_ERROR_MESSAGES = Set.of("Connection refused", "Timeout connecting");
private volatile boolean nodeOperational = false;
+ private final int waitForUpTimeout;
private final AtomicBoolean firstMasterNotDiscoveredLog = new AtomicBoolean(true);
private final EsConnector esConnector;
public EsManagedProcess(Process process, ProcessId processId, EsConnector esConnector) {
+ this(process, processId, esConnector, 10 * 60);
+ }
+
+ EsManagedProcess(Process process, ProcessId processId, EsConnector esConnector, int waitForUpTimeout) {
super(process, processId);
this.esConnector = esConnector;
+ this.waitForUpTimeout = waitForUpTimeout;
}
@Override
i++;
status = checkStatus();
}
- } while (i < WAIT_FOR_UP_TIMEOUT);
+ } while (i < waitForUpTimeout);
return status == YELLOW || status == GREEN;
}
}
return KO;
} catch (ElasticsearchException e) {
- if (e.status() == RestStatus.INTERNAL_SERVER_ERROR && e.getMessage().contains("Connection refused")) {
+ if (e.status() == RestStatus.INTERNAL_SERVER_ERROR && (SUPPRESSED_ERROR_MESSAGES.stream().anyMatch(s -> e.getMessage().contains(s)))) {
return CONNECTION_REFUSED;
} else {
LOG.error("Failed to check status", e);
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
+import java.net.ConnectException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.ExecutionException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
public class EsManagedProcessTest {
+ private final static int WAIT_FOR_UP_TIMEOUT = 1;
+ private final static int WAIT_FOR_UP_TIMEOUT_LONG = 2;
+
@Test
public void isOperational_should_return_false_if_status_is_unknown() {
EsConnector esConnector = mock(EsConnector.class);
when(esConnector.getClusterHealthStatus()).thenReturn(Optional.empty());
- EsManagedProcess underTest = new EsManagedProcess(mock(Process.class), ProcessId.ELASTICSEARCH, esConnector);
+ EsManagedProcess underTest = new EsManagedProcess(mock(Process.class), ProcessId.ELASTICSEARCH, esConnector, WAIT_FOR_UP_TIMEOUT);
assertThat(underTest.isOperational()).isFalse();
}
public void isOperational_should_return_false_if_Elasticsearch_is_RED() {
EsConnector esConnector = mock(EsConnector.class);
when(esConnector.getClusterHealthStatus()).thenReturn(Optional.of(ClusterHealthStatus.RED));
- EsManagedProcess underTest = new EsManagedProcess(mock(Process.class), ProcessId.ELASTICSEARCH, esConnector);
+ EsManagedProcess underTest = new EsManagedProcess(mock(Process.class), ProcessId.ELASTICSEARCH, esConnector, WAIT_FOR_UP_TIMEOUT);
assertThat(underTest.isOperational()).isFalse();
}
public void isOperational_should_return_true_if_Elasticsearch_is_YELLOW() {
EsConnector esConnector = mock(EsConnector.class);
when(esConnector.getClusterHealthStatus()).thenReturn(Optional.of(ClusterHealthStatus.YELLOW));
- EsManagedProcess underTest = new EsManagedProcess(mock(Process.class), ProcessId.ELASTICSEARCH, esConnector);
+ EsManagedProcess underTest = new EsManagedProcess(mock(Process.class), ProcessId.ELASTICSEARCH, esConnector, WAIT_FOR_UP_TIMEOUT);
assertThat(underTest.isOperational()).isTrue();
}
public void isOperational_should_return_true_if_Elasticsearch_is_GREEN() {
EsConnector esConnector = mock(EsConnector.class);
when(esConnector.getClusterHealthStatus()).thenReturn(Optional.of(ClusterHealthStatus.GREEN));
- EsManagedProcess underTest = new EsManagedProcess(mock(Process.class), ProcessId.ELASTICSEARCH, esConnector);
+ EsManagedProcess underTest = new EsManagedProcess(mock(Process.class), ProcessId.ELASTICSEARCH, esConnector, WAIT_FOR_UP_TIMEOUT);
assertThat(underTest.isOperational()).isTrue();
}
public void isOperational_should_return_true_if_Elasticsearch_was_GREEN_once() {
EsConnector esConnector = mock(EsConnector.class);
when(esConnector.getClusterHealthStatus()).thenReturn(Optional.of(ClusterHealthStatus.GREEN));
- EsManagedProcess underTest = new EsManagedProcess(mock(Process.class), ProcessId.ELASTICSEARCH, esConnector);
+ EsManagedProcess underTest = new EsManagedProcess(mock(Process.class), ProcessId.ELASTICSEARCH, esConnector, WAIT_FOR_UP_TIMEOUT);
assertThat(underTest.isOperational()).isTrue();
when(esConnector.getClusterHealthStatus()).thenReturn(Optional.of(ClusterHealthStatus.RED));
when(esConnector.getClusterHealthStatus())
.thenReturn(Optional.empty())
.thenReturn(Optional.of(ClusterHealthStatus.GREEN));
- EsManagedProcess underTest = new EsManagedProcess(mock(Process.class), ProcessId.ELASTICSEARCH, esConnector);
+ EsManagedProcess underTest = new EsManagedProcess(mock(Process.class), ProcessId.ELASTICSEARCH, esConnector, WAIT_FOR_UP_TIMEOUT);
assertThat(underTest.isOperational()).isTrue();
}
EsConnector esConnector = mock(EsConnector.class);
when(esConnector.getClusterHealthStatus())
.thenThrow(new RuntimeException("test"));
- EsManagedProcess underTest = new EsManagedProcess(mock(Process.class), ProcessId.ELASTICSEARCH, esConnector);
+ EsManagedProcess underTest = new EsManagedProcess(mock(Process.class), ProcessId.ELASTICSEARCH, esConnector, WAIT_FOR_UP_TIMEOUT);
assertThat(underTest.isOperational()).isFalse();
}
EsConnector esConnector = mock(EsConnector.class);
when(esConnector.getClusterHealthStatus())
.thenThrow(new ElasticsearchException("Connection refused"));
- EsManagedProcess underTest = new EsManagedProcess(mock(Process.class), ProcessId.ELASTICSEARCH, esConnector);
+ EsManagedProcess underTest = new EsManagedProcess(mock(Process.class), ProcessId.ELASTICSEARCH, esConnector, WAIT_FOR_UP_TIMEOUT);
+ assertThat(underTest.isOperational()).isFalse();
+ }
+
+ @Test
+ public void isOperational_should_return_false_if_ElasticsearchException_with_connection_timeout_thrown() {
+ EsConnector esConnector = mock(EsConnector.class);
+ when(esConnector.getClusterHealthStatus())
+ .thenThrow(new ElasticsearchException(new ExecutionException(new ConnectException("Timeout connecting to [/127.0.0.1:9001]"))));
+ EsManagedProcess underTest = new EsManagedProcess(mock(Process.class), ProcessId.ELASTICSEARCH, esConnector, WAIT_FOR_UP_TIMEOUT_LONG);
assertThat(underTest.isOperational()).isFalse();
}
EsConnector esConnector = mock(EsConnector.class);
when(esConnector.getClusterHealthStatus())
.thenThrow(new ElasticsearchException("test"));
- EsManagedProcess underTest = new EsManagedProcess(mock(Process.class), ProcessId.ELASTICSEARCH, esConnector);
+ EsManagedProcess underTest = new EsManagedProcess(mock(Process.class), ProcessId.ELASTICSEARCH, esConnector, WAIT_FOR_UP_TIMEOUT);
assertThat(underTest.isOperational()).isFalse();
}
when(esConnector.getClusterHealthStatus())
.thenThrow(new ElasticsearchStatusException("foobar[type=master_not_discovered_exception,acme]...", RestStatus.SERVICE_UNAVAILABLE));
- EsManagedProcess underTest = new EsManagedProcess(mock(Process.class), ProcessId.ELASTICSEARCH, esConnector);
+ EsManagedProcess underTest = new EsManagedProcess(mock(Process.class), ProcessId.ELASTICSEARCH, esConnector, WAIT_FOR_UP_TIMEOUT);
assertThat(underTest.isOperational()).isFalse();
assertThat(memoryAppender.events).isNotEmpty();
assertThat(memoryAppender.events)