summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/gitblit/fanout/FanoutService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/gitblit/fanout/FanoutService.java')
-rw-r--r--src/main/java/com/gitblit/fanout/FanoutService.java563
1 files changed, 563 insertions, 0 deletions
diff --git a/src/main/java/com/gitblit/fanout/FanoutService.java b/src/main/java/com/gitblit/fanout/FanoutService.java
new file mode 100644
index 00000000..cbfd8a24
--- /dev/null
+++ b/src/main/java/com/gitblit/fanout/FanoutService.java
@@ -0,0 +1,563 @@
+/*
+ * Copyright 2013 gitblit.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gitblit.fanout;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for Fanout service implementations.
+ *
+ * Subclass implementations can be used as a Sparkleshare PubSub notification
+ * server. This allows Sparkleshare to be used in conjunction with Gitblit
+ * behind a corporate firewall that restricts or prohibits client internet access
+ * to the default Sparkleshare PubSub server: notifications.sparkleshare.org
+ *
+ * @author James Moger
+ *
+ */
+public abstract class FanoutService implements Runnable {
+
+ private final static Logger logger = LoggerFactory.getLogger(FanoutService.class);
+
+ public final static int DEFAULT_PORT = 17000;
+
+ protected final static int serviceTimeout = 5000;
+
+ protected final String host;
+ protected final int port;
+ protected final String name;
+
+ private Thread serviceThread;
+
+ private final Map<String, FanoutServiceConnection> connections;
+ private final Map<String, Set<FanoutServiceConnection>> subscriptions;
+
+ protected final AtomicBoolean isRunning;
+ private final AtomicBoolean strictRequestTermination;
+ private final AtomicBoolean allowAllChannelAnnouncements;
+ private final AtomicInteger concurrentConnectionLimit;
+
+ private final Date bootDate;
+ private final AtomicLong rejectedConnectionCount;
+ private final AtomicInteger peakConnectionCount;
+ private final AtomicLong totalConnections;
+ private final AtomicLong totalAnnouncements;
+ private final AtomicLong totalMessages;
+ private final AtomicLong totalSubscribes;
+ private final AtomicLong totalUnsubscribes;
+ private final AtomicLong totalPings;
+
+ protected FanoutService(String host, int port, String name) {
+ this.host = host;
+ this.port = port;
+ this.name = name;
+
+ connections = new ConcurrentHashMap<String, FanoutServiceConnection>();
+ subscriptions = new ConcurrentHashMap<String, Set<FanoutServiceConnection>>();
+ subscriptions.put(FanoutConstants.CH_ALL, new ConcurrentSkipListSet<FanoutServiceConnection>());
+
+ isRunning = new AtomicBoolean(false);
+ strictRequestTermination = new AtomicBoolean(false);
+ allowAllChannelAnnouncements = new AtomicBoolean(false);
+ concurrentConnectionLimit = new AtomicInteger(0);
+
+ bootDate = new Date();
+ rejectedConnectionCount = new AtomicLong(0);
+ peakConnectionCount = new AtomicInteger(0);
+ totalConnections = new AtomicLong(0);
+ totalAnnouncements = new AtomicLong(0);
+ totalMessages = new AtomicLong(0);
+ totalSubscribes = new AtomicLong(0);
+ totalUnsubscribes = new AtomicLong(0);
+ totalPings = new AtomicLong(0);
+ }
+
+ /*
+ * Abstract methods
+ */
+
+ protected abstract boolean isConnected();
+
+ protected abstract boolean connect();
+
+ protected abstract void listen() throws IOException;
+
+ protected abstract void disconnect();
+
+ /**
+ * Returns true if the service requires \n request termination.
+ *
+ * @return true if request requires \n termination
+ */
+ public boolean isStrictRequestTermination() {
+ return strictRequestTermination.get();
+ }
+
+ /**
+ * Control the termination of fanout requests. If true, fanout requests must
+ * be terminated with \n. If false, fanout requests may be terminated with
+ * \n, \r, \r\n, or \n\r. This is useful for debugging with a telnet client.
+ *
+ * @param isStrictTermination
+ */
+ public void setStrictRequestTermination(boolean isStrictTermination) {
+ strictRequestTermination.set(isStrictTermination);
+ }
+
+ /**
+ * Returns the maximum allowable concurrent fanout connections.
+ *
+ * @return the maximum allowable concurrent connection count
+ */
+ public int getConcurrentConnectionLimit() {
+ return concurrentConnectionLimit.get();
+ }
+
+ /**
+ * Sets the maximum allowable concurrent fanout connection count.
+ *
+ * @param value
+ */
+ public void setConcurrentConnectionLimit(int value) {
+ concurrentConnectionLimit.set(value);
+ }
+
+ /**
+ * Returns true if connections are allowed to announce on the all channel.
+ *
+ * @return true if connections are allowed to announce on the all channel
+ */
+ public boolean allowAllChannelAnnouncements() {
+ return allowAllChannelAnnouncements.get();
+ }
+
+ /**
+ * Allows/prohibits connections from announcing on the ALL channel.
+ *
+ * @param value
+ */
+ public void setAllowAllChannelAnnouncements(boolean value) {
+ allowAllChannelAnnouncements.set(value);
+ }
+
+ /**
+ * Returns the current connections
+ *
+ * @param channel
+ * @return map of current connections keyed by their id
+ */
+ public Map<String, FanoutServiceConnection> getCurrentConnections() {
+ return connections;
+ }
+
+ /**
+ * Returns all subscriptions
+ *
+ * @return map of current subscriptions keyed by channel name
+ */
+ public Map<String, Set<FanoutServiceConnection>> getCurrentSubscriptions() {
+ return subscriptions;
+ }
+
+ /**
+ * Returns the subscriptions for the specified channel
+ *
+ * @param channel
+ * @return set of subscribed connections for the specified channel
+ */
+ public Set<FanoutServiceConnection> getCurrentSubscriptions(String channel) {
+ return subscriptions.get(channel);
+ }
+
+ /**
+ * Returns the runtime statistics object for this service.
+ *
+ * @return stats
+ */
+ public FanoutStats getStatistics() {
+ FanoutStats stats = new FanoutStats();
+
+ // settings
+ stats.allowAllChannelAnnouncements = allowAllChannelAnnouncements();
+ stats.concurrentConnectionLimit = getConcurrentConnectionLimit();
+ stats.strictRequestTermination = isStrictRequestTermination();
+
+ // runtime stats
+ stats.bootDate = bootDate;
+ stats.rejectedConnectionCount = rejectedConnectionCount.get();
+ stats.peakConnectionCount = peakConnectionCount.get();
+ stats.totalConnections = totalConnections.get();
+ stats.totalAnnouncements = totalAnnouncements.get();
+ stats.totalMessages = totalMessages.get();
+ stats.totalSubscribes = totalSubscribes.get();
+ stats.totalUnsubscribes = totalUnsubscribes.get();
+ stats.totalPings = totalPings.get();
+ stats.currentConnections = connections.size();
+ stats.currentChannels = subscriptions.size();
+ stats.currentSubscriptions = subscriptions.size() * connections.size();
+ return stats;
+ }
+
+ /**
+ * Returns true if the service is ready.
+ *
+ * @return true, if the service is ready
+ */
+ public boolean isReady() {
+ if (isRunning.get()) {
+ return isConnected();
+ }
+ return false;
+ }
+
+ /**
+ * Start the Fanout service thread and immediatel return.
+ *
+ */
+ public void start() {
+ if (isRunning.get()) {
+ logger.warn(MessageFormat.format("{0} is already running", name));
+ return;
+ }
+ serviceThread = new Thread(this);
+ serviceThread.setName(MessageFormat.format("{0} {1}:{2,number,0}", name, host == null ? "all" : host, port));
+ serviceThread.start();
+ }
+
+ /**
+ * Start the Fanout service thread and wait until it is accepting connections.
+ *
+ */
+ public void startSynchronously() {
+ start();
+ while (!isReady()) {
+ try {
+ Thread.sleep(100);
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ /**
+ * Stop the Fanout service. This method returns when the service has been
+ * completely shutdown.
+ */
+ public void stop() {
+ if (!isRunning.get()) {
+ logger.warn(MessageFormat.format("{0} is not running", name));
+ return;
+ }
+ logger.info(MessageFormat.format("stopping {0}...", name));
+ isRunning.set(false);
+ try {
+ if (serviceThread != null) {
+ serviceThread.join();
+ serviceThread = null;
+ }
+ } catch (InterruptedException e1) {
+ logger.error("", e1);
+ }
+ logger.info(MessageFormat.format("stopped {0}", name));
+ }
+
+ /**
+ * Main execution method of the service
+ */
+ @Override
+ public final void run() {
+ disconnect();
+ resetState();
+ isRunning.set(true);
+ while (isRunning.get()) {
+ if (connect()) {
+ try {
+ listen();
+ } catch (IOException e) {
+ logger.error(MessageFormat.format("error processing {0}", name), e);
+ isRunning.set(false);
+ }
+ } else {
+ try {
+ Thread.sleep(serviceTimeout);
+ } catch (InterruptedException x) {
+ }
+ }
+ }
+ disconnect();
+ resetState();
+ }
+
+ protected void resetState() {
+ // reset state data
+ connections.clear();
+ subscriptions.clear();
+ rejectedConnectionCount.set(0);
+ peakConnectionCount.set(0);
+ totalConnections.set(0);
+ totalAnnouncements.set(0);
+ totalMessages.set(0);
+ totalSubscribes.set(0);
+ totalUnsubscribes.set(0);
+ totalPings.set(0);
+ }
+
+ /**
+ * Configure the client connection socket.
+ *
+ * @param socket
+ * @throws SocketException
+ */
+ protected void configureClientSocket(Socket socket) throws SocketException {
+ socket.setKeepAlive(true);
+ socket.setSoLinger(true, 0); // immediately discard any remaining data
+ }
+
+ /**
+ * Add the connection to the connections map.
+ *
+ * @param connection
+ * @return false if the connection was rejected due to too many concurrent
+ * connections
+ */
+ protected boolean addConnection(FanoutServiceConnection connection) {
+ int limit = getConcurrentConnectionLimit();
+ if (limit > 0 && connections.size() > limit) {
+ logger.info(MessageFormat.format("hit {0,number,0} connection limit, rejecting fanout connection", concurrentConnectionLimit));
+ increment(rejectedConnectionCount);
+ connection.busy();
+ return false;
+ }
+
+ // add the connection to our map
+ connections.put(connection.id, connection);
+
+ // track peak number of concurrent connections
+ if (connections.size() > peakConnectionCount.get()) {
+ peakConnectionCount.set(connections.size());
+ }
+
+ logger.info("fanout new connection " + connection.id);
+ connection.connected();
+ return true;
+ }
+
+ /**
+ * Remove the connection from the connections list and from subscriptions.
+ *
+ * @param connection
+ */
+ protected void removeConnection(FanoutServiceConnection connection) {
+ connections.remove(connection.id);
+ Iterator<Map.Entry<String, Set<FanoutServiceConnection>>> itr = subscriptions.entrySet().iterator();
+ while (itr.hasNext()) {
+ Map.Entry<String, Set<FanoutServiceConnection>> entry = itr.next();
+ Set<FanoutServiceConnection> subscriptions = entry.getValue();
+ subscriptions.remove(connection);
+ if (!FanoutConstants.CH_ALL.equals(entry.getKey())) {
+ if (subscriptions.size() == 0) {
+ itr.remove();
+ logger.info(MessageFormat.format("fanout remove channel {0}, no subscribers", entry.getKey()));
+ }
+ }
+ }
+ logger.info(MessageFormat.format("fanout connection {0} removed", connection.id));
+ }
+
+ /**
+ * Tests to see if the connection is being monitored by the service.
+ *
+ * @param connection
+ * @return true if the service is monitoring the connection
+ */
+ protected boolean hasConnection(FanoutServiceConnection connection) {
+ return connections.containsKey(connection.id);
+ }
+
+ /**
+ * Reply to a connection on the specified channel.
+ *
+ * @param connection
+ * @param channel
+ * @param message
+ * @return the reply
+ */
+ protected String reply(FanoutServiceConnection connection, String channel, String message) {
+ if (channel != null && channel.length() > 0) {
+ increment(totalMessages);
+ }
+ return connection.reply(channel, message);
+ }
+
+ /**
+ * Service method to broadcast a message to all connections.
+ *
+ * @param message
+ */
+ public void broadcastAll(String message) {
+ broadcast(connections.values(), FanoutConstants.CH_ALL, message);
+ increment(totalAnnouncements);
+ }
+
+ /**
+ * Service method to broadcast a message to connections subscribed to the
+ * channel.
+ *
+ * @param message
+ */
+ public void broadcast(String channel, String message) {
+ List<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>(subscriptions.get(channel));
+ broadcast(connections, channel, message);
+ increment(totalAnnouncements);
+ }
+
+ /**
+ * Broadcast a message to connections subscribed to the specified channel.
+ *
+ * @param connections
+ * @param channel
+ * @param message
+ */
+ protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {
+ for (FanoutServiceConnection connection : connections) {
+ reply(connection, channel, message);
+ }
+ }
+
+ /**
+ * Process an incoming Fanout request.
+ *
+ * @param connection
+ * @param req
+ * @return the reply to the request, may be null
+ */
+ protected String processRequest(FanoutServiceConnection connection, String req) {
+ logger.info(MessageFormat.format("fanout request from {0}: {1}", connection.id, req));
+ String[] fields = req.split(" ", 3);
+ String action = fields[0];
+ String channel = fields.length >= 2 ? fields[1] : null;
+ String message = fields.length >= 3 ? fields[2] : null;
+ try {
+ return processRequest(connection, action, channel, message);
+ } catch (IllegalArgumentException e) {
+ // invalid action
+ logger.error(MessageFormat.format("fanout connection {0} requested invalid action {1}", connection.id, action));
+ logger.error(asHexArray(req));
+ }
+ return null;
+ }
+
+ /**
+ * Process the Fanout request.
+ *
+ * @param connection
+ * @param action
+ * @param channel
+ * @param message
+ * @return the reply to the request, may be null
+ * @throws IllegalArgumentException
+ */
+ protected String processRequest(FanoutServiceConnection connection, String action, String channel, String message) throws IllegalArgumentException {
+ if ("ping".equals(action)) {
+ // ping
+ increment(totalPings);
+ return reply(connection, null, "" + System.currentTimeMillis());
+ } else if ("info".equals(action)) {
+ // info
+ String info = getStatistics().info();
+ return reply(connection, null, info);
+ } else if ("announce".equals(action)) {
+ // announcement
+ if (!allowAllChannelAnnouncements.get() && FanoutConstants.CH_ALL.equals(channel)) {
+ // prohibiting connection-sourced all announcements
+ logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on ALL channel", connection.id, message));
+ } else if ("debug".equals(channel)) {
+ // prohibiting connection-sourced debug announcements
+ logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on DEBUG channel", connection.id, message));
+ } else {
+ // acceptable announcement
+ List<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>(subscriptions.get(channel));
+ connections.remove(connection); // remove announcer
+ broadcast(connections, channel, message);
+ increment(totalAnnouncements);
+ }
+ } else if ("subscribe".equals(action)) {
+ // subscribe
+ if (!subscriptions.containsKey(channel)) {
+ logger.info(MessageFormat.format("fanout new channel {0}", channel));
+ subscriptions.put(channel, new ConcurrentSkipListSet<FanoutServiceConnection>());
+ }
+ subscriptions.get(channel).add(connection);
+ logger.debug(MessageFormat.format("fanout connection {0} subscribed to channel {1}", connection.id, channel));
+ increment(totalSubscribes);
+ } else if ("unsubscribe".equals(action)) {
+ // unsubscribe
+ if (subscriptions.containsKey(channel)) {
+ subscriptions.get(channel).remove(connection);
+ if (subscriptions.get(channel).size() == 0) {
+ subscriptions.remove(channel);
+ }
+ increment(totalUnsubscribes);
+ }
+ } else {
+ // invalid action
+ throw new IllegalArgumentException(action);
+ }
+ return null;
+ }
+
+ private String asHexArray(String req) {
+ StringBuilder sb = new StringBuilder();
+ for (char c : req.toCharArray()) {
+ sb.append(Integer.toHexString(c)).append(' ');
+ }
+ return "[ " + sb.toString().trim() + " ]";
+ }
+
+ /**
+ * Increment a long and prevent negative rollover.
+ *
+ * @param counter
+ */
+ private void increment(AtomicLong counter) {
+ long v = counter.incrementAndGet();
+ if (v < 0) {
+ counter.set(0);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+} \ No newline at end of file