]> source.dussan.org Git - gitblit.git/commitdiff
Fanout service for Sparkleshare clients
authorJames Moger <james.moger@gitblit.com>
Sat, 12 Jan 2013 04:50:59 +0000 (23:50 -0500)
committerJames Moger <james.moger@gitblit.com>
Sat, 12 Jan 2013 04:50:59 +0000 (23:50 -0500)
13 files changed:
distrib/gitblit.properties
docs/01_features.mkd
docs/04_releases.mkd
src/com/gitblit/GitBlit.java
src/com/gitblit/fanout/FanoutClient.java [new file with mode: 0644]
src/com/gitblit/fanout/FanoutConstants.java [new file with mode: 0644]
src/com/gitblit/fanout/FanoutNioService.java [new file with mode: 0644]
src/com/gitblit/fanout/FanoutService.java [new file with mode: 0644]
src/com/gitblit/fanout/FanoutServiceConnection.java [new file with mode: 0644]
src/com/gitblit/fanout/FanoutSocketService.java [new file with mode: 0644]
src/com/gitblit/fanout/FanoutStats.java [new file with mode: 0644]
tests/com/gitblit/tests/FanoutServiceTest.java [new file with mode: 0644]
tests/com/gitblit/tests/GitBlitSuite.java

index ce269d2c14e433841634459f8c5ecec397824d51..758137e3253d95ae9f86055d2e6a3639edc3ffbb 100644 (file)
@@ -365,6 +365,53 @@ groovy.postReceiveScripts =
 # SINCE 1.0.0\r
 groovy.customFields = \r
 \r
+#\r
+# Fanout Settings\r
+#\r
+\r
+# Fanout is a PubSub notification service that can be used by Sparkleshare\r
+# to eliminate repository change polling.  The fanout service runs in a separate\r
+# thread on a separate port from the Gitblit http/https application.\r
+# This service is provided so that Sparkleshare may be used with Gitblit in\r
+# firewalled environments or where reliance on Sparkleshare's default notifications\r
+# server (notifications.sparkleshare.org) is unwanted.\r
+#\r
+# This service maintains an open socket connection from the client to the\r
+# Fanout PubSub service. This service may not work properly behind a proxy server.  \r
+\r
+# Specify the interface for Fanout to bind it's service.\r
+# You may specify an ip or an empty value to bind to all interfaces.\r
+# Specifying localhost will result in Gitblit ONLY listening to requests to\r
+# localhost.\r
+#\r
+# SINCE 1.2.1\r
+# RESTART REQUIRED\r
+fanout.bindInterface = localhost\r
+\r
+# port for serving the Fanout PubSub service.  <= 0 disables this service.\r
+# On Unix/Linux systems, ports < 1024 require root permissions.\r
+# Recommended value: 17000\r
+#\r
+# SINCE 1.2.1\r
+# RESTART REQUIRED\r
+fanout.port = 0\r
+\r
+# Use Fanout NIO service.  If false, a multi-threaded socket service will be used.\r
+# Be advised, the socket implementation spawns a thread per connection plus the\r
+# connection acceptor thread.  The NIO implementation is completely single-threaded.\r
+#\r
+# SINCE 1.2.1\r
+# RESTART REQUIRED\r
+fanout.useNio = true\r
+\r
+# Concurrent connection limit.  <= 0 disables concurrent connection throttling.\r
+# If > 0, only the specified number of concurrent connections will be allowed\r
+# and all other connections will be rejected.\r
+#\r
+# SINCE 1.2.1\r
+# RESTART REQUIRED\r
+fanout.connectionLimit = 0\r
+\r
 #\r
 # Authentication Settings\r
 #\r
index 038acd06752b0d6eb7f182e9c318357f9a1bcae0..fa3efea4a6998e644fcfc0aca36df1b2ad717f07 100644 (file)
@@ -37,6 +37,7 @@
 - Git-notes display support\r
 - Submodule support\r
 - Push log based on a hidden, orphan branch refs/gitblit/pushes\r
+- Fanout PubSub notifications service for self-hosted [Sparkleshare](http://sparkleshare.org) use\r
 - gh-pages display support (Jekyll is not supported)\r
 - Branch metrics (uses Google Charts)\r
 - HEAD and Branch RSS feeds\r
index 26cbd08b75e35cf9024b3e27aa15595fc321b2ca..d77c732621fcd380adb0f0546a991e93a76167c0 100644 (file)
 \r
 #### additions\r
 \r
-- Implemented a simple push log based on a hidden, orphan branch refs/gitblit/pushes (issue 177)\r
+- Fanout PubSub service for self-hosted [Sparkleshare](http://sparkleshare.org) notifications.<br/>\r
+This service is disabled by default.<br/>\r
+    **New:** *fanout.bindInterface = localhost*<br/>\r
+       **New:** *fanout.port = 0*<br/>\r
+       **New:** *fanout.useNio = true*<br/>\r
+       **New:** *fanout.connectionLimit = 0*\r
+- Implemented a simple push log based on a hidden, orphan branch refs/gitblit/pushes (issue 177)<br/>\r
+The push log is not currently visible in the ui, but the data will be collected and it will be exposed to the ui in the next release.\r
 - Support for locally and remotely authenticated accounts in LdapUserService and RedmineUserService (issue 183)\r
 - Added Dutch translation (github/kwoot)\r
 \r
index 3eb246b82fa57a06b036316293a6eb9171f8671c..489ba63ca60c59b9ae6b90db9794651c4aa529bb 100644 (file)
@@ -85,6 +85,9 @@ import com.gitblit.Constants.FederationStrategy;
 import com.gitblit.Constants.FederationToken;\r
 import com.gitblit.Constants.PermissionType;\r
 import com.gitblit.Constants.RegistrantType;\r
+import com.gitblit.fanout.FanoutNioService;\r
+import com.gitblit.fanout.FanoutService;\r
+import com.gitblit.fanout.FanoutSocketService;\r
 import com.gitblit.models.FederationModel;\r
 import com.gitblit.models.FederationProposal;\r
 import com.gitblit.models.FederationSet;\r
@@ -180,6 +183,8 @@ public class GitBlit implements ServletContextListener {
        private TimeZone timezone;\r
        \r
        private FileBasedConfig projectConfigs;\r
+       \r
+       private FanoutService fanoutService;\r
 \r
        public GitBlit() {\r
                if (gitblit == null) {\r
@@ -3133,6 +3138,32 @@ public class GitBlit implements ServletContextListener {
                }\r
 \r
                ContainerUtils.CVE_2007_0450.test();\r
+               \r
+               // startup Fanout PubSub service\r
+               if (settings.getInteger(Keys.fanout.port, 0) > 0) {\r
+                       String bindInterface = settings.getString(Keys.fanout.bindInterface, null);\r
+                       int port = settings.getInteger(Keys.fanout.port, FanoutService.DEFAULT_PORT);\r
+                       boolean useNio = settings.getBoolean(Keys.fanout.useNio, true);\r
+                       int limit = settings.getInteger(Keys.fanout.connectionLimit, 0);\r
+                       \r
+                       if (useNio) {\r
+                               if (StringUtils.isEmpty(bindInterface)) {\r
+                                       fanoutService = new FanoutNioService(port);\r
+                               } else {\r
+                                       fanoutService = new FanoutNioService(bindInterface, port);\r
+                               }\r
+                       } else {\r
+                               if (StringUtils.isEmpty(bindInterface)) {\r
+                                       fanoutService = new FanoutSocketService(port);\r
+                               } else {\r
+                                       fanoutService = new FanoutSocketService(bindInterface, port);\r
+                               }\r
+                       }\r
+                       \r
+                       fanoutService.setConcurrentConnectionLimit(limit);\r
+                       fanoutService.setAllowAllChannelAnnouncements(false);\r
+                       fanoutService.start();\r
+               }\r
        }\r
        \r
        private void logTimezone(String type, TimeZone zone) {\r
@@ -3206,6 +3237,9 @@ public class GitBlit implements ServletContextListener {
                scheduledExecutor.shutdownNow();\r
                luceneExecutor.close();\r
                gcExecutor.close();\r
+               if (fanoutService != null) {\r
+                       fanoutService.stop();\r
+               }\r
        }\r
        \r
        /**\r
diff --git a/src/com/gitblit/fanout/FanoutClient.java b/src/com/gitblit/fanout/FanoutClient.java
new file mode 100644 (file)
index 0000000..b9ace4b
--- /dev/null
@@ -0,0 +1,413 @@
+/*\r
+ * Copyright 2013 gitblit.com.\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *     http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package com.gitblit.fanout;\r
+\r
+import java.io.IOException;\r
+import java.net.InetAddress;\r
+import java.net.InetSocketAddress;\r
+import java.nio.ByteBuffer;\r
+import java.nio.channels.SelectionKey;\r
+import java.nio.channels.Selector;\r
+import java.nio.channels.SocketChannel;\r
+import java.nio.charset.Charset;\r
+import java.nio.charset.CharsetDecoder;\r
+import java.text.MessageFormat;\r
+import java.util.ArrayList;\r
+import java.util.Collections;\r
+import java.util.Date;\r
+import java.util.Iterator;\r
+import java.util.LinkedHashSet;\r
+import java.util.List;\r
+import java.util.Set;\r
+import java.util.concurrent.atomic.AtomicBoolean;\r
+\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+/**\r
+ * Fanout client class.\r
+ * \r
+ * @author James Moger\r
+ *\r
+ */\r
+public class FanoutClient implements Runnable {\r
+\r
+       private final static Logger logger = LoggerFactory.getLogger(FanoutClient.class);\r
+\r
+       private final int clientTimeout = 500;\r
+       private final int reconnectTimeout = 2000;\r
+       private final String host;\r
+       private final int port;\r
+       private final List<FanoutListener> listeners;\r
+\r
+       private String id;\r
+       private volatile Selector selector;\r
+       private volatile SocketChannel socketCh;\r
+       private Thread clientThread;\r
+       \r
+       private final AtomicBoolean isConnected;\r
+       private final AtomicBoolean isRunning;\r
+       private final AtomicBoolean isAutomaticReconnect;\r
+       private final ByteBuffer writeBuffer;\r
+       private final ByteBuffer readBuffer;\r
+       private final CharsetDecoder decoder;\r
+       \r
+       private final Set<String> subscriptions;\r
+       private boolean resubscribe;\r
+       \r
+       public interface FanoutListener {\r
+               public void pong(Date timestamp);\r
+               public void announcement(String channel, String message);\r
+       }\r
+       \r
+       public static class FanoutAdapter implements FanoutListener {\r
+               public void pong(Date timestamp) { }\r
+               public void announcement(String channel, String message) { }\r
+       }\r
+\r
+       public static void main(String args[]) throws Exception {\r
+               FanoutClient client = new FanoutClient("localhost", 2000);\r
+               client.addListener(new FanoutAdapter() {\r
+\r
+                       @Override\r
+                       public void pong(Date timestamp) {\r
+                               System.out.println("Pong. " + timestamp);\r
+                       }\r
+                       \r
+                       @Override\r
+                       public void announcement(String channel, String message) {\r
+                               System.out.println(MessageFormat.format("Here ye, Here ye. {0} says {1}", channel, message));\r
+                       }\r
+               });\r
+               client.start();\r
+               \r
+               Thread.sleep(5000);\r
+               client.ping();\r
+               client.subscribe("james");\r
+               client.announce("james", "12345");              \r
+               client.subscribe("c52f99d16eb5627877ae957df7ce1be102783bd5");\r
+               \r
+               while (true) {\r
+                       Thread.sleep(10000);\r
+                       client.ping();\r
+               }\r
+       }\r
+\r
+       public FanoutClient(String host, int port) {\r
+               this.host = host;\r
+               this.port = port;\r
+               readBuffer = ByteBuffer.allocateDirect(FanoutConstants.BUFFER_LENGTH);\r
+               writeBuffer = ByteBuffer.allocateDirect(FanoutConstants.BUFFER_LENGTH);\r
+               decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder();\r
+               listeners = Collections.synchronizedList(new ArrayList<FanoutListener>());\r
+               subscriptions = new LinkedHashSet<String>();\r
+               isRunning = new AtomicBoolean(false);\r
+               isConnected = new AtomicBoolean(false);\r
+               isAutomaticReconnect = new AtomicBoolean(true);\r
+       }\r
+\r
+       public void addListener(FanoutListener listener) {\r
+               listeners.add(listener);\r
+       }\r
+\r
+       public void removeListener(FanoutListener listener) {\r
+               listeners.remove(listener);\r
+       }\r
+       \r
+       public boolean isAutomaticReconnect() {\r
+               return isAutomaticReconnect.get();\r
+       }\r
+       \r
+       public void setAutomaticReconnect(boolean value) {\r
+               isAutomaticReconnect.set(value);\r
+       }\r
+\r
+       public void ping() {\r
+               confirmConnection();\r
+               write("ping");\r
+       }\r
+\r
+       public void status() {\r
+               confirmConnection();\r
+               write("status");\r
+       }\r
+       \r
+       public void subscribe(String channel) {\r
+               confirmConnection();\r
+               if (subscriptions.add(channel)) {\r
+                       write("subscribe " + channel);\r
+               }\r
+       }\r
+       \r
+       public void unsubscribe(String channel) {\r
+               confirmConnection();\r
+               if (subscriptions.remove(channel)) {\r
+                       write("unsubscribe " + channel);\r
+               }\r
+       }\r
+       \r
+       public void announce(String channel, String message) {\r
+               confirmConnection();\r
+               write("announce " + channel + " " + message);\r
+       }\r
+\r
+       private void confirmConnection() {\r
+               if (!isConnected()) {\r
+                       throw new RuntimeException("Fanout client is disconnected!");\r
+               }\r
+       }\r
+       \r
+       public boolean isConnected() {\r
+               return isRunning.get() && socketCh != null && isConnected.get();\r
+       }\r
+       \r
+       /**\r
+        * Start client connection and return immediately.\r
+        */\r
+       public void start() {\r
+               if (isRunning.get()) {\r
+                       logger.warn("Fanout client is already running");\r
+                       return;\r
+               }\r
+               clientThread = new Thread(this, "Fanout client");\r
+               clientThread.start();\r
+       }\r
+       \r
+       /**\r
+        * Start client connection and wait until it has connected.\r
+        */\r
+       public void startSynchronously() {\r
+               start();\r
+               while (!isConnected()) {                        \r
+                       try {\r
+                               Thread.sleep(100);\r
+                       } catch (Exception e) {\r
+                       }\r
+               }\r
+       }\r
+\r
+       /**\r
+        * Stops client connection.  This method returns when the connection has\r
+        * been completely shutdown.\r
+        */\r
+       public void stop() {\r
+               if (!isRunning.get()) {\r
+                       logger.warn("Fanout client is not running");\r
+                       return;\r
+               }\r
+               isRunning.set(false);\r
+               try {\r
+                       if (clientThread != null) {\r
+                               clientThread.join();\r
+                               clientThread = null;\r
+                       }\r
+               } catch (InterruptedException e1) {\r
+               }\r
+       }\r
+\r
+       @Override\r
+       public void run() {\r
+               resetState();\r
+               \r
+               isRunning.set(true);            \r
+               while (isRunning.get()) {\r
+                       // (re)connect\r
+                       if (socketCh == null) {\r
+                               try {\r
+                                       InetAddress addr = InetAddress.getByName(host);\r
+                                       socketCh = SocketChannel.open(new InetSocketAddress(addr, port));\r
+                                       socketCh.configureBlocking(false);\r
+                                       selector = Selector.open();\r
+                                       id = FanoutConstants.getLocalSocketId(socketCh.socket());                                                                       \r
+                                       socketCh.register(selector, SelectionKey.OP_READ);\r
+                               } catch (Exception e) {\r
+                                       logger.error(MessageFormat.format("failed to open client connection to {0}:{1,number,0}", host, port), e);\r
+                                       try {\r
+                                               Thread.sleep(reconnectTimeout);\r
+                                       } catch (InterruptedException x) {\r
+                                       }\r
+                                       continue;\r
+                               }\r
+                       }\r
+                       \r
+                       // read/write\r
+                       try {\r
+                               selector.select(clientTimeout);\r
+\r
+                               Iterator<SelectionKey> i = selector.selectedKeys().iterator();\r
+                               while (i.hasNext()) {\r
+                                       SelectionKey key = i.next();\r
+                                       i.remove();\r
+                                       \r
+                                       if (key.isReadable()) {\r
+                                               // read message\r
+                                               String content = read();\r
+                                               String[] lines = content.split("\n");\r
+                                               for (String reply : lines) {\r
+                                                       logger.trace(MessageFormat.format("fanout client {0} received: {1}", id, reply));\r
+                                                       if (!processReply(reply)) {\r
+                                                               logger.error(MessageFormat.format("fanout client {0} received unknown message", id));\r
+                                                       }\r
+                                               }\r
+                                       } else if (key.isWritable()) {\r
+                                               // resubscribe\r
+                                               if (resubscribe) {\r
+                                                       resubscribe = false;\r
+                                                       logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size()));                                                \r
+                                                       for (String subscription : subscriptions) {\r
+                                                               write("subscribe " + subscription);\r
+                                                       }\r
+                                               }\r
+                                               socketCh.register(selector, SelectionKey.OP_READ);\r
+                                       }\r
+                               }\r
+                       } catch (IOException e) {\r
+                               logger.error(MessageFormat.format("fanout client {0} error: {1}", id, e.getMessage()));\r
+                               closeChannel();                         \r
+                               if (!isAutomaticReconnect.get()) {\r
+                                       isRunning.set(false);\r
+                                       continue;\r
+                               }\r
+                       }\r
+               }\r
+               \r
+               closeChannel();\r
+               resetState();\r
+       }\r
+       \r
+       protected void resetState() {\r
+               readBuffer.clear();\r
+               writeBuffer.clear();\r
+               isRunning.set(false);\r
+               isConnected.set(false);\r
+       }\r
+       \r
+       private void closeChannel() {\r
+               try {\r
+                       if (socketCh != null) {\r
+                               socketCh.close();\r
+                               socketCh = null;\r
+                               selector.close();\r
+                               selector = null;\r
+                               isConnected.set(false);\r
+                       }\r
+               } catch (IOException x) {\r
+               }\r
+       }\r
+\r
+       protected boolean processReply(String reply) {\r
+               String[] fields = reply.split("!", 2);\r
+               if (fields.length == 1) {\r
+                       try {\r
+                               long time = Long.parseLong(fields[0]);\r
+                               Date date = new Date(time);\r
+                               firePong(date);\r
+                       } catch (Exception e) {                         \r
+                       }\r
+                       return true;\r
+               } else if (fields.length == 2) {\r
+                       String channel = fields[0];\r
+                       String message = fields[1];\r
+                       if (FanoutConstants.CH_DEBUG.equals(channel)) {\r
+                               // debug messages are for internal use\r
+                               if (FanoutConstants.MSG_CONNECTED.equals(message)) {\r
+                                       isConnected.set(true);\r
+                                       resubscribe = subscriptions.size() > 0;\r
+                                       if (resubscribe) {\r
+                                               try {\r
+                                                       // register for async resubscribe\r
+                                                       socketCh.register(selector, SelectionKey.OP_WRITE);\r
+                                               } catch (Exception e) {\r
+                                                       logger.error("an error occurred", e);\r
+                                               }\r
+                                       }\r
+                               }\r
+                               logger.debug(MessageFormat.format("fanout client {0} < {1}", id, reply));\r
+                       } else {\r
+                               fireAnnouncement(channel, message);\r
+                       }\r
+                       return true;\r
+               } else {\r
+                       // unknown message\r
+                       return false;\r
+               }\r
+       }\r
+\r
+       protected void firePong(Date timestamp) {\r
+               logger.info(MessageFormat.format("fanout client {0} < pong {1,date,yyyy-MM-dd HH:mm:ss}", id, timestamp));\r
+               for (FanoutListener listener : listeners) {\r
+                       try {\r
+                               listener.pong(timestamp);\r
+                       } catch (Throwable t) {\r
+                               logger.error("FanoutListener threw an exception!", t);\r
+                       }\r
+               }\r
+       }\r
+       protected void fireAnnouncement(String channel, String message) {\r
+               logger.info(MessageFormat.format("fanout client {0} < announcement {1} {2}", id, channel, message));\r
+               for (FanoutListener listener : listeners) {\r
+                       try {\r
+                               listener.announcement(channel, message);\r
+                       } catch (Throwable t) {\r
+                               logger.error("FanoutListener threw an exception!", t);\r
+                       }\r
+               }\r
+       }\r
+       \r
+       protected synchronized String read() throws IOException {\r
+               readBuffer.clear();\r
+               long len = socketCh.read(readBuffer);\r
+\r
+               if (len == -1) {\r
+                       logger.error(MessageFormat.format("fanout client {0} lost connection to {1}:{2,number,0}, end of stream", id, host, port));\r
+                       socketCh.close();\r
+                       return null;\r
+               } else {\r
+                       readBuffer.flip();\r
+                       String content = decoder.decode(readBuffer).toString();\r
+                       readBuffer.clear();\r
+                       return content;\r
+               }\r
+       }\r
+       \r
+       protected synchronized boolean write(String message) {\r
+               try {\r
+                       logger.info(MessageFormat.format("fanout client {0} > {1}", id, message));\r
+                       byte [] bytes = message.getBytes(FanoutConstants.CHARSET);\r
+                       writeBuffer.clear();\r
+                       writeBuffer.put(bytes);\r
+                       if (bytes[bytes.length - 1] != 0xa) {\r
+                               writeBuffer.put((byte) 0xa);\r
+                       }\r
+                       writeBuffer.flip();\r
+\r
+                       // loop until write buffer has been completely sent\r
+                       long written = 0;\r
+                       long toWrite = writeBuffer.remaining();\r
+                       while (written != toWrite) {\r
+                               written += socketCh.write(writeBuffer);\r
+                               try {\r
+                                       Thread.sleep(10);\r
+                               } catch (Exception x) {\r
+                               }\r
+                       }\r
+                       return true;\r
+               } catch (IOException e) {\r
+                       logger.error("fanout client {0} error: {1}", id, e.getMessage());\r
+               }\r
+               return false;\r
+       }\r
+}
\ No newline at end of file
diff --git a/src/com/gitblit/fanout/FanoutConstants.java b/src/com/gitblit/fanout/FanoutConstants.java
new file mode 100644 (file)
index 0000000..6e6964c
--- /dev/null
@@ -0,0 +1,36 @@
+/*\r
+ * Copyright 2013 gitblit.com.\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *     http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package com.gitblit.fanout;\r
+\r
+import java.net.Socket;\r
+\r
+public class FanoutConstants {\r
+\r
+       public final static String CHARSET = "ISO-8859-1";\r
+       public final static int BUFFER_LENGTH = 512;\r
+       public final static String CH_ALL = "all";\r
+       public final static String CH_DEBUG = "debug";\r
+       public final static String MSG_CONNECTED = "connected...";\r
+       public final static String MSG_BUSY = "busy";\r
+       \r
+       public static String getRemoteSocketId(Socket socket) {\r
+               return socket.getInetAddress().getHostAddress() + ":" + socket.getPort();\r
+       }\r
+       \r
+       public static String getLocalSocketId(Socket socket) {\r
+               return socket.getInetAddress().getHostAddress() + ":" + socket.getLocalPort();\r
+       }\r
+}\r
diff --git a/src/com/gitblit/fanout/FanoutNioService.java b/src/com/gitblit/fanout/FanoutNioService.java
new file mode 100644 (file)
index 0000000..65d022a
--- /dev/null
@@ -0,0 +1,332 @@
+/*\r
+ * Copyright 2013 gitblit.com.\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *     http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package com.gitblit.fanout;\r
+\r
+import java.io.IOException;\r
+import java.net.InetSocketAddress;\r
+import java.nio.ByteBuffer;\r
+import java.nio.CharBuffer;\r
+import java.nio.channels.SelectionKey;\r
+import java.nio.channels.Selector;\r
+import java.nio.channels.ServerSocketChannel;\r
+import java.nio.channels.SocketChannel;\r
+import java.nio.charset.CharacterCodingException;\r
+import java.nio.charset.Charset;\r
+import java.nio.charset.CharsetDecoder;\r
+import java.text.MessageFormat;\r
+import java.util.ArrayList;\r
+import java.util.Arrays;\r
+import java.util.Collection;\r
+import java.util.HashMap;\r
+import java.util.Iterator;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.Set;\r
+\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+/**\r
+ * A single-thread NIO implementation of https://github.com/travisghansen/fanout\r
+ *\r
+ * This implementation uses channels and selectors, which are the Java analog of\r
+ * the Linux epoll mechanism used in the original fanout C code.\r
+ * \r
+ * @author James Moger\r
+ *\r
+ */\r
+public class FanoutNioService extends FanoutService {\r
+\r
+       private final static Logger logger = LoggerFactory.getLogger(FanoutNioService.class);\r
+\r
+       private volatile ServerSocketChannel serviceCh;\r
+       private volatile Selector selector;\r
+       \r
+       public static void main(String[] args) throws Exception {\r
+               FanoutNioService pubsub = new FanoutNioService(null, DEFAULT_PORT);\r
+               pubsub.setStrictRequestTermination(false);\r
+               pubsub.setAllowAllChannelAnnouncements(false);\r
+               pubsub.start();\r
+       }\r
+\r
+       /**\r
+        * Create a single-threaded fanout service.\r
+        * \r
+        * @param host\r
+        * @param port\r
+        *            the port for running the fanout PubSub service\r
+        * @throws IOException\r
+        */\r
+       public FanoutNioService(int port) {\r
+               this(null, port);\r
+       }\r
+       \r
+       /**\r
+        * Create a single-threaded fanout service.\r
+        * \r
+        * @param bindInterface\r
+        *            the ip address to bind for the service, may be null\r
+        * @param port\r
+        *            the port for running the fanout PubSub service\r
+        * @throws IOException\r
+        */\r
+       public FanoutNioService(String bindInterface, int port) {\r
+               super(bindInterface, port, "Fanout nio service");\r
+       }\r
+       \r
+       @Override\r
+       protected boolean isConnected() {\r
+               return serviceCh != null;\r
+       }\r
+\r
+       @Override\r
+       protected boolean connect() {\r
+               if (serviceCh == null) {\r
+                       try {\r
+                               serviceCh = ServerSocketChannel.open();\r
+                               serviceCh.configureBlocking(false);\r
+                               serviceCh.socket().setReuseAddress(true);\r
+                               serviceCh.socket().bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));\r
+                               selector = Selector.open();\r
+                               serviceCh.register(selector, SelectionKey.OP_ACCEPT);\r
+                               logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}", \r
+                                               name, host == null ? "0.0.0.0" : host, port));\r
+                       } catch (IOException e) {\r
+                               logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}", \r
+                                               name, name, host == null ? "0.0.0.0" : host, port), e);\r
+                               return false;\r
+                       }\r
+               }\r
+               return true;\r
+       }\r
+\r
+       @Override\r
+       protected void disconnect() {\r
+               try {\r
+                       if (serviceCh != null) {\r
+                               // close all active client connections\r
+                               Map<String, SocketChannel> clients = getCurrentClientSockets();\r
+                               for (Map.Entry<String, SocketChannel> client : clients.entrySet()) {\r
+                                       closeClientSocket(client.getKey(), client.getValue());\r
+                               }\r
+                               \r
+                               // close service socket channel\r
+                               logger.debug(MessageFormat.format("closing {0} socket channel", name));\r
+                               serviceCh.socket().close();\r
+                               serviceCh.close();                              \r
+                               serviceCh = null;\r
+                               selector.close();\r
+                               selector = null;\r
+                       }\r
+               } catch (IOException e) {\r
+                       logger.error(MessageFormat.format("failed to disconnect {0}", name), e);\r
+               }\r
+       }\r
+\r
+       @Override\r
+       protected void listen() throws IOException {\r
+               while (selector.select(serviceTimeout) > 0) {\r
+                       Set<SelectionKey> keys = selector.selectedKeys();\r
+                       Iterator<SelectionKey> keyItr = keys.iterator();\r
+                       while (keyItr.hasNext()) {\r
+                               SelectionKey key = (SelectionKey) keyItr.next();\r
+                               if (key.isAcceptable()) {\r
+                                       // new fanout client connection\r
+                                       ServerSocketChannel sch = (ServerSocketChannel) key.channel();\r
+                                       try {\r
+                                               SocketChannel ch = sch.accept();\r
+                                               ch.configureBlocking(false);\r
+                                               configureClientSocket(ch.socket());\r
+\r
+                                               FanoutNioConnection connection = new FanoutNioConnection(ch);\r
+                                               addConnection(connection);\r
+\r
+                                               // register to send the queued message\r
+                                               ch.register(selector, SelectionKey.OP_WRITE, connection);\r
+                                       } catch (IOException e) {\r
+                                               logger.error("error accepting fanout connection", e);\r
+                                       }\r
+                               } else if (key.isReadable()) {\r
+                                       // read fanout client request\r
+                                       SocketChannel ch = (SocketChannel) key.channel();\r
+                                       FanoutNioConnection connection = (FanoutNioConnection) key.attachment();\r
+                                       try {\r
+                                               connection.read(ch, isStrictRequestTermination());\r
+                                               int replies = 0;\r
+                                               Iterator<String> reqItr = connection.requestQueue.iterator();\r
+                                               while (reqItr.hasNext()) {\r
+                                                       String req = reqItr.next();\r
+                                                       String reply = processRequest(connection, req);\r
+                                                       reqItr.remove();\r
+                                                       if (reply != null) {\r
+                                                               replies++;\r
+                                                       }\r
+                                               }\r
+\r
+                                               if (replies > 0) {\r
+                                                       // register to send the replies to requests\r
+                                                       ch.register(selector, SelectionKey.OP_WRITE, connection);\r
+                                               } else {\r
+                                                       // re-register for next read\r
+                                                       ch.register(selector, SelectionKey.OP_READ, connection);\r
+                                               }\r
+                                       } catch (IOException e) {\r
+                                               logger.error(MessageFormat.format("fanout connection {0} error: {1}", connection.id, e.getMessage()));\r
+                                               removeConnection(connection);\r
+                                               closeClientSocket(connection.id, ch);\r
+                                       }\r
+                               } else if (key.isWritable()) {\r
+                                       // asynchronous reply to fanout client request\r
+                                       SocketChannel ch = (SocketChannel) key.channel();\r
+                                       FanoutNioConnection connection = (FanoutNioConnection) key.attachment();\r
+                                       try {\r
+                                               connection.write(ch);\r
+\r
+                                               if (hasConnection(connection)) {\r
+                                                       // register for next read\r
+                                                       ch.register(selector, SelectionKey.OP_READ, connection);\r
+                                               } else {\r
+                                                       // Connection was rejected due to load or\r
+                                                       // some other reason. Close it.\r
+                                                       closeClientSocket(connection.id, ch);\r
+                                               }\r
+                                       } catch (IOException e) {\r
+                                               logger.error(MessageFormat.format("fanout connection {0}: {1}", connection.id, e.getMessage()));\r
+                                               removeConnection(connection);\r
+                                               closeClientSocket(connection.id, ch);\r
+                                       }\r
+                               }\r
+                               keyItr.remove();\r
+                       }\r
+               }\r
+       }\r
+       \r
+       protected void closeClientSocket(String id, SocketChannel ch) {\r
+               try {\r
+                       ch.close();\r
+               } catch (IOException e) {\r
+                       logger.error(MessageFormat.format("fanout connection {0}", id), e);\r
+               }\r
+       }\r
+       \r
+       protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {\r
+               super.broadcast(connections, channel, message);\r
+               \r
+               // register queued write\r
+               Map<String, SocketChannel> sockets = getCurrentClientSockets();\r
+               for (FanoutServiceConnection connection : connections) {\r
+                       SocketChannel ch = sockets.get(connection.id);\r
+                       if (ch == null) {\r
+                               logger.warn(MessageFormat.format("fanout connection {0} has been disconnected", connection.id));\r
+                               removeConnection(connection);\r
+                               continue;\r
+                       }\r
+                       try {\r
+                               ch.register(selector, SelectionKey.OP_WRITE, connection);\r
+                       } catch (IOException e) {\r
+                               logger.error(MessageFormat.format("failed to register write op for fanout connection {0}", connection.id));\r
+                       }\r
+               }\r
+       }\r
+       \r
+       protected Map<String, SocketChannel> getCurrentClientSockets() {\r
+               Map<String, SocketChannel> sockets = new HashMap<String, SocketChannel>();\r
+               for (SelectionKey key : selector.keys()) {\r
+                       if (key.channel() instanceof SocketChannel) {\r
+                               SocketChannel ch = (SocketChannel) key.channel();\r
+                               String id = FanoutConstants.getRemoteSocketId(ch.socket());\r
+                               sockets.put(id, ch);\r
+                       }\r
+               }\r
+               return sockets;\r
+       }\r
+       \r
+       /**\r
+        * FanoutNioConnection handles reading/writing messages from a remote fanout\r
+        * connection.\r
+        * \r
+        * @author James Moger\r
+        *\r
+        */\r
+       static class FanoutNioConnection extends FanoutServiceConnection {\r
+               final ByteBuffer readBuffer;\r
+               final ByteBuffer writeBuffer;\r
+               final List<String> requestQueue;\r
+               final List<String> replyQueue;\r
+               final CharsetDecoder decoder;\r
+\r
+               FanoutNioConnection(SocketChannel ch) {\r
+                       super(ch.socket());\r
+                       readBuffer = ByteBuffer.allocate(FanoutConstants.BUFFER_LENGTH);\r
+                       writeBuffer = ByteBuffer.allocate(FanoutConstants.BUFFER_LENGTH);\r
+                       requestQueue = new ArrayList<String>();\r
+                       replyQueue = new ArrayList<String>();\r
+                       decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder();\r
+               }\r
+               \r
+               protected void read(SocketChannel ch, boolean strictRequestTermination) throws CharacterCodingException, IOException {\r
+                       long bytesRead = 0;\r
+                       readBuffer.clear();\r
+                       bytesRead = ch.read(readBuffer);\r
+                       readBuffer.flip();\r
+                       if (bytesRead == -1) {\r
+                               throw new IOException("lost client connection, end of stream");\r
+                       }\r
+                       if (readBuffer.limit() == 0) {\r
+                               return;\r
+                       }\r
+                       CharBuffer cbuf = decoder.decode(readBuffer);\r
+                       String req = cbuf.toString();\r
+                       String [] lines = req.split(strictRequestTermination ? "\n" : "\n|\r");\r
+                       requestQueue.addAll(Arrays.asList(lines));\r
+               }\r
+               \r
+               protected void write(SocketChannel ch) throws IOException {\r
+                       Iterator<String> itr = replyQueue.iterator();\r
+                       while (itr.hasNext()) {\r
+                               String reply = itr.next();\r
+                               writeBuffer.clear();\r
+                               logger.debug(MessageFormat.format("fanout reply to {0}: {1}", id, reply));\r
+                               byte [] bytes = reply.getBytes(FanoutConstants.CHARSET);\r
+                               writeBuffer.put(bytes);\r
+                               if (bytes[bytes.length - 1] != 0xa) {\r
+                                       writeBuffer.put((byte) 0xa);\r
+                               }\r
+                               writeBuffer.flip();\r
+                               \r
+                               // loop until write buffer has been completely sent\r
+                               int written = 0;\r
+                               int toWrite = writeBuffer.remaining();\r
+                               while (written != toWrite) {\r
+                                       written += ch.write(writeBuffer);\r
+                                       try {\r
+                                               Thread.sleep(10);\r
+                                       } catch (Exception x) {\r
+                                       }\r
+                               }                               \r
+                               itr.remove();\r
+                       }\r
+                       writeBuffer.clear();\r
+               }\r
+\r
+               @Override\r
+               protected void reply(String content) throws IOException {\r
+                       // queue the reply\r
+                       // replies are transmitted asynchronously from the requests\r
+                       replyQueue.add(content);\r
+               }\r
+       }\r
+}
\ No newline at end of file
diff --git a/src/com/gitblit/fanout/FanoutService.java b/src/com/gitblit/fanout/FanoutService.java
new file mode 100644 (file)
index 0000000..cbfd8a2
--- /dev/null
@@ -0,0 +1,563 @@
+/*\r
+ * Copyright 2013 gitblit.com.\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *     http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package com.gitblit.fanout;\r
+\r
+import java.io.IOException;\r
+import java.net.Socket;\r
+import java.net.SocketException;\r
+import java.text.MessageFormat;\r
+import java.util.ArrayList;\r
+import java.util.Collection;\r
+import java.util.Date;\r
+import java.util.Iterator;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.Set;\r
+import java.util.concurrent.ConcurrentHashMap;\r
+import java.util.concurrent.ConcurrentSkipListSet;\r
+import java.util.concurrent.atomic.AtomicBoolean;\r
+import java.util.concurrent.atomic.AtomicInteger;\r
+import java.util.concurrent.atomic.AtomicLong;\r
+\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+/**\r
+ * Base class for Fanout service implementations.\r
+ * \r
+ * Subclass implementations can be used as a Sparkleshare PubSub notification\r
+ * server.  This allows Sparkleshare to be used in conjunction with Gitblit\r
+ * behind a corporate firewall that restricts or prohibits client internet access\r
+ * to the default Sparkleshare PubSub server: notifications.sparkleshare.org\r
+ * \r
+ * @author James Moger\r
+ *\r
+ */\r
+public abstract class FanoutService implements Runnable {\r
+\r
+       private final static Logger logger = LoggerFactory.getLogger(FanoutService.class);\r
+       \r
+       public final static int DEFAULT_PORT = 17000;\r
+       \r
+       protected final static int serviceTimeout = 5000;\r
+\r
+       protected final String host;\r
+       protected final int port;\r
+       protected final String name; \r
+       \r
+       private Thread serviceThread;\r
+       \r
+       private final Map<String, FanoutServiceConnection> connections;\r
+       private final Map<String, Set<FanoutServiceConnection>> subscriptions;\r
+\r
+       protected final AtomicBoolean isRunning;\r
+       private final AtomicBoolean strictRequestTermination;\r
+       private final AtomicBoolean allowAllChannelAnnouncements;\r
+       private final AtomicInteger concurrentConnectionLimit;\r
+       \r
+       private final Date bootDate;\r
+       private final AtomicLong rejectedConnectionCount;\r
+       private final AtomicInteger peakConnectionCount;\r
+       private final AtomicLong totalConnections;\r
+       private final AtomicLong totalAnnouncements;\r
+       private final AtomicLong totalMessages;\r
+       private final AtomicLong totalSubscribes;\r
+       private final AtomicLong totalUnsubscribes;\r
+       private final AtomicLong totalPings;\r
+\r
+       protected FanoutService(String host, int port, String name) {\r
+               this.host = host;\r
+               this.port = port;\r
+               this.name = name;\r
+               \r
+               connections = new ConcurrentHashMap<String, FanoutServiceConnection>();\r
+               subscriptions = new ConcurrentHashMap<String, Set<FanoutServiceConnection>>();\r
+               subscriptions.put(FanoutConstants.CH_ALL, new ConcurrentSkipListSet<FanoutServiceConnection>());\r
+               \r
+               isRunning = new AtomicBoolean(false);\r
+               strictRequestTermination = new AtomicBoolean(false);\r
+               allowAllChannelAnnouncements = new AtomicBoolean(false);\r
+               concurrentConnectionLimit = new AtomicInteger(0);\r
+               \r
+               bootDate = new Date();\r
+               rejectedConnectionCount = new AtomicLong(0);\r
+               peakConnectionCount = new AtomicInteger(0);\r
+               totalConnections = new AtomicLong(0);\r
+               totalAnnouncements = new AtomicLong(0);\r
+               totalMessages = new AtomicLong(0);\r
+               totalSubscribes = new AtomicLong(0);\r
+               totalUnsubscribes = new AtomicLong(0);\r
+               totalPings = new AtomicLong(0);\r
+       }\r
+\r
+       /*\r
+        * Abstract methods\r
+        */\r
+       \r
+       protected abstract boolean isConnected();\r
+       \r
+       protected abstract boolean connect();\r
+       \r
+       protected abstract void listen() throws IOException;\r
+       \r
+       protected abstract void disconnect();\r
+       \r
+       /**\r
+        * Returns true if the service requires \n request termination.\r
+        * \r
+        * @return true if request requires \n termination\r
+        */\r
+       public boolean isStrictRequestTermination() {\r
+               return strictRequestTermination.get();\r
+       }\r
+\r
+       /**\r
+        * Control the termination of fanout requests. If true, fanout requests must\r
+        * be terminated with \n. If false, fanout requests may be terminated with\r
+        * \n, \r, \r\n, or \n\r. This is useful for debugging with a telnet client.\r
+        * \r
+        * @param isStrictTermination\r
+        */\r
+       public void setStrictRequestTermination(boolean isStrictTermination) {\r
+               strictRequestTermination.set(isStrictTermination);\r
+       }\r
+       \r
+       /**\r
+        * Returns the maximum allowable concurrent fanout connections.\r
+        *  \r
+        * @return the maximum allowable concurrent connection count\r
+        */\r
+       public int getConcurrentConnectionLimit() {\r
+               return concurrentConnectionLimit.get();\r
+       }\r
+       \r
+       /**\r
+        * Sets the maximum allowable concurrent fanout connection count.\r
+        * \r
+        * @param value\r
+        */\r
+       public void setConcurrentConnectionLimit(int value) {\r
+               concurrentConnectionLimit.set(value);\r
+       }\r
+       \r
+       /**\r
+        * Returns true if connections are allowed to announce on the all channel.\r
+        *  \r
+        * @return true if connections are allowed to announce on the all channel\r
+        */\r
+       public boolean allowAllChannelAnnouncements() {\r
+               return allowAllChannelAnnouncements.get();\r
+       }\r
+       \r
+       /**\r
+        * Allows/prohibits connections from announcing on the ALL channel.\r
+        * \r
+        * @param value\r
+        */\r
+       public void setAllowAllChannelAnnouncements(boolean value) {\r
+               allowAllChannelAnnouncements.set(value);\r
+       }\r
+       \r
+       /**\r
+        * Returns the current connections\r
+        * \r
+        * @param channel\r
+        * @return map of current connections keyed by their id\r
+        */\r
+       public Map<String, FanoutServiceConnection> getCurrentConnections() {\r
+               return connections;\r
+       }\r
+       \r
+       /**\r
+        * Returns all subscriptions\r
+        * \r
+        * @return map of current subscriptions keyed by channel name\r
+        */\r
+       public Map<String, Set<FanoutServiceConnection>> getCurrentSubscriptions() {\r
+               return subscriptions;\r
+       }\r
+\r
+       /**\r
+        * Returns the subscriptions for the specified channel\r
+        * \r
+        * @param channel\r
+        * @return set of subscribed connections for the specified channel\r
+        */\r
+       public Set<FanoutServiceConnection> getCurrentSubscriptions(String channel) {\r
+               return subscriptions.get(channel);\r
+       }\r
+\r
+       /**\r
+        * Returns the runtime statistics object for this service.\r
+        * \r
+        * @return stats\r
+        */\r
+       public FanoutStats getStatistics() {\r
+               FanoutStats stats = new FanoutStats();\r
+               \r
+               // settings\r
+               stats.allowAllChannelAnnouncements = allowAllChannelAnnouncements();\r
+               stats.concurrentConnectionLimit = getConcurrentConnectionLimit();\r
+               stats.strictRequestTermination = isStrictRequestTermination();\r
+               \r
+               // runtime stats\r
+               stats.bootDate = bootDate;\r
+               stats.rejectedConnectionCount = rejectedConnectionCount.get();\r
+               stats.peakConnectionCount = peakConnectionCount.get();\r
+               stats.totalConnections = totalConnections.get();\r
+               stats.totalAnnouncements = totalAnnouncements.get();\r
+               stats.totalMessages = totalMessages.get();\r
+               stats.totalSubscribes = totalSubscribes.get();\r
+               stats.totalUnsubscribes = totalUnsubscribes.get();\r
+               stats.totalPings = totalPings.get();            \r
+               stats.currentConnections = connections.size();\r
+               stats.currentChannels = subscriptions.size();\r
+               stats.currentSubscriptions = subscriptions.size() * connections.size();\r
+               return stats;\r
+       }\r
+       \r
+       /**\r
+        * Returns true if the service is ready.\r
+        * \r
+        * @return true, if the service is ready\r
+        */\r
+       public boolean isReady() {\r
+               if (isRunning.get()) {\r
+                       return isConnected();\r
+               }\r
+               return false;\r
+       }\r
+\r
+       /**\r
+        * Start the Fanout service thread and immediatel return.\r
+        * \r
+        */\r
+       public void start() {\r
+               if (isRunning.get()) {\r
+                       logger.warn(MessageFormat.format("{0} is already running", name));\r
+                       return;\r
+               }\r
+               serviceThread = new Thread(this);\r
+               serviceThread.setName(MessageFormat.format("{0} {1}:{2,number,0}", name, host == null ? "all" : host, port));\r
+               serviceThread.start();\r
+       }\r
+       \r
+       /**\r
+        * Start the Fanout service thread and wait until it is accepting connections.\r
+        * \r
+        */\r
+       public void startSynchronously() {\r
+               start();\r
+               while (!isReady()) {\r
+                       try {\r
+                               Thread.sleep(100);\r
+                       } catch (Exception e) {\r
+                       }\r
+               }\r
+       }\r
+               \r
+       /**\r
+        * Stop the Fanout service.  This method returns when the service has been\r
+        * completely shutdown.\r
+        */\r
+       public void stop() {\r
+               if (!isRunning.get()) {\r
+                       logger.warn(MessageFormat.format("{0} is not running", name));\r
+                       return;\r
+               }\r
+               logger.info(MessageFormat.format("stopping {0}...", name));\r
+               isRunning.set(false);\r
+               try {\r
+                       if (serviceThread != null) {\r
+                               serviceThread.join();\r
+                               serviceThread = null;\r
+                       }\r
+               } catch (InterruptedException e1) {\r
+                       logger.error("", e1);\r
+               }\r
+               logger.info(MessageFormat.format("stopped {0}", name));\r
+       }\r
+       \r
+       /**\r
+        * Main execution method of the service\r
+        */\r
+       @Override\r
+       public final void run() {\r
+               disconnect();\r
+               resetState();\r
+               isRunning.set(true);\r
+               while (isRunning.get()) {\r
+                       if (connect()) {\r
+                               try {\r
+                                       listen();\r
+                               } catch (IOException e) {\r
+                                       logger.error(MessageFormat.format("error processing {0}", name), e);\r
+                                       isRunning.set(false);\r
+                               }\r
+                       } else {\r
+                               try {\r
+                                       Thread.sleep(serviceTimeout);\r
+                               } catch (InterruptedException x) {\r
+                               }\r
+                       }\r
+               }\r
+               disconnect();           \r
+               resetState();\r
+       }\r
+       \r
+       protected void resetState() {\r
+               // reset state data\r
+               connections.clear();\r
+               subscriptions.clear();\r
+               rejectedConnectionCount.set(0);\r
+               peakConnectionCount.set(0);\r
+               totalConnections.set(0);\r
+               totalAnnouncements.set(0);\r
+               totalMessages.set(0);\r
+               totalSubscribes.set(0);\r
+               totalUnsubscribes.set(0);\r
+               totalPings.set(0);\r
+       }\r
+\r
+       /**\r
+        * Configure the client connection socket.\r
+        * \r
+        * @param socket\r
+        * @throws SocketException\r
+        */\r
+       protected void configureClientSocket(Socket socket) throws SocketException {\r
+               socket.setKeepAlive(true);                              \r
+               socket.setSoLinger(true, 0); // immediately discard any remaining data\r
+       }\r
+       \r
+       /**\r
+        * Add the connection to the connections map.\r
+        * \r
+        * @param connection\r
+        * @return false if the connection was rejected due to too many concurrent\r
+        *         connections\r
+        */\r
+       protected boolean addConnection(FanoutServiceConnection connection) {           \r
+               int limit = getConcurrentConnectionLimit();\r
+               if (limit > 0 && connections.size() > limit) {\r
+                       logger.info(MessageFormat.format("hit {0,number,0} connection limit, rejecting fanout connection", concurrentConnectionLimit));\r
+                       increment(rejectedConnectionCount);\r
+                       connection.busy();\r
+                       return false;\r
+               }\r
+               \r
+               // add the connection to our map\r
+               connections.put(connection.id, connection);\r
+\r
+               // track peak number of concurrent connections\r
+               if (connections.size() > peakConnectionCount.get()) {\r
+                       peakConnectionCount.set(connections.size());\r
+               }\r
+\r
+               logger.info("fanout new connection " + connection.id);\r
+               connection.connected();\r
+               return true;\r
+       }\r
+       \r
+       /**\r
+        * Remove the connection from the connections list and from subscriptions.\r
+        * \r
+        * @param connection\r
+        */\r
+       protected void removeConnection(FanoutServiceConnection connection) {\r
+               connections.remove(connection.id);\r
+               Iterator<Map.Entry<String, Set<FanoutServiceConnection>>> itr = subscriptions.entrySet().iterator();\r
+               while (itr.hasNext()) {\r
+                       Map.Entry<String, Set<FanoutServiceConnection>> entry = itr.next();\r
+                       Set<FanoutServiceConnection> subscriptions = entry.getValue();\r
+                       subscriptions.remove(connection);\r
+                       if (!FanoutConstants.CH_ALL.equals(entry.getKey())) {\r
+                               if (subscriptions.size() == 0) {\r
+                                       itr.remove();\r
+                                       logger.info(MessageFormat.format("fanout remove channel {0}, no subscribers", entry.getKey()));\r
+                               }\r
+                       }\r
+               }\r
+               logger.info(MessageFormat.format("fanout connection {0} removed", connection.id));\r
+       }\r
+       \r
+       /**\r
+        * Tests to see if the connection is being monitored by the service.\r
+        * \r
+        * @param connection\r
+        * @return true if the service is monitoring the connection\r
+        */\r
+       protected boolean hasConnection(FanoutServiceConnection connection) {\r
+               return connections.containsKey(connection.id);\r
+       }\r
+       \r
+       /**\r
+        * Reply to a connection on the specified channel.\r
+        * \r
+        * @param connection\r
+        * @param channel\r
+        * @param message\r
+        * @return the reply\r
+        */\r
+       protected String reply(FanoutServiceConnection connection, String channel, String message) {            \r
+               if (channel != null && channel.length() > 0) {\r
+                       increment(totalMessages);\r
+               }\r
+               return connection.reply(channel, message);\r
+       }\r
+       \r
+       /**\r
+        * Service method to broadcast a message to all connections.\r
+        * \r
+        * @param message\r
+        */\r
+       public void broadcastAll(String message) {\r
+               broadcast(connections.values(), FanoutConstants.CH_ALL, message);\r
+               increment(totalAnnouncements);\r
+       }\r
+       \r
+       /**\r
+        * Service method to broadcast a message to connections subscribed to the\r
+        * channel.\r
+        * \r
+        * @param message\r
+        */\r
+       public void broadcast(String channel, String message) {\r
+               List<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>(subscriptions.get(channel));\r
+               broadcast(connections, channel, message);\r
+               increment(totalAnnouncements);\r
+       }\r
+       \r
+       /**\r
+        * Broadcast a message to connections subscribed to the specified channel.\r
+        * \r
+        * @param connections\r
+        * @param channel\r
+        * @param message\r
+        */\r
+       protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {\r
+               for (FanoutServiceConnection connection : connections) {\r
+                       reply(connection, channel, message);\r
+               }\r
+       }\r
+       \r
+       /**\r
+        * Process an incoming Fanout request.\r
+        * \r
+        * @param connection\r
+        * @param req\r
+        * @return the reply to the request, may be null\r
+        */\r
+       protected String processRequest(FanoutServiceConnection connection, String req) {\r
+               logger.info(MessageFormat.format("fanout request from {0}: {1}", connection.id, req));\r
+               String[] fields = req.split(" ", 3);\r
+               String action = fields[0];\r
+               String channel = fields.length >= 2 ? fields[1] : null;\r
+               String message = fields.length >= 3 ? fields[2] : null;\r
+               try {\r
+                       return processRequest(connection, action, channel, message);\r
+               } catch (IllegalArgumentException e) {\r
+                       // invalid action\r
+                       logger.error(MessageFormat.format("fanout connection {0} requested invalid action {1}", connection.id, action));\r
+                       logger.error(asHexArray(req));\r
+               }\r
+               return null;\r
+       }\r
+       \r
+       /**\r
+        * Process the Fanout request.\r
+        * \r
+        * @param connection\r
+        * @param action\r
+        * @param channel\r
+        * @param message\r
+        * @return the reply to the request, may be null\r
+        * @throws IllegalArgumentException\r
+        */\r
+       protected String processRequest(FanoutServiceConnection connection, String action, String channel, String message) throws IllegalArgumentException {\r
+               if ("ping".equals(action)) {\r
+                       // ping\r
+                       increment(totalPings);\r
+                       return reply(connection, null, "" + System.currentTimeMillis());\r
+               } else if ("info".equals(action)) {\r
+                       // info\r
+                       String info = getStatistics().info();\r
+                       return reply(connection, null, info);\r
+               } else if ("announce".equals(action)) {\r
+                       // announcement\r
+                       if (!allowAllChannelAnnouncements.get() && FanoutConstants.CH_ALL.equals(channel)) {\r
+                               // prohibiting connection-sourced all announcements\r
+                               logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on ALL channel", connection.id, message));\r
+                       } else if ("debug".equals(channel)) {\r
+                               // prohibiting connection-sourced debug announcements\r
+                               logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on DEBUG channel", connection.id, message));\r
+                       } else {\r
+                               // acceptable announcement\r
+                               List<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>(subscriptions.get(channel));\r
+                               connections.remove(connection); // remove announcer\r
+                               broadcast(connections, channel, message);\r
+                               increment(totalAnnouncements);\r
+                       }\r
+               } else if ("subscribe".equals(action)) {\r
+                       // subscribe\r
+                       if (!subscriptions.containsKey(channel)) {\r
+                               logger.info(MessageFormat.format("fanout new channel {0}", channel));\r
+                               subscriptions.put(channel, new ConcurrentSkipListSet<FanoutServiceConnection>());\r
+                       }\r
+                       subscriptions.get(channel).add(connection);\r
+                       logger.debug(MessageFormat.format("fanout connection {0} subscribed to channel {1}", connection.id, channel));\r
+                       increment(totalSubscribes);\r
+               } else if ("unsubscribe".equals(action)) {\r
+                       // unsubscribe\r
+                       if (subscriptions.containsKey(channel)) {\r
+                               subscriptions.get(channel).remove(connection);\r
+                               if (subscriptions.get(channel).size() == 0) {\r
+                                       subscriptions.remove(channel);\r
+                               }\r
+                               increment(totalUnsubscribes);\r
+                       }\r
+               } else {\r
+                       // invalid action\r
+                       throw new IllegalArgumentException(action);\r
+               }\r
+               return null;\r
+       }\r
+       \r
+       private String asHexArray(String req) {\r
+               StringBuilder sb = new StringBuilder();\r
+               for (char c : req.toCharArray()) {\r
+                       sb.append(Integer.toHexString(c)).append(' ');\r
+               }\r
+               return "[ " + sb.toString().trim() + " ]";\r
+       }\r
+       \r
+       /**\r
+        * Increment a long and prevent negative rollover.\r
+        * \r
+        * @param counter\r
+        */\r
+       private void increment(AtomicLong counter) {\r
+               long v = counter.incrementAndGet();\r
+               if (v < 0) {\r
+                       counter.set(0);\r
+               }\r
+       }\r
+       \r
+       @Override\r
+       public String toString() {\r
+               return name;\r
+       }\r
+}
\ No newline at end of file
diff --git a/src/com/gitblit/fanout/FanoutServiceConnection.java b/src/com/gitblit/fanout/FanoutServiceConnection.java
new file mode 100644 (file)
index 0000000..f7f2c95
--- /dev/null
@@ -0,0 +1,105 @@
+/*\r
+ * Copyright 2013 gitblit.com.\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *     http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package com.gitblit.fanout;\r
+\r
+import java.io.IOException;\r
+import java.net.Socket;\r
+\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+/**\r
+ * FanoutServiceConnection handles reading/writing messages from a remote fanout\r
+ * connection.\r
+ * \r
+ * @author James Moger\r
+ * \r
+ */\r
+public abstract class FanoutServiceConnection implements Comparable<FanoutServiceConnection> {\r
+       \r
+       private static final Logger logger = LoggerFactory.getLogger(FanoutServiceConnection.class);\r
+       \r
+       public final String id;\r
+\r
+       protected FanoutServiceConnection(Socket socket) {\r
+               this.id = FanoutConstants.getRemoteSocketId(socket);\r
+       }\r
+\r
+       protected abstract void reply(String content) throws IOException;\r
+\r
+       /**\r
+        * Send the connection a debug channel connected message.\r
+        * \r
+        * @param message\r
+        */\r
+       protected void connected() {\r
+               reply(FanoutConstants.CH_DEBUG, FanoutConstants.MSG_CONNECTED);\r
+       }\r
+       \r
+       /**\r
+        * Send the connection a debug channel busy message.\r
+        * \r
+        * @param message\r
+        */\r
+       protected void busy() {\r
+               reply(FanoutConstants.CH_DEBUG, FanoutConstants.MSG_BUSY);\r
+       }\r
+       \r
+       /**\r
+        * Send the connection a message for the specified channel.\r
+        * \r
+        * @param channel\r
+        * @param message\r
+        * @return the reply\r
+        */\r
+       protected String reply(String channel, String message) {\r
+               String content;\r
+               if (channel != null) {\r
+                       content = channel + "!" + message;\r
+               } else {\r
+                       content = message;\r
+               }\r
+               try {\r
+                       reply(content);\r
+               } catch (Exception e) {\r
+                       logger.error("failed to reply to fanout connection " + id, e);\r
+               }\r
+               return content;\r
+       }\r
+\r
+       @Override\r
+       public int compareTo(FanoutServiceConnection c) {\r
+               return id.compareTo(c.id);\r
+       }\r
+\r
+       @Override\r
+       public boolean equals(Object o) {\r
+               if (o instanceof FanoutServiceConnection) {\r
+                       return id.equals(((FanoutServiceConnection) o).id);\r
+               }\r
+               return false;\r
+       }\r
+\r
+       @Override\r
+       public int hashCode() {\r
+               return id.hashCode();\r
+       }\r
+\r
+       @Override\r
+       public String toString() {\r
+               return id;\r
+       }\r
+}
\ No newline at end of file
diff --git a/src/com/gitblit/fanout/FanoutSocketService.java b/src/com/gitblit/fanout/FanoutSocketService.java
new file mode 100644 (file)
index 0000000..07c18f9
--- /dev/null
@@ -0,0 +1,234 @@
+/*\r
+ * Copyright 2013 gitblit.com.\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *     http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package com.gitblit.fanout;\r
+\r
+import java.io.BufferedInputStream;\r
+import java.io.IOException;\r
+import java.io.OutputStream;\r
+import java.net.InetSocketAddress;\r
+import java.net.ServerSocket;\r
+import java.net.Socket;\r
+import java.net.SocketException;\r
+import java.net.SocketTimeoutException;\r
+import java.text.MessageFormat;\r
+\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+/**\r
+ * A multi-threaded socket implementation of https://github.com/travisghansen/fanout\r
+ *\r
+ * This implementation creates a master acceptor thread which accepts incoming\r
+ * fanout connections and then spawns a daemon thread for each accepted connection.\r
+ * If there are 100 concurrent fanout connections, there are 101 threads.\r
+ *   \r
+ * @author James Moger\r
+ *\r
+ */\r
+public class FanoutSocketService extends FanoutService {\r
+\r
+       private final static Logger logger = LoggerFactory.getLogger(FanoutSocketService.class);\r
+\r
+       private volatile ServerSocket serviceSocket;\r
+\r
+       public static void main(String[] args) throws Exception {\r
+               FanoutSocketService pubsub = new FanoutSocketService(null, DEFAULT_PORT);\r
+               pubsub.setStrictRequestTermination(false);\r
+               pubsub.setAllowAllChannelAnnouncements(false);\r
+               pubsub.start();\r
+       }\r
+       \r
+       /**\r
+        * Create a multi-threaded fanout service.\r
+        * \r
+        * @param port\r
+        *            the port for running the fanout PubSub service\r
+        * @throws IOException\r
+        */\r
+       public FanoutSocketService(int port) {\r
+               this(null, port);\r
+       }\r
+\r
+       /**\r
+        * Create a multi-threaded fanout service.\r
+        * \r
+        * @param bindInterface\r
+        *            the ip address to bind for the service, may be null\r
+        * @param port\r
+        *            the port for running the fanout PubSub service\r
+        * @throws IOException\r
+        */\r
+       public FanoutSocketService(String bindInterface, int port) {\r
+               super(bindInterface, port, "Fanout socket service");\r
+       }\r
+       \r
+       @Override\r
+       protected boolean isConnected() {\r
+               return serviceSocket != null;\r
+       }\r
+               \r
+       @Override\r
+       protected boolean connect() {\r
+               if (serviceSocket == null) {\r
+                       try {\r
+                               serviceSocket = new ServerSocket();\r
+                               serviceSocket.setReuseAddress(true);\r
+                               serviceSocket.setSoTimeout(serviceTimeout);\r
+                               serviceSocket.bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));\r
+                               logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}", \r
+                                               name, host == null ? "0.0.0.0" : host, serviceSocket.getLocalPort()));\r
+                       } catch (IOException e) {\r
+                               logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}",\r
+                                               name, host == null ? "0.0.0.0" : host, port), e);\r
+                               return false;\r
+                       }\r
+               }\r
+               return true;\r
+       }\r
+\r
+       @Override\r
+       protected void disconnect() {\r
+               try {\r
+                       if (serviceSocket != null) {\r
+                               logger.debug(MessageFormat.format("closing {0} server socket", name));\r
+                               serviceSocket.close();\r
+                               serviceSocket = null;\r
+                       }\r
+               } catch (IOException e) {\r
+                       logger.error(MessageFormat.format("failed to disconnect {0}", name), e);\r
+               }\r
+       }\r
+\r
+       /**\r
+        * This accepts incoming fanout connections and spawns connection threads.\r
+        */\r
+       @Override\r
+       protected void listen() throws IOException {\r
+               try {\r
+                       Socket socket;\r
+                       socket = serviceSocket.accept();\r
+                       configureClientSocket(socket);\r
+\r
+                       FanoutSocketConnection connection = new FanoutSocketConnection(socket);\r
+\r
+                       if (addConnection(connection)) {\r
+                               // spawn connection daemon thread\r
+                               Thread connectionThread = new Thread(connection);\r
+                               connectionThread.setDaemon(true);\r
+                               connectionThread.setName("Fanout " + connection.id);\r
+                               connectionThread.start();\r
+                       } else {\r
+                               // synchronously close the connection and remove it\r
+                               removeConnection(connection);\r
+                               connection.closeConnection();\r
+                               connection = null;\r
+                       }\r
+               } catch (SocketTimeoutException e) {\r
+                       // ignore accept timeout exceptions\r
+               }\r
+       }\r
+       \r
+       /**\r
+        * FanoutSocketConnection handles reading/writing messages from a remote fanout\r
+        * connection.\r
+        * \r
+        * @author James Moger\r
+        *\r
+        */\r
+       class FanoutSocketConnection extends FanoutServiceConnection implements Runnable {\r
+               Socket socket;\r
+               \r
+               FanoutSocketConnection(Socket socket) {\r
+                       super(socket);\r
+                       this.socket = socket;\r
+               }\r
+\r
+               /**\r
+                * Connection thread read/write method.\r
+                */\r
+               @Override\r
+               public void run() {\r
+                       try {\r
+                               StringBuilder sb = new StringBuilder();\r
+                               BufferedInputStream is = new BufferedInputStream(socket.getInputStream());\r
+                               byte[] buffer = new byte[FanoutConstants.BUFFER_LENGTH];\r
+                               int len = 0;\r
+                               while (true) {\r
+                                       while (is.available() > 0) {\r
+                                               len = is.read(buffer);\r
+                                               for (int i = 0; i < len; i++) {\r
+                                                       byte b = buffer[i];\r
+                                                       if (b == 0xa || (!isStrictRequestTermination() && b == 0xd)) {\r
+                                                               String req = sb.toString();\r
+                                                               sb.setLength(0);\r
+                                                               if (req.length() > 0) {\r
+                                                                       // ignore empty request strings\r
+                                                                       processRequest(this, req);\r
+                                                               }\r
+                                                       } else {\r
+                                                               sb.append((char) b);\r
+                                                       }\r
+                                               }\r
+                                       }\r
+\r
+                                       if (!isRunning.get()) {\r
+                                               // service has stopped, terminate client connection\r
+                                               break;\r
+                                       } else {\r
+                                               Thread.sleep(500);\r
+                                       }\r
+                               }\r
+                       } catch (Throwable t) {\r
+                               if (t instanceof SocketException) {\r
+                                       logger.error(MessageFormat.format("fanout connection {0}: {1}", id, t.getMessage()));\r
+                               } else if (t instanceof SocketTimeoutException) {\r
+                                       logger.error(MessageFormat.format("fanout connection {0}: {1}", id, t.getMessage()));\r
+                               } else {\r
+                                       logger.error(MessageFormat.format("exception while handling fanout connection {0}", id), t);\r
+                               }\r
+                       } finally {\r
+                               closeConnection();\r
+                       }\r
+\r
+                       logger.info(MessageFormat.format("thread for fanout connection {0} is finished", id));\r
+               }\r
+                               \r
+               @Override\r
+               protected void reply(String content) throws IOException {\r
+                       // synchronously send reply\r
+                       logger.debug(MessageFormat.format("fanout reply to {0}: {1}", id, content));\r
+                       OutputStream os = socket.getOutputStream();\r
+                       byte [] bytes = content.getBytes(FanoutConstants.CHARSET);\r
+                       os.write(bytes);\r
+                       if (bytes[bytes.length - 1] != 0xa) {\r
+                               os.write(0xa);\r
+                       }\r
+                       os.flush();\r
+               }\r
+               \r
+               protected void closeConnection() {\r
+                       // close the connection socket\r
+                       try {\r
+                               socket.close();\r
+                       } catch (IOException e) {\r
+                       }\r
+                       socket = null;\r
+                       \r
+                       // remove this connection from the service\r
+                       removeConnection(this);\r
+               }\r
+       }\r
+}
\ No newline at end of file
diff --git a/src/com/gitblit/fanout/FanoutStats.java b/src/com/gitblit/fanout/FanoutStats.java
new file mode 100644 (file)
index 0000000..b06884d
--- /dev/null
@@ -0,0 +1,98 @@
+/*\r
+ * Copyright 2013 gitblit.com.\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *     http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package com.gitblit.fanout;\r
+\r
+import java.io.Serializable;\r
+import java.text.MessageFormat;\r
+import java.util.Date;\r
+\r
+/**\r
+ * Encapsulates the runtime stats of a fanout service.\r
+ * \r
+ * @author James Moger\r
+ *\r
+ */\r
+public class FanoutStats implements Serializable {\r
+\r
+       private static final long serialVersionUID = 1L;\r
+       \r
+       public long concurrentConnectionLimit;\r
+       public boolean allowAllChannelAnnouncements;\r
+       public boolean strictRequestTermination;\r
+       \r
+       public Date bootDate;\r
+       public long rejectedConnectionCount;\r
+       public int peakConnectionCount;\r
+       public long currentChannels;\r
+       public long currentSubscriptions;\r
+       public long currentConnections;\r
+       public long totalConnections;\r
+       public long totalAnnouncements;\r
+       public long totalMessages;\r
+       public long totalSubscribes;\r
+       public long totalUnsubscribes;\r
+       public long totalPings;\r
+       \r
+       public String info() {\r
+               int i = 0;\r
+               StringBuilder sb = new StringBuilder();\r
+               sb.append(infoStr(i++, "boot date"));\r
+               sb.append(infoStr(i++, "strict request termination"));\r
+               sb.append(infoStr(i++, "allow connection \"all\" announcements"));\r
+               sb.append(infoInt(i++, "concurrent connection limit"));\r
+               sb.append(infoInt(i++, "concurrent limit rejected connections"));\r
+               sb.append(infoInt(i++, "peak connections"));\r
+               sb.append(infoInt(i++, "current connections"));\r
+               sb.append(infoInt(i++, "current channels"));\r
+               sb.append(infoInt(i++, "current subscriptions"));\r
+               sb.append(infoInt(i++, "user-requested subscriptions"));\r
+               sb.append(infoInt(i++, "total connections"));\r
+               sb.append(infoInt(i++, "total announcements"));\r
+               sb.append(infoInt(i++, "total messages"));\r
+               sb.append(infoInt(i++, "total subscribes"));\r
+               sb.append(infoInt(i++, "total unsubscribes"));\r
+               sb.append(infoInt(i++, "total pings"));\r
+               String template = sb.toString();\r
+\r
+               String info = MessageFormat.format(template, \r
+                               bootDate.toString(),\r
+                               Boolean.toString(strictRequestTermination),\r
+                               Boolean.toString(allowAllChannelAnnouncements),\r
+                               concurrentConnectionLimit,\r
+                               rejectedConnectionCount,\r
+                               peakConnectionCount,\r
+                               currentConnections, \r
+                               currentChannels,\r
+                               currentSubscriptions,\r
+                               currentSubscriptions == 0 ? 0 : (currentSubscriptions - currentConnections),\r
+                                               totalConnections,\r
+                                               totalAnnouncements,\r
+                                               totalMessages,\r
+                                               totalSubscribes,\r
+                                               totalUnsubscribes,\r
+                                               totalPings);\r
+               return info;\r
+       }\r
+       \r
+       private String infoStr(int index, String label) {\r
+               return label + ": {" + index + "}\n";\r
+       }\r
+       \r
+       private String infoInt(int index, String label) {\r
+               return label + ": {" + index + ",number,0}\n";\r
+       }\r
+\r
+}\r
diff --git a/tests/com/gitblit/tests/FanoutServiceTest.java b/tests/com/gitblit/tests/FanoutServiceTest.java
new file mode 100644 (file)
index 0000000..28e5d82
--- /dev/null
@@ -0,0 +1,172 @@
+/*\r
+ * Copyright 2013 gitblit.com.\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *     http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package com.gitblit.tests;\r
+\r
+import static org.junit.Assert.assertEquals;\r
+\r
+import java.text.MessageFormat;\r
+import java.util.Date;\r
+import java.util.Map;\r
+import java.util.concurrent.ConcurrentHashMap;\r
+import java.util.concurrent.atomic.AtomicInteger;\r
+\r
+import org.junit.Test;\r
+\r
+import com.gitblit.fanout.FanoutService;\r
+import com.gitblit.fanout.FanoutClient;\r
+import com.gitblit.fanout.FanoutClient.FanoutAdapter;\r
+import com.gitblit.fanout.FanoutNioService;\r
+import com.gitblit.fanout.FanoutService;\r
+import com.gitblit.fanout.FanoutSocketService;\r
+\r
+public class FanoutServiceTest {\r
+       \r
+       int fanoutPort = FanoutService.DEFAULT_PORT;\r
+       \r
+       @Test\r
+       public void testNioPubSub() throws Exception {\r
+               testPubSub(new FanoutNioService(fanoutPort));\r
+       }\r
+\r
+       @Test\r
+       public void testSocketPubSub() throws Exception {\r
+               testPubSub(new FanoutSocketService(fanoutPort));\r
+       }\r
+       \r
+       @Test\r
+       public void testNioDisruptionAndRecovery() throws Exception {\r
+               testDisruption(new FanoutNioService(fanoutPort));\r
+       }\r
+\r
+       @Test\r
+       public void testSocketDisruptionAndRecovery() throws Exception {\r
+               testDisruption(new FanoutSocketService(fanoutPort));\r
+       }\r
+       \r
+       protected void testPubSub(FanoutService service) throws Exception {\r
+               System.out.println(MessageFormat.format("\n\n========================================\nPUBSUB TEST {0}\n========================================\n\n", service.toString()));\r
+               service.startSynchronously();\r
+               \r
+               final Map<String, String> announcementsA = new ConcurrentHashMap<String, String>();\r
+               FanoutClient clientA = new FanoutClient("localhost", fanoutPort);\r
+               clientA.addListener(new FanoutAdapter() {\r
+                       \r
+                       @Override\r
+                       public void announcement(String channel, String message) {\r
+                               announcementsA.put(channel, message);\r
+                       }\r
+               });\r
+               \r
+               clientA.startSynchronously();\r
+\r
+               final Map<String, String> announcementsB = new ConcurrentHashMap<String, String>();\r
+               FanoutClient clientB = new FanoutClient("localhost", fanoutPort);\r
+               clientB.addListener(new FanoutAdapter() {\r
+                       @Override\r
+                       public void announcement(String channel, String message) {\r
+                               announcementsB.put(channel, message);\r
+                       }\r
+               });\r
+               clientB.startSynchronously();\r
+\r
+               \r
+               // subscribe clients A and B to the channels\r
+               clientA.subscribe("a");\r
+               clientA.subscribe("b");\r
+               clientA.subscribe("c");\r
+               \r
+               clientB.subscribe("a");\r
+               clientB.subscribe("b");\r
+               clientB.subscribe("c");\r
+               \r
+               // give async messages a chance to be delivered\r
+               Thread.sleep(1000);\r
+               \r
+               clientA.announce("a", "apple");\r
+               clientA.announce("b", "banana");\r
+               clientA.announce("c", "cantelope");\r
+               \r
+               clientB.announce("a", "avocado");\r
+               clientB.announce("b", "beet");\r
+               clientB.announce("c", "carrot");\r
+\r
+               // give async messages a chance to be delivered\r
+               Thread.sleep(2000);\r
+\r
+               // confirm that client B received client A's announcements\r
+               assertEquals("apple", announcementsB.get("a"));\r
+               assertEquals("banana", announcementsB.get("b"));\r
+               assertEquals("cantelope", announcementsB.get("c"));\r
+\r
+               // confirm that client A received client B's announcements\r
+               assertEquals("avocado", announcementsA.get("a"));\r
+               assertEquals("beet", announcementsA.get("b"));\r
+               assertEquals("carrot", announcementsA.get("c"));\r
+               \r
+               clientA.stop();\r
+               clientB.stop();\r
+               service.stop();         \r
+       }\r
+       \r
+       protected void testDisruption(FanoutService service) throws Exception  {\r
+               System.out.println(MessageFormat.format("\n\n========================================\nDISRUPTION TEST {0}\n========================================\n\n", service.toString()));\r
+               service.startSynchronously();\r
+               \r
+               final AtomicInteger pongCount = new AtomicInteger(0);\r
+               FanoutClient client = new FanoutClient("localhost", fanoutPort);\r
+               client.addListener(new FanoutAdapter() {\r
+                       @Override\r
+                       public void pong(Date timestamp) {\r
+                               pongCount.incrementAndGet();\r
+                       }\r
+               });\r
+               client.startSynchronously();\r
+               \r
+               // ping and wait for pong\r
+               client.ping();  \r
+               Thread.sleep(500);\r
+               \r
+               // restart client\r
+               client.stop();\r
+               Thread.sleep(1000);\r
+               client.startSynchronously();            \r
+               \r
+               // ping and wait for pong\r
+               client.ping();  \r
+               Thread.sleep(500);\r
+                               \r
+               assertEquals(2, pongCount.get());\r
+               \r
+               // now disrupt service\r
+               service.stop();         \r
+               Thread.sleep(2000);\r
+               service.startSynchronously();\r
+               \r
+               // wait for reconnect\r
+               Thread.sleep(2000);\r
+\r
+               // ping and wait for pong\r
+               client.ping();\r
+               Thread.sleep(500);\r
+\r
+               // kill all\r
+               client.stop();\r
+               service.stop();\r
+               \r
+               // confirm expected pong count\r
+               assertEquals(3, pongCount.get());\r
+       }\r
+}
\ No newline at end of file
index bb734eb77555542f420f772e902d8e0e548206b3..5220a6a3c44cd4406bc3eb4a78d2b4b57eff8e5c 100644 (file)
@@ -59,7 +59,8 @@ import com.gitblit.utils.JGitUtils;
                MarkdownUtilsTest.class, JGitUtilsTest.class, SyndicationUtilsTest.class,\r
                DiffUtilsTest.class, MetricUtilsTest.class, TicgitUtilsTest.class, X509UtilsTest.class,\r
                GitBlitTest.class, FederationTests.class, RpcTests.class, GitServletTest.class,\r
-               GroovyScriptTest.class, LuceneExecutorTest.class, IssuesTest.class, RepositoryModelTest.class })\r
+               GroovyScriptTest.class, LuceneExecutorTest.class, IssuesTest.class, RepositoryModelTest.class,\r
+               FanoutServiceTest.class })\r
 public class GitBlitSuite {\r
 \r
        public static final File REPOSITORIES = new File("git");\r