# SINCE 1.0.0\r
groovy.customFields = \r
\r
+#\r
+# Fanout Settings\r
+#\r
+\r
+# Fanout is a PubSub notification service that can be used by Sparkleshare\r
+# to eliminate repository change polling. The fanout service runs in a separate\r
+# thread on a separate port from the Gitblit http/https application.\r
+# This service is provided so that Sparkleshare may be used with Gitblit in\r
+# firewalled environments or where reliance on Sparkleshare's default notifications\r
+# server (notifications.sparkleshare.org) is unwanted.\r
+#\r
+# This service maintains an open socket connection from the client to the\r
+# Fanout PubSub service. This service may not work properly behind a proxy server. \r
+\r
+# Specify the interface for Fanout to bind it's service.\r
+# You may specify an ip or an empty value to bind to all interfaces.\r
+# Specifying localhost will result in Gitblit ONLY listening to requests to\r
+# localhost.\r
+#\r
+# SINCE 1.2.1\r
+# RESTART REQUIRED\r
+fanout.bindInterface = localhost\r
+\r
+# port for serving the Fanout PubSub service. <= 0 disables this service.\r
+# On Unix/Linux systems, ports < 1024 require root permissions.\r
+# Recommended value: 17000\r
+#\r
+# SINCE 1.2.1\r
+# RESTART REQUIRED\r
+fanout.port = 0\r
+\r
+# Use Fanout NIO service. If false, a multi-threaded socket service will be used.\r
+# Be advised, the socket implementation spawns a thread per connection plus the\r
+# connection acceptor thread. The NIO implementation is completely single-threaded.\r
+#\r
+# SINCE 1.2.1\r
+# RESTART REQUIRED\r
+fanout.useNio = true\r
+\r
+# Concurrent connection limit. <= 0 disables concurrent connection throttling.\r
+# If > 0, only the specified number of concurrent connections will be allowed\r
+# and all other connections will be rejected.\r
+#\r
+# SINCE 1.2.1\r
+# RESTART REQUIRED\r
+fanout.connectionLimit = 0\r
+\r
#\r
# Authentication Settings\r
#\r
- Git-notes display support\r
- Submodule support\r
- Push log based on a hidden, orphan branch refs/gitblit/pushes\r
+- Fanout PubSub notifications service for self-hosted [Sparkleshare](http://sparkleshare.org) use\r
- gh-pages display support (Jekyll is not supported)\r
- Branch metrics (uses Google Charts)\r
- HEAD and Branch RSS feeds\r
\r
#### additions\r
\r
-- Implemented a simple push log based on a hidden, orphan branch refs/gitblit/pushes (issue 177)\r
+- Fanout PubSub service for self-hosted [Sparkleshare](http://sparkleshare.org) notifications.<br/>\r
+This service is disabled by default.<br/>\r
+ **New:** *fanout.bindInterface = localhost*<br/>\r
+ **New:** *fanout.port = 0*<br/>\r
+ **New:** *fanout.useNio = true*<br/>\r
+ **New:** *fanout.connectionLimit = 0*\r
+- Implemented a simple push log based on a hidden, orphan branch refs/gitblit/pushes (issue 177)<br/>\r
+The push log is not currently visible in the ui, but the data will be collected and it will be exposed to the ui in the next release.\r
- Support for locally and remotely authenticated accounts in LdapUserService and RedmineUserService (issue 183)\r
- Added Dutch translation (github/kwoot)\r
\r
import com.gitblit.Constants.FederationToken;\r
import com.gitblit.Constants.PermissionType;\r
import com.gitblit.Constants.RegistrantType;\r
+import com.gitblit.fanout.FanoutNioService;\r
+import com.gitblit.fanout.FanoutService;\r
+import com.gitblit.fanout.FanoutSocketService;\r
import com.gitblit.models.FederationModel;\r
import com.gitblit.models.FederationProposal;\r
import com.gitblit.models.FederationSet;\r
private TimeZone timezone;\r
\r
private FileBasedConfig projectConfigs;\r
+ \r
+ private FanoutService fanoutService;\r
\r
public GitBlit() {\r
if (gitblit == null) {\r
}\r
\r
ContainerUtils.CVE_2007_0450.test();\r
+ \r
+ // startup Fanout PubSub service\r
+ if (settings.getInteger(Keys.fanout.port, 0) > 0) {\r
+ String bindInterface = settings.getString(Keys.fanout.bindInterface, null);\r
+ int port = settings.getInteger(Keys.fanout.port, FanoutService.DEFAULT_PORT);\r
+ boolean useNio = settings.getBoolean(Keys.fanout.useNio, true);\r
+ int limit = settings.getInteger(Keys.fanout.connectionLimit, 0);\r
+ \r
+ if (useNio) {\r
+ if (StringUtils.isEmpty(bindInterface)) {\r
+ fanoutService = new FanoutNioService(port);\r
+ } else {\r
+ fanoutService = new FanoutNioService(bindInterface, port);\r
+ }\r
+ } else {\r
+ if (StringUtils.isEmpty(bindInterface)) {\r
+ fanoutService = new FanoutSocketService(port);\r
+ } else {\r
+ fanoutService = new FanoutSocketService(bindInterface, port);\r
+ }\r
+ }\r
+ \r
+ fanoutService.setConcurrentConnectionLimit(limit);\r
+ fanoutService.setAllowAllChannelAnnouncements(false);\r
+ fanoutService.start();\r
+ }\r
}\r
\r
private void logTimezone(String type, TimeZone zone) {\r
scheduledExecutor.shutdownNow();\r
luceneExecutor.close();\r
gcExecutor.close();\r
+ if (fanoutService != null) {\r
+ fanoutService.stop();\r
+ }\r
}\r
\r
/**\r
--- /dev/null
+/*\r
+ * Copyright 2013 gitblit.com.\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package com.gitblit.fanout;\r
+\r
+import java.io.IOException;\r
+import java.net.InetAddress;\r
+import java.net.InetSocketAddress;\r
+import java.nio.ByteBuffer;\r
+import java.nio.channels.SelectionKey;\r
+import java.nio.channels.Selector;\r
+import java.nio.channels.SocketChannel;\r
+import java.nio.charset.Charset;\r
+import java.nio.charset.CharsetDecoder;\r
+import java.text.MessageFormat;\r
+import java.util.ArrayList;\r
+import java.util.Collections;\r
+import java.util.Date;\r
+import java.util.Iterator;\r
+import java.util.LinkedHashSet;\r
+import java.util.List;\r
+import java.util.Set;\r
+import java.util.concurrent.atomic.AtomicBoolean;\r
+\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+/**\r
+ * Fanout client class.\r
+ * \r
+ * @author James Moger\r
+ *\r
+ */\r
+public class FanoutClient implements Runnable {\r
+\r
+ private final static Logger logger = LoggerFactory.getLogger(FanoutClient.class);\r
+\r
+ private final int clientTimeout = 500;\r
+ private final int reconnectTimeout = 2000;\r
+ private final String host;\r
+ private final int port;\r
+ private final List<FanoutListener> listeners;\r
+\r
+ private String id;\r
+ private volatile Selector selector;\r
+ private volatile SocketChannel socketCh;\r
+ private Thread clientThread;\r
+ \r
+ private final AtomicBoolean isConnected;\r
+ private final AtomicBoolean isRunning;\r
+ private final AtomicBoolean isAutomaticReconnect;\r
+ private final ByteBuffer writeBuffer;\r
+ private final ByteBuffer readBuffer;\r
+ private final CharsetDecoder decoder;\r
+ \r
+ private final Set<String> subscriptions;\r
+ private boolean resubscribe;\r
+ \r
+ public interface FanoutListener {\r
+ public void pong(Date timestamp);\r
+ public void announcement(String channel, String message);\r
+ }\r
+ \r
+ public static class FanoutAdapter implements FanoutListener {\r
+ public void pong(Date timestamp) { }\r
+ public void announcement(String channel, String message) { }\r
+ }\r
+\r
+ public static void main(String args[]) throws Exception {\r
+ FanoutClient client = new FanoutClient("localhost", 2000);\r
+ client.addListener(new FanoutAdapter() {\r
+\r
+ @Override\r
+ public void pong(Date timestamp) {\r
+ System.out.println("Pong. " + timestamp);\r
+ }\r
+ \r
+ @Override\r
+ public void announcement(String channel, String message) {\r
+ System.out.println(MessageFormat.format("Here ye, Here ye. {0} says {1}", channel, message));\r
+ }\r
+ });\r
+ client.start();\r
+ \r
+ Thread.sleep(5000);\r
+ client.ping();\r
+ client.subscribe("james");\r
+ client.announce("james", "12345"); \r
+ client.subscribe("c52f99d16eb5627877ae957df7ce1be102783bd5");\r
+ \r
+ while (true) {\r
+ Thread.sleep(10000);\r
+ client.ping();\r
+ }\r
+ }\r
+\r
+ public FanoutClient(String host, int port) {\r
+ this.host = host;\r
+ this.port = port;\r
+ readBuffer = ByteBuffer.allocateDirect(FanoutConstants.BUFFER_LENGTH);\r
+ writeBuffer = ByteBuffer.allocateDirect(FanoutConstants.BUFFER_LENGTH);\r
+ decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder();\r
+ listeners = Collections.synchronizedList(new ArrayList<FanoutListener>());\r
+ subscriptions = new LinkedHashSet<String>();\r
+ isRunning = new AtomicBoolean(false);\r
+ isConnected = new AtomicBoolean(false);\r
+ isAutomaticReconnect = new AtomicBoolean(true);\r
+ }\r
+\r
+ public void addListener(FanoutListener listener) {\r
+ listeners.add(listener);\r
+ }\r
+\r
+ public void removeListener(FanoutListener listener) {\r
+ listeners.remove(listener);\r
+ }\r
+ \r
+ public boolean isAutomaticReconnect() {\r
+ return isAutomaticReconnect.get();\r
+ }\r
+ \r
+ public void setAutomaticReconnect(boolean value) {\r
+ isAutomaticReconnect.set(value);\r
+ }\r
+\r
+ public void ping() {\r
+ confirmConnection();\r
+ write("ping");\r
+ }\r
+\r
+ public void status() {\r
+ confirmConnection();\r
+ write("status");\r
+ }\r
+ \r
+ public void subscribe(String channel) {\r
+ confirmConnection();\r
+ if (subscriptions.add(channel)) {\r
+ write("subscribe " + channel);\r
+ }\r
+ }\r
+ \r
+ public void unsubscribe(String channel) {\r
+ confirmConnection();\r
+ if (subscriptions.remove(channel)) {\r
+ write("unsubscribe " + channel);\r
+ }\r
+ }\r
+ \r
+ public void announce(String channel, String message) {\r
+ confirmConnection();\r
+ write("announce " + channel + " " + message);\r
+ }\r
+\r
+ private void confirmConnection() {\r
+ if (!isConnected()) {\r
+ throw new RuntimeException("Fanout client is disconnected!");\r
+ }\r
+ }\r
+ \r
+ public boolean isConnected() {\r
+ return isRunning.get() && socketCh != null && isConnected.get();\r
+ }\r
+ \r
+ /**\r
+ * Start client connection and return immediately.\r
+ */\r
+ public void start() {\r
+ if (isRunning.get()) {\r
+ logger.warn("Fanout client is already running");\r
+ return;\r
+ }\r
+ clientThread = new Thread(this, "Fanout client");\r
+ clientThread.start();\r
+ }\r
+ \r
+ /**\r
+ * Start client connection and wait until it has connected.\r
+ */\r
+ public void startSynchronously() {\r
+ start();\r
+ while (!isConnected()) { \r
+ try {\r
+ Thread.sleep(100);\r
+ } catch (Exception e) {\r
+ }\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Stops client connection. This method returns when the connection has\r
+ * been completely shutdown.\r
+ */\r
+ public void stop() {\r
+ if (!isRunning.get()) {\r
+ logger.warn("Fanout client is not running");\r
+ return;\r
+ }\r
+ isRunning.set(false);\r
+ try {\r
+ if (clientThread != null) {\r
+ clientThread.join();\r
+ clientThread = null;\r
+ }\r
+ } catch (InterruptedException e1) {\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void run() {\r
+ resetState();\r
+ \r
+ isRunning.set(true); \r
+ while (isRunning.get()) {\r
+ // (re)connect\r
+ if (socketCh == null) {\r
+ try {\r
+ InetAddress addr = InetAddress.getByName(host);\r
+ socketCh = SocketChannel.open(new InetSocketAddress(addr, port));\r
+ socketCh.configureBlocking(false);\r
+ selector = Selector.open();\r
+ id = FanoutConstants.getLocalSocketId(socketCh.socket()); \r
+ socketCh.register(selector, SelectionKey.OP_READ);\r
+ } catch (Exception e) {\r
+ logger.error(MessageFormat.format("failed to open client connection to {0}:{1,number,0}", host, port), e);\r
+ try {\r
+ Thread.sleep(reconnectTimeout);\r
+ } catch (InterruptedException x) {\r
+ }\r
+ continue;\r
+ }\r
+ }\r
+ \r
+ // read/write\r
+ try {\r
+ selector.select(clientTimeout);\r
+\r
+ Iterator<SelectionKey> i = selector.selectedKeys().iterator();\r
+ while (i.hasNext()) {\r
+ SelectionKey key = i.next();\r
+ i.remove();\r
+ \r
+ if (key.isReadable()) {\r
+ // read message\r
+ String content = read();\r
+ String[] lines = content.split("\n");\r
+ for (String reply : lines) {\r
+ logger.trace(MessageFormat.format("fanout client {0} received: {1}", id, reply));\r
+ if (!processReply(reply)) {\r
+ logger.error(MessageFormat.format("fanout client {0} received unknown message", id));\r
+ }\r
+ }\r
+ } else if (key.isWritable()) {\r
+ // resubscribe\r
+ if (resubscribe) {\r
+ resubscribe = false;\r
+ logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size())); \r
+ for (String subscription : subscriptions) {\r
+ write("subscribe " + subscription);\r
+ }\r
+ }\r
+ socketCh.register(selector, SelectionKey.OP_READ);\r
+ }\r
+ }\r
+ } catch (IOException e) {\r
+ logger.error(MessageFormat.format("fanout client {0} error: {1}", id, e.getMessage()));\r
+ closeChannel(); \r
+ if (!isAutomaticReconnect.get()) {\r
+ isRunning.set(false);\r
+ continue;\r
+ }\r
+ }\r
+ }\r
+ \r
+ closeChannel();\r
+ resetState();\r
+ }\r
+ \r
+ protected void resetState() {\r
+ readBuffer.clear();\r
+ writeBuffer.clear();\r
+ isRunning.set(false);\r
+ isConnected.set(false);\r
+ }\r
+ \r
+ private void closeChannel() {\r
+ try {\r
+ if (socketCh != null) {\r
+ socketCh.close();\r
+ socketCh = null;\r
+ selector.close();\r
+ selector = null;\r
+ isConnected.set(false);\r
+ }\r
+ } catch (IOException x) {\r
+ }\r
+ }\r
+\r
+ protected boolean processReply(String reply) {\r
+ String[] fields = reply.split("!", 2);\r
+ if (fields.length == 1) {\r
+ try {\r
+ long time = Long.parseLong(fields[0]);\r
+ Date date = new Date(time);\r
+ firePong(date);\r
+ } catch (Exception e) { \r
+ }\r
+ return true;\r
+ } else if (fields.length == 2) {\r
+ String channel = fields[0];\r
+ String message = fields[1];\r
+ if (FanoutConstants.CH_DEBUG.equals(channel)) {\r
+ // debug messages are for internal use\r
+ if (FanoutConstants.MSG_CONNECTED.equals(message)) {\r
+ isConnected.set(true);\r
+ resubscribe = subscriptions.size() > 0;\r
+ if (resubscribe) {\r
+ try {\r
+ // register for async resubscribe\r
+ socketCh.register(selector, SelectionKey.OP_WRITE);\r
+ } catch (Exception e) {\r
+ logger.error("an error occurred", e);\r
+ }\r
+ }\r
+ }\r
+ logger.debug(MessageFormat.format("fanout client {0} < {1}", id, reply));\r
+ } else {\r
+ fireAnnouncement(channel, message);\r
+ }\r
+ return true;\r
+ } else {\r
+ // unknown message\r
+ return false;\r
+ }\r
+ }\r
+\r
+ protected void firePong(Date timestamp) {\r
+ logger.info(MessageFormat.format("fanout client {0} < pong {1,date,yyyy-MM-dd HH:mm:ss}", id, timestamp));\r
+ for (FanoutListener listener : listeners) {\r
+ try {\r
+ listener.pong(timestamp);\r
+ } catch (Throwable t) {\r
+ logger.error("FanoutListener threw an exception!", t);\r
+ }\r
+ }\r
+ }\r
+ protected void fireAnnouncement(String channel, String message) {\r
+ logger.info(MessageFormat.format("fanout client {0} < announcement {1} {2}", id, channel, message));\r
+ for (FanoutListener listener : listeners) {\r
+ try {\r
+ listener.announcement(channel, message);\r
+ } catch (Throwable t) {\r
+ logger.error("FanoutListener threw an exception!", t);\r
+ }\r
+ }\r
+ }\r
+ \r
+ protected synchronized String read() throws IOException {\r
+ readBuffer.clear();\r
+ long len = socketCh.read(readBuffer);\r
+\r
+ if (len == -1) {\r
+ logger.error(MessageFormat.format("fanout client {0} lost connection to {1}:{2,number,0}, end of stream", id, host, port));\r
+ socketCh.close();\r
+ return null;\r
+ } else {\r
+ readBuffer.flip();\r
+ String content = decoder.decode(readBuffer).toString();\r
+ readBuffer.clear();\r
+ return content;\r
+ }\r
+ }\r
+ \r
+ protected synchronized boolean write(String message) {\r
+ try {\r
+ logger.info(MessageFormat.format("fanout client {0} > {1}", id, message));\r
+ byte [] bytes = message.getBytes(FanoutConstants.CHARSET);\r
+ writeBuffer.clear();\r
+ writeBuffer.put(bytes);\r
+ if (bytes[bytes.length - 1] != 0xa) {\r
+ writeBuffer.put((byte) 0xa);\r
+ }\r
+ writeBuffer.flip();\r
+\r
+ // loop until write buffer has been completely sent\r
+ long written = 0;\r
+ long toWrite = writeBuffer.remaining();\r
+ while (written != toWrite) {\r
+ written += socketCh.write(writeBuffer);\r
+ try {\r
+ Thread.sleep(10);\r
+ } catch (Exception x) {\r
+ }\r
+ }\r
+ return true;\r
+ } catch (IOException e) {\r
+ logger.error("fanout client {0} error: {1}", id, e.getMessage());\r
+ }\r
+ return false;\r
+ }\r
+}
\ No newline at end of file
--- /dev/null
+/*\r
+ * Copyright 2013 gitblit.com.\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package com.gitblit.fanout;\r
+\r
+import java.net.Socket;\r
+\r
+public class FanoutConstants {\r
+\r
+ public final static String CHARSET = "ISO-8859-1";\r
+ public final static int BUFFER_LENGTH = 512;\r
+ public final static String CH_ALL = "all";\r
+ public final static String CH_DEBUG = "debug";\r
+ public final static String MSG_CONNECTED = "connected...";\r
+ public final static String MSG_BUSY = "busy";\r
+ \r
+ public static String getRemoteSocketId(Socket socket) {\r
+ return socket.getInetAddress().getHostAddress() + ":" + socket.getPort();\r
+ }\r
+ \r
+ public static String getLocalSocketId(Socket socket) {\r
+ return socket.getInetAddress().getHostAddress() + ":" + socket.getLocalPort();\r
+ }\r
+}\r
--- /dev/null
+/*\r
+ * Copyright 2013 gitblit.com.\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package com.gitblit.fanout;\r
+\r
+import java.io.IOException;\r
+import java.net.InetSocketAddress;\r
+import java.nio.ByteBuffer;\r
+import java.nio.CharBuffer;\r
+import java.nio.channels.SelectionKey;\r
+import java.nio.channels.Selector;\r
+import java.nio.channels.ServerSocketChannel;\r
+import java.nio.channels.SocketChannel;\r
+import java.nio.charset.CharacterCodingException;\r
+import java.nio.charset.Charset;\r
+import java.nio.charset.CharsetDecoder;\r
+import java.text.MessageFormat;\r
+import java.util.ArrayList;\r
+import java.util.Arrays;\r
+import java.util.Collection;\r
+import java.util.HashMap;\r
+import java.util.Iterator;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.Set;\r
+\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+/**\r
+ * A single-thread NIO implementation of https://github.com/travisghansen/fanout\r
+ *\r
+ * This implementation uses channels and selectors, which are the Java analog of\r
+ * the Linux epoll mechanism used in the original fanout C code.\r
+ * \r
+ * @author James Moger\r
+ *\r
+ */\r
+public class FanoutNioService extends FanoutService {\r
+\r
+ private final static Logger logger = LoggerFactory.getLogger(FanoutNioService.class);\r
+\r
+ private volatile ServerSocketChannel serviceCh;\r
+ private volatile Selector selector;\r
+ \r
+ public static void main(String[] args) throws Exception {\r
+ FanoutNioService pubsub = new FanoutNioService(null, DEFAULT_PORT);\r
+ pubsub.setStrictRequestTermination(false);\r
+ pubsub.setAllowAllChannelAnnouncements(false);\r
+ pubsub.start();\r
+ }\r
+\r
+ /**\r
+ * Create a single-threaded fanout service.\r
+ * \r
+ * @param host\r
+ * @param port\r
+ * the port for running the fanout PubSub service\r
+ * @throws IOException\r
+ */\r
+ public FanoutNioService(int port) {\r
+ this(null, port);\r
+ }\r
+ \r
+ /**\r
+ * Create a single-threaded fanout service.\r
+ * \r
+ * @param bindInterface\r
+ * the ip address to bind for the service, may be null\r
+ * @param port\r
+ * the port for running the fanout PubSub service\r
+ * @throws IOException\r
+ */\r
+ public FanoutNioService(String bindInterface, int port) {\r
+ super(bindInterface, port, "Fanout nio service");\r
+ }\r
+ \r
+ @Override\r
+ protected boolean isConnected() {\r
+ return serviceCh != null;\r
+ }\r
+\r
+ @Override\r
+ protected boolean connect() {\r
+ if (serviceCh == null) {\r
+ try {\r
+ serviceCh = ServerSocketChannel.open();\r
+ serviceCh.configureBlocking(false);\r
+ serviceCh.socket().setReuseAddress(true);\r
+ serviceCh.socket().bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));\r
+ selector = Selector.open();\r
+ serviceCh.register(selector, SelectionKey.OP_ACCEPT);\r
+ logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}", \r
+ name, host == null ? "0.0.0.0" : host, port));\r
+ } catch (IOException e) {\r
+ logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}", \r
+ name, name, host == null ? "0.0.0.0" : host, port), e);\r
+ return false;\r
+ }\r
+ }\r
+ return true;\r
+ }\r
+\r
+ @Override\r
+ protected void disconnect() {\r
+ try {\r
+ if (serviceCh != null) {\r
+ // close all active client connections\r
+ Map<String, SocketChannel> clients = getCurrentClientSockets();\r
+ for (Map.Entry<String, SocketChannel> client : clients.entrySet()) {\r
+ closeClientSocket(client.getKey(), client.getValue());\r
+ }\r
+ \r
+ // close service socket channel\r
+ logger.debug(MessageFormat.format("closing {0} socket channel", name));\r
+ serviceCh.socket().close();\r
+ serviceCh.close(); \r
+ serviceCh = null;\r
+ selector.close();\r
+ selector = null;\r
+ }\r
+ } catch (IOException e) {\r
+ logger.error(MessageFormat.format("failed to disconnect {0}", name), e);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ protected void listen() throws IOException {\r
+ while (selector.select(serviceTimeout) > 0) {\r
+ Set<SelectionKey> keys = selector.selectedKeys();\r
+ Iterator<SelectionKey> keyItr = keys.iterator();\r
+ while (keyItr.hasNext()) {\r
+ SelectionKey key = (SelectionKey) keyItr.next();\r
+ if (key.isAcceptable()) {\r
+ // new fanout client connection\r
+ ServerSocketChannel sch = (ServerSocketChannel) key.channel();\r
+ try {\r
+ SocketChannel ch = sch.accept();\r
+ ch.configureBlocking(false);\r
+ configureClientSocket(ch.socket());\r
+\r
+ FanoutNioConnection connection = new FanoutNioConnection(ch);\r
+ addConnection(connection);\r
+\r
+ // register to send the queued message\r
+ ch.register(selector, SelectionKey.OP_WRITE, connection);\r
+ } catch (IOException e) {\r
+ logger.error("error accepting fanout connection", e);\r
+ }\r
+ } else if (key.isReadable()) {\r
+ // read fanout client request\r
+ SocketChannel ch = (SocketChannel) key.channel();\r
+ FanoutNioConnection connection = (FanoutNioConnection) key.attachment();\r
+ try {\r
+ connection.read(ch, isStrictRequestTermination());\r
+ int replies = 0;\r
+ Iterator<String> reqItr = connection.requestQueue.iterator();\r
+ while (reqItr.hasNext()) {\r
+ String req = reqItr.next();\r
+ String reply = processRequest(connection, req);\r
+ reqItr.remove();\r
+ if (reply != null) {\r
+ replies++;\r
+ }\r
+ }\r
+\r
+ if (replies > 0) {\r
+ // register to send the replies to requests\r
+ ch.register(selector, SelectionKey.OP_WRITE, connection);\r
+ } else {\r
+ // re-register for next read\r
+ ch.register(selector, SelectionKey.OP_READ, connection);\r
+ }\r
+ } catch (IOException e) {\r
+ logger.error(MessageFormat.format("fanout connection {0} error: {1}", connection.id, e.getMessage()));\r
+ removeConnection(connection);\r
+ closeClientSocket(connection.id, ch);\r
+ }\r
+ } else if (key.isWritable()) {\r
+ // asynchronous reply to fanout client request\r
+ SocketChannel ch = (SocketChannel) key.channel();\r
+ FanoutNioConnection connection = (FanoutNioConnection) key.attachment();\r
+ try {\r
+ connection.write(ch);\r
+\r
+ if (hasConnection(connection)) {\r
+ // register for next read\r
+ ch.register(selector, SelectionKey.OP_READ, connection);\r
+ } else {\r
+ // Connection was rejected due to load or\r
+ // some other reason. Close it.\r
+ closeClientSocket(connection.id, ch);\r
+ }\r
+ } catch (IOException e) {\r
+ logger.error(MessageFormat.format("fanout connection {0}: {1}", connection.id, e.getMessage()));\r
+ removeConnection(connection);\r
+ closeClientSocket(connection.id, ch);\r
+ }\r
+ }\r
+ keyItr.remove();\r
+ }\r
+ }\r
+ }\r
+ \r
+ protected void closeClientSocket(String id, SocketChannel ch) {\r
+ try {\r
+ ch.close();\r
+ } catch (IOException e) {\r
+ logger.error(MessageFormat.format("fanout connection {0}", id), e);\r
+ }\r
+ }\r
+ \r
+ protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {\r
+ super.broadcast(connections, channel, message);\r
+ \r
+ // register queued write\r
+ Map<String, SocketChannel> sockets = getCurrentClientSockets();\r
+ for (FanoutServiceConnection connection : connections) {\r
+ SocketChannel ch = sockets.get(connection.id);\r
+ if (ch == null) {\r
+ logger.warn(MessageFormat.format("fanout connection {0} has been disconnected", connection.id));\r
+ removeConnection(connection);\r
+ continue;\r
+ }\r
+ try {\r
+ ch.register(selector, SelectionKey.OP_WRITE, connection);\r
+ } catch (IOException e) {\r
+ logger.error(MessageFormat.format("failed to register write op for fanout connection {0}", connection.id));\r
+ }\r
+ }\r
+ }\r
+ \r
+ protected Map<String, SocketChannel> getCurrentClientSockets() {\r
+ Map<String, SocketChannel> sockets = new HashMap<String, SocketChannel>();\r
+ for (SelectionKey key : selector.keys()) {\r
+ if (key.channel() instanceof SocketChannel) {\r
+ SocketChannel ch = (SocketChannel) key.channel();\r
+ String id = FanoutConstants.getRemoteSocketId(ch.socket());\r
+ sockets.put(id, ch);\r
+ }\r
+ }\r
+ return sockets;\r
+ }\r
+ \r
+ /**\r
+ * FanoutNioConnection handles reading/writing messages from a remote fanout\r
+ * connection.\r
+ * \r
+ * @author James Moger\r
+ *\r
+ */\r
+ static class FanoutNioConnection extends FanoutServiceConnection {\r
+ final ByteBuffer readBuffer;\r
+ final ByteBuffer writeBuffer;\r
+ final List<String> requestQueue;\r
+ final List<String> replyQueue;\r
+ final CharsetDecoder decoder;\r
+\r
+ FanoutNioConnection(SocketChannel ch) {\r
+ super(ch.socket());\r
+ readBuffer = ByteBuffer.allocate(FanoutConstants.BUFFER_LENGTH);\r
+ writeBuffer = ByteBuffer.allocate(FanoutConstants.BUFFER_LENGTH);\r
+ requestQueue = new ArrayList<String>();\r
+ replyQueue = new ArrayList<String>();\r
+ decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder();\r
+ }\r
+ \r
+ protected void read(SocketChannel ch, boolean strictRequestTermination) throws CharacterCodingException, IOException {\r
+ long bytesRead = 0;\r
+ readBuffer.clear();\r
+ bytesRead = ch.read(readBuffer);\r
+ readBuffer.flip();\r
+ if (bytesRead == -1) {\r
+ throw new IOException("lost client connection, end of stream");\r
+ }\r
+ if (readBuffer.limit() == 0) {\r
+ return;\r
+ }\r
+ CharBuffer cbuf = decoder.decode(readBuffer);\r
+ String req = cbuf.toString();\r
+ String [] lines = req.split(strictRequestTermination ? "\n" : "\n|\r");\r
+ requestQueue.addAll(Arrays.asList(lines));\r
+ }\r
+ \r
+ protected void write(SocketChannel ch) throws IOException {\r
+ Iterator<String> itr = replyQueue.iterator();\r
+ while (itr.hasNext()) {\r
+ String reply = itr.next();\r
+ writeBuffer.clear();\r
+ logger.debug(MessageFormat.format("fanout reply to {0}: {1}", id, reply));\r
+ byte [] bytes = reply.getBytes(FanoutConstants.CHARSET);\r
+ writeBuffer.put(bytes);\r
+ if (bytes[bytes.length - 1] != 0xa) {\r
+ writeBuffer.put((byte) 0xa);\r
+ }\r
+ writeBuffer.flip();\r
+ \r
+ // loop until write buffer has been completely sent\r
+ int written = 0;\r
+ int toWrite = writeBuffer.remaining();\r
+ while (written != toWrite) {\r
+ written += ch.write(writeBuffer);\r
+ try {\r
+ Thread.sleep(10);\r
+ } catch (Exception x) {\r
+ }\r
+ } \r
+ itr.remove();\r
+ }\r
+ writeBuffer.clear();\r
+ }\r
+\r
+ @Override\r
+ protected void reply(String content) throws IOException {\r
+ // queue the reply\r
+ // replies are transmitted asynchronously from the requests\r
+ replyQueue.add(content);\r
+ }\r
+ }\r
+}
\ No newline at end of file
--- /dev/null
+/*\r
+ * Copyright 2013 gitblit.com.\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package com.gitblit.fanout;\r
+\r
+import java.io.IOException;\r
+import java.net.Socket;\r
+import java.net.SocketException;\r
+import java.text.MessageFormat;\r
+import java.util.ArrayList;\r
+import java.util.Collection;\r
+import java.util.Date;\r
+import java.util.Iterator;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.Set;\r
+import java.util.concurrent.ConcurrentHashMap;\r
+import java.util.concurrent.ConcurrentSkipListSet;\r
+import java.util.concurrent.atomic.AtomicBoolean;\r
+import java.util.concurrent.atomic.AtomicInteger;\r
+import java.util.concurrent.atomic.AtomicLong;\r
+\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+/**\r
+ * Base class for Fanout service implementations.\r
+ * \r
+ * Subclass implementations can be used as a Sparkleshare PubSub notification\r
+ * server. This allows Sparkleshare to be used in conjunction with Gitblit\r
+ * behind a corporate firewall that restricts or prohibits client internet access\r
+ * to the default Sparkleshare PubSub server: notifications.sparkleshare.org\r
+ * \r
+ * @author James Moger\r
+ *\r
+ */\r
+public abstract class FanoutService implements Runnable {\r
+\r
+ private final static Logger logger = LoggerFactory.getLogger(FanoutService.class);\r
+ \r
+ public final static int DEFAULT_PORT = 17000;\r
+ \r
+ protected final static int serviceTimeout = 5000;\r
+\r
+ protected final String host;\r
+ protected final int port;\r
+ protected final String name; \r
+ \r
+ private Thread serviceThread;\r
+ \r
+ private final Map<String, FanoutServiceConnection> connections;\r
+ private final Map<String, Set<FanoutServiceConnection>> subscriptions;\r
+\r
+ protected final AtomicBoolean isRunning;\r
+ private final AtomicBoolean strictRequestTermination;\r
+ private final AtomicBoolean allowAllChannelAnnouncements;\r
+ private final AtomicInteger concurrentConnectionLimit;\r
+ \r
+ private final Date bootDate;\r
+ private final AtomicLong rejectedConnectionCount;\r
+ private final AtomicInteger peakConnectionCount;\r
+ private final AtomicLong totalConnections;\r
+ private final AtomicLong totalAnnouncements;\r
+ private final AtomicLong totalMessages;\r
+ private final AtomicLong totalSubscribes;\r
+ private final AtomicLong totalUnsubscribes;\r
+ private final AtomicLong totalPings;\r
+\r
+ protected FanoutService(String host, int port, String name) {\r
+ this.host = host;\r
+ this.port = port;\r
+ this.name = name;\r
+ \r
+ connections = new ConcurrentHashMap<String, FanoutServiceConnection>();\r
+ subscriptions = new ConcurrentHashMap<String, Set<FanoutServiceConnection>>();\r
+ subscriptions.put(FanoutConstants.CH_ALL, new ConcurrentSkipListSet<FanoutServiceConnection>());\r
+ \r
+ isRunning = new AtomicBoolean(false);\r
+ strictRequestTermination = new AtomicBoolean(false);\r
+ allowAllChannelAnnouncements = new AtomicBoolean(false);\r
+ concurrentConnectionLimit = new AtomicInteger(0);\r
+ \r
+ bootDate = new Date();\r
+ rejectedConnectionCount = new AtomicLong(0);\r
+ peakConnectionCount = new AtomicInteger(0);\r
+ totalConnections = new AtomicLong(0);\r
+ totalAnnouncements = new AtomicLong(0);\r
+ totalMessages = new AtomicLong(0);\r
+ totalSubscribes = new AtomicLong(0);\r
+ totalUnsubscribes = new AtomicLong(0);\r
+ totalPings = new AtomicLong(0);\r
+ }\r
+\r
+ /*\r
+ * Abstract methods\r
+ */\r
+ \r
+ protected abstract boolean isConnected();\r
+ \r
+ protected abstract boolean connect();\r
+ \r
+ protected abstract void listen() throws IOException;\r
+ \r
+ protected abstract void disconnect();\r
+ \r
+ /**\r
+ * Returns true if the service requires \n request termination.\r
+ * \r
+ * @return true if request requires \n termination\r
+ */\r
+ public boolean isStrictRequestTermination() {\r
+ return strictRequestTermination.get();\r
+ }\r
+\r
+ /**\r
+ * Control the termination of fanout requests. If true, fanout requests must\r
+ * be terminated with \n. If false, fanout requests may be terminated with\r
+ * \n, \r, \r\n, or \n\r. This is useful for debugging with a telnet client.\r
+ * \r
+ * @param isStrictTermination\r
+ */\r
+ public void setStrictRequestTermination(boolean isStrictTermination) {\r
+ strictRequestTermination.set(isStrictTermination);\r
+ }\r
+ \r
+ /**\r
+ * Returns the maximum allowable concurrent fanout connections.\r
+ * \r
+ * @return the maximum allowable concurrent connection count\r
+ */\r
+ public int getConcurrentConnectionLimit() {\r
+ return concurrentConnectionLimit.get();\r
+ }\r
+ \r
+ /**\r
+ * Sets the maximum allowable concurrent fanout connection count.\r
+ * \r
+ * @param value\r
+ */\r
+ public void setConcurrentConnectionLimit(int value) {\r
+ concurrentConnectionLimit.set(value);\r
+ }\r
+ \r
+ /**\r
+ * Returns true if connections are allowed to announce on the all channel.\r
+ * \r
+ * @return true if connections are allowed to announce on the all channel\r
+ */\r
+ public boolean allowAllChannelAnnouncements() {\r
+ return allowAllChannelAnnouncements.get();\r
+ }\r
+ \r
+ /**\r
+ * Allows/prohibits connections from announcing on the ALL channel.\r
+ * \r
+ * @param value\r
+ */\r
+ public void setAllowAllChannelAnnouncements(boolean value) {\r
+ allowAllChannelAnnouncements.set(value);\r
+ }\r
+ \r
+ /**\r
+ * Returns the current connections\r
+ * \r
+ * @param channel\r
+ * @return map of current connections keyed by their id\r
+ */\r
+ public Map<String, FanoutServiceConnection> getCurrentConnections() {\r
+ return connections;\r
+ }\r
+ \r
+ /**\r
+ * Returns all subscriptions\r
+ * \r
+ * @return map of current subscriptions keyed by channel name\r
+ */\r
+ public Map<String, Set<FanoutServiceConnection>> getCurrentSubscriptions() {\r
+ return subscriptions;\r
+ }\r
+\r
+ /**\r
+ * Returns the subscriptions for the specified channel\r
+ * \r
+ * @param channel\r
+ * @return set of subscribed connections for the specified channel\r
+ */\r
+ public Set<FanoutServiceConnection> getCurrentSubscriptions(String channel) {\r
+ return subscriptions.get(channel);\r
+ }\r
+\r
+ /**\r
+ * Returns the runtime statistics object for this service.\r
+ * \r
+ * @return stats\r
+ */\r
+ public FanoutStats getStatistics() {\r
+ FanoutStats stats = new FanoutStats();\r
+ \r
+ // settings\r
+ stats.allowAllChannelAnnouncements = allowAllChannelAnnouncements();\r
+ stats.concurrentConnectionLimit = getConcurrentConnectionLimit();\r
+ stats.strictRequestTermination = isStrictRequestTermination();\r
+ \r
+ // runtime stats\r
+ stats.bootDate = bootDate;\r
+ stats.rejectedConnectionCount = rejectedConnectionCount.get();\r
+ stats.peakConnectionCount = peakConnectionCount.get();\r
+ stats.totalConnections = totalConnections.get();\r
+ stats.totalAnnouncements = totalAnnouncements.get();\r
+ stats.totalMessages = totalMessages.get();\r
+ stats.totalSubscribes = totalSubscribes.get();\r
+ stats.totalUnsubscribes = totalUnsubscribes.get();\r
+ stats.totalPings = totalPings.get(); \r
+ stats.currentConnections = connections.size();\r
+ stats.currentChannels = subscriptions.size();\r
+ stats.currentSubscriptions = subscriptions.size() * connections.size();\r
+ return stats;\r
+ }\r
+ \r
+ /**\r
+ * Returns true if the service is ready.\r
+ * \r
+ * @return true, if the service is ready\r
+ */\r
+ public boolean isReady() {\r
+ if (isRunning.get()) {\r
+ return isConnected();\r
+ }\r
+ return false;\r
+ }\r
+\r
+ /**\r
+ * Start the Fanout service thread and immediatel return.\r
+ * \r
+ */\r
+ public void start() {\r
+ if (isRunning.get()) {\r
+ logger.warn(MessageFormat.format("{0} is already running", name));\r
+ return;\r
+ }\r
+ serviceThread = new Thread(this);\r
+ serviceThread.setName(MessageFormat.format("{0} {1}:{2,number,0}", name, host == null ? "all" : host, port));\r
+ serviceThread.start();\r
+ }\r
+ \r
+ /**\r
+ * Start the Fanout service thread and wait until it is accepting connections.\r
+ * \r
+ */\r
+ public void startSynchronously() {\r
+ start();\r
+ while (!isReady()) {\r
+ try {\r
+ Thread.sleep(100);\r
+ } catch (Exception e) {\r
+ }\r
+ }\r
+ }\r
+ \r
+ /**\r
+ * Stop the Fanout service. This method returns when the service has been\r
+ * completely shutdown.\r
+ */\r
+ public void stop() {\r
+ if (!isRunning.get()) {\r
+ logger.warn(MessageFormat.format("{0} is not running", name));\r
+ return;\r
+ }\r
+ logger.info(MessageFormat.format("stopping {0}...", name));\r
+ isRunning.set(false);\r
+ try {\r
+ if (serviceThread != null) {\r
+ serviceThread.join();\r
+ serviceThread = null;\r
+ }\r
+ } catch (InterruptedException e1) {\r
+ logger.error("", e1);\r
+ }\r
+ logger.info(MessageFormat.format("stopped {0}", name));\r
+ }\r
+ \r
+ /**\r
+ * Main execution method of the service\r
+ */\r
+ @Override\r
+ public final void run() {\r
+ disconnect();\r
+ resetState();\r
+ isRunning.set(true);\r
+ while (isRunning.get()) {\r
+ if (connect()) {\r
+ try {\r
+ listen();\r
+ } catch (IOException e) {\r
+ logger.error(MessageFormat.format("error processing {0}", name), e);\r
+ isRunning.set(false);\r
+ }\r
+ } else {\r
+ try {\r
+ Thread.sleep(serviceTimeout);\r
+ } catch (InterruptedException x) {\r
+ }\r
+ }\r
+ }\r
+ disconnect(); \r
+ resetState();\r
+ }\r
+ \r
+ protected void resetState() {\r
+ // reset state data\r
+ connections.clear();\r
+ subscriptions.clear();\r
+ rejectedConnectionCount.set(0);\r
+ peakConnectionCount.set(0);\r
+ totalConnections.set(0);\r
+ totalAnnouncements.set(0);\r
+ totalMessages.set(0);\r
+ totalSubscribes.set(0);\r
+ totalUnsubscribes.set(0);\r
+ totalPings.set(0);\r
+ }\r
+\r
+ /**\r
+ * Configure the client connection socket.\r
+ * \r
+ * @param socket\r
+ * @throws SocketException\r
+ */\r
+ protected void configureClientSocket(Socket socket) throws SocketException {\r
+ socket.setKeepAlive(true); \r
+ socket.setSoLinger(true, 0); // immediately discard any remaining data\r
+ }\r
+ \r
+ /**\r
+ * Add the connection to the connections map.\r
+ * \r
+ * @param connection\r
+ * @return false if the connection was rejected due to too many concurrent\r
+ * connections\r
+ */\r
+ protected boolean addConnection(FanoutServiceConnection connection) { \r
+ int limit = getConcurrentConnectionLimit();\r
+ if (limit > 0 && connections.size() > limit) {\r
+ logger.info(MessageFormat.format("hit {0,number,0} connection limit, rejecting fanout connection", concurrentConnectionLimit));\r
+ increment(rejectedConnectionCount);\r
+ connection.busy();\r
+ return false;\r
+ }\r
+ \r
+ // add the connection to our map\r
+ connections.put(connection.id, connection);\r
+\r
+ // track peak number of concurrent connections\r
+ if (connections.size() > peakConnectionCount.get()) {\r
+ peakConnectionCount.set(connections.size());\r
+ }\r
+\r
+ logger.info("fanout new connection " + connection.id);\r
+ connection.connected();\r
+ return true;\r
+ }\r
+ \r
+ /**\r
+ * Remove the connection from the connections list and from subscriptions.\r
+ * \r
+ * @param connection\r
+ */\r
+ protected void removeConnection(FanoutServiceConnection connection) {\r
+ connections.remove(connection.id);\r
+ Iterator<Map.Entry<String, Set<FanoutServiceConnection>>> itr = subscriptions.entrySet().iterator();\r
+ while (itr.hasNext()) {\r
+ Map.Entry<String, Set<FanoutServiceConnection>> entry = itr.next();\r
+ Set<FanoutServiceConnection> subscriptions = entry.getValue();\r
+ subscriptions.remove(connection);\r
+ if (!FanoutConstants.CH_ALL.equals(entry.getKey())) {\r
+ if (subscriptions.size() == 0) {\r
+ itr.remove();\r
+ logger.info(MessageFormat.format("fanout remove channel {0}, no subscribers", entry.getKey()));\r
+ }\r
+ }\r
+ }\r
+ logger.info(MessageFormat.format("fanout connection {0} removed", connection.id));\r
+ }\r
+ \r
+ /**\r
+ * Tests to see if the connection is being monitored by the service.\r
+ * \r
+ * @param connection\r
+ * @return true if the service is monitoring the connection\r
+ */\r
+ protected boolean hasConnection(FanoutServiceConnection connection) {\r
+ return connections.containsKey(connection.id);\r
+ }\r
+ \r
+ /**\r
+ * Reply to a connection on the specified channel.\r
+ * \r
+ * @param connection\r
+ * @param channel\r
+ * @param message\r
+ * @return the reply\r
+ */\r
+ protected String reply(FanoutServiceConnection connection, String channel, String message) { \r
+ if (channel != null && channel.length() > 0) {\r
+ increment(totalMessages);\r
+ }\r
+ return connection.reply(channel, message);\r
+ }\r
+ \r
+ /**\r
+ * Service method to broadcast a message to all connections.\r
+ * \r
+ * @param message\r
+ */\r
+ public void broadcastAll(String message) {\r
+ broadcast(connections.values(), FanoutConstants.CH_ALL, message);\r
+ increment(totalAnnouncements);\r
+ }\r
+ \r
+ /**\r
+ * Service method to broadcast a message to connections subscribed to the\r
+ * channel.\r
+ * \r
+ * @param message\r
+ */\r
+ public void broadcast(String channel, String message) {\r
+ List<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>(subscriptions.get(channel));\r
+ broadcast(connections, channel, message);\r
+ increment(totalAnnouncements);\r
+ }\r
+ \r
+ /**\r
+ * Broadcast a message to connections subscribed to the specified channel.\r
+ * \r
+ * @param connections\r
+ * @param channel\r
+ * @param message\r
+ */\r
+ protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {\r
+ for (FanoutServiceConnection connection : connections) {\r
+ reply(connection, channel, message);\r
+ }\r
+ }\r
+ \r
+ /**\r
+ * Process an incoming Fanout request.\r
+ * \r
+ * @param connection\r
+ * @param req\r
+ * @return the reply to the request, may be null\r
+ */\r
+ protected String processRequest(FanoutServiceConnection connection, String req) {\r
+ logger.info(MessageFormat.format("fanout request from {0}: {1}", connection.id, req));\r
+ String[] fields = req.split(" ", 3);\r
+ String action = fields[0];\r
+ String channel = fields.length >= 2 ? fields[1] : null;\r
+ String message = fields.length >= 3 ? fields[2] : null;\r
+ try {\r
+ return processRequest(connection, action, channel, message);\r
+ } catch (IllegalArgumentException e) {\r
+ // invalid action\r
+ logger.error(MessageFormat.format("fanout connection {0} requested invalid action {1}", connection.id, action));\r
+ logger.error(asHexArray(req));\r
+ }\r
+ return null;\r
+ }\r
+ \r
+ /**\r
+ * Process the Fanout request.\r
+ * \r
+ * @param connection\r
+ * @param action\r
+ * @param channel\r
+ * @param message\r
+ * @return the reply to the request, may be null\r
+ * @throws IllegalArgumentException\r
+ */\r
+ protected String processRequest(FanoutServiceConnection connection, String action, String channel, String message) throws IllegalArgumentException {\r
+ if ("ping".equals(action)) {\r
+ // ping\r
+ increment(totalPings);\r
+ return reply(connection, null, "" + System.currentTimeMillis());\r
+ } else if ("info".equals(action)) {\r
+ // info\r
+ String info = getStatistics().info();\r
+ return reply(connection, null, info);\r
+ } else if ("announce".equals(action)) {\r
+ // announcement\r
+ if (!allowAllChannelAnnouncements.get() && FanoutConstants.CH_ALL.equals(channel)) {\r
+ // prohibiting connection-sourced all announcements\r
+ logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on ALL channel", connection.id, message));\r
+ } else if ("debug".equals(channel)) {\r
+ // prohibiting connection-sourced debug announcements\r
+ logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on DEBUG channel", connection.id, message));\r
+ } else {\r
+ // acceptable announcement\r
+ List<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>(subscriptions.get(channel));\r
+ connections.remove(connection); // remove announcer\r
+ broadcast(connections, channel, message);\r
+ increment(totalAnnouncements);\r
+ }\r
+ } else if ("subscribe".equals(action)) {\r
+ // subscribe\r
+ if (!subscriptions.containsKey(channel)) {\r
+ logger.info(MessageFormat.format("fanout new channel {0}", channel));\r
+ subscriptions.put(channel, new ConcurrentSkipListSet<FanoutServiceConnection>());\r
+ }\r
+ subscriptions.get(channel).add(connection);\r
+ logger.debug(MessageFormat.format("fanout connection {0} subscribed to channel {1}", connection.id, channel));\r
+ increment(totalSubscribes);\r
+ } else if ("unsubscribe".equals(action)) {\r
+ // unsubscribe\r
+ if (subscriptions.containsKey(channel)) {\r
+ subscriptions.get(channel).remove(connection);\r
+ if (subscriptions.get(channel).size() == 0) {\r
+ subscriptions.remove(channel);\r
+ }\r
+ increment(totalUnsubscribes);\r
+ }\r
+ } else {\r
+ // invalid action\r
+ throw new IllegalArgumentException(action);\r
+ }\r
+ return null;\r
+ }\r
+ \r
+ private String asHexArray(String req) {\r
+ StringBuilder sb = new StringBuilder();\r
+ for (char c : req.toCharArray()) {\r
+ sb.append(Integer.toHexString(c)).append(' ');\r
+ }\r
+ return "[ " + sb.toString().trim() + " ]";\r
+ }\r
+ \r
+ /**\r
+ * Increment a long and prevent negative rollover.\r
+ * \r
+ * @param counter\r
+ */\r
+ private void increment(AtomicLong counter) {\r
+ long v = counter.incrementAndGet();\r
+ if (v < 0) {\r
+ counter.set(0);\r
+ }\r
+ }\r
+ \r
+ @Override\r
+ public String toString() {\r
+ return name;\r
+ }\r
+}
\ No newline at end of file
--- /dev/null
+/*\r
+ * Copyright 2013 gitblit.com.\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package com.gitblit.fanout;\r
+\r
+import java.io.IOException;\r
+import java.net.Socket;\r
+\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+/**\r
+ * FanoutServiceConnection handles reading/writing messages from a remote fanout\r
+ * connection.\r
+ * \r
+ * @author James Moger\r
+ * \r
+ */\r
+public abstract class FanoutServiceConnection implements Comparable<FanoutServiceConnection> {\r
+ \r
+ private static final Logger logger = LoggerFactory.getLogger(FanoutServiceConnection.class);\r
+ \r
+ public final String id;\r
+\r
+ protected FanoutServiceConnection(Socket socket) {\r
+ this.id = FanoutConstants.getRemoteSocketId(socket);\r
+ }\r
+\r
+ protected abstract void reply(String content) throws IOException;\r
+\r
+ /**\r
+ * Send the connection a debug channel connected message.\r
+ * \r
+ * @param message\r
+ */\r
+ protected void connected() {\r
+ reply(FanoutConstants.CH_DEBUG, FanoutConstants.MSG_CONNECTED);\r
+ }\r
+ \r
+ /**\r
+ * Send the connection a debug channel busy message.\r
+ * \r
+ * @param message\r
+ */\r
+ protected void busy() {\r
+ reply(FanoutConstants.CH_DEBUG, FanoutConstants.MSG_BUSY);\r
+ }\r
+ \r
+ /**\r
+ * Send the connection a message for the specified channel.\r
+ * \r
+ * @param channel\r
+ * @param message\r
+ * @return the reply\r
+ */\r
+ protected String reply(String channel, String message) {\r
+ String content;\r
+ if (channel != null) {\r
+ content = channel + "!" + message;\r
+ } else {\r
+ content = message;\r
+ }\r
+ try {\r
+ reply(content);\r
+ } catch (Exception e) {\r
+ logger.error("failed to reply to fanout connection " + id, e);\r
+ }\r
+ return content;\r
+ }\r
+\r
+ @Override\r
+ public int compareTo(FanoutServiceConnection c) {\r
+ return id.compareTo(c.id);\r
+ }\r
+\r
+ @Override\r
+ public boolean equals(Object o) {\r
+ if (o instanceof FanoutServiceConnection) {\r
+ return id.equals(((FanoutServiceConnection) o).id);\r
+ }\r
+ return false;\r
+ }\r
+\r
+ @Override\r
+ public int hashCode() {\r
+ return id.hashCode();\r
+ }\r
+\r
+ @Override\r
+ public String toString() {\r
+ return id;\r
+ }\r
+}
\ No newline at end of file
--- /dev/null
+/*\r
+ * Copyright 2013 gitblit.com.\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package com.gitblit.fanout;\r
+\r
+import java.io.BufferedInputStream;\r
+import java.io.IOException;\r
+import java.io.OutputStream;\r
+import java.net.InetSocketAddress;\r
+import java.net.ServerSocket;\r
+import java.net.Socket;\r
+import java.net.SocketException;\r
+import java.net.SocketTimeoutException;\r
+import java.text.MessageFormat;\r
+\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+/**\r
+ * A multi-threaded socket implementation of https://github.com/travisghansen/fanout\r
+ *\r
+ * This implementation creates a master acceptor thread which accepts incoming\r
+ * fanout connections and then spawns a daemon thread for each accepted connection.\r
+ * If there are 100 concurrent fanout connections, there are 101 threads.\r
+ * \r
+ * @author James Moger\r
+ *\r
+ */\r
+public class FanoutSocketService extends FanoutService {\r
+\r
+ private final static Logger logger = LoggerFactory.getLogger(FanoutSocketService.class);\r
+\r
+ private volatile ServerSocket serviceSocket;\r
+\r
+ public static void main(String[] args) throws Exception {\r
+ FanoutSocketService pubsub = new FanoutSocketService(null, DEFAULT_PORT);\r
+ pubsub.setStrictRequestTermination(false);\r
+ pubsub.setAllowAllChannelAnnouncements(false);\r
+ pubsub.start();\r
+ }\r
+ \r
+ /**\r
+ * Create a multi-threaded fanout service.\r
+ * \r
+ * @param port\r
+ * the port for running the fanout PubSub service\r
+ * @throws IOException\r
+ */\r
+ public FanoutSocketService(int port) {\r
+ this(null, port);\r
+ }\r
+\r
+ /**\r
+ * Create a multi-threaded fanout service.\r
+ * \r
+ * @param bindInterface\r
+ * the ip address to bind for the service, may be null\r
+ * @param port\r
+ * the port for running the fanout PubSub service\r
+ * @throws IOException\r
+ */\r
+ public FanoutSocketService(String bindInterface, int port) {\r
+ super(bindInterface, port, "Fanout socket service");\r
+ }\r
+ \r
+ @Override\r
+ protected boolean isConnected() {\r
+ return serviceSocket != null;\r
+ }\r
+ \r
+ @Override\r
+ protected boolean connect() {\r
+ if (serviceSocket == null) {\r
+ try {\r
+ serviceSocket = new ServerSocket();\r
+ serviceSocket.setReuseAddress(true);\r
+ serviceSocket.setSoTimeout(serviceTimeout);\r
+ serviceSocket.bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));\r
+ logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}", \r
+ name, host == null ? "0.0.0.0" : host, serviceSocket.getLocalPort()));\r
+ } catch (IOException e) {\r
+ logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}",\r
+ name, host == null ? "0.0.0.0" : host, port), e);\r
+ return false;\r
+ }\r
+ }\r
+ return true;\r
+ }\r
+\r
+ @Override\r
+ protected void disconnect() {\r
+ try {\r
+ if (serviceSocket != null) {\r
+ logger.debug(MessageFormat.format("closing {0} server socket", name));\r
+ serviceSocket.close();\r
+ serviceSocket = null;\r
+ }\r
+ } catch (IOException e) {\r
+ logger.error(MessageFormat.format("failed to disconnect {0}", name), e);\r
+ }\r
+ }\r
+\r
+ /**\r
+ * This accepts incoming fanout connections and spawns connection threads.\r
+ */\r
+ @Override\r
+ protected void listen() throws IOException {\r
+ try {\r
+ Socket socket;\r
+ socket = serviceSocket.accept();\r
+ configureClientSocket(socket);\r
+\r
+ FanoutSocketConnection connection = new FanoutSocketConnection(socket);\r
+\r
+ if (addConnection(connection)) {\r
+ // spawn connection daemon thread\r
+ Thread connectionThread = new Thread(connection);\r
+ connectionThread.setDaemon(true);\r
+ connectionThread.setName("Fanout " + connection.id);\r
+ connectionThread.start();\r
+ } else {\r
+ // synchronously close the connection and remove it\r
+ removeConnection(connection);\r
+ connection.closeConnection();\r
+ connection = null;\r
+ }\r
+ } catch (SocketTimeoutException e) {\r
+ // ignore accept timeout exceptions\r
+ }\r
+ }\r
+ \r
+ /**\r
+ * FanoutSocketConnection handles reading/writing messages from a remote fanout\r
+ * connection.\r
+ * \r
+ * @author James Moger\r
+ *\r
+ */\r
+ class FanoutSocketConnection extends FanoutServiceConnection implements Runnable {\r
+ Socket socket;\r
+ \r
+ FanoutSocketConnection(Socket socket) {\r
+ super(socket);\r
+ this.socket = socket;\r
+ }\r
+\r
+ /**\r
+ * Connection thread read/write method.\r
+ */\r
+ @Override\r
+ public void run() {\r
+ try {\r
+ StringBuilder sb = new StringBuilder();\r
+ BufferedInputStream is = new BufferedInputStream(socket.getInputStream());\r
+ byte[] buffer = new byte[FanoutConstants.BUFFER_LENGTH];\r
+ int len = 0;\r
+ while (true) {\r
+ while (is.available() > 0) {\r
+ len = is.read(buffer);\r
+ for (int i = 0; i < len; i++) {\r
+ byte b = buffer[i];\r
+ if (b == 0xa || (!isStrictRequestTermination() && b == 0xd)) {\r
+ String req = sb.toString();\r
+ sb.setLength(0);\r
+ if (req.length() > 0) {\r
+ // ignore empty request strings\r
+ processRequest(this, req);\r
+ }\r
+ } else {\r
+ sb.append((char) b);\r
+ }\r
+ }\r
+ }\r
+\r
+ if (!isRunning.get()) {\r
+ // service has stopped, terminate client connection\r
+ break;\r
+ } else {\r
+ Thread.sleep(500);\r
+ }\r
+ }\r
+ } catch (Throwable t) {\r
+ if (t instanceof SocketException) {\r
+ logger.error(MessageFormat.format("fanout connection {0}: {1}", id, t.getMessage()));\r
+ } else if (t instanceof SocketTimeoutException) {\r
+ logger.error(MessageFormat.format("fanout connection {0}: {1}", id, t.getMessage()));\r
+ } else {\r
+ logger.error(MessageFormat.format("exception while handling fanout connection {0}", id), t);\r
+ }\r
+ } finally {\r
+ closeConnection();\r
+ }\r
+\r
+ logger.info(MessageFormat.format("thread for fanout connection {0} is finished", id));\r
+ }\r
+ \r
+ @Override\r
+ protected void reply(String content) throws IOException {\r
+ // synchronously send reply\r
+ logger.debug(MessageFormat.format("fanout reply to {0}: {1}", id, content));\r
+ OutputStream os = socket.getOutputStream();\r
+ byte [] bytes = content.getBytes(FanoutConstants.CHARSET);\r
+ os.write(bytes);\r
+ if (bytes[bytes.length - 1] != 0xa) {\r
+ os.write(0xa);\r
+ }\r
+ os.flush();\r
+ }\r
+ \r
+ protected void closeConnection() {\r
+ // close the connection socket\r
+ try {\r
+ socket.close();\r
+ } catch (IOException e) {\r
+ }\r
+ socket = null;\r
+ \r
+ // remove this connection from the service\r
+ removeConnection(this);\r
+ }\r
+ }\r
+}
\ No newline at end of file
--- /dev/null
+/*\r
+ * Copyright 2013 gitblit.com.\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package com.gitblit.fanout;\r
+\r
+import java.io.Serializable;\r
+import java.text.MessageFormat;\r
+import java.util.Date;\r
+\r
+/**\r
+ * Encapsulates the runtime stats of a fanout service.\r
+ * \r
+ * @author James Moger\r
+ *\r
+ */\r
+public class FanoutStats implements Serializable {\r
+\r
+ private static final long serialVersionUID = 1L;\r
+ \r
+ public long concurrentConnectionLimit;\r
+ public boolean allowAllChannelAnnouncements;\r
+ public boolean strictRequestTermination;\r
+ \r
+ public Date bootDate;\r
+ public long rejectedConnectionCount;\r
+ public int peakConnectionCount;\r
+ public long currentChannels;\r
+ public long currentSubscriptions;\r
+ public long currentConnections;\r
+ public long totalConnections;\r
+ public long totalAnnouncements;\r
+ public long totalMessages;\r
+ public long totalSubscribes;\r
+ public long totalUnsubscribes;\r
+ public long totalPings;\r
+ \r
+ public String info() {\r
+ int i = 0;\r
+ StringBuilder sb = new StringBuilder();\r
+ sb.append(infoStr(i++, "boot date"));\r
+ sb.append(infoStr(i++, "strict request termination"));\r
+ sb.append(infoStr(i++, "allow connection \"all\" announcements"));\r
+ sb.append(infoInt(i++, "concurrent connection limit"));\r
+ sb.append(infoInt(i++, "concurrent limit rejected connections"));\r
+ sb.append(infoInt(i++, "peak connections"));\r
+ sb.append(infoInt(i++, "current connections"));\r
+ sb.append(infoInt(i++, "current channels"));\r
+ sb.append(infoInt(i++, "current subscriptions"));\r
+ sb.append(infoInt(i++, "user-requested subscriptions"));\r
+ sb.append(infoInt(i++, "total connections"));\r
+ sb.append(infoInt(i++, "total announcements"));\r
+ sb.append(infoInt(i++, "total messages"));\r
+ sb.append(infoInt(i++, "total subscribes"));\r
+ sb.append(infoInt(i++, "total unsubscribes"));\r
+ sb.append(infoInt(i++, "total pings"));\r
+ String template = sb.toString();\r
+\r
+ String info = MessageFormat.format(template, \r
+ bootDate.toString(),\r
+ Boolean.toString(strictRequestTermination),\r
+ Boolean.toString(allowAllChannelAnnouncements),\r
+ concurrentConnectionLimit,\r
+ rejectedConnectionCount,\r
+ peakConnectionCount,\r
+ currentConnections, \r
+ currentChannels,\r
+ currentSubscriptions,\r
+ currentSubscriptions == 0 ? 0 : (currentSubscriptions - currentConnections),\r
+ totalConnections,\r
+ totalAnnouncements,\r
+ totalMessages,\r
+ totalSubscribes,\r
+ totalUnsubscribes,\r
+ totalPings);\r
+ return info;\r
+ }\r
+ \r
+ private String infoStr(int index, String label) {\r
+ return label + ": {" + index + "}\n";\r
+ }\r
+ \r
+ private String infoInt(int index, String label) {\r
+ return label + ": {" + index + ",number,0}\n";\r
+ }\r
+\r
+}\r
--- /dev/null
+/*\r
+ * Copyright 2013 gitblit.com.\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package com.gitblit.tests;\r
+\r
+import static org.junit.Assert.assertEquals;\r
+\r
+import java.text.MessageFormat;\r
+import java.util.Date;\r
+import java.util.Map;\r
+import java.util.concurrent.ConcurrentHashMap;\r
+import java.util.concurrent.atomic.AtomicInteger;\r
+\r
+import org.junit.Test;\r
+\r
+import com.gitblit.fanout.FanoutService;\r
+import com.gitblit.fanout.FanoutClient;\r
+import com.gitblit.fanout.FanoutClient.FanoutAdapter;\r
+import com.gitblit.fanout.FanoutNioService;\r
+import com.gitblit.fanout.FanoutService;\r
+import com.gitblit.fanout.FanoutSocketService;\r
+\r
+public class FanoutServiceTest {\r
+ \r
+ int fanoutPort = FanoutService.DEFAULT_PORT;\r
+ \r
+ @Test\r
+ public void testNioPubSub() throws Exception {\r
+ testPubSub(new FanoutNioService(fanoutPort));\r
+ }\r
+\r
+ @Test\r
+ public void testSocketPubSub() throws Exception {\r
+ testPubSub(new FanoutSocketService(fanoutPort));\r
+ }\r
+ \r
+ @Test\r
+ public void testNioDisruptionAndRecovery() throws Exception {\r
+ testDisruption(new FanoutNioService(fanoutPort));\r
+ }\r
+\r
+ @Test\r
+ public void testSocketDisruptionAndRecovery() throws Exception {\r
+ testDisruption(new FanoutSocketService(fanoutPort));\r
+ }\r
+ \r
+ protected void testPubSub(FanoutService service) throws Exception {\r
+ System.out.println(MessageFormat.format("\n\n========================================\nPUBSUB TEST {0}\n========================================\n\n", service.toString()));\r
+ service.startSynchronously();\r
+ \r
+ final Map<String, String> announcementsA = new ConcurrentHashMap<String, String>();\r
+ FanoutClient clientA = new FanoutClient("localhost", fanoutPort);\r
+ clientA.addListener(new FanoutAdapter() {\r
+ \r
+ @Override\r
+ public void announcement(String channel, String message) {\r
+ announcementsA.put(channel, message);\r
+ }\r
+ });\r
+ \r
+ clientA.startSynchronously();\r
+\r
+ final Map<String, String> announcementsB = new ConcurrentHashMap<String, String>();\r
+ FanoutClient clientB = new FanoutClient("localhost", fanoutPort);\r
+ clientB.addListener(new FanoutAdapter() {\r
+ @Override\r
+ public void announcement(String channel, String message) {\r
+ announcementsB.put(channel, message);\r
+ }\r
+ });\r
+ clientB.startSynchronously();\r
+\r
+ \r
+ // subscribe clients A and B to the channels\r
+ clientA.subscribe("a");\r
+ clientA.subscribe("b");\r
+ clientA.subscribe("c");\r
+ \r
+ clientB.subscribe("a");\r
+ clientB.subscribe("b");\r
+ clientB.subscribe("c");\r
+ \r
+ // give async messages a chance to be delivered\r
+ Thread.sleep(1000);\r
+ \r
+ clientA.announce("a", "apple");\r
+ clientA.announce("b", "banana");\r
+ clientA.announce("c", "cantelope");\r
+ \r
+ clientB.announce("a", "avocado");\r
+ clientB.announce("b", "beet");\r
+ clientB.announce("c", "carrot");\r
+\r
+ // give async messages a chance to be delivered\r
+ Thread.sleep(2000);\r
+\r
+ // confirm that client B received client A's announcements\r
+ assertEquals("apple", announcementsB.get("a"));\r
+ assertEquals("banana", announcementsB.get("b"));\r
+ assertEquals("cantelope", announcementsB.get("c"));\r
+\r
+ // confirm that client A received client B's announcements\r
+ assertEquals("avocado", announcementsA.get("a"));\r
+ assertEquals("beet", announcementsA.get("b"));\r
+ assertEquals("carrot", announcementsA.get("c"));\r
+ \r
+ clientA.stop();\r
+ clientB.stop();\r
+ service.stop(); \r
+ }\r
+ \r
+ protected void testDisruption(FanoutService service) throws Exception {\r
+ System.out.println(MessageFormat.format("\n\n========================================\nDISRUPTION TEST {0}\n========================================\n\n", service.toString()));\r
+ service.startSynchronously();\r
+ \r
+ final AtomicInteger pongCount = new AtomicInteger(0);\r
+ FanoutClient client = new FanoutClient("localhost", fanoutPort);\r
+ client.addListener(new FanoutAdapter() {\r
+ @Override\r
+ public void pong(Date timestamp) {\r
+ pongCount.incrementAndGet();\r
+ }\r
+ });\r
+ client.startSynchronously();\r
+ \r
+ // ping and wait for pong\r
+ client.ping(); \r
+ Thread.sleep(500);\r
+ \r
+ // restart client\r
+ client.stop();\r
+ Thread.sleep(1000);\r
+ client.startSynchronously(); \r
+ \r
+ // ping and wait for pong\r
+ client.ping(); \r
+ Thread.sleep(500);\r
+ \r
+ assertEquals(2, pongCount.get());\r
+ \r
+ // now disrupt service\r
+ service.stop(); \r
+ Thread.sleep(2000);\r
+ service.startSynchronously();\r
+ \r
+ // wait for reconnect\r
+ Thread.sleep(2000);\r
+\r
+ // ping and wait for pong\r
+ client.ping();\r
+ Thread.sleep(500);\r
+\r
+ // kill all\r
+ client.stop();\r
+ service.stop();\r
+ \r
+ // confirm expected pong count\r
+ assertEquals(3, pongCount.get());\r
+ }\r
+}
\ No newline at end of file
MarkdownUtilsTest.class, JGitUtilsTest.class, SyndicationUtilsTest.class,\r
DiffUtilsTest.class, MetricUtilsTest.class, TicgitUtilsTest.class, X509UtilsTest.class,\r
GitBlitTest.class, FederationTests.class, RpcTests.class, GitServletTest.class,\r
- GroovyScriptTest.class, LuceneExecutorTest.class, IssuesTest.class, RepositoryModelTest.class })\r
+ GroovyScriptTest.class, LuceneExecutorTest.class, IssuesTest.class, RepositoryModelTest.class,\r
+ FanoutServiceTest.class })\r
public class GitBlitSuite {\r
\r
public static final File REPOSITORIES = new File("git");\r