diff options
Diffstat (limited to 'src/test/java/com/gitblit/tests/FanoutServiceTest.java')
-rw-r--r-- | src/test/java/com/gitblit/tests/FanoutServiceTest.java | 54 |
1 files changed, 27 insertions, 27 deletions
diff --git a/src/test/java/com/gitblit/tests/FanoutServiceTest.java b/src/test/java/com/gitblit/tests/FanoutServiceTest.java index cd094dac..056a6c34 100644 --- a/src/test/java/com/gitblit/tests/FanoutServiceTest.java +++ b/src/test/java/com/gitblit/tests/FanoutServiceTest.java @@ -32,9 +32,9 @@ import com.gitblit.fanout.FanoutService; import com.gitblit.fanout.FanoutSocketService;
public class FanoutServiceTest {
-
+
int fanoutPort = FanoutService.DEFAULT_PORT;
-
+
@Test
public void testNioPubSub() throws Exception {
testPubSub(new FanoutNioService(fanoutPort));
@@ -44,7 +44,7 @@ public class FanoutServiceTest { public void testSocketPubSub() throws Exception {
testPubSub(new FanoutSocketService(fanoutPort));
}
-
+
@Test
public void testNioDisruptionAndRecovery() throws Exception {
testDisruption(new FanoutNioService(fanoutPort));
@@ -54,21 +54,21 @@ public class FanoutServiceTest { public void testSocketDisruptionAndRecovery() throws Exception {
testDisruption(new FanoutSocketService(fanoutPort));
}
-
+
protected void testPubSub(FanoutService service) throws Exception {
System.out.println(MessageFormat.format("\n\n========================================\nPUBSUB TEST {0}\n========================================\n\n", service.toString()));
service.startSynchronously();
-
+
final Map<String, String> announcementsA = new ConcurrentHashMap<String, String>();
FanoutClient clientA = new FanoutClient("localhost", fanoutPort);
clientA.addListener(new FanoutAdapter() {
-
+
@Override
public void announcement(String channel, String message) {
announcementsA.put(channel, message);
}
});
-
+
clientA.startSynchronously();
final Map<String, String> announcementsB = new ConcurrentHashMap<String, String>();
@@ -81,23 +81,23 @@ public class FanoutServiceTest { });
clientB.startSynchronously();
-
+
// subscribe clients A and B to the channels
clientA.subscribe("a");
clientA.subscribe("b");
clientA.subscribe("c");
-
+
clientB.subscribe("a");
clientB.subscribe("b");
clientB.subscribe("c");
-
+
// give async messages a chance to be delivered
Thread.sleep(1000);
-
+
clientA.announce("a", "apple");
clientA.announce("b", "banana");
clientA.announce("c", "cantelope");
-
+
clientB.announce("a", "avocado");
clientB.announce("b", "beet");
clientB.announce("c", "carrot");
@@ -114,16 +114,16 @@ public class FanoutServiceTest { assertEquals("avocado", announcementsA.get("a"));
assertEquals("beet", announcementsA.get("b"));
assertEquals("carrot", announcementsA.get("c"));
-
+
clientA.stop();
clientB.stop();
- service.stop();
+ service.stop();
}
-
+
protected void testDisruption(FanoutService service) throws Exception {
System.out.println(MessageFormat.format("\n\n========================================\nDISRUPTION TEST {0}\n========================================\n\n", service.toString()));
service.startSynchronously();
-
+
final AtomicInteger pongCount = new AtomicInteger(0);
FanoutClient client = new FanoutClient("localhost", fanoutPort);
client.addListener(new FanoutAdapter() {
@@ -133,27 +133,27 @@ public class FanoutServiceTest { }
});
client.startSynchronously();
-
+
// ping and wait for pong
- client.ping();
+ client.ping();
Thread.sleep(500);
-
+
// restart client
client.stop();
Thread.sleep(1000);
- client.startSynchronously();
-
+ client.startSynchronously();
+
// ping and wait for pong
- client.ping();
+ client.ping();
Thread.sleep(500);
-
+
assertEquals(2, pongCount.get());
-
+
// now disrupt service
- service.stop();
+ service.stop();
Thread.sleep(2000);
service.startSynchronously();
-
+
// wait for reconnect
Thread.sleep(2000);
@@ -164,7 +164,7 @@ public class FanoutServiceTest { // kill all
client.stop();
service.stop();
-
+
// confirm expected pong count
assertEquals(3, pongCount.get());
}
|