summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorJames Moger <james.moger@gitblit.com>2013-01-11 23:50:59 -0500
committerJames Moger <james.moger@gitblit.com>2013-01-11 23:50:59 -0500
commit5316d20e861640867d10405b25cfe75aeca0a34c (patch)
treeeeb9cd87d77171c958c6df55db104a6bac5918f7 /src
parent1f82620a088efa2ba3254df1805e229266690673 (diff)
downloadgitblit-5316d20e861640867d10405b25cfe75aeca0a34c.tar.gz
gitblit-5316d20e861640867d10405b25cfe75aeca0a34c.zip
Fanout service for Sparkleshare clients
Diffstat (limited to 'src')
-rw-r--r--src/com/gitblit/GitBlit.java34
-rw-r--r--src/com/gitblit/fanout/FanoutClient.java413
-rw-r--r--src/com/gitblit/fanout/FanoutConstants.java36
-rw-r--r--src/com/gitblit/fanout/FanoutNioService.java332
-rw-r--r--src/com/gitblit/fanout/FanoutService.java563
-rw-r--r--src/com/gitblit/fanout/FanoutServiceConnection.java105
-rw-r--r--src/com/gitblit/fanout/FanoutSocketService.java234
-rw-r--r--src/com/gitblit/fanout/FanoutStats.java98
8 files changed, 1815 insertions, 0 deletions
diff --git a/src/com/gitblit/GitBlit.java b/src/com/gitblit/GitBlit.java
index 3eb246b8..489ba63c 100644
--- a/src/com/gitblit/GitBlit.java
+++ b/src/com/gitblit/GitBlit.java
@@ -85,6 +85,9 @@ import com.gitblit.Constants.FederationStrategy;
import com.gitblit.Constants.FederationToken;
import com.gitblit.Constants.PermissionType;
import com.gitblit.Constants.RegistrantType;
+import com.gitblit.fanout.FanoutNioService;
+import com.gitblit.fanout.FanoutService;
+import com.gitblit.fanout.FanoutSocketService;
import com.gitblit.models.FederationModel;
import com.gitblit.models.FederationProposal;
import com.gitblit.models.FederationSet;
@@ -180,6 +183,8 @@ public class GitBlit implements ServletContextListener {
private TimeZone timezone;
private FileBasedConfig projectConfigs;
+
+ private FanoutService fanoutService;
public GitBlit() {
if (gitblit == null) {
@@ -3133,6 +3138,32 @@ public class GitBlit implements ServletContextListener {
}
ContainerUtils.CVE_2007_0450.test();
+
+ // startup Fanout PubSub service
+ if (settings.getInteger(Keys.fanout.port, 0) > 0) {
+ String bindInterface = settings.getString(Keys.fanout.bindInterface, null);
+ int port = settings.getInteger(Keys.fanout.port, FanoutService.DEFAULT_PORT);
+ boolean useNio = settings.getBoolean(Keys.fanout.useNio, true);
+ int limit = settings.getInteger(Keys.fanout.connectionLimit, 0);
+
+ if (useNio) {
+ if (StringUtils.isEmpty(bindInterface)) {
+ fanoutService = new FanoutNioService(port);
+ } else {
+ fanoutService = new FanoutNioService(bindInterface, port);
+ }
+ } else {
+ if (StringUtils.isEmpty(bindInterface)) {
+ fanoutService = new FanoutSocketService(port);
+ } else {
+ fanoutService = new FanoutSocketService(bindInterface, port);
+ }
+ }
+
+ fanoutService.setConcurrentConnectionLimit(limit);
+ fanoutService.setAllowAllChannelAnnouncements(false);
+ fanoutService.start();
+ }
}
private void logTimezone(String type, TimeZone zone) {
@@ -3206,6 +3237,9 @@ public class GitBlit implements ServletContextListener {
scheduledExecutor.shutdownNow();
luceneExecutor.close();
gcExecutor.close();
+ if (fanoutService != null) {
+ fanoutService.stop();
+ }
}
/**
diff --git a/src/com/gitblit/fanout/FanoutClient.java b/src/com/gitblit/fanout/FanoutClient.java
new file mode 100644
index 00000000..b9ace4be
--- /dev/null
+++ b/src/com/gitblit/fanout/FanoutClient.java
@@ -0,0 +1,413 @@
+/*
+ * Copyright 2013 gitblit.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gitblit.fanout;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Fanout client class.
+ *
+ * @author James Moger
+ *
+ */
+public class FanoutClient implements Runnable {
+
+ private final static Logger logger = LoggerFactory.getLogger(FanoutClient.class);
+
+ private final int clientTimeout = 500;
+ private final int reconnectTimeout = 2000;
+ private final String host;
+ private final int port;
+ private final List<FanoutListener> listeners;
+
+ private String id;
+ private volatile Selector selector;
+ private volatile SocketChannel socketCh;
+ private Thread clientThread;
+
+ private final AtomicBoolean isConnected;
+ private final AtomicBoolean isRunning;
+ private final AtomicBoolean isAutomaticReconnect;
+ private final ByteBuffer writeBuffer;
+ private final ByteBuffer readBuffer;
+ private final CharsetDecoder decoder;
+
+ private final Set<String> subscriptions;
+ private boolean resubscribe;
+
+ public interface FanoutListener {
+ public void pong(Date timestamp);
+ public void announcement(String channel, String message);
+ }
+
+ public static class FanoutAdapter implements FanoutListener {
+ public void pong(Date timestamp) { }
+ public void announcement(String channel, String message) { }
+ }
+
+ public static void main(String args[]) throws Exception {
+ FanoutClient client = new FanoutClient("localhost", 2000);
+ client.addListener(new FanoutAdapter() {
+
+ @Override
+ public void pong(Date timestamp) {
+ System.out.println("Pong. " + timestamp);
+ }
+
+ @Override
+ public void announcement(String channel, String message) {
+ System.out.println(MessageFormat.format("Here ye, Here ye. {0} says {1}", channel, message));
+ }
+ });
+ client.start();
+
+ Thread.sleep(5000);
+ client.ping();
+ client.subscribe("james");
+ client.announce("james", "12345");
+ client.subscribe("c52f99d16eb5627877ae957df7ce1be102783bd5");
+
+ while (true) {
+ Thread.sleep(10000);
+ client.ping();
+ }
+ }
+
+ public FanoutClient(String host, int port) {
+ this.host = host;
+ this.port = port;
+ readBuffer = ByteBuffer.allocateDirect(FanoutConstants.BUFFER_LENGTH);
+ writeBuffer = ByteBuffer.allocateDirect(FanoutConstants.BUFFER_LENGTH);
+ decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder();
+ listeners = Collections.synchronizedList(new ArrayList<FanoutListener>());
+ subscriptions = new LinkedHashSet<String>();
+ isRunning = new AtomicBoolean(false);
+ isConnected = new AtomicBoolean(false);
+ isAutomaticReconnect = new AtomicBoolean(true);
+ }
+
+ public void addListener(FanoutListener listener) {
+ listeners.add(listener);
+ }
+
+ public void removeListener(FanoutListener listener) {
+ listeners.remove(listener);
+ }
+
+ public boolean isAutomaticReconnect() {
+ return isAutomaticReconnect.get();
+ }
+
+ public void setAutomaticReconnect(boolean value) {
+ isAutomaticReconnect.set(value);
+ }
+
+ public void ping() {
+ confirmConnection();
+ write("ping");
+ }
+
+ public void status() {
+ confirmConnection();
+ write("status");
+ }
+
+ public void subscribe(String channel) {
+ confirmConnection();
+ if (subscriptions.add(channel)) {
+ write("subscribe " + channel);
+ }
+ }
+
+ public void unsubscribe(String channel) {
+ confirmConnection();
+ if (subscriptions.remove(channel)) {
+ write("unsubscribe " + channel);
+ }
+ }
+
+ public void announce(String channel, String message) {
+ confirmConnection();
+ write("announce " + channel + " " + message);
+ }
+
+ private void confirmConnection() {
+ if (!isConnected()) {
+ throw new RuntimeException("Fanout client is disconnected!");
+ }
+ }
+
+ public boolean isConnected() {
+ return isRunning.get() && socketCh != null && isConnected.get();
+ }
+
+ /**
+ * Start client connection and return immediately.
+ */
+ public void start() {
+ if (isRunning.get()) {
+ logger.warn("Fanout client is already running");
+ return;
+ }
+ clientThread = new Thread(this, "Fanout client");
+ clientThread.start();
+ }
+
+ /**
+ * Start client connection and wait until it has connected.
+ */
+ public void startSynchronously() {
+ start();
+ while (!isConnected()) {
+ try {
+ Thread.sleep(100);
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ /**
+ * Stops client connection. This method returns when the connection has
+ * been completely shutdown.
+ */
+ public void stop() {
+ if (!isRunning.get()) {
+ logger.warn("Fanout client is not running");
+ return;
+ }
+ isRunning.set(false);
+ try {
+ if (clientThread != null) {
+ clientThread.join();
+ clientThread = null;
+ }
+ } catch (InterruptedException e1) {
+ }
+ }
+
+ @Override
+ public void run() {
+ resetState();
+
+ isRunning.set(true);
+ while (isRunning.get()) {
+ // (re)connect
+ if (socketCh == null) {
+ try {
+ InetAddress addr = InetAddress.getByName(host);
+ socketCh = SocketChannel.open(new InetSocketAddress(addr, port));
+ socketCh.configureBlocking(false);
+ selector = Selector.open();
+ id = FanoutConstants.getLocalSocketId(socketCh.socket());
+ socketCh.register(selector, SelectionKey.OP_READ);
+ } catch (Exception e) {
+ logger.error(MessageFormat.format("failed to open client connection to {0}:{1,number,0}", host, port), e);
+ try {
+ Thread.sleep(reconnectTimeout);
+ } catch (InterruptedException x) {
+ }
+ continue;
+ }
+ }
+
+ // read/write
+ try {
+ selector.select(clientTimeout);
+
+ Iterator<SelectionKey> i = selector.selectedKeys().iterator();
+ while (i.hasNext()) {
+ SelectionKey key = i.next();
+ i.remove();
+
+ if (key.isReadable()) {
+ // read message
+ String content = read();
+ String[] lines = content.split("\n");
+ for (String reply : lines) {
+ logger.trace(MessageFormat.format("fanout client {0} received: {1}", id, reply));
+ if (!processReply(reply)) {
+ logger.error(MessageFormat.format("fanout client {0} received unknown message", id));
+ }
+ }
+ } else if (key.isWritable()) {
+ // resubscribe
+ if (resubscribe) {
+ resubscribe = false;
+ logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size()));
+ for (String subscription : subscriptions) {
+ write("subscribe " + subscription);
+ }
+ }
+ socketCh.register(selector, SelectionKey.OP_READ);
+ }
+ }
+ } catch (IOException e) {
+ logger.error(MessageFormat.format("fanout client {0} error: {1}", id, e.getMessage()));
+ closeChannel();
+ if (!isAutomaticReconnect.get()) {
+ isRunning.set(false);
+ continue;
+ }
+ }
+ }
+
+ closeChannel();
+ resetState();
+ }
+
+ protected void resetState() {
+ readBuffer.clear();
+ writeBuffer.clear();
+ isRunning.set(false);
+ isConnected.set(false);
+ }
+
+ private void closeChannel() {
+ try {
+ if (socketCh != null) {
+ socketCh.close();
+ socketCh = null;
+ selector.close();
+ selector = null;
+ isConnected.set(false);
+ }
+ } catch (IOException x) {
+ }
+ }
+
+ protected boolean processReply(String reply) {
+ String[] fields = reply.split("!", 2);
+ if (fields.length == 1) {
+ try {
+ long time = Long.parseLong(fields[0]);
+ Date date = new Date(time);
+ firePong(date);
+ } catch (Exception e) {
+ }
+ return true;
+ } else if (fields.length == 2) {
+ String channel = fields[0];
+ String message = fields[1];
+ if (FanoutConstants.CH_DEBUG.equals(channel)) {
+ // debug messages are for internal use
+ if (FanoutConstants.MSG_CONNECTED.equals(message)) {
+ isConnected.set(true);
+ resubscribe = subscriptions.size() > 0;
+ if (resubscribe) {
+ try {
+ // register for async resubscribe
+ socketCh.register(selector, SelectionKey.OP_WRITE);
+ } catch (Exception e) {
+ logger.error("an error occurred", e);
+ }
+ }
+ }
+ logger.debug(MessageFormat.format("fanout client {0} < {1}", id, reply));
+ } else {
+ fireAnnouncement(channel, message);
+ }
+ return true;
+ } else {
+ // unknown message
+ return false;
+ }
+ }
+
+ protected void firePong(Date timestamp) {
+ logger.info(MessageFormat.format("fanout client {0} < pong {1,date,yyyy-MM-dd HH:mm:ss}", id, timestamp));
+ for (FanoutListener listener : listeners) {
+ try {
+ listener.pong(timestamp);
+ } catch (Throwable t) {
+ logger.error("FanoutListener threw an exception!", t);
+ }
+ }
+ }
+ protected void fireAnnouncement(String channel, String message) {
+ logger.info(MessageFormat.format("fanout client {0} < announcement {1} {2}", id, channel, message));
+ for (FanoutListener listener : listeners) {
+ try {
+ listener.announcement(channel, message);
+ } catch (Throwable t) {
+ logger.error("FanoutListener threw an exception!", t);
+ }
+ }
+ }
+
+ protected synchronized String read() throws IOException {
+ readBuffer.clear();
+ long len = socketCh.read(readBuffer);
+
+ if (len == -1) {
+ logger.error(MessageFormat.format("fanout client {0} lost connection to {1}:{2,number,0}, end of stream", id, host, port));
+ socketCh.close();
+ return null;
+ } else {
+ readBuffer.flip();
+ String content = decoder.decode(readBuffer).toString();
+ readBuffer.clear();
+ return content;
+ }
+ }
+
+ protected synchronized boolean write(String message) {
+ try {
+ logger.info(MessageFormat.format("fanout client {0} > {1}", id, message));
+ byte [] bytes = message.getBytes(FanoutConstants.CHARSET);
+ writeBuffer.clear();
+ writeBuffer.put(bytes);
+ if (bytes[bytes.length - 1] != 0xa) {
+ writeBuffer.put((byte) 0xa);
+ }
+ writeBuffer.flip();
+
+ // loop until write buffer has been completely sent
+ long written = 0;
+ long toWrite = writeBuffer.remaining();
+ while (written != toWrite) {
+ written += socketCh.write(writeBuffer);
+ try {
+ Thread.sleep(10);
+ } catch (Exception x) {
+ }
+ }
+ return true;
+ } catch (IOException e) {
+ logger.error("fanout client {0} error: {1}", id, e.getMessage());
+ }
+ return false;
+ }
+} \ No newline at end of file
diff --git a/src/com/gitblit/fanout/FanoutConstants.java b/src/com/gitblit/fanout/FanoutConstants.java
new file mode 100644
index 00000000..6e6964c9
--- /dev/null
+++ b/src/com/gitblit/fanout/FanoutConstants.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2013 gitblit.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gitblit.fanout;
+
+import java.net.Socket;
+
+public class FanoutConstants {
+
+ public final static String CHARSET = "ISO-8859-1";
+ public final static int BUFFER_LENGTH = 512;
+ public final static String CH_ALL = "all";
+ public final static String CH_DEBUG = "debug";
+ public final static String MSG_CONNECTED = "connected...";
+ public final static String MSG_BUSY = "busy";
+
+ public static String getRemoteSocketId(Socket socket) {
+ return socket.getInetAddress().getHostAddress() + ":" + socket.getPort();
+ }
+
+ public static String getLocalSocketId(Socket socket) {
+ return socket.getInetAddress().getHostAddress() + ":" + socket.getLocalPort();
+ }
+}
diff --git a/src/com/gitblit/fanout/FanoutNioService.java b/src/com/gitblit/fanout/FanoutNioService.java
new file mode 100644
index 00000000..65d022ab
--- /dev/null
+++ b/src/com/gitblit/fanout/FanoutNioService.java
@@ -0,0 +1,332 @@
+/*
+ * Copyright 2013 gitblit.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gitblit.fanout;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A single-thread NIO implementation of https://github.com/travisghansen/fanout
+ *
+ * This implementation uses channels and selectors, which are the Java analog of
+ * the Linux epoll mechanism used in the original fanout C code.
+ *
+ * @author James Moger
+ *
+ */
+public class FanoutNioService extends FanoutService {
+
+ private final static Logger logger = LoggerFactory.getLogger(FanoutNioService.class);
+
+ private volatile ServerSocketChannel serviceCh;
+ private volatile Selector selector;
+
+ public static void main(String[] args) throws Exception {
+ FanoutNioService pubsub = new FanoutNioService(null, DEFAULT_PORT);
+ pubsub.setStrictRequestTermination(false);
+ pubsub.setAllowAllChannelAnnouncements(false);
+ pubsub.start();
+ }
+
+ /**
+ * Create a single-threaded fanout service.
+ *
+ * @param host
+ * @param port
+ * the port for running the fanout PubSub service
+ * @throws IOException
+ */
+ public FanoutNioService(int port) {
+ this(null, port);
+ }
+
+ /**
+ * Create a single-threaded fanout service.
+ *
+ * @param bindInterface
+ * the ip address to bind for the service, may be null
+ * @param port
+ * the port for running the fanout PubSub service
+ * @throws IOException
+ */
+ public FanoutNioService(String bindInterface, int port) {
+ super(bindInterface, port, "Fanout nio service");
+ }
+
+ @Override
+ protected boolean isConnected() {
+ return serviceCh != null;
+ }
+
+ @Override
+ protected boolean connect() {
+ if (serviceCh == null) {
+ try {
+ serviceCh = ServerSocketChannel.open();
+ serviceCh.configureBlocking(false);
+ serviceCh.socket().setReuseAddress(true);
+ serviceCh.socket().bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));
+ selector = Selector.open();
+ serviceCh.register(selector, SelectionKey.OP_ACCEPT);
+ logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}",
+ name, host == null ? "0.0.0.0" : host, port));
+ } catch (IOException e) {
+ logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}",
+ name, name, host == null ? "0.0.0.0" : host, port), e);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ protected void disconnect() {
+ try {
+ if (serviceCh != null) {
+ // close all active client connections
+ Map<String, SocketChannel> clients = getCurrentClientSockets();
+ for (Map.Entry<String, SocketChannel> client : clients.entrySet()) {
+ closeClientSocket(client.getKey(), client.getValue());
+ }
+
+ // close service socket channel
+ logger.debug(MessageFormat.format("closing {0} socket channel", name));
+ serviceCh.socket().close();
+ serviceCh.close();
+ serviceCh = null;
+ selector.close();
+ selector = null;
+ }
+ } catch (IOException e) {
+ logger.error(MessageFormat.format("failed to disconnect {0}", name), e);
+ }
+ }
+
+ @Override
+ protected void listen() throws IOException {
+ while (selector.select(serviceTimeout) > 0) {
+ Set<SelectionKey> keys = selector.selectedKeys();
+ Iterator<SelectionKey> keyItr = keys.iterator();
+ while (keyItr.hasNext()) {
+ SelectionKey key = (SelectionKey) keyItr.next();
+ if (key.isAcceptable()) {
+ // new fanout client connection
+ ServerSocketChannel sch = (ServerSocketChannel) key.channel();
+ try {
+ SocketChannel ch = sch.accept();
+ ch.configureBlocking(false);
+ configureClientSocket(ch.socket());
+
+ FanoutNioConnection connection = new FanoutNioConnection(ch);
+ addConnection(connection);
+
+ // register to send the queued message
+ ch.register(selector, SelectionKey.OP_WRITE, connection);
+ } catch (IOException e) {
+ logger.error("error accepting fanout connection", e);
+ }
+ } else if (key.isReadable()) {
+ // read fanout client request
+ SocketChannel ch = (SocketChannel) key.channel();
+ FanoutNioConnection connection = (FanoutNioConnection) key.attachment();
+ try {
+ connection.read(ch, isStrictRequestTermination());
+ int replies = 0;
+ Iterator<String> reqItr = connection.requestQueue.iterator();
+ while (reqItr.hasNext()) {
+ String req = reqItr.next();
+ String reply = processRequest(connection, req);
+ reqItr.remove();
+ if (reply != null) {
+ replies++;
+ }
+ }
+
+ if (replies > 0) {
+ // register to send the replies to requests
+ ch.register(selector, SelectionKey.OP_WRITE, connection);
+ } else {
+ // re-register for next read
+ ch.register(selector, SelectionKey.OP_READ, connection);
+ }
+ } catch (IOException e) {
+ logger.error(MessageFormat.format("fanout connection {0} error: {1}", connection.id, e.getMessage()));
+ removeConnection(connection);
+ closeClientSocket(connection.id, ch);
+ }
+ } else if (key.isWritable()) {
+ // asynchronous reply to fanout client request
+ SocketChannel ch = (SocketChannel) key.channel();
+ FanoutNioConnection connection = (FanoutNioConnection) key.attachment();
+ try {
+ connection.write(ch);
+
+ if (hasConnection(connection)) {
+ // register for next read
+ ch.register(selector, SelectionKey.OP_READ, connection);
+ } else {
+ // Connection was rejected due to load or
+ // some other reason. Close it.
+ closeClientSocket(connection.id, ch);
+ }
+ } catch (IOException e) {
+ logger.error(MessageFormat.format("fanout connection {0}: {1}", connection.id, e.getMessage()));
+ removeConnection(connection);
+ closeClientSocket(connection.id, ch);
+ }
+ }
+ keyItr.remove();
+ }
+ }
+ }
+
+ protected void closeClientSocket(String id, SocketChannel ch) {
+ try {
+ ch.close();
+ } catch (IOException e) {
+ logger.error(MessageFormat.format("fanout connection {0}", id), e);
+ }
+ }
+
+ protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {
+ super.broadcast(connections, channel, message);
+
+ // register queued write
+ Map<String, SocketChannel> sockets = getCurrentClientSockets();
+ for (FanoutServiceConnection connection : connections) {
+ SocketChannel ch = sockets.get(connection.id);
+ if (ch == null) {
+ logger.warn(MessageFormat.format("fanout connection {0} has been disconnected", connection.id));
+ removeConnection(connection);
+ continue;
+ }
+ try {
+ ch.register(selector, SelectionKey.OP_WRITE, connection);
+ } catch (IOException e) {
+ logger.error(MessageFormat.format("failed to register write op for fanout connection {0}", connection.id));
+ }
+ }
+ }
+
+ protected Map<String, SocketChannel> getCurrentClientSockets() {
+ Map<String, SocketChannel> sockets = new HashMap<String, SocketChannel>();
+ for (SelectionKey key : selector.keys()) {
+ if (key.channel() instanceof SocketChannel) {
+ SocketChannel ch = (SocketChannel) key.channel();
+ String id = FanoutConstants.getRemoteSocketId(ch.socket());
+ sockets.put(id, ch);
+ }
+ }
+ return sockets;
+ }
+
+ /**
+ * FanoutNioConnection handles reading/writing messages from a remote fanout
+ * connection.
+ *
+ * @author James Moger
+ *
+ */
+ static class FanoutNioConnection extends FanoutServiceConnection {
+ final ByteBuffer readBuffer;
+ final ByteBuffer writeBuffer;
+ final List<String> requestQueue;
+ final List<String> replyQueue;
+ final CharsetDecoder decoder;
+
+ FanoutNioConnection(SocketChannel ch) {
+ super(ch.socket());
+ readBuffer = ByteBuffer.allocate(FanoutConstants.BUFFER_LENGTH);
+ writeBuffer = ByteBuffer.allocate(FanoutConstants.BUFFER_LENGTH);
+ requestQueue = new ArrayList<String>();
+ replyQueue = new ArrayList<String>();
+ decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder();
+ }
+
+ protected void read(SocketChannel ch, boolean strictRequestTermination) throws CharacterCodingException, IOException {
+ long bytesRead = 0;
+ readBuffer.clear();
+ bytesRead = ch.read(readBuffer);
+ readBuffer.flip();
+ if (bytesRead == -1) {
+ throw new IOException("lost client connection, end of stream");
+ }
+ if (readBuffer.limit() == 0) {
+ return;
+ }
+ CharBuffer cbuf = decoder.decode(readBuffer);
+ String req = cbuf.toString();
+ String [] lines = req.split(strictRequestTermination ? "\n" : "\n|\r");
+ requestQueue.addAll(Arrays.asList(lines));
+ }
+
+ protected void write(SocketChannel ch) throws IOException {
+ Iterator<String> itr = replyQueue.iterator();
+ while (itr.hasNext()) {
+ String reply = itr.next();
+ writeBuffer.clear();
+ logger.debug(MessageFormat.format("fanout reply to {0}: {1}", id, reply));
+ byte [] bytes = reply.getBytes(FanoutConstants.CHARSET);
+ writeBuffer.put(bytes);
+ if (bytes[bytes.length - 1] != 0xa) {
+ writeBuffer.put((byte) 0xa);
+ }
+ writeBuffer.flip();
+
+ // loop until write buffer has been completely sent
+ int written = 0;
+ int toWrite = writeBuffer.remaining();
+ while (written != toWrite) {
+ written += ch.write(writeBuffer);
+ try {
+ Thread.sleep(10);
+ } catch (Exception x) {
+ }
+ }
+ itr.remove();
+ }
+ writeBuffer.clear();
+ }
+
+ @Override
+ protected void reply(String content) throws IOException {
+ // queue the reply
+ // replies are transmitted asynchronously from the requests
+ replyQueue.add(content);
+ }
+ }
+} \ No newline at end of file
diff --git a/src/com/gitblit/fanout/FanoutService.java b/src/com/gitblit/fanout/FanoutService.java
new file mode 100644
index 00000000..cbfd8a24
--- /dev/null
+++ b/src/com/gitblit/fanout/FanoutService.java
@@ -0,0 +1,563 @@
+/*
+ * Copyright 2013 gitblit.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gitblit.fanout;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for Fanout service implementations.
+ *
+ * Subclass implementations can be used as a Sparkleshare PubSub notification
+ * server. This allows Sparkleshare to be used in conjunction with Gitblit
+ * behind a corporate firewall that restricts or prohibits client internet access
+ * to the default Sparkleshare PubSub server: notifications.sparkleshare.org
+ *
+ * @author James Moger
+ *
+ */
+public abstract class FanoutService implements Runnable {
+
+ private final static Logger logger = LoggerFactory.getLogger(FanoutService.class);
+
+ public final static int DEFAULT_PORT = 17000;
+
+ protected final static int serviceTimeout = 5000;
+
+ protected final String host;
+ protected final int port;
+ protected final String name;
+
+ private Thread serviceThread;
+
+ private final Map<String, FanoutServiceConnection> connections;
+ private final Map<String, Set<FanoutServiceConnection>> subscriptions;
+
+ protected final AtomicBoolean isRunning;
+ private final AtomicBoolean strictRequestTermination;
+ private final AtomicBoolean allowAllChannelAnnouncements;
+ private final AtomicInteger concurrentConnectionLimit;
+
+ private final Date bootDate;
+ private final AtomicLong rejectedConnectionCount;
+ private final AtomicInteger peakConnectionCount;
+ private final AtomicLong totalConnections;
+ private final AtomicLong totalAnnouncements;
+ private final AtomicLong totalMessages;
+ private final AtomicLong totalSubscribes;
+ private final AtomicLong totalUnsubscribes;
+ private final AtomicLong totalPings;
+
+ protected FanoutService(String host, int port, String name) {
+ this.host = host;
+ this.port = port;
+ this.name = name;
+
+ connections = new ConcurrentHashMap<String, FanoutServiceConnection>();
+ subscriptions = new ConcurrentHashMap<String, Set<FanoutServiceConnection>>();
+ subscriptions.put(FanoutConstants.CH_ALL, new ConcurrentSkipListSet<FanoutServiceConnection>());
+
+ isRunning = new AtomicBoolean(false);
+ strictRequestTermination = new AtomicBoolean(false);
+ allowAllChannelAnnouncements = new AtomicBoolean(false);
+ concurrentConnectionLimit = new AtomicInteger(0);
+
+ bootDate = new Date();
+ rejectedConnectionCount = new AtomicLong(0);
+ peakConnectionCount = new AtomicInteger(0);
+ totalConnections = new AtomicLong(0);
+ totalAnnouncements = new AtomicLong(0);
+ totalMessages = new AtomicLong(0);
+ totalSubscribes = new AtomicLong(0);
+ totalUnsubscribes = new AtomicLong(0);
+ totalPings = new AtomicLong(0);
+ }
+
+ /*
+ * Abstract methods
+ */
+
+ protected abstract boolean isConnected();
+
+ protected abstract boolean connect();
+
+ protected abstract void listen() throws IOException;
+
+ protected abstract void disconnect();
+
+ /**
+ * Returns true if the service requires \n request termination.
+ *
+ * @return true if request requires \n termination
+ */
+ public boolean isStrictRequestTermination() {
+ return strictRequestTermination.get();
+ }
+
+ /**
+ * Control the termination of fanout requests. If true, fanout requests must
+ * be terminated with \n. If false, fanout requests may be terminated with
+ * \n, \r, \r\n, or \n\r. This is useful for debugging with a telnet client.
+ *
+ * @param isStrictTermination
+ */
+ public void setStrictRequestTermination(boolean isStrictTermination) {
+ strictRequestTermination.set(isStrictTermination);
+ }
+
+ /**
+ * Returns the maximum allowable concurrent fanout connections.
+ *
+ * @return the maximum allowable concurrent connection count
+ */
+ public int getConcurrentConnectionLimit() {
+ return concurrentConnectionLimit.get();
+ }
+
+ /**
+ * Sets the maximum allowable concurrent fanout connection count.
+ *
+ * @param value
+ */
+ public void setConcurrentConnectionLimit(int value) {
+ concurrentConnectionLimit.set(value);
+ }
+
+ /**
+ * Returns true if connections are allowed to announce on the all channel.
+ *
+ * @return true if connections are allowed to announce on the all channel
+ */
+ public boolean allowAllChannelAnnouncements() {
+ return allowAllChannelAnnouncements.get();
+ }
+
+ /**
+ * Allows/prohibits connections from announcing on the ALL channel.
+ *
+ * @param value
+ */
+ public void setAllowAllChannelAnnouncements(boolean value) {
+ allowAllChannelAnnouncements.set(value);
+ }
+
+ /**
+ * Returns the current connections
+ *
+ * @param channel
+ * @return map of current connections keyed by their id
+ */
+ public Map<String, FanoutServiceConnection> getCurrentConnections() {
+ return connections;
+ }
+
+ /**
+ * Returns all subscriptions
+ *
+ * @return map of current subscriptions keyed by channel name
+ */
+ public Map<String, Set<FanoutServiceConnection>> getCurrentSubscriptions() {
+ return subscriptions;
+ }
+
+ /**
+ * Returns the subscriptions for the specified channel
+ *
+ * @param channel
+ * @return set of subscribed connections for the specified channel
+ */
+ public Set<FanoutServiceConnection> getCurrentSubscriptions(String channel) {
+ return subscriptions.get(channel);
+ }
+
+ /**
+ * Returns the runtime statistics object for this service.
+ *
+ * @return stats
+ */
+ public FanoutStats getStatistics() {
+ FanoutStats stats = new FanoutStats();
+
+ // settings
+ stats.allowAllChannelAnnouncements = allowAllChannelAnnouncements();
+ stats.concurrentConnectionLimit = getConcurrentConnectionLimit();
+ stats.strictRequestTermination = isStrictRequestTermination();
+
+ // runtime stats
+ stats.bootDate = bootDate;
+ stats.rejectedConnectionCount = rejectedConnectionCount.get();
+ stats.peakConnectionCount = peakConnectionCount.get();
+ stats.totalConnections = totalConnections.get();
+ stats.totalAnnouncements = totalAnnouncements.get();
+ stats.totalMessages = totalMessages.get();
+ stats.totalSubscribes = totalSubscribes.get();
+ stats.totalUnsubscribes = totalUnsubscribes.get();
+ stats.totalPings = totalPings.get();
+ stats.currentConnections = connections.size();
+ stats.currentChannels = subscriptions.size();
+ stats.currentSubscriptions = subscriptions.size() * connections.size();
+ return stats;
+ }
+
+ /**
+ * Returns true if the service is ready.
+ *
+ * @return true, if the service is ready
+ */
+ public boolean isReady() {
+ if (isRunning.get()) {
+ return isConnected();
+ }
+ return false;
+ }
+
+ /**
+ * Start the Fanout service thread and immediatel return.
+ *
+ */
+ public void start() {
+ if (isRunning.get()) {
+ logger.warn(MessageFormat.format("{0} is already running", name));
+ return;
+ }
+ serviceThread = new Thread(this);
+ serviceThread.setName(MessageFormat.format("{0} {1}:{2,number,0}", name, host == null ? "all" : host, port));
+ serviceThread.start();
+ }
+
+ /**
+ * Start the Fanout service thread and wait until it is accepting connections.
+ *
+ */
+ public void startSynchronously() {
+ start();
+ while (!isReady()) {
+ try {
+ Thread.sleep(100);
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ /**
+ * Stop the Fanout service. This method returns when the service has been
+ * completely shutdown.
+ */
+ public void stop() {
+ if (!isRunning.get()) {
+ logger.warn(MessageFormat.format("{0} is not running", name));
+ return;
+ }
+ logger.info(MessageFormat.format("stopping {0}...", name));
+ isRunning.set(false);
+ try {
+ if (serviceThread != null) {
+ serviceThread.join();
+ serviceThread = null;
+ }
+ } catch (InterruptedException e1) {
+ logger.error("", e1);
+ }
+ logger.info(MessageFormat.format("stopped {0}", name));
+ }
+
+ /**
+ * Main execution method of the service
+ */
+ @Override
+ public final void run() {
+ disconnect();
+ resetState();
+ isRunning.set(true);
+ while (isRunning.get()) {
+ if (connect()) {
+ try {
+ listen();
+ } catch (IOException e) {
+ logger.error(MessageFormat.format("error processing {0}", name), e);
+ isRunning.set(false);
+ }
+ } else {
+ try {
+ Thread.sleep(serviceTimeout);
+ } catch (InterruptedException x) {
+ }
+ }
+ }
+ disconnect();
+ resetState();
+ }
+
+ protected void resetState() {
+ // reset state data
+ connections.clear();
+ subscriptions.clear();
+ rejectedConnectionCount.set(0);
+ peakConnectionCount.set(0);
+ totalConnections.set(0);
+ totalAnnouncements.set(0);
+ totalMessages.set(0);
+ totalSubscribes.set(0);
+ totalUnsubscribes.set(0);
+ totalPings.set(0);
+ }
+
+ /**
+ * Configure the client connection socket.
+ *
+ * @param socket
+ * @throws SocketException
+ */
+ protected void configureClientSocket(Socket socket) throws SocketException {
+ socket.setKeepAlive(true);
+ socket.setSoLinger(true, 0); // immediately discard any remaining data
+ }
+
+ /**
+ * Add the connection to the connections map.
+ *
+ * @param connection
+ * @return false if the connection was rejected due to too many concurrent
+ * connections
+ */
+ protected boolean addConnection(FanoutServiceConnection connection) {
+ int limit = getConcurrentConnectionLimit();
+ if (limit > 0 && connections.size() > limit) {
+ logger.info(MessageFormat.format("hit {0,number,0} connection limit, rejecting fanout connection", concurrentConnectionLimit));
+ increment(rejectedConnectionCount);
+ connection.busy();
+ return false;
+ }
+
+ // add the connection to our map
+ connections.put(connection.id, connection);
+
+ // track peak number of concurrent connections
+ if (connections.size() > peakConnectionCount.get()) {
+ peakConnectionCount.set(connections.size());
+ }
+
+ logger.info("fanout new connection " + connection.id);
+ connection.connected();
+ return true;
+ }
+
+ /**
+ * Remove the connection from the connections list and from subscriptions.
+ *
+ * @param connection
+ */
+ protected void removeConnection(FanoutServiceConnection connection) {
+ connections.remove(connection.id);
+ Iterator<Map.Entry<String, Set<FanoutServiceConnection>>> itr = subscriptions.entrySet().iterator();
+ while (itr.hasNext()) {
+ Map.Entry<String, Set<FanoutServiceConnection>> entry = itr.next();
+ Set<FanoutServiceConnection> subscriptions = entry.getValue();
+ subscriptions.remove(connection);
+ if (!FanoutConstants.CH_ALL.equals(entry.getKey())) {
+ if (subscriptions.size() == 0) {
+ itr.remove();
+ logger.info(MessageFormat.format("fanout remove channel {0}, no subscribers", entry.getKey()));
+ }
+ }
+ }
+ logger.info(MessageFormat.format("fanout connection {0} removed", connection.id));
+ }
+
+ /**
+ * Tests to see if the connection is being monitored by the service.
+ *
+ * @param connection
+ * @return true if the service is monitoring the connection
+ */
+ protected boolean hasConnection(FanoutServiceConnection connection) {
+ return connections.containsKey(connection.id);
+ }
+
+ /**
+ * Reply to a connection on the specified channel.
+ *
+ * @param connection
+ * @param channel
+ * @param message
+ * @return the reply
+ */
+ protected String reply(FanoutServiceConnection connection, String channel, String message) {
+ if (channel != null && channel.length() > 0) {
+ increment(totalMessages);
+ }
+ return connection.reply(channel, message);
+ }
+
+ /**
+ * Service method to broadcast a message to all connections.
+ *
+ * @param message
+ */
+ public void broadcastAll(String message) {
+ broadcast(connections.values(), FanoutConstants.CH_ALL, message);
+ increment(totalAnnouncements);
+ }
+
+ /**
+ * Service method to broadcast a message to connections subscribed to the
+ * channel.
+ *
+ * @param message
+ */
+ public void broadcast(String channel, String message) {
+ List<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>(subscriptions.get(channel));
+ broadcast(connections, channel, message);
+ increment(totalAnnouncements);
+ }
+
+ /**
+ * Broadcast a message to connections subscribed to the specified channel.
+ *
+ * @param connections
+ * @param channel
+ * @param message
+ */
+ protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {
+ for (FanoutServiceConnection connection : connections) {
+ reply(connection, channel, message);
+ }
+ }
+
+ /**
+ * Process an incoming Fanout request.
+ *
+ * @param connection
+ * @param req
+ * @return the reply to the request, may be null
+ */
+ protected String processRequest(FanoutServiceConnection connection, String req) {
+ logger.info(MessageFormat.format("fanout request from {0}: {1}", connection.id, req));
+ String[] fields = req.split(" ", 3);
+ String action = fields[0];
+ String channel = fields.length >= 2 ? fields[1] : null;
+ String message = fields.length >= 3 ? fields[2] : null;
+ try {
+ return processRequest(connection, action, channel, message);
+ } catch (IllegalArgumentException e) {
+ // invalid action
+ logger.error(MessageFormat.format("fanout connection {0} requested invalid action {1}", connection.id, action));
+ logger.error(asHexArray(req));
+ }
+ return null;
+ }
+
+ /**
+ * Process the Fanout request.
+ *
+ * @param connection
+ * @param action
+ * @param channel
+ * @param message
+ * @return the reply to the request, may be null
+ * @throws IllegalArgumentException
+ */
+ protected String processRequest(FanoutServiceConnection connection, String action, String channel, String message) throws IllegalArgumentException {
+ if ("ping".equals(action)) {
+ // ping
+ increment(totalPings);
+ return reply(connection, null, "" + System.currentTimeMillis());
+ } else if ("info".equals(action)) {
+ // info
+ String info = getStatistics().info();
+ return reply(connection, null, info);
+ } else if ("announce".equals(action)) {
+ // announcement
+ if (!allowAllChannelAnnouncements.get() && FanoutConstants.CH_ALL.equals(channel)) {
+ // prohibiting connection-sourced all announcements
+ logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on ALL channel", connection.id, message));
+ } else if ("debug".equals(channel)) {
+ // prohibiting connection-sourced debug announcements
+ logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on DEBUG channel", connection.id, message));
+ } else {
+ // acceptable announcement
+ List<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>(subscriptions.get(channel));
+ connections.remove(connection); // remove announcer
+ broadcast(connections, channel, message);
+ increment(totalAnnouncements);
+ }
+ } else if ("subscribe".equals(action)) {
+ // subscribe
+ if (!subscriptions.containsKey(channel)) {
+ logger.info(MessageFormat.format("fanout new channel {0}", channel));
+ subscriptions.put(channel, new ConcurrentSkipListSet<FanoutServiceConnection>());
+ }
+ subscriptions.get(channel).add(connection);
+ logger.debug(MessageFormat.format("fanout connection {0} subscribed to channel {1}", connection.id, channel));
+ increment(totalSubscribes);
+ } else if ("unsubscribe".equals(action)) {
+ // unsubscribe
+ if (subscriptions.containsKey(channel)) {
+ subscriptions.get(channel).remove(connection);
+ if (subscriptions.get(channel).size() == 0) {
+ subscriptions.remove(channel);
+ }
+ increment(totalUnsubscribes);
+ }
+ } else {
+ // invalid action
+ throw new IllegalArgumentException(action);
+ }
+ return null;
+ }
+
+ private String asHexArray(String req) {
+ StringBuilder sb = new StringBuilder();
+ for (char c : req.toCharArray()) {
+ sb.append(Integer.toHexString(c)).append(' ');
+ }
+ return "[ " + sb.toString().trim() + " ]";
+ }
+
+ /**
+ * Increment a long and prevent negative rollover.
+ *
+ * @param counter
+ */
+ private void increment(AtomicLong counter) {
+ long v = counter.incrementAndGet();
+ if (v < 0) {
+ counter.set(0);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+} \ No newline at end of file
diff --git a/src/com/gitblit/fanout/FanoutServiceConnection.java b/src/com/gitblit/fanout/FanoutServiceConnection.java
new file mode 100644
index 00000000..f7f2c959
--- /dev/null
+++ b/src/com/gitblit/fanout/FanoutServiceConnection.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2013 gitblit.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gitblit.fanout;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FanoutServiceConnection handles reading/writing messages from a remote fanout
+ * connection.
+ *
+ * @author James Moger
+ *
+ */
+public abstract class FanoutServiceConnection implements Comparable<FanoutServiceConnection> {
+
+ private static final Logger logger = LoggerFactory.getLogger(FanoutServiceConnection.class);
+
+ public final String id;
+
+ protected FanoutServiceConnection(Socket socket) {
+ this.id = FanoutConstants.getRemoteSocketId(socket);
+ }
+
+ protected abstract void reply(String content) throws IOException;
+
+ /**
+ * Send the connection a debug channel connected message.
+ *
+ * @param message
+ */
+ protected void connected() {
+ reply(FanoutConstants.CH_DEBUG, FanoutConstants.MSG_CONNECTED);
+ }
+
+ /**
+ * Send the connection a debug channel busy message.
+ *
+ * @param message
+ */
+ protected void busy() {
+ reply(FanoutConstants.CH_DEBUG, FanoutConstants.MSG_BUSY);
+ }
+
+ /**
+ * Send the connection a message for the specified channel.
+ *
+ * @param channel
+ * @param message
+ * @return the reply
+ */
+ protected String reply(String channel, String message) {
+ String content;
+ if (channel != null) {
+ content = channel + "!" + message;
+ } else {
+ content = message;
+ }
+ try {
+ reply(content);
+ } catch (Exception e) {
+ logger.error("failed to reply to fanout connection " + id, e);
+ }
+ return content;
+ }
+
+ @Override
+ public int compareTo(FanoutServiceConnection c) {
+ return id.compareTo(c.id);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof FanoutServiceConnection) {
+ return id.equals(((FanoutServiceConnection) o).id);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return id;
+ }
+} \ No newline at end of file
diff --git a/src/com/gitblit/fanout/FanoutSocketService.java b/src/com/gitblit/fanout/FanoutSocketService.java
new file mode 100644
index 00000000..07c18f90
--- /dev/null
+++ b/src/com/gitblit/fanout/FanoutSocketService.java
@@ -0,0 +1,234 @@
+/*
+ * Copyright 2013 gitblit.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gitblit.fanout;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.text.MessageFormat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A multi-threaded socket implementation of https://github.com/travisghansen/fanout
+ *
+ * This implementation creates a master acceptor thread which accepts incoming
+ * fanout connections and then spawns a daemon thread for each accepted connection.
+ * If there are 100 concurrent fanout connections, there are 101 threads.
+ *
+ * @author James Moger
+ *
+ */
+public class FanoutSocketService extends FanoutService {
+
+ private final static Logger logger = LoggerFactory.getLogger(FanoutSocketService.class);
+
+ private volatile ServerSocket serviceSocket;
+
+ public static void main(String[] args) throws Exception {
+ FanoutSocketService pubsub = new FanoutSocketService(null, DEFAULT_PORT);
+ pubsub.setStrictRequestTermination(false);
+ pubsub.setAllowAllChannelAnnouncements(false);
+ pubsub.start();
+ }
+
+ /**
+ * Create a multi-threaded fanout service.
+ *
+ * @param port
+ * the port for running the fanout PubSub service
+ * @throws IOException
+ */
+ public FanoutSocketService(int port) {
+ this(null, port);
+ }
+
+ /**
+ * Create a multi-threaded fanout service.
+ *
+ * @param bindInterface
+ * the ip address to bind for the service, may be null
+ * @param port
+ * the port for running the fanout PubSub service
+ * @throws IOException
+ */
+ public FanoutSocketService(String bindInterface, int port) {
+ super(bindInterface, port, "Fanout socket service");
+ }
+
+ @Override
+ protected boolean isConnected() {
+ return serviceSocket != null;
+ }
+
+ @Override
+ protected boolean connect() {
+ if (serviceSocket == null) {
+ try {
+ serviceSocket = new ServerSocket();
+ serviceSocket.setReuseAddress(true);
+ serviceSocket.setSoTimeout(serviceTimeout);
+ serviceSocket.bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));
+ logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}",
+ name, host == null ? "0.0.0.0" : host, serviceSocket.getLocalPort()));
+ } catch (IOException e) {
+ logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}",
+ name, host == null ? "0.0.0.0" : host, port), e);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ protected void disconnect() {
+ try {
+ if (serviceSocket != null) {
+ logger.debug(MessageFormat.format("closing {0} server socket", name));
+ serviceSocket.close();
+ serviceSocket = null;
+ }
+ } catch (IOException e) {
+ logger.error(MessageFormat.format("failed to disconnect {0}", name), e);
+ }
+ }
+
+ /**
+ * This accepts incoming fanout connections and spawns connection threads.
+ */
+ @Override
+ protected void listen() throws IOException {
+ try {
+ Socket socket;
+ socket = serviceSocket.accept();
+ configureClientSocket(socket);
+
+ FanoutSocketConnection connection = new FanoutSocketConnection(socket);
+
+ if (addConnection(connection)) {
+ // spawn connection daemon thread
+ Thread connectionThread = new Thread(connection);
+ connectionThread.setDaemon(true);
+ connectionThread.setName("Fanout " + connection.id);
+ connectionThread.start();
+ } else {
+ // synchronously close the connection and remove it
+ removeConnection(connection);
+ connection.closeConnection();
+ connection = null;
+ }
+ } catch (SocketTimeoutException e) {
+ // ignore accept timeout exceptions
+ }
+ }
+
+ /**
+ * FanoutSocketConnection handles reading/writing messages from a remote fanout
+ * connection.
+ *
+ * @author James Moger
+ *
+ */
+ class FanoutSocketConnection extends FanoutServiceConnection implements Runnable {
+ Socket socket;
+
+ FanoutSocketConnection(Socket socket) {
+ super(socket);
+ this.socket = socket;
+ }
+
+ /**
+ * Connection thread read/write method.
+ */
+ @Override
+ public void run() {
+ try {
+ StringBuilder sb = new StringBuilder();
+ BufferedInputStream is = new BufferedInputStream(socket.getInputStream());
+ byte[] buffer = new byte[FanoutConstants.BUFFER_LENGTH];
+ int len = 0;
+ while (true) {
+ while (is.available() > 0) {
+ len = is.read(buffer);
+ for (int i = 0; i < len; i++) {
+ byte b = buffer[i];
+ if (b == 0xa || (!isStrictRequestTermination() && b == 0xd)) {
+ String req = sb.toString();
+ sb.setLength(0);
+ if (req.length() > 0) {
+ // ignore empty request strings
+ processRequest(this, req);
+ }
+ } else {
+ sb.append((char) b);
+ }
+ }
+ }
+
+ if (!isRunning.get()) {
+ // service has stopped, terminate client connection
+ break;
+ } else {
+ Thread.sleep(500);
+ }
+ }
+ } catch (Throwable t) {
+ if (t instanceof SocketException) {
+ logger.error(MessageFormat.format("fanout connection {0}: {1}", id, t.getMessage()));
+ } else if (t instanceof SocketTimeoutException) {
+ logger.error(MessageFormat.format("fanout connection {0}: {1}", id, t.getMessage()));
+ } else {
+ logger.error(MessageFormat.format("exception while handling fanout connection {0}", id), t);
+ }
+ } finally {
+ closeConnection();
+ }
+
+ logger.info(MessageFormat.format("thread for fanout connection {0} is finished", id));
+ }
+
+ @Override
+ protected void reply(String content) throws IOException {
+ // synchronously send reply
+ logger.debug(MessageFormat.format("fanout reply to {0}: {1}", id, content));
+ OutputStream os = socket.getOutputStream();
+ byte [] bytes = content.getBytes(FanoutConstants.CHARSET);
+ os.write(bytes);
+ if (bytes[bytes.length - 1] != 0xa) {
+ os.write(0xa);
+ }
+ os.flush();
+ }
+
+ protected void closeConnection() {
+ // close the connection socket
+ try {
+ socket.close();
+ } catch (IOException e) {
+ }
+ socket = null;
+
+ // remove this connection from the service
+ removeConnection(this);
+ }
+ }
+} \ No newline at end of file
diff --git a/src/com/gitblit/fanout/FanoutStats.java b/src/com/gitblit/fanout/FanoutStats.java
new file mode 100644
index 00000000..b06884d3
--- /dev/null
+++ b/src/com/gitblit/fanout/FanoutStats.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2013 gitblit.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gitblit.fanout;
+
+import java.io.Serializable;
+import java.text.MessageFormat;
+import java.util.Date;
+
+/**
+ * Encapsulates the runtime stats of a fanout service.
+ *
+ * @author James Moger
+ *
+ */
+public class FanoutStats implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public long concurrentConnectionLimit;
+ public boolean allowAllChannelAnnouncements;
+ public boolean strictRequestTermination;
+
+ public Date bootDate;
+ public long rejectedConnectionCount;
+ public int peakConnectionCount;
+ public long currentChannels;
+ public long currentSubscriptions;
+ public long currentConnections;
+ public long totalConnections;
+ public long totalAnnouncements;
+ public long totalMessages;
+ public long totalSubscribes;
+ public long totalUnsubscribes;
+ public long totalPings;
+
+ public String info() {
+ int i = 0;
+ StringBuilder sb = new StringBuilder();
+ sb.append(infoStr(i++, "boot date"));
+ sb.append(infoStr(i++, "strict request termination"));
+ sb.append(infoStr(i++, "allow connection \"all\" announcements"));
+ sb.append(infoInt(i++, "concurrent connection limit"));
+ sb.append(infoInt(i++, "concurrent limit rejected connections"));
+ sb.append(infoInt(i++, "peak connections"));
+ sb.append(infoInt(i++, "current connections"));
+ sb.append(infoInt(i++, "current channels"));
+ sb.append(infoInt(i++, "current subscriptions"));
+ sb.append(infoInt(i++, "user-requested subscriptions"));
+ sb.append(infoInt(i++, "total connections"));
+ sb.append(infoInt(i++, "total announcements"));
+ sb.append(infoInt(i++, "total messages"));
+ sb.append(infoInt(i++, "total subscribes"));
+ sb.append(infoInt(i++, "total unsubscribes"));
+ sb.append(infoInt(i++, "total pings"));
+ String template = sb.toString();
+
+ String info = MessageFormat.format(template,
+ bootDate.toString(),
+ Boolean.toString(strictRequestTermination),
+ Boolean.toString(allowAllChannelAnnouncements),
+ concurrentConnectionLimit,
+ rejectedConnectionCount,
+ peakConnectionCount,
+ currentConnections,
+ currentChannels,
+ currentSubscriptions,
+ currentSubscriptions == 0 ? 0 : (currentSubscriptions - currentConnections),
+ totalConnections,
+ totalAnnouncements,
+ totalMessages,
+ totalSubscribes,
+ totalUnsubscribes,
+ totalPings);
+ return info;
+ }
+
+ private String infoStr(int index, String label) {
+ return label + ": {" + index + "}\n";
+ }
+
+ private String infoInt(int index, String label) {
+ return label + ": {" + index + ",number,0}\n";
+ }
+
+}