/* * Copyright 2013 gitblit.com. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.gitblit.tests; import java.text.MessageFormat; import java.util.Date; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import com.gitblit.fanout.FanoutClient; import com.gitblit.fanout.FanoutClient.FanoutAdapter; import com.gitblit.fanout.FanoutNioService; import com.gitblit.fanout.FanoutService; import com.gitblit.fanout.FanoutSocketService; public class FanoutServiceTest extends GitblitUnitTest { int fanoutPort = FanoutService.DEFAULT_PORT; @Test public void testNioPubSub() throws Exception { testPubSub(new FanoutNioService(fanoutPort)); } @Test public void testSocketPubSub() throws Exception { testPubSub(new FanoutSocketService(fanoutPort)); } @Test public void testNioDisruptionAndRecovery() throws Exception { testDisruption(new FanoutNioService(fanoutPort)); } @Test public void testSocketDisruptionAndRecovery() throws Exception { testDisruption(new FanoutSocketService(fanoutPort)); } protected void testPubSub(FanoutService service) throws Exception { System.out.println(MessageFormat.format("\n\n========================================\nPUBSUB TEST {0}\n========================================\n\n", service.toString())); service.startSynchronously(); final Map announcementsA = new ConcurrentHashMap(); FanoutClient clientA = new FanoutClient("localhost", fanoutPort); clientA.addListener(new FanoutAdapter() { @Override public void announcement(String channel, String message) { announcementsA.put(channel, message); } }); clientA.startSynchronously(); final Map announcementsB = new ConcurrentHashMap(); FanoutClient clientB = new FanoutClient("localhost", fanoutPort); clientB.addListener(new FanoutAdapter() { @Override public void announcement(String channel, String message) { announcementsB.put(channel, message); } }); clientB.startSynchronously(); // subscribe clients A and B to the channels clientA.subscribe("a"); clientA.subscribe("b"); clientA.subscribe("c"); clientB.subscribe("a"); clientB.subscribe("b"); clientB.subscribe("c"); // give async messages a chance to be delivered Thread.sleep(1000); clientA.announce("a", "apple"); clientA.announce("b", "banana"); clientA.announce("c", "cantelope"); clientB.announce("a", "avocado"); clientB.announce("b", "beet"); clientB.announce("c", "carrot"); // give async messages a chance to be delivered Thread.sleep(2000); // confirm that client B received client A's announcements assertEquals("apple", announcementsB.get("a")); assertEquals("banana", announcementsB.get("b")); assertEquals("cantelope", announcementsB.get("c")); // confirm that client A received client B's announcements assertEquals("avocado", announcementsA.get("a")); assertEquals("beet", announcementsA.get("b")); assertEquals("carrot", announcementsA.get("c")); clientA.stop(); clientB.stop(); service.stop(); } protected void testDisruption(FanoutService service) throws Exception { System.out.println(MessageFormat.format("\n\n========================================\nDISRUPTION TEST {0}\n========================================\n\n", service.toString())); service.startSynchronously(); final AtomicInteger pongCount = new AtomicInteger(0); FanoutClient client = new FanoutClient("localhost", fanoutPort); client.addListener(new FanoutAdapter() { @Override public void pong(Date timestamp) { pongCount.incrementAndGet(); } }); client.startSynchronously(); // ping and wait for pong client.ping(); Thread.sleep(500); // restart client client.stop(); Thread.sleep(1000); client.startSynchronously(); // ping and wait for pong client.ping(); Thread.sleep(500); assertEquals(2, pongCount.get()); // now disrupt service service.stop(); Thread.sleep(2000); service.startSynchronously(); // wait for reconnect Thread.sleep(2000); // ping and wait for pong client.ping(); Thread.sleep(500); // kill all client.stop(); service.stop(); // confirm expected pong count assertEquals(3, pongCount.get()); } }