@@ -21,7 +21,6 @@ package org.sonar.server.pushapi.scheduler.polling; | |||
import java.util.concurrent.Executors; | |||
import java.util.concurrent.ScheduledExecutorService; | |||
import org.sonar.api.server.ServerSide; | |||
import org.sonar.server.util.AbstractStoppableScheduledExecutorServiceImpl; | |||
import static java.lang.Thread.MIN_PRIORITY; |
@@ -21,7 +21,10 @@ package org.sonar.server.pushapi.scheduler.polling; | |||
import java.util.Collection; | |||
import java.util.Deque; | |||
import java.util.LinkedList; | |||
import java.util.List; | |||
import java.util.Map; | |||
import java.util.Optional; | |||
import java.util.Set; | |||
import java.util.concurrent.TimeUnit; | |||
import java.util.stream.Collectors; | |||
@@ -38,6 +41,7 @@ import org.sonar.db.project.ProjectDto; | |||
import org.sonar.db.pushevent.PushEventDto; | |||
import org.sonar.server.pushapi.sonarlint.SonarLintClient; | |||
import org.sonar.server.pushapi.sonarlint.SonarLintClientsRegistry; | |||
import org.sonar.server.pushapi.sonarlint.SonarLintPushEvent; | |||
@ServerSide | |||
public class PushEventPollScheduler implements Startable { | |||
@@ -93,13 +97,14 @@ public class PushEventPollScheduler implements Startable { | |||
var projectKeys = getClientsProjectKeys(clients); | |||
try (DbSession dbSession = dbClient.openSession(false)) { | |||
var projectUuids = getProjectUuids(projectKeys, dbSession); | |||
Deque<PushEventDto> events = getPushEvents(dbSession, projectUuids); | |||
var projectKeysByUuids = getProjectKeysByUuids(dbSession, projectKeys); | |||
Deque<PushEventDto> events = getPushEvents(dbSession, projectKeysByUuids.keySet()); | |||
LOG.debug("Received {} push events, attempting to broadcast to {} registered clients.", events.size(), | |||
clients.size()); | |||
events.forEach(clientsRegistry::broadcastMessage); | |||
events.forEach(pushEventDto -> mapToSonarLintPushEvent(pushEventDto, projectKeysByUuids) | |||
.ifPresent(clientsRegistry::broadcastMessage)); | |||
if (!events.isEmpty()) { | |||
var last = events.getLast(); | |||
@@ -109,6 +114,15 @@ public class PushEventPollScheduler implements Startable { | |||
} | |||
} | |||
private static Optional<SonarLintPushEvent> mapToSonarLintPushEvent(PushEventDto pushEventDto, Map<String, String> projectKeysByUuids) { | |||
var resolvedProjectKey = projectKeysByUuids.get(pushEventDto.getProjectUuid()); | |||
if (resolvedProjectKey == null) { | |||
LOG.debug("Could not find key for project with uuid [{}]", pushEventDto.getProjectUuid()); | |||
return Optional.empty(); | |||
} | |||
return Optional.of(new SonarLintPushEvent(pushEventDto.getName(), pushEventDto.getPayload(), resolvedProjectKey)); | |||
} | |||
private static Set<String> getClientsProjectKeys(List<SonarLintClient> clients) { | |||
return clients.stream() | |||
.map(SonarLintClient::getClientProjectKeys) | |||
@@ -117,14 +131,17 @@ public class PushEventPollScheduler implements Startable { | |||
} | |||
private Deque<PushEventDto> getPushEvents(DbSession dbSession, Set<String> projectUuids) { | |||
if (projectUuids.isEmpty()) { | |||
return new LinkedList<>(); | |||
} | |||
return dbClient.pushEventDao().selectChunkByProjectUuids(dbSession, projectUuids, lastPullTimestamp, lastSeenUuid, getPageSize()); | |||
} | |||
@NotNull | |||
private Set<String> getProjectUuids(Set<String> projectKeys, DbSession dbSession) { | |||
private Map<String, String> getProjectKeysByUuids(DbSession dbSession, Set<String> projectKeys) { | |||
return dbClient.projectDao().selectProjectsByKeys(dbSession, projectKeys) | |||
.stream().map(ProjectDto::getUuid) | |||
.collect(Collectors.toSet()); | |||
.stream() | |||
.collect(Collectors.toMap(ProjectDto::getUuid, ProjectDto::getKey)); | |||
} | |||
public long getInitialDelay() { |
@@ -34,7 +34,6 @@ import org.sonar.core.util.issue.IssueChangeListener; | |||
import org.sonar.core.util.issue.IssueChangedEvent; | |||
import org.sonar.core.util.rule.RuleActivationListener; | |||
import org.sonar.core.util.rule.RuleSetChangedEvent; | |||
import org.sonar.db.pushevent.PushEventDto; | |||
import org.sonar.server.exceptions.ForbiddenException; | |||
import org.sonar.server.pushapi.issues.IssueChangeBroadcastUtils; | |||
import org.sonar.server.pushapi.issues.IssueChangeEventsDistributor; | |||
@@ -105,9 +104,22 @@ public class SonarLintClientsRegistry implements RuleActivationListener, IssueCh | |||
broadcastMessage(issueChangedEvent, IssueChangeBroadcastUtils.getFilterForEvent(issueChangedEvent)); | |||
} | |||
public void broadcastMessage(PushEventDto event) { | |||
// TODO:: different task for broadcasting event | |||
LOG.info("received event: ({}, {}) ", event.getUuid(), event.getName()); | |||
public void broadcastMessage(SonarLintPushEvent event) { | |||
clients.stream().filter(client -> client.getClientProjectKeys().contains(event.getProjectKey())) | |||
.forEach(c -> { | |||
Set<String> clientProjectKeys = new HashSet<>(c.getClientProjectKeys()); | |||
clientProjectKeys.retainAll(Set.of(event.getProjectKey())); | |||
try { | |||
sonarLintClientPermissionsValidator.validateUserCanReceivePushEventForProjects(c.getUserUuid(), clientProjectKeys); | |||
c.writeAndFlush(event.serialize()); | |||
} catch (ForbiddenException forbiddenException) { | |||
logClientUnauthenticated(forbiddenException); | |||
unregisterClient(c); | |||
} catch (IllegalStateException | IOException e) { | |||
logUnexpectedError(e); | |||
unregisterClient(c); | |||
} | |||
}); | |||
} | |||
public void broadcastMessage(RuleSetChangedEvent event, Predicate<SonarLintClient> filter) { | |||
@@ -121,10 +133,10 @@ public class SonarLintClientsRegistry implements RuleActivationListener, IssueCh | |||
String message = RuleSetChangeBroadcastUtils.getMessage(personalizedEvent); | |||
c.writeAndFlush(message); | |||
} catch (ForbiddenException forbiddenException) { | |||
LOG.debug("Client is no longer authenticated: " + forbiddenException.getMessage()); | |||
logClientUnauthenticated(forbiddenException); | |||
unregisterClient(c); | |||
} catch (IllegalStateException | IOException e) { | |||
LOG.error("Unable to send message to a client: " + e.getMessage()); | |||
logUnexpectedError(e); | |||
unregisterClient(c); | |||
} | |||
}); | |||
@@ -139,15 +151,23 @@ public class SonarLintClientsRegistry implements RuleActivationListener, IssueCh | |||
String message = IssueChangeBroadcastUtils.getMessage(event); | |||
c.writeAndFlush(message); | |||
} catch (ForbiddenException forbiddenException) { | |||
LOG.debug("Client is no longer authenticated: " + forbiddenException.getMessage()); | |||
logClientUnauthenticated(forbiddenException); | |||
unregisterClient(c); | |||
} catch (IllegalStateException | IOException e) { | |||
LOG.error("Unable to send message to a client: " + e.getMessage()); | |||
logUnexpectedError(e); | |||
unregisterClient(c); | |||
} | |||
}); | |||
} | |||
private static void logUnexpectedError(Exception e) { | |||
LOG.error("Unable to send message to a client: " + e.getMessage()); | |||
} | |||
private static void logClientUnauthenticated(ForbiddenException forbiddenException) { | |||
LOG.debug("Client is no longer authenticated: " + forbiddenException.getMessage()); | |||
} | |||
class SonarLintClientEventsListener implements AsyncListener { | |||
private final SonarLintClient client; | |||
@@ -0,0 +1,53 @@ | |||
/* | |||
* SonarQube | |||
* Copyright (C) 2009-2022 SonarSource SA | |||
* mailto:info AT sonarsource DOT com | |||
* | |||
* This program is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* This program is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.pushapi.sonarlint; | |||
import static java.nio.charset.StandardCharsets.UTF_8; | |||
public class SonarLintPushEvent { | |||
private final String name; | |||
private final byte[] data; | |||
private final String projectKey; | |||
public SonarLintPushEvent(String name, byte[] data, String projectKey) { | |||
this.name = name; | |||
this.data = data; | |||
this.projectKey = projectKey; | |||
} | |||
public String getProjectKey() { | |||
return projectKey; | |||
} | |||
public String getName() { | |||
return name; | |||
} | |||
public byte[] getData() { | |||
return data; | |||
} | |||
public String serialize() { | |||
return "event: " + this.name + "\n" | |||
+ "data: " + new String(this.data, UTF_8); | |||
} | |||
} |
@@ -34,6 +34,7 @@ import org.sonar.db.DbTester; | |||
import org.sonar.db.pushevent.PushEventDto; | |||
import org.sonar.server.pushapi.sonarlint.SonarLintClient; | |||
import org.sonar.server.pushapi.sonarlint.SonarLintClientsRegistry; | |||
import org.sonar.server.pushapi.sonarlint.SonarLintPushEvent; | |||
import org.sonar.server.util.AbstractStoppableExecutorService; | |||
import static java.nio.charset.StandardCharsets.UTF_8; | |||
@@ -68,7 +69,7 @@ public class PushEventPollSchedulerTest { | |||
assertThatCode(executorService::runCommand) | |||
.doesNotThrowAnyException(); | |||
verify(clientsRegistry, times(0)).broadcastMessage(any(PushEventDto.class)); | |||
verify(clientsRegistry, times(0)).broadcastMessage(any(SonarLintPushEvent.class)); | |||
} | |||
@Test | |||
@@ -80,7 +81,7 @@ public class PushEventPollSchedulerTest { | |||
executorService.runCommand(); | |||
verify(clientsRegistry, times(0)).broadcastMessage(any(PushEventDto.class)); | |||
verify(clientsRegistry, times(0)).broadcastMessage(any(SonarLintPushEvent.class)); | |||
} | |||
@Test | |||
@@ -96,7 +97,29 @@ public class PushEventPollSchedulerTest { | |||
executorService.runCommand(); | |||
verify(clientsRegistry, times(0)).broadcastMessage(any(PushEventDto.class)); | |||
verify(clientsRegistry, times(0)).broadcastMessage(any(SonarLintPushEvent.class)); | |||
} | |||
@Test | |||
public void nothing_to_broadcast_if_project_key_does_not_exist() { | |||
var project = db.components().insertPrivateProject(); | |||
system2.setNow(1L); | |||
var sonarLintClient = mock(SonarLintClient.class); | |||
when(sonarLintClient.getClientProjectKeys()).thenReturn(Set.of("not-existing-project-key")); | |||
when(clientsRegistry.getClients()).thenReturn(List.of(sonarLintClient)); | |||
var underTest = new PushEventPollScheduler(executorService, clientsRegistry, db.getDbClient(), system2, config); | |||
underTest.start(); | |||
verify(clientsRegistry, times(0)).broadcastMessage(any(SonarLintPushEvent.class)); | |||
system2.tick(); // tick=2 | |||
generatePushEvent(project.uuid()); | |||
executorService.runCommand(); | |||
verify(clientsRegistry, times(0)).broadcastMessage(any(SonarLintPushEvent.class)); | |||
} | |||
@Test | |||
@@ -112,7 +135,7 @@ public class PushEventPollSchedulerTest { | |||
underTest.start(); | |||
executorService.runCommand(); | |||
verify(clientsRegistry, times(0)).broadcastMessage(any(PushEventDto.class)); | |||
verify(clientsRegistry, times(0)).broadcastMessage(any(SonarLintPushEvent.class)); | |||
system2.tick(); // tick=2 | |||
generatePushEvent(project.uuid()); | |||
@@ -124,7 +147,7 @@ public class PushEventPollSchedulerTest { | |||
underTest.start(); | |||
executorService.runCommand(); | |||
verify(clientsRegistry, times(3)).broadcastMessage(any(PushEventDto.class)); | |||
verify(clientsRegistry, times(3)).broadcastMessage(any(SonarLintPushEvent.class)); | |||
system2.tick(); // tick=4 | |||
generatePushEvent(project.uuid()); | |||
@@ -132,7 +155,7 @@ public class PushEventPollSchedulerTest { | |||
underTest.start(); | |||
executorService.runCommand(); | |||
verify(clientsRegistry, times(5)).broadcastMessage(any(PushEventDto.class)); | |||
verify(clientsRegistry, times(5)).broadcastMessage(any(SonarLintPushEvent.class)); | |||
} | |||
@Test | |||
@@ -148,7 +171,7 @@ public class PushEventPollSchedulerTest { | |||
underTest.start(); | |||
executorService.runCommand(); | |||
verify(clientsRegistry, times(0)).broadcastMessage(any(PushEventDto.class)); | |||
verify(clientsRegistry, times(0)).broadcastMessage(any(SonarLintPushEvent.class)); | |||
system2.tick(); // tick=2 | |||
generatePushEvent(project.uuid()); | |||
@@ -157,7 +180,7 @@ public class PushEventPollSchedulerTest { | |||
executorService.runCommand(); | |||
// all clients have been unregistered, nothing to broadcast | |||
verify(clientsRegistry, times(0)).broadcastMessage(any(PushEventDto.class)); | |||
verify(clientsRegistry, times(0)).broadcastMessage(any(SonarLintPushEvent.class)); | |||
} | |||
private PushEventDto generatePushEvent(String projectUuid) { |
@@ -20,6 +20,7 @@ | |||
package org.sonar.server.pushapi.sonarlint; | |||
import java.io.IOException; | |||
import java.nio.charset.StandardCharsets; | |||
import java.util.Set; | |||
import javax.servlet.AsyncContext; | |||
import javax.servlet.ServletOutputStream; | |||
@@ -28,9 +29,9 @@ import org.junit.Before; | |||
import org.junit.Test; | |||
import org.mockito.ArgumentCaptor; | |||
import org.sonar.api.rule.Severity; | |||
import org.sonar.core.util.ParamChange; | |||
import org.sonar.core.util.issue.Issue; | |||
import org.sonar.core.util.issue.IssueChangedEvent; | |||
import org.sonar.core.util.ParamChange; | |||
import org.sonar.core.util.rule.RuleChange; | |||
import org.sonar.core.util.rule.RuleSetChangedEvent; | |||
import org.sonar.server.exceptions.ForbiddenException; | |||
@@ -176,7 +177,7 @@ public class SonarLintClientsRegistryTest { | |||
@Test | |||
public void listen_givenUserNotPermittedToReceiveIssueChangeEvent_closeConnection() { | |||
Issue[] issues = new Issue[]{ new Issue("issue-1", "branch-1")}; | |||
Issue[] issues = new Issue[] {new Issue("issue-1", "branch-1")}; | |||
IssueChangedEvent issueChangedEvent = new IssueChangedEvent("project1", issues, true, "BLOCKER", "BUG"); | |||
SonarLintClient sonarLintClient = createSampleSLClient(); | |||
@@ -208,6 +209,64 @@ public class SonarLintClientsRegistryTest { | |||
verify(sonarLintClient, times(2)).close(); | |||
} | |||
@Test | |||
public void broadcast_push_event_to_clients() throws IOException { | |||
SonarLintPushEvent event = new SonarLintPushEvent("event", "data".getBytes(StandardCharsets.UTF_8), "project2"); | |||
SonarLintClient sonarLintClient = createSampleSLClient(); | |||
underTest.registerClient(sonarLintClient); | |||
underTest.broadcastMessage(event); | |||
verify(permissionsValidator, times(1)).validateUserCanReceivePushEventForProjects(anyString(), anySet()); | |||
verify(sonarLintClient, times(1)).writeAndFlush(anyString()); | |||
} | |||
@Test | |||
public void broadcast_skips_push_if_event_project_does_not_match_with_client() throws IOException { | |||
SonarLintPushEvent event = new SonarLintPushEvent("event", "data".getBytes(StandardCharsets.UTF_8), "project4"); | |||
SonarLintClient sonarLintClient = createSampleSLClient(); | |||
underTest.registerClient(sonarLintClient); | |||
underTest.broadcastMessage(event); | |||
verify(permissionsValidator, times(0)).validateUserCanReceivePushEventForProjects(anyString(), anySet()); | |||
verify(sonarLintClient, times(0)).close(); | |||
verify(sonarLintClient, times(0)).writeAndFlush(anyString()); | |||
} | |||
@Test | |||
public void broadcast_givenUserNotPermittedToReceiveSonarLintPushEvent_closeConnection() { | |||
SonarLintPushEvent event = new SonarLintPushEvent("event", "data".getBytes(StandardCharsets.UTF_8), "project1"); | |||
SonarLintClient sonarLintClient = createSampleSLClient(); | |||
underTest.registerClient(sonarLintClient); | |||
doThrow(new ForbiddenException("Access forbidden")).when(permissionsValidator).validateUserCanReceivePushEventForProjects(anyString(), anySet()); | |||
underTest.broadcastMessage(event); | |||
verify(sonarLintClient).close(); | |||
} | |||
@Test | |||
public void broadcast_givenUnregisteredClient_closeConnection() throws IOException { | |||
SonarLintPushEvent event = new SonarLintPushEvent("event", "data".getBytes(StandardCharsets.UTF_8), "project1"); | |||
SonarLintClient sonarLintClient = createSampleSLClient(); | |||
underTest.registerClient(sonarLintClient); | |||
doThrow(new IOException("Broken pipe")).when(sonarLintClient).writeAndFlush(anyString()); | |||
underTest.broadcastMessage(event); | |||
underTest.registerClient(sonarLintClient); | |||
doThrow(new IllegalStateException("Things went wrong")).when(sonarLintClient).writeAndFlush(anyString()); | |||
underTest.broadcastMessage(event); | |||
verify(sonarLintClient, times(2)).close(); | |||
} | |||
@Test | |||
public void registerClient_whenCalledFirstTime_registerAlsoToListenToEvents() { | |||
underTest.registerClient(createSampleSLClient()); | |||
@@ -258,7 +317,7 @@ public class SonarLintClientsRegistryTest { | |||
private RuleChange createRuleChange() { | |||
RuleChange javaRule = new RuleChange(); | |||
javaRule.setLanguage("java"); | |||
javaRule.setParams(new ParamChange[]{new ParamChange("param-key", "param-value")}); | |||
javaRule.setParams(new ParamChange[] {new ParamChange("param-key", "param-value")}); | |||
javaRule.setTemplateKey("repo:template-key"); | |||
javaRule.setSeverity(Severity.CRITICAL); | |||
javaRule.setKey("repo:rule-key"); |