From 5316d20e861640867d10405b25cfe75aeca0a34c Mon Sep 17 00:00:00 2001 From: James Moger Date: Fri, 11 Jan 2013 23:50:59 -0500 Subject: [PATCH] Fanout service for Sparkleshare clients --- distrib/gitblit.properties | 47 ++ docs/01_features.mkd | 1 + docs/04_releases.mkd | 9 +- src/com/gitblit/GitBlit.java | 34 ++ src/com/gitblit/fanout/FanoutClient.java | 413 +++++++++++++ src/com/gitblit/fanout/FanoutConstants.java | 36 ++ src/com/gitblit/fanout/FanoutNioService.java | 332 +++++++++++ src/com/gitblit/fanout/FanoutService.java | 563 ++++++++++++++++++ .../fanout/FanoutServiceConnection.java | 105 ++++ .../gitblit/fanout/FanoutSocketService.java | 234 ++++++++ src/com/gitblit/fanout/FanoutStats.java | 98 +++ .../com/gitblit/tests/FanoutServiceTest.java | 172 ++++++ tests/com/gitblit/tests/GitBlitSuite.java | 3 +- 13 files changed, 2045 insertions(+), 2 deletions(-) create mode 100644 src/com/gitblit/fanout/FanoutClient.java create mode 100644 src/com/gitblit/fanout/FanoutConstants.java create mode 100644 src/com/gitblit/fanout/FanoutNioService.java create mode 100644 src/com/gitblit/fanout/FanoutService.java create mode 100644 src/com/gitblit/fanout/FanoutServiceConnection.java create mode 100644 src/com/gitblit/fanout/FanoutSocketService.java create mode 100644 src/com/gitblit/fanout/FanoutStats.java create mode 100644 tests/com/gitblit/tests/FanoutServiceTest.java diff --git a/distrib/gitblit.properties b/distrib/gitblit.properties index ce269d2c..758137e3 100644 --- a/distrib/gitblit.properties +++ b/distrib/gitblit.properties @@ -365,6 +365,53 @@ groovy.postReceiveScripts = # SINCE 1.0.0 groovy.customFields = +# +# Fanout Settings +# + +# Fanout is a PubSub notification service that can be used by Sparkleshare +# to eliminate repository change polling. The fanout service runs in a separate +# thread on a separate port from the Gitblit http/https application. +# This service is provided so that Sparkleshare may be used with Gitblit in +# firewalled environments or where reliance on Sparkleshare's default notifications +# server (notifications.sparkleshare.org) is unwanted. +# +# This service maintains an open socket connection from the client to the +# Fanout PubSub service. This service may not work properly behind a proxy server. + +# Specify the interface for Fanout to bind it's service. +# You may specify an ip or an empty value to bind to all interfaces. +# Specifying localhost will result in Gitblit ONLY listening to requests to +# localhost. +# +# SINCE 1.2.1 +# RESTART REQUIRED +fanout.bindInterface = localhost + +# port for serving the Fanout PubSub service. <= 0 disables this service. +# On Unix/Linux systems, ports < 1024 require root permissions. +# Recommended value: 17000 +# +# SINCE 1.2.1 +# RESTART REQUIRED +fanout.port = 0 + +# Use Fanout NIO service. If false, a multi-threaded socket service will be used. +# Be advised, the socket implementation spawns a thread per connection plus the +# connection acceptor thread. The NIO implementation is completely single-threaded. +# +# SINCE 1.2.1 +# RESTART REQUIRED +fanout.useNio = true + +# Concurrent connection limit. <= 0 disables concurrent connection throttling. +# If > 0, only the specified number of concurrent connections will be allowed +# and all other connections will be rejected. +# +# SINCE 1.2.1 +# RESTART REQUIRED +fanout.connectionLimit = 0 + # # Authentication Settings # diff --git a/docs/01_features.mkd b/docs/01_features.mkd index 038acd06..fa3efea4 100644 --- a/docs/01_features.mkd +++ b/docs/01_features.mkd @@ -37,6 +37,7 @@ - Git-notes display support - Submodule support - Push log based on a hidden, orphan branch refs/gitblit/pushes +- Fanout PubSub notifications service for self-hosted [Sparkleshare](http://sparkleshare.org) use - gh-pages display support (Jekyll is not supported) - Branch metrics (uses Google Charts) - HEAD and Branch RSS feeds diff --git a/docs/04_releases.mkd b/docs/04_releases.mkd index 26cbd08b..d77c7326 100644 --- a/docs/04_releases.mkd +++ b/docs/04_releases.mkd @@ -14,7 +14,14 @@ #### additions -- Implemented a simple push log based on a hidden, orphan branch refs/gitblit/pushes (issue 177) +- Fanout PubSub service for self-hosted [Sparkleshare](http://sparkleshare.org) notifications.
+This service is disabled by default.
+ **New:** *fanout.bindInterface = localhost*
+ **New:** *fanout.port = 0*
+ **New:** *fanout.useNio = true*
+ **New:** *fanout.connectionLimit = 0* +- Implemented a simple push log based on a hidden, orphan branch refs/gitblit/pushes (issue 177)
+The push log is not currently visible in the ui, but the data will be collected and it will be exposed to the ui in the next release. - Support for locally and remotely authenticated accounts in LdapUserService and RedmineUserService (issue 183) - Added Dutch translation (github/kwoot) diff --git a/src/com/gitblit/GitBlit.java b/src/com/gitblit/GitBlit.java index 3eb246b8..489ba63c 100644 --- a/src/com/gitblit/GitBlit.java +++ b/src/com/gitblit/GitBlit.java @@ -85,6 +85,9 @@ import com.gitblit.Constants.FederationStrategy; import com.gitblit.Constants.FederationToken; import com.gitblit.Constants.PermissionType; import com.gitblit.Constants.RegistrantType; +import com.gitblit.fanout.FanoutNioService; +import com.gitblit.fanout.FanoutService; +import com.gitblit.fanout.FanoutSocketService; import com.gitblit.models.FederationModel; import com.gitblit.models.FederationProposal; import com.gitblit.models.FederationSet; @@ -180,6 +183,8 @@ public class GitBlit implements ServletContextListener { private TimeZone timezone; private FileBasedConfig projectConfigs; + + private FanoutService fanoutService; public GitBlit() { if (gitblit == null) { @@ -3133,6 +3138,32 @@ public class GitBlit implements ServletContextListener { } ContainerUtils.CVE_2007_0450.test(); + + // startup Fanout PubSub service + if (settings.getInteger(Keys.fanout.port, 0) > 0) { + String bindInterface = settings.getString(Keys.fanout.bindInterface, null); + int port = settings.getInteger(Keys.fanout.port, FanoutService.DEFAULT_PORT); + boolean useNio = settings.getBoolean(Keys.fanout.useNio, true); + int limit = settings.getInteger(Keys.fanout.connectionLimit, 0); + + if (useNio) { + if (StringUtils.isEmpty(bindInterface)) { + fanoutService = new FanoutNioService(port); + } else { + fanoutService = new FanoutNioService(bindInterface, port); + } + } else { + if (StringUtils.isEmpty(bindInterface)) { + fanoutService = new FanoutSocketService(port); + } else { + fanoutService = new FanoutSocketService(bindInterface, port); + } + } + + fanoutService.setConcurrentConnectionLimit(limit); + fanoutService.setAllowAllChannelAnnouncements(false); + fanoutService.start(); + } } private void logTimezone(String type, TimeZone zone) { @@ -3206,6 +3237,9 @@ public class GitBlit implements ServletContextListener { scheduledExecutor.shutdownNow(); luceneExecutor.close(); gcExecutor.close(); + if (fanoutService != null) { + fanoutService.stop(); + } } /** diff --git a/src/com/gitblit/fanout/FanoutClient.java b/src/com/gitblit/fanout/FanoutClient.java new file mode 100644 index 00000000..b9ace4be --- /dev/null +++ b/src/com/gitblit/fanout/FanoutClient.java @@ -0,0 +1,413 @@ +/* + * Copyright 2013 gitblit.com. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gitblit.fanout; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Fanout client class. + * + * @author James Moger + * + */ +public class FanoutClient implements Runnable { + + private final static Logger logger = LoggerFactory.getLogger(FanoutClient.class); + + private final int clientTimeout = 500; + private final int reconnectTimeout = 2000; + private final String host; + private final int port; + private final List listeners; + + private String id; + private volatile Selector selector; + private volatile SocketChannel socketCh; + private Thread clientThread; + + private final AtomicBoolean isConnected; + private final AtomicBoolean isRunning; + private final AtomicBoolean isAutomaticReconnect; + private final ByteBuffer writeBuffer; + private final ByteBuffer readBuffer; + private final CharsetDecoder decoder; + + private final Set subscriptions; + private boolean resubscribe; + + public interface FanoutListener { + public void pong(Date timestamp); + public void announcement(String channel, String message); + } + + public static class FanoutAdapter implements FanoutListener { + public void pong(Date timestamp) { } + public void announcement(String channel, String message) { } + } + + public static void main(String args[]) throws Exception { + FanoutClient client = new FanoutClient("localhost", 2000); + client.addListener(new FanoutAdapter() { + + @Override + public void pong(Date timestamp) { + System.out.println("Pong. " + timestamp); + } + + @Override + public void announcement(String channel, String message) { + System.out.println(MessageFormat.format("Here ye, Here ye. {0} says {1}", channel, message)); + } + }); + client.start(); + + Thread.sleep(5000); + client.ping(); + client.subscribe("james"); + client.announce("james", "12345"); + client.subscribe("c52f99d16eb5627877ae957df7ce1be102783bd5"); + + while (true) { + Thread.sleep(10000); + client.ping(); + } + } + + public FanoutClient(String host, int port) { + this.host = host; + this.port = port; + readBuffer = ByteBuffer.allocateDirect(FanoutConstants.BUFFER_LENGTH); + writeBuffer = ByteBuffer.allocateDirect(FanoutConstants.BUFFER_LENGTH); + decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder(); + listeners = Collections.synchronizedList(new ArrayList()); + subscriptions = new LinkedHashSet(); + isRunning = new AtomicBoolean(false); + isConnected = new AtomicBoolean(false); + isAutomaticReconnect = new AtomicBoolean(true); + } + + public void addListener(FanoutListener listener) { + listeners.add(listener); + } + + public void removeListener(FanoutListener listener) { + listeners.remove(listener); + } + + public boolean isAutomaticReconnect() { + return isAutomaticReconnect.get(); + } + + public void setAutomaticReconnect(boolean value) { + isAutomaticReconnect.set(value); + } + + public void ping() { + confirmConnection(); + write("ping"); + } + + public void status() { + confirmConnection(); + write("status"); + } + + public void subscribe(String channel) { + confirmConnection(); + if (subscriptions.add(channel)) { + write("subscribe " + channel); + } + } + + public void unsubscribe(String channel) { + confirmConnection(); + if (subscriptions.remove(channel)) { + write("unsubscribe " + channel); + } + } + + public void announce(String channel, String message) { + confirmConnection(); + write("announce " + channel + " " + message); + } + + private void confirmConnection() { + if (!isConnected()) { + throw new RuntimeException("Fanout client is disconnected!"); + } + } + + public boolean isConnected() { + return isRunning.get() && socketCh != null && isConnected.get(); + } + + /** + * Start client connection and return immediately. + */ + public void start() { + if (isRunning.get()) { + logger.warn("Fanout client is already running"); + return; + } + clientThread = new Thread(this, "Fanout client"); + clientThread.start(); + } + + /** + * Start client connection and wait until it has connected. + */ + public void startSynchronously() { + start(); + while (!isConnected()) { + try { + Thread.sleep(100); + } catch (Exception e) { + } + } + } + + /** + * Stops client connection. This method returns when the connection has + * been completely shutdown. + */ + public void stop() { + if (!isRunning.get()) { + logger.warn("Fanout client is not running"); + return; + } + isRunning.set(false); + try { + if (clientThread != null) { + clientThread.join(); + clientThread = null; + } + } catch (InterruptedException e1) { + } + } + + @Override + public void run() { + resetState(); + + isRunning.set(true); + while (isRunning.get()) { + // (re)connect + if (socketCh == null) { + try { + InetAddress addr = InetAddress.getByName(host); + socketCh = SocketChannel.open(new InetSocketAddress(addr, port)); + socketCh.configureBlocking(false); + selector = Selector.open(); + id = FanoutConstants.getLocalSocketId(socketCh.socket()); + socketCh.register(selector, SelectionKey.OP_READ); + } catch (Exception e) { + logger.error(MessageFormat.format("failed to open client connection to {0}:{1,number,0}", host, port), e); + try { + Thread.sleep(reconnectTimeout); + } catch (InterruptedException x) { + } + continue; + } + } + + // read/write + try { + selector.select(clientTimeout); + + Iterator i = selector.selectedKeys().iterator(); + while (i.hasNext()) { + SelectionKey key = i.next(); + i.remove(); + + if (key.isReadable()) { + // read message + String content = read(); + String[] lines = content.split("\n"); + for (String reply : lines) { + logger.trace(MessageFormat.format("fanout client {0} received: {1}", id, reply)); + if (!processReply(reply)) { + logger.error(MessageFormat.format("fanout client {0} received unknown message", id)); + } + } + } else if (key.isWritable()) { + // resubscribe + if (resubscribe) { + resubscribe = false; + logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size())); + for (String subscription : subscriptions) { + write("subscribe " + subscription); + } + } + socketCh.register(selector, SelectionKey.OP_READ); + } + } + } catch (IOException e) { + logger.error(MessageFormat.format("fanout client {0} error: {1}", id, e.getMessage())); + closeChannel(); + if (!isAutomaticReconnect.get()) { + isRunning.set(false); + continue; + } + } + } + + closeChannel(); + resetState(); + } + + protected void resetState() { + readBuffer.clear(); + writeBuffer.clear(); + isRunning.set(false); + isConnected.set(false); + } + + private void closeChannel() { + try { + if (socketCh != null) { + socketCh.close(); + socketCh = null; + selector.close(); + selector = null; + isConnected.set(false); + } + } catch (IOException x) { + } + } + + protected boolean processReply(String reply) { + String[] fields = reply.split("!", 2); + if (fields.length == 1) { + try { + long time = Long.parseLong(fields[0]); + Date date = new Date(time); + firePong(date); + } catch (Exception e) { + } + return true; + } else if (fields.length == 2) { + String channel = fields[0]; + String message = fields[1]; + if (FanoutConstants.CH_DEBUG.equals(channel)) { + // debug messages are for internal use + if (FanoutConstants.MSG_CONNECTED.equals(message)) { + isConnected.set(true); + resubscribe = subscriptions.size() > 0; + if (resubscribe) { + try { + // register for async resubscribe + socketCh.register(selector, SelectionKey.OP_WRITE); + } catch (Exception e) { + logger.error("an error occurred", e); + } + } + } + logger.debug(MessageFormat.format("fanout client {0} < {1}", id, reply)); + } else { + fireAnnouncement(channel, message); + } + return true; + } else { + // unknown message + return false; + } + } + + protected void firePong(Date timestamp) { + logger.info(MessageFormat.format("fanout client {0} < pong {1,date,yyyy-MM-dd HH:mm:ss}", id, timestamp)); + for (FanoutListener listener : listeners) { + try { + listener.pong(timestamp); + } catch (Throwable t) { + logger.error("FanoutListener threw an exception!", t); + } + } + } + protected void fireAnnouncement(String channel, String message) { + logger.info(MessageFormat.format("fanout client {0} < announcement {1} {2}", id, channel, message)); + for (FanoutListener listener : listeners) { + try { + listener.announcement(channel, message); + } catch (Throwable t) { + logger.error("FanoutListener threw an exception!", t); + } + } + } + + protected synchronized String read() throws IOException { + readBuffer.clear(); + long len = socketCh.read(readBuffer); + + if (len == -1) { + logger.error(MessageFormat.format("fanout client {0} lost connection to {1}:{2,number,0}, end of stream", id, host, port)); + socketCh.close(); + return null; + } else { + readBuffer.flip(); + String content = decoder.decode(readBuffer).toString(); + readBuffer.clear(); + return content; + } + } + + protected synchronized boolean write(String message) { + try { + logger.info(MessageFormat.format("fanout client {0} > {1}", id, message)); + byte [] bytes = message.getBytes(FanoutConstants.CHARSET); + writeBuffer.clear(); + writeBuffer.put(bytes); + if (bytes[bytes.length - 1] != 0xa) { + writeBuffer.put((byte) 0xa); + } + writeBuffer.flip(); + + // loop until write buffer has been completely sent + long written = 0; + long toWrite = writeBuffer.remaining(); + while (written != toWrite) { + written += socketCh.write(writeBuffer); + try { + Thread.sleep(10); + } catch (Exception x) { + } + } + return true; + } catch (IOException e) { + logger.error("fanout client {0} error: {1}", id, e.getMessage()); + } + return false; + } +} \ No newline at end of file diff --git a/src/com/gitblit/fanout/FanoutConstants.java b/src/com/gitblit/fanout/FanoutConstants.java new file mode 100644 index 00000000..6e6964c9 --- /dev/null +++ b/src/com/gitblit/fanout/FanoutConstants.java @@ -0,0 +1,36 @@ +/* + * Copyright 2013 gitblit.com. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gitblit.fanout; + +import java.net.Socket; + +public class FanoutConstants { + + public final static String CHARSET = "ISO-8859-1"; + public final static int BUFFER_LENGTH = 512; + public final static String CH_ALL = "all"; + public final static String CH_DEBUG = "debug"; + public final static String MSG_CONNECTED = "connected..."; + public final static String MSG_BUSY = "busy"; + + public static String getRemoteSocketId(Socket socket) { + return socket.getInetAddress().getHostAddress() + ":" + socket.getPort(); + } + + public static String getLocalSocketId(Socket socket) { + return socket.getInetAddress().getHostAddress() + ":" + socket.getLocalPort(); + } +} diff --git a/src/com/gitblit/fanout/FanoutNioService.java b/src/com/gitblit/fanout/FanoutNioService.java new file mode 100644 index 00000000..65d022ab --- /dev/null +++ b/src/com/gitblit/fanout/FanoutNioService.java @@ -0,0 +1,332 @@ +/* + * Copyright 2013 gitblit.com. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gitblit.fanout; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A single-thread NIO implementation of https://github.com/travisghansen/fanout + * + * This implementation uses channels and selectors, which are the Java analog of + * the Linux epoll mechanism used in the original fanout C code. + * + * @author James Moger + * + */ +public class FanoutNioService extends FanoutService { + + private final static Logger logger = LoggerFactory.getLogger(FanoutNioService.class); + + private volatile ServerSocketChannel serviceCh; + private volatile Selector selector; + + public static void main(String[] args) throws Exception { + FanoutNioService pubsub = new FanoutNioService(null, DEFAULT_PORT); + pubsub.setStrictRequestTermination(false); + pubsub.setAllowAllChannelAnnouncements(false); + pubsub.start(); + } + + /** + * Create a single-threaded fanout service. + * + * @param host + * @param port + * the port for running the fanout PubSub service + * @throws IOException + */ + public FanoutNioService(int port) { + this(null, port); + } + + /** + * Create a single-threaded fanout service. + * + * @param bindInterface + * the ip address to bind for the service, may be null + * @param port + * the port for running the fanout PubSub service + * @throws IOException + */ + public FanoutNioService(String bindInterface, int port) { + super(bindInterface, port, "Fanout nio service"); + } + + @Override + protected boolean isConnected() { + return serviceCh != null; + } + + @Override + protected boolean connect() { + if (serviceCh == null) { + try { + serviceCh = ServerSocketChannel.open(); + serviceCh.configureBlocking(false); + serviceCh.socket().setReuseAddress(true); + serviceCh.socket().bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port)); + selector = Selector.open(); + serviceCh.register(selector, SelectionKey.OP_ACCEPT); + logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}", + name, host == null ? "0.0.0.0" : host, port)); + } catch (IOException e) { + logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}", + name, name, host == null ? "0.0.0.0" : host, port), e); + return false; + } + } + return true; + } + + @Override + protected void disconnect() { + try { + if (serviceCh != null) { + // close all active client connections + Map clients = getCurrentClientSockets(); + for (Map.Entry client : clients.entrySet()) { + closeClientSocket(client.getKey(), client.getValue()); + } + + // close service socket channel + logger.debug(MessageFormat.format("closing {0} socket channel", name)); + serviceCh.socket().close(); + serviceCh.close(); + serviceCh = null; + selector.close(); + selector = null; + } + } catch (IOException e) { + logger.error(MessageFormat.format("failed to disconnect {0}", name), e); + } + } + + @Override + protected void listen() throws IOException { + while (selector.select(serviceTimeout) > 0) { + Set keys = selector.selectedKeys(); + Iterator keyItr = keys.iterator(); + while (keyItr.hasNext()) { + SelectionKey key = (SelectionKey) keyItr.next(); + if (key.isAcceptable()) { + // new fanout client connection + ServerSocketChannel sch = (ServerSocketChannel) key.channel(); + try { + SocketChannel ch = sch.accept(); + ch.configureBlocking(false); + configureClientSocket(ch.socket()); + + FanoutNioConnection connection = new FanoutNioConnection(ch); + addConnection(connection); + + // register to send the queued message + ch.register(selector, SelectionKey.OP_WRITE, connection); + } catch (IOException e) { + logger.error("error accepting fanout connection", e); + } + } else if (key.isReadable()) { + // read fanout client request + SocketChannel ch = (SocketChannel) key.channel(); + FanoutNioConnection connection = (FanoutNioConnection) key.attachment(); + try { + connection.read(ch, isStrictRequestTermination()); + int replies = 0; + Iterator reqItr = connection.requestQueue.iterator(); + while (reqItr.hasNext()) { + String req = reqItr.next(); + String reply = processRequest(connection, req); + reqItr.remove(); + if (reply != null) { + replies++; + } + } + + if (replies > 0) { + // register to send the replies to requests + ch.register(selector, SelectionKey.OP_WRITE, connection); + } else { + // re-register for next read + ch.register(selector, SelectionKey.OP_READ, connection); + } + } catch (IOException e) { + logger.error(MessageFormat.format("fanout connection {0} error: {1}", connection.id, e.getMessage())); + removeConnection(connection); + closeClientSocket(connection.id, ch); + } + } else if (key.isWritable()) { + // asynchronous reply to fanout client request + SocketChannel ch = (SocketChannel) key.channel(); + FanoutNioConnection connection = (FanoutNioConnection) key.attachment(); + try { + connection.write(ch); + + if (hasConnection(connection)) { + // register for next read + ch.register(selector, SelectionKey.OP_READ, connection); + } else { + // Connection was rejected due to load or + // some other reason. Close it. + closeClientSocket(connection.id, ch); + } + } catch (IOException e) { + logger.error(MessageFormat.format("fanout connection {0}: {1}", connection.id, e.getMessage())); + removeConnection(connection); + closeClientSocket(connection.id, ch); + } + } + keyItr.remove(); + } + } + } + + protected void closeClientSocket(String id, SocketChannel ch) { + try { + ch.close(); + } catch (IOException e) { + logger.error(MessageFormat.format("fanout connection {0}", id), e); + } + } + + protected void broadcast(Collection connections, String channel, String message) { + super.broadcast(connections, channel, message); + + // register queued write + Map sockets = getCurrentClientSockets(); + for (FanoutServiceConnection connection : connections) { + SocketChannel ch = sockets.get(connection.id); + if (ch == null) { + logger.warn(MessageFormat.format("fanout connection {0} has been disconnected", connection.id)); + removeConnection(connection); + continue; + } + try { + ch.register(selector, SelectionKey.OP_WRITE, connection); + } catch (IOException e) { + logger.error(MessageFormat.format("failed to register write op for fanout connection {0}", connection.id)); + } + } + } + + protected Map getCurrentClientSockets() { + Map sockets = new HashMap(); + for (SelectionKey key : selector.keys()) { + if (key.channel() instanceof SocketChannel) { + SocketChannel ch = (SocketChannel) key.channel(); + String id = FanoutConstants.getRemoteSocketId(ch.socket()); + sockets.put(id, ch); + } + } + return sockets; + } + + /** + * FanoutNioConnection handles reading/writing messages from a remote fanout + * connection. + * + * @author James Moger + * + */ + static class FanoutNioConnection extends FanoutServiceConnection { + final ByteBuffer readBuffer; + final ByteBuffer writeBuffer; + final List requestQueue; + final List replyQueue; + final CharsetDecoder decoder; + + FanoutNioConnection(SocketChannel ch) { + super(ch.socket()); + readBuffer = ByteBuffer.allocate(FanoutConstants.BUFFER_LENGTH); + writeBuffer = ByteBuffer.allocate(FanoutConstants.BUFFER_LENGTH); + requestQueue = new ArrayList(); + replyQueue = new ArrayList(); + decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder(); + } + + protected void read(SocketChannel ch, boolean strictRequestTermination) throws CharacterCodingException, IOException { + long bytesRead = 0; + readBuffer.clear(); + bytesRead = ch.read(readBuffer); + readBuffer.flip(); + if (bytesRead == -1) { + throw new IOException("lost client connection, end of stream"); + } + if (readBuffer.limit() == 0) { + return; + } + CharBuffer cbuf = decoder.decode(readBuffer); + String req = cbuf.toString(); + String [] lines = req.split(strictRequestTermination ? "\n" : "\n|\r"); + requestQueue.addAll(Arrays.asList(lines)); + } + + protected void write(SocketChannel ch) throws IOException { + Iterator itr = replyQueue.iterator(); + while (itr.hasNext()) { + String reply = itr.next(); + writeBuffer.clear(); + logger.debug(MessageFormat.format("fanout reply to {0}: {1}", id, reply)); + byte [] bytes = reply.getBytes(FanoutConstants.CHARSET); + writeBuffer.put(bytes); + if (bytes[bytes.length - 1] != 0xa) { + writeBuffer.put((byte) 0xa); + } + writeBuffer.flip(); + + // loop until write buffer has been completely sent + int written = 0; + int toWrite = writeBuffer.remaining(); + while (written != toWrite) { + written += ch.write(writeBuffer); + try { + Thread.sleep(10); + } catch (Exception x) { + } + } + itr.remove(); + } + writeBuffer.clear(); + } + + @Override + protected void reply(String content) throws IOException { + // queue the reply + // replies are transmitted asynchronously from the requests + replyQueue.add(content); + } + } +} \ No newline at end of file diff --git a/src/com/gitblit/fanout/FanoutService.java b/src/com/gitblit/fanout/FanoutService.java new file mode 100644 index 00000000..cbfd8a24 --- /dev/null +++ b/src/com/gitblit/fanout/FanoutService.java @@ -0,0 +1,563 @@ +/* + * Copyright 2013 gitblit.com. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gitblit.fanout; + +import java.io.IOException; +import java.net.Socket; +import java.net.SocketException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base class for Fanout service implementations. + * + * Subclass implementations can be used as a Sparkleshare PubSub notification + * server. This allows Sparkleshare to be used in conjunction with Gitblit + * behind a corporate firewall that restricts or prohibits client internet access + * to the default Sparkleshare PubSub server: notifications.sparkleshare.org + * + * @author James Moger + * + */ +public abstract class FanoutService implements Runnable { + + private final static Logger logger = LoggerFactory.getLogger(FanoutService.class); + + public final static int DEFAULT_PORT = 17000; + + protected final static int serviceTimeout = 5000; + + protected final String host; + protected final int port; + protected final String name; + + private Thread serviceThread; + + private final Map connections; + private final Map> subscriptions; + + protected final AtomicBoolean isRunning; + private final AtomicBoolean strictRequestTermination; + private final AtomicBoolean allowAllChannelAnnouncements; + private final AtomicInteger concurrentConnectionLimit; + + private final Date bootDate; + private final AtomicLong rejectedConnectionCount; + private final AtomicInteger peakConnectionCount; + private final AtomicLong totalConnections; + private final AtomicLong totalAnnouncements; + private final AtomicLong totalMessages; + private final AtomicLong totalSubscribes; + private final AtomicLong totalUnsubscribes; + private final AtomicLong totalPings; + + protected FanoutService(String host, int port, String name) { + this.host = host; + this.port = port; + this.name = name; + + connections = new ConcurrentHashMap(); + subscriptions = new ConcurrentHashMap>(); + subscriptions.put(FanoutConstants.CH_ALL, new ConcurrentSkipListSet()); + + isRunning = new AtomicBoolean(false); + strictRequestTermination = new AtomicBoolean(false); + allowAllChannelAnnouncements = new AtomicBoolean(false); + concurrentConnectionLimit = new AtomicInteger(0); + + bootDate = new Date(); + rejectedConnectionCount = new AtomicLong(0); + peakConnectionCount = new AtomicInteger(0); + totalConnections = new AtomicLong(0); + totalAnnouncements = new AtomicLong(0); + totalMessages = new AtomicLong(0); + totalSubscribes = new AtomicLong(0); + totalUnsubscribes = new AtomicLong(0); + totalPings = new AtomicLong(0); + } + + /* + * Abstract methods + */ + + protected abstract boolean isConnected(); + + protected abstract boolean connect(); + + protected abstract void listen() throws IOException; + + protected abstract void disconnect(); + + /** + * Returns true if the service requires \n request termination. + * + * @return true if request requires \n termination + */ + public boolean isStrictRequestTermination() { + return strictRequestTermination.get(); + } + + /** + * Control the termination of fanout requests. If true, fanout requests must + * be terminated with \n. If false, fanout requests may be terminated with + * \n, \r, \r\n, or \n\r. This is useful for debugging with a telnet client. + * + * @param isStrictTermination + */ + public void setStrictRequestTermination(boolean isStrictTermination) { + strictRequestTermination.set(isStrictTermination); + } + + /** + * Returns the maximum allowable concurrent fanout connections. + * + * @return the maximum allowable concurrent connection count + */ + public int getConcurrentConnectionLimit() { + return concurrentConnectionLimit.get(); + } + + /** + * Sets the maximum allowable concurrent fanout connection count. + * + * @param value + */ + public void setConcurrentConnectionLimit(int value) { + concurrentConnectionLimit.set(value); + } + + /** + * Returns true if connections are allowed to announce on the all channel. + * + * @return true if connections are allowed to announce on the all channel + */ + public boolean allowAllChannelAnnouncements() { + return allowAllChannelAnnouncements.get(); + } + + /** + * Allows/prohibits connections from announcing on the ALL channel. + * + * @param value + */ + public void setAllowAllChannelAnnouncements(boolean value) { + allowAllChannelAnnouncements.set(value); + } + + /** + * Returns the current connections + * + * @param channel + * @return map of current connections keyed by their id + */ + public Map getCurrentConnections() { + return connections; + } + + /** + * Returns all subscriptions + * + * @return map of current subscriptions keyed by channel name + */ + public Map> getCurrentSubscriptions() { + return subscriptions; + } + + /** + * Returns the subscriptions for the specified channel + * + * @param channel + * @return set of subscribed connections for the specified channel + */ + public Set getCurrentSubscriptions(String channel) { + return subscriptions.get(channel); + } + + /** + * Returns the runtime statistics object for this service. + * + * @return stats + */ + public FanoutStats getStatistics() { + FanoutStats stats = new FanoutStats(); + + // settings + stats.allowAllChannelAnnouncements = allowAllChannelAnnouncements(); + stats.concurrentConnectionLimit = getConcurrentConnectionLimit(); + stats.strictRequestTermination = isStrictRequestTermination(); + + // runtime stats + stats.bootDate = bootDate; + stats.rejectedConnectionCount = rejectedConnectionCount.get(); + stats.peakConnectionCount = peakConnectionCount.get(); + stats.totalConnections = totalConnections.get(); + stats.totalAnnouncements = totalAnnouncements.get(); + stats.totalMessages = totalMessages.get(); + stats.totalSubscribes = totalSubscribes.get(); + stats.totalUnsubscribes = totalUnsubscribes.get(); + stats.totalPings = totalPings.get(); + stats.currentConnections = connections.size(); + stats.currentChannels = subscriptions.size(); + stats.currentSubscriptions = subscriptions.size() * connections.size(); + return stats; + } + + /** + * Returns true if the service is ready. + * + * @return true, if the service is ready + */ + public boolean isReady() { + if (isRunning.get()) { + return isConnected(); + } + return false; + } + + /** + * Start the Fanout service thread and immediatel return. + * + */ + public void start() { + if (isRunning.get()) { + logger.warn(MessageFormat.format("{0} is already running", name)); + return; + } + serviceThread = new Thread(this); + serviceThread.setName(MessageFormat.format("{0} {1}:{2,number,0}", name, host == null ? "all" : host, port)); + serviceThread.start(); + } + + /** + * Start the Fanout service thread and wait until it is accepting connections. + * + */ + public void startSynchronously() { + start(); + while (!isReady()) { + try { + Thread.sleep(100); + } catch (Exception e) { + } + } + } + + /** + * Stop the Fanout service. This method returns when the service has been + * completely shutdown. + */ + public void stop() { + if (!isRunning.get()) { + logger.warn(MessageFormat.format("{0} is not running", name)); + return; + } + logger.info(MessageFormat.format("stopping {0}...", name)); + isRunning.set(false); + try { + if (serviceThread != null) { + serviceThread.join(); + serviceThread = null; + } + } catch (InterruptedException e1) { + logger.error("", e1); + } + logger.info(MessageFormat.format("stopped {0}", name)); + } + + /** + * Main execution method of the service + */ + @Override + public final void run() { + disconnect(); + resetState(); + isRunning.set(true); + while (isRunning.get()) { + if (connect()) { + try { + listen(); + } catch (IOException e) { + logger.error(MessageFormat.format("error processing {0}", name), e); + isRunning.set(false); + } + } else { + try { + Thread.sleep(serviceTimeout); + } catch (InterruptedException x) { + } + } + } + disconnect(); + resetState(); + } + + protected void resetState() { + // reset state data + connections.clear(); + subscriptions.clear(); + rejectedConnectionCount.set(0); + peakConnectionCount.set(0); + totalConnections.set(0); + totalAnnouncements.set(0); + totalMessages.set(0); + totalSubscribes.set(0); + totalUnsubscribes.set(0); + totalPings.set(0); + } + + /** + * Configure the client connection socket. + * + * @param socket + * @throws SocketException + */ + protected void configureClientSocket(Socket socket) throws SocketException { + socket.setKeepAlive(true); + socket.setSoLinger(true, 0); // immediately discard any remaining data + } + + /** + * Add the connection to the connections map. + * + * @param connection + * @return false if the connection was rejected due to too many concurrent + * connections + */ + protected boolean addConnection(FanoutServiceConnection connection) { + int limit = getConcurrentConnectionLimit(); + if (limit > 0 && connections.size() > limit) { + logger.info(MessageFormat.format("hit {0,number,0} connection limit, rejecting fanout connection", concurrentConnectionLimit)); + increment(rejectedConnectionCount); + connection.busy(); + return false; + } + + // add the connection to our map + connections.put(connection.id, connection); + + // track peak number of concurrent connections + if (connections.size() > peakConnectionCount.get()) { + peakConnectionCount.set(connections.size()); + } + + logger.info("fanout new connection " + connection.id); + connection.connected(); + return true; + } + + /** + * Remove the connection from the connections list and from subscriptions. + * + * @param connection + */ + protected void removeConnection(FanoutServiceConnection connection) { + connections.remove(connection.id); + Iterator>> itr = subscriptions.entrySet().iterator(); + while (itr.hasNext()) { + Map.Entry> entry = itr.next(); + Set subscriptions = entry.getValue(); + subscriptions.remove(connection); + if (!FanoutConstants.CH_ALL.equals(entry.getKey())) { + if (subscriptions.size() == 0) { + itr.remove(); + logger.info(MessageFormat.format("fanout remove channel {0}, no subscribers", entry.getKey())); + } + } + } + logger.info(MessageFormat.format("fanout connection {0} removed", connection.id)); + } + + /** + * Tests to see if the connection is being monitored by the service. + * + * @param connection + * @return true if the service is monitoring the connection + */ + protected boolean hasConnection(FanoutServiceConnection connection) { + return connections.containsKey(connection.id); + } + + /** + * Reply to a connection on the specified channel. + * + * @param connection + * @param channel + * @param message + * @return the reply + */ + protected String reply(FanoutServiceConnection connection, String channel, String message) { + if (channel != null && channel.length() > 0) { + increment(totalMessages); + } + return connection.reply(channel, message); + } + + /** + * Service method to broadcast a message to all connections. + * + * @param message + */ + public void broadcastAll(String message) { + broadcast(connections.values(), FanoutConstants.CH_ALL, message); + increment(totalAnnouncements); + } + + /** + * Service method to broadcast a message to connections subscribed to the + * channel. + * + * @param message + */ + public void broadcast(String channel, String message) { + List connections = new ArrayList(subscriptions.get(channel)); + broadcast(connections, channel, message); + increment(totalAnnouncements); + } + + /** + * Broadcast a message to connections subscribed to the specified channel. + * + * @param connections + * @param channel + * @param message + */ + protected void broadcast(Collection connections, String channel, String message) { + for (FanoutServiceConnection connection : connections) { + reply(connection, channel, message); + } + } + + /** + * Process an incoming Fanout request. + * + * @param connection + * @param req + * @return the reply to the request, may be null + */ + protected String processRequest(FanoutServiceConnection connection, String req) { + logger.info(MessageFormat.format("fanout request from {0}: {1}", connection.id, req)); + String[] fields = req.split(" ", 3); + String action = fields[0]; + String channel = fields.length >= 2 ? fields[1] : null; + String message = fields.length >= 3 ? fields[2] : null; + try { + return processRequest(connection, action, channel, message); + } catch (IllegalArgumentException e) { + // invalid action + logger.error(MessageFormat.format("fanout connection {0} requested invalid action {1}", connection.id, action)); + logger.error(asHexArray(req)); + } + return null; + } + + /** + * Process the Fanout request. + * + * @param connection + * @param action + * @param channel + * @param message + * @return the reply to the request, may be null + * @throws IllegalArgumentException + */ + protected String processRequest(FanoutServiceConnection connection, String action, String channel, String message) throws IllegalArgumentException { + if ("ping".equals(action)) { + // ping + increment(totalPings); + return reply(connection, null, "" + System.currentTimeMillis()); + } else if ("info".equals(action)) { + // info + String info = getStatistics().info(); + return reply(connection, null, info); + } else if ("announce".equals(action)) { + // announcement + if (!allowAllChannelAnnouncements.get() && FanoutConstants.CH_ALL.equals(channel)) { + // prohibiting connection-sourced all announcements + logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on ALL channel", connection.id, message)); + } else if ("debug".equals(channel)) { + // prohibiting connection-sourced debug announcements + logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on DEBUG channel", connection.id, message)); + } else { + // acceptable announcement + List connections = new ArrayList(subscriptions.get(channel)); + connections.remove(connection); // remove announcer + broadcast(connections, channel, message); + increment(totalAnnouncements); + } + } else if ("subscribe".equals(action)) { + // subscribe + if (!subscriptions.containsKey(channel)) { + logger.info(MessageFormat.format("fanout new channel {0}", channel)); + subscriptions.put(channel, new ConcurrentSkipListSet()); + } + subscriptions.get(channel).add(connection); + logger.debug(MessageFormat.format("fanout connection {0} subscribed to channel {1}", connection.id, channel)); + increment(totalSubscribes); + } else if ("unsubscribe".equals(action)) { + // unsubscribe + if (subscriptions.containsKey(channel)) { + subscriptions.get(channel).remove(connection); + if (subscriptions.get(channel).size() == 0) { + subscriptions.remove(channel); + } + increment(totalUnsubscribes); + } + } else { + // invalid action + throw new IllegalArgumentException(action); + } + return null; + } + + private String asHexArray(String req) { + StringBuilder sb = new StringBuilder(); + for (char c : req.toCharArray()) { + sb.append(Integer.toHexString(c)).append(' '); + } + return "[ " + sb.toString().trim() + " ]"; + } + + /** + * Increment a long and prevent negative rollover. + * + * @param counter + */ + private void increment(AtomicLong counter) { + long v = counter.incrementAndGet(); + if (v < 0) { + counter.set(0); + } + } + + @Override + public String toString() { + return name; + } +} \ No newline at end of file diff --git a/src/com/gitblit/fanout/FanoutServiceConnection.java b/src/com/gitblit/fanout/FanoutServiceConnection.java new file mode 100644 index 00000000..f7f2c959 --- /dev/null +++ b/src/com/gitblit/fanout/FanoutServiceConnection.java @@ -0,0 +1,105 @@ +/* + * Copyright 2013 gitblit.com. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gitblit.fanout; + +import java.io.IOException; +import java.net.Socket; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * FanoutServiceConnection handles reading/writing messages from a remote fanout + * connection. + * + * @author James Moger + * + */ +public abstract class FanoutServiceConnection implements Comparable { + + private static final Logger logger = LoggerFactory.getLogger(FanoutServiceConnection.class); + + public final String id; + + protected FanoutServiceConnection(Socket socket) { + this.id = FanoutConstants.getRemoteSocketId(socket); + } + + protected abstract void reply(String content) throws IOException; + + /** + * Send the connection a debug channel connected message. + * + * @param message + */ + protected void connected() { + reply(FanoutConstants.CH_DEBUG, FanoutConstants.MSG_CONNECTED); + } + + /** + * Send the connection a debug channel busy message. + * + * @param message + */ + protected void busy() { + reply(FanoutConstants.CH_DEBUG, FanoutConstants.MSG_BUSY); + } + + /** + * Send the connection a message for the specified channel. + * + * @param channel + * @param message + * @return the reply + */ + protected String reply(String channel, String message) { + String content; + if (channel != null) { + content = channel + "!" + message; + } else { + content = message; + } + try { + reply(content); + } catch (Exception e) { + logger.error("failed to reply to fanout connection " + id, e); + } + return content; + } + + @Override + public int compareTo(FanoutServiceConnection c) { + return id.compareTo(c.id); + } + + @Override + public boolean equals(Object o) { + if (o instanceof FanoutServiceConnection) { + return id.equals(((FanoutServiceConnection) o).id); + } + return false; + } + + @Override + public int hashCode() { + return id.hashCode(); + } + + @Override + public String toString() { + return id; + } +} \ No newline at end of file diff --git a/src/com/gitblit/fanout/FanoutSocketService.java b/src/com/gitblit/fanout/FanoutSocketService.java new file mode 100644 index 00000000..07c18f90 --- /dev/null +++ b/src/com/gitblit/fanout/FanoutSocketService.java @@ -0,0 +1,234 @@ +/* + * Copyright 2013 gitblit.com. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gitblit.fanout; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.text.MessageFormat; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A multi-threaded socket implementation of https://github.com/travisghansen/fanout + * + * This implementation creates a master acceptor thread which accepts incoming + * fanout connections and then spawns a daemon thread for each accepted connection. + * If there are 100 concurrent fanout connections, there are 101 threads. + * + * @author James Moger + * + */ +public class FanoutSocketService extends FanoutService { + + private final static Logger logger = LoggerFactory.getLogger(FanoutSocketService.class); + + private volatile ServerSocket serviceSocket; + + public static void main(String[] args) throws Exception { + FanoutSocketService pubsub = new FanoutSocketService(null, DEFAULT_PORT); + pubsub.setStrictRequestTermination(false); + pubsub.setAllowAllChannelAnnouncements(false); + pubsub.start(); + } + + /** + * Create a multi-threaded fanout service. + * + * @param port + * the port for running the fanout PubSub service + * @throws IOException + */ + public FanoutSocketService(int port) { + this(null, port); + } + + /** + * Create a multi-threaded fanout service. + * + * @param bindInterface + * the ip address to bind for the service, may be null + * @param port + * the port for running the fanout PubSub service + * @throws IOException + */ + public FanoutSocketService(String bindInterface, int port) { + super(bindInterface, port, "Fanout socket service"); + } + + @Override + protected boolean isConnected() { + return serviceSocket != null; + } + + @Override + protected boolean connect() { + if (serviceSocket == null) { + try { + serviceSocket = new ServerSocket(); + serviceSocket.setReuseAddress(true); + serviceSocket.setSoTimeout(serviceTimeout); + serviceSocket.bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port)); + logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}", + name, host == null ? "0.0.0.0" : host, serviceSocket.getLocalPort())); + } catch (IOException e) { + logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}", + name, host == null ? "0.0.0.0" : host, port), e); + return false; + } + } + return true; + } + + @Override + protected void disconnect() { + try { + if (serviceSocket != null) { + logger.debug(MessageFormat.format("closing {0} server socket", name)); + serviceSocket.close(); + serviceSocket = null; + } + } catch (IOException e) { + logger.error(MessageFormat.format("failed to disconnect {0}", name), e); + } + } + + /** + * This accepts incoming fanout connections and spawns connection threads. + */ + @Override + protected void listen() throws IOException { + try { + Socket socket; + socket = serviceSocket.accept(); + configureClientSocket(socket); + + FanoutSocketConnection connection = new FanoutSocketConnection(socket); + + if (addConnection(connection)) { + // spawn connection daemon thread + Thread connectionThread = new Thread(connection); + connectionThread.setDaemon(true); + connectionThread.setName("Fanout " + connection.id); + connectionThread.start(); + } else { + // synchronously close the connection and remove it + removeConnection(connection); + connection.closeConnection(); + connection = null; + } + } catch (SocketTimeoutException e) { + // ignore accept timeout exceptions + } + } + + /** + * FanoutSocketConnection handles reading/writing messages from a remote fanout + * connection. + * + * @author James Moger + * + */ + class FanoutSocketConnection extends FanoutServiceConnection implements Runnable { + Socket socket; + + FanoutSocketConnection(Socket socket) { + super(socket); + this.socket = socket; + } + + /** + * Connection thread read/write method. + */ + @Override + public void run() { + try { + StringBuilder sb = new StringBuilder(); + BufferedInputStream is = new BufferedInputStream(socket.getInputStream()); + byte[] buffer = new byte[FanoutConstants.BUFFER_LENGTH]; + int len = 0; + while (true) { + while (is.available() > 0) { + len = is.read(buffer); + for (int i = 0; i < len; i++) { + byte b = buffer[i]; + if (b == 0xa || (!isStrictRequestTermination() && b == 0xd)) { + String req = sb.toString(); + sb.setLength(0); + if (req.length() > 0) { + // ignore empty request strings + processRequest(this, req); + } + } else { + sb.append((char) b); + } + } + } + + if (!isRunning.get()) { + // service has stopped, terminate client connection + break; + } else { + Thread.sleep(500); + } + } + } catch (Throwable t) { + if (t instanceof SocketException) { + logger.error(MessageFormat.format("fanout connection {0}: {1}", id, t.getMessage())); + } else if (t instanceof SocketTimeoutException) { + logger.error(MessageFormat.format("fanout connection {0}: {1}", id, t.getMessage())); + } else { + logger.error(MessageFormat.format("exception while handling fanout connection {0}", id), t); + } + } finally { + closeConnection(); + } + + logger.info(MessageFormat.format("thread for fanout connection {0} is finished", id)); + } + + @Override + protected void reply(String content) throws IOException { + // synchronously send reply + logger.debug(MessageFormat.format("fanout reply to {0}: {1}", id, content)); + OutputStream os = socket.getOutputStream(); + byte [] bytes = content.getBytes(FanoutConstants.CHARSET); + os.write(bytes); + if (bytes[bytes.length - 1] != 0xa) { + os.write(0xa); + } + os.flush(); + } + + protected void closeConnection() { + // close the connection socket + try { + socket.close(); + } catch (IOException e) { + } + socket = null; + + // remove this connection from the service + removeConnection(this); + } + } +} \ No newline at end of file diff --git a/src/com/gitblit/fanout/FanoutStats.java b/src/com/gitblit/fanout/FanoutStats.java new file mode 100644 index 00000000..b06884d3 --- /dev/null +++ b/src/com/gitblit/fanout/FanoutStats.java @@ -0,0 +1,98 @@ +/* + * Copyright 2013 gitblit.com. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gitblit.fanout; + +import java.io.Serializable; +import java.text.MessageFormat; +import java.util.Date; + +/** + * Encapsulates the runtime stats of a fanout service. + * + * @author James Moger + * + */ +public class FanoutStats implements Serializable { + + private static final long serialVersionUID = 1L; + + public long concurrentConnectionLimit; + public boolean allowAllChannelAnnouncements; + public boolean strictRequestTermination; + + public Date bootDate; + public long rejectedConnectionCount; + public int peakConnectionCount; + public long currentChannels; + public long currentSubscriptions; + public long currentConnections; + public long totalConnections; + public long totalAnnouncements; + public long totalMessages; + public long totalSubscribes; + public long totalUnsubscribes; + public long totalPings; + + public String info() { + int i = 0; + StringBuilder sb = new StringBuilder(); + sb.append(infoStr(i++, "boot date")); + sb.append(infoStr(i++, "strict request termination")); + sb.append(infoStr(i++, "allow connection \"all\" announcements")); + sb.append(infoInt(i++, "concurrent connection limit")); + sb.append(infoInt(i++, "concurrent limit rejected connections")); + sb.append(infoInt(i++, "peak connections")); + sb.append(infoInt(i++, "current connections")); + sb.append(infoInt(i++, "current channels")); + sb.append(infoInt(i++, "current subscriptions")); + sb.append(infoInt(i++, "user-requested subscriptions")); + sb.append(infoInt(i++, "total connections")); + sb.append(infoInt(i++, "total announcements")); + sb.append(infoInt(i++, "total messages")); + sb.append(infoInt(i++, "total subscribes")); + sb.append(infoInt(i++, "total unsubscribes")); + sb.append(infoInt(i++, "total pings")); + String template = sb.toString(); + + String info = MessageFormat.format(template, + bootDate.toString(), + Boolean.toString(strictRequestTermination), + Boolean.toString(allowAllChannelAnnouncements), + concurrentConnectionLimit, + rejectedConnectionCount, + peakConnectionCount, + currentConnections, + currentChannels, + currentSubscriptions, + currentSubscriptions == 0 ? 0 : (currentSubscriptions - currentConnections), + totalConnections, + totalAnnouncements, + totalMessages, + totalSubscribes, + totalUnsubscribes, + totalPings); + return info; + } + + private String infoStr(int index, String label) { + return label + ": {" + index + "}\n"; + } + + private String infoInt(int index, String label) { + return label + ": {" + index + ",number,0}\n"; + } + +} diff --git a/tests/com/gitblit/tests/FanoutServiceTest.java b/tests/com/gitblit/tests/FanoutServiceTest.java new file mode 100644 index 00000000..28e5d82d --- /dev/null +++ b/tests/com/gitblit/tests/FanoutServiceTest.java @@ -0,0 +1,172 @@ +/* + * Copyright 2013 gitblit.com. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gitblit.tests; + +import static org.junit.Assert.assertEquals; + +import java.text.MessageFormat; +import java.util.Date; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import com.gitblit.fanout.FanoutService; +import com.gitblit.fanout.FanoutClient; +import com.gitblit.fanout.FanoutClient.FanoutAdapter; +import com.gitblit.fanout.FanoutNioService; +import com.gitblit.fanout.FanoutService; +import com.gitblit.fanout.FanoutSocketService; + +public class FanoutServiceTest { + + int fanoutPort = FanoutService.DEFAULT_PORT; + + @Test + public void testNioPubSub() throws Exception { + testPubSub(new FanoutNioService(fanoutPort)); + } + + @Test + public void testSocketPubSub() throws Exception { + testPubSub(new FanoutSocketService(fanoutPort)); + } + + @Test + public void testNioDisruptionAndRecovery() throws Exception { + testDisruption(new FanoutNioService(fanoutPort)); + } + + @Test + public void testSocketDisruptionAndRecovery() throws Exception { + testDisruption(new FanoutSocketService(fanoutPort)); + } + + protected void testPubSub(FanoutService service) throws Exception { + System.out.println(MessageFormat.format("\n\n========================================\nPUBSUB TEST {0}\n========================================\n\n", service.toString())); + service.startSynchronously(); + + final Map announcementsA = new ConcurrentHashMap(); + FanoutClient clientA = new FanoutClient("localhost", fanoutPort); + clientA.addListener(new FanoutAdapter() { + + @Override + public void announcement(String channel, String message) { + announcementsA.put(channel, message); + } + }); + + clientA.startSynchronously(); + + final Map announcementsB = new ConcurrentHashMap(); + FanoutClient clientB = new FanoutClient("localhost", fanoutPort); + clientB.addListener(new FanoutAdapter() { + @Override + public void announcement(String channel, String message) { + announcementsB.put(channel, message); + } + }); + clientB.startSynchronously(); + + + // subscribe clients A and B to the channels + clientA.subscribe("a"); + clientA.subscribe("b"); + clientA.subscribe("c"); + + clientB.subscribe("a"); + clientB.subscribe("b"); + clientB.subscribe("c"); + + // give async messages a chance to be delivered + Thread.sleep(1000); + + clientA.announce("a", "apple"); + clientA.announce("b", "banana"); + clientA.announce("c", "cantelope"); + + clientB.announce("a", "avocado"); + clientB.announce("b", "beet"); + clientB.announce("c", "carrot"); + + // give async messages a chance to be delivered + Thread.sleep(2000); + + // confirm that client B received client A's announcements + assertEquals("apple", announcementsB.get("a")); + assertEquals("banana", announcementsB.get("b")); + assertEquals("cantelope", announcementsB.get("c")); + + // confirm that client A received client B's announcements + assertEquals("avocado", announcementsA.get("a")); + assertEquals("beet", announcementsA.get("b")); + assertEquals("carrot", announcementsA.get("c")); + + clientA.stop(); + clientB.stop(); + service.stop(); + } + + protected void testDisruption(FanoutService service) throws Exception { + System.out.println(MessageFormat.format("\n\n========================================\nDISRUPTION TEST {0}\n========================================\n\n", service.toString())); + service.startSynchronously(); + + final AtomicInteger pongCount = new AtomicInteger(0); + FanoutClient client = new FanoutClient("localhost", fanoutPort); + client.addListener(new FanoutAdapter() { + @Override + public void pong(Date timestamp) { + pongCount.incrementAndGet(); + } + }); + client.startSynchronously(); + + // ping and wait for pong + client.ping(); + Thread.sleep(500); + + // restart client + client.stop(); + Thread.sleep(1000); + client.startSynchronously(); + + // ping and wait for pong + client.ping(); + Thread.sleep(500); + + assertEquals(2, pongCount.get()); + + // now disrupt service + service.stop(); + Thread.sleep(2000); + service.startSynchronously(); + + // wait for reconnect + Thread.sleep(2000); + + // ping and wait for pong + client.ping(); + Thread.sleep(500); + + // kill all + client.stop(); + service.stop(); + + // confirm expected pong count + assertEquals(3, pongCount.get()); + } +} \ No newline at end of file diff --git a/tests/com/gitblit/tests/GitBlitSuite.java b/tests/com/gitblit/tests/GitBlitSuite.java index bb734eb7..5220a6a3 100644 --- a/tests/com/gitblit/tests/GitBlitSuite.java +++ b/tests/com/gitblit/tests/GitBlitSuite.java @@ -59,7 +59,8 @@ import com.gitblit.utils.JGitUtils; MarkdownUtilsTest.class, JGitUtilsTest.class, SyndicationUtilsTest.class, DiffUtilsTest.class, MetricUtilsTest.class, TicgitUtilsTest.class, X509UtilsTest.class, GitBlitTest.class, FederationTests.class, RpcTests.class, GitServletTest.class, - GroovyScriptTest.class, LuceneExecutorTest.class, IssuesTest.class, RepositoryModelTest.class }) + GroovyScriptTest.class, LuceneExecutorTest.class, IssuesTest.class, RepositoryModelTest.class, + FanoutServiceTest.class }) public class GitBlitSuite { public static final File REPOSITORIES = new File("git"); -- 2.39.5