You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

FanoutServiceTest.java 4.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. /*
  2. * Copyright 2013 gitblit.com.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package com.gitblit.tests;
  17. import java.text.MessageFormat;
  18. import java.util.Date;
  19. import java.util.Map;
  20. import java.util.concurrent.ConcurrentHashMap;
  21. import java.util.concurrent.atomic.AtomicInteger;
  22. import org.junit.Test;
  23. import com.gitblit.fanout.FanoutClient;
  24. import com.gitblit.fanout.FanoutClient.FanoutAdapter;
  25. import com.gitblit.fanout.FanoutNioService;
  26. import com.gitblit.fanout.FanoutService;
  27. import com.gitblit.fanout.FanoutSocketService;
  28. public class FanoutServiceTest extends GitblitUnitTest {
  29. int fanoutPort = FanoutService.DEFAULT_PORT;
  30. @Test
  31. public void testNioPubSub() throws Exception {
  32. testPubSub(new FanoutNioService(fanoutPort));
  33. }
  34. @Test
  35. public void testSocketPubSub() throws Exception {
  36. testPubSub(new FanoutSocketService(fanoutPort));
  37. }
  38. @Test
  39. public void testNioDisruptionAndRecovery() throws Exception {
  40. testDisruption(new FanoutNioService(fanoutPort));
  41. }
  42. @Test
  43. public void testSocketDisruptionAndRecovery() throws Exception {
  44. testDisruption(new FanoutSocketService(fanoutPort));
  45. }
  46. protected void testPubSub(FanoutService service) throws Exception {
  47. System.out.println(MessageFormat.format("\n\n========================================\nPUBSUB TEST {0}\n========================================\n\n", service.toString()));
  48. service.startSynchronously();
  49. final Map<String, String> announcementsA = new ConcurrentHashMap<String, String>();
  50. FanoutClient clientA = new FanoutClient("localhost", fanoutPort);
  51. clientA.addListener(new FanoutAdapter() {
  52. @Override
  53. public void announcement(String channel, String message) {
  54. announcementsA.put(channel, message);
  55. }
  56. });
  57. clientA.startSynchronously();
  58. final Map<String, String> announcementsB = new ConcurrentHashMap<String, String>();
  59. FanoutClient clientB = new FanoutClient("localhost", fanoutPort);
  60. clientB.addListener(new FanoutAdapter() {
  61. @Override
  62. public void announcement(String channel, String message) {
  63. announcementsB.put(channel, message);
  64. }
  65. });
  66. clientB.startSynchronously();
  67. // subscribe clients A and B to the channels
  68. clientA.subscribe("a");
  69. clientA.subscribe("b");
  70. clientA.subscribe("c");
  71. clientB.subscribe("a");
  72. clientB.subscribe("b");
  73. clientB.subscribe("c");
  74. // give async messages a chance to be delivered
  75. Thread.sleep(1000);
  76. clientA.announce("a", "apple");
  77. clientA.announce("b", "banana");
  78. clientA.announce("c", "cantelope");
  79. clientB.announce("a", "avocado");
  80. clientB.announce("b", "beet");
  81. clientB.announce("c", "carrot");
  82. // give async messages a chance to be delivered
  83. Thread.sleep(2000);
  84. // confirm that client B received client A's announcements
  85. assertEquals("apple", announcementsB.get("a"));
  86. assertEquals("banana", announcementsB.get("b"));
  87. assertEquals("cantelope", announcementsB.get("c"));
  88. // confirm that client A received client B's announcements
  89. assertEquals("avocado", announcementsA.get("a"));
  90. assertEquals("beet", announcementsA.get("b"));
  91. assertEquals("carrot", announcementsA.get("c"));
  92. clientA.stop();
  93. clientB.stop();
  94. service.stop();
  95. }
  96. protected void testDisruption(FanoutService service) throws Exception {
  97. System.out.println(MessageFormat.format("\n\n========================================\nDISRUPTION TEST {0}\n========================================\n\n", service.toString()));
  98. service.startSynchronously();
  99. final AtomicInteger pongCount = new AtomicInteger(0);
  100. FanoutClient client = new FanoutClient("localhost", fanoutPort);
  101. client.addListener(new FanoutAdapter() {
  102. @Override
  103. public void pong(Date timestamp) {
  104. pongCount.incrementAndGet();
  105. }
  106. });
  107. client.startSynchronously();
  108. // ping and wait for pong
  109. client.ping();
  110. Thread.sleep(500);
  111. // restart client
  112. client.stop();
  113. Thread.sleep(1000);
  114. client.startSynchronously();
  115. // ping and wait for pong
  116. client.ping();
  117. Thread.sleep(500);
  118. assertEquals(2, pongCount.get());
  119. // now disrupt service
  120. service.stop();
  121. Thread.sleep(2000);
  122. service.startSynchronously();
  123. // wait for reconnect
  124. Thread.sleep(2000);
  125. // ping and wait for pong
  126. client.ping();
  127. Thread.sleep(500);
  128. // kill all
  129. client.stop();
  130. service.stop();
  131. // confirm expected pong count
  132. assertEquals(3, pongCount.get());
  133. }
  134. }