From 699e71e76b15081baf746c6ce9c9144f7e5f1ff9 Mon Sep 17 00:00:00 2001 From: James Moger Date: Mon, 30 Sep 2013 09:30:04 -0400 Subject: Trim trailing whitespace and organize imports Change-Id: I9f91138b20219be6e3c4b28251487df262bff6cc --- src/main/java/com/gitblit/fanout/FanoutClient.java | 64 +++++----- .../java/com/gitblit/fanout/FanoutConstants.java | 4 +- .../java/com/gitblit/fanout/FanoutNioService.java | 43 ++++--- .../java/com/gitblit/fanout/FanoutService.java | 142 ++++++++++----------- .../gitblit/fanout/FanoutServiceConnection.java | 18 +-- .../com/gitblit/fanout/FanoutSocketService.java | 26 ++-- src/main/java/com/gitblit/fanout/FanoutStats.java | 16 +-- 7 files changed, 158 insertions(+), 155 deletions(-) (limited to 'src/main/java/com/gitblit/fanout') diff --git a/src/main/java/com/gitblit/fanout/FanoutClient.java b/src/main/java/com/gitblit/fanout/FanoutClient.java index b9ace4be..a676abcd 100644 --- a/src/main/java/com/gitblit/fanout/FanoutClient.java +++ b/src/main/java/com/gitblit/fanout/FanoutClient.java @@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory; /** * Fanout client class. - * + * * @author James Moger * */ @@ -57,24 +57,26 @@ public class FanoutClient implements Runnable { private volatile Selector selector; private volatile SocketChannel socketCh; private Thread clientThread; - + private final AtomicBoolean isConnected; private final AtomicBoolean isRunning; private final AtomicBoolean isAutomaticReconnect; private final ByteBuffer writeBuffer; private final ByteBuffer readBuffer; private final CharsetDecoder decoder; - + private final Set subscriptions; private boolean resubscribe; - + public interface FanoutListener { public void pong(Date timestamp); public void announcement(String channel, String message); } - + public static class FanoutAdapter implements FanoutListener { + @Override public void pong(Date timestamp) { } + @Override public void announcement(String channel, String message) { } } @@ -86,20 +88,20 @@ public class FanoutClient implements Runnable { public void pong(Date timestamp) { System.out.println("Pong. " + timestamp); } - + @Override public void announcement(String channel, String message) { System.out.println(MessageFormat.format("Here ye, Here ye. {0} says {1}", channel, message)); } }); client.start(); - + Thread.sleep(5000); client.ping(); client.subscribe("james"); - client.announce("james", "12345"); + client.announce("james", "12345"); client.subscribe("c52f99d16eb5627877ae957df7ce1be102783bd5"); - + while (true) { Thread.sleep(10000); client.ping(); @@ -126,11 +128,11 @@ public class FanoutClient implements Runnable { public void removeListener(FanoutListener listener) { listeners.remove(listener); } - + public boolean isAutomaticReconnect() { return isAutomaticReconnect.get(); } - + public void setAutomaticReconnect(boolean value) { isAutomaticReconnect.set(value); } @@ -144,21 +146,21 @@ public class FanoutClient implements Runnable { confirmConnection(); write("status"); } - + public void subscribe(String channel) { confirmConnection(); if (subscriptions.add(channel)) { write("subscribe " + channel); } } - + public void unsubscribe(String channel) { confirmConnection(); if (subscriptions.remove(channel)) { write("unsubscribe " + channel); } } - + public void announce(String channel, String message) { confirmConnection(); write("announce " + channel + " " + message); @@ -169,11 +171,11 @@ public class FanoutClient implements Runnable { throw new RuntimeException("Fanout client is disconnected!"); } } - + public boolean isConnected() { return isRunning.get() && socketCh != null && isConnected.get(); } - + /** * Start client connection and return immediately. */ @@ -185,13 +187,13 @@ public class FanoutClient implements Runnable { clientThread = new Thread(this, "Fanout client"); clientThread.start(); } - + /** * Start client connection and wait until it has connected. */ public void startSynchronously() { start(); - while (!isConnected()) { + while (!isConnected()) { try { Thread.sleep(100); } catch (Exception e) { @@ -221,8 +223,8 @@ public class FanoutClient implements Runnable { @Override public void run() { resetState(); - - isRunning.set(true); + + isRunning.set(true); while (isRunning.get()) { // (re)connect if (socketCh == null) { @@ -231,7 +233,7 @@ public class FanoutClient implements Runnable { socketCh = SocketChannel.open(new InetSocketAddress(addr, port)); socketCh.configureBlocking(false); selector = Selector.open(); - id = FanoutConstants.getLocalSocketId(socketCh.socket()); + id = FanoutConstants.getLocalSocketId(socketCh.socket()); socketCh.register(selector, SelectionKey.OP_READ); } catch (Exception e) { logger.error(MessageFormat.format("failed to open client connection to {0}:{1,number,0}", host, port), e); @@ -242,7 +244,7 @@ public class FanoutClient implements Runnable { continue; } } - + // read/write try { selector.select(clientTimeout); @@ -251,7 +253,7 @@ public class FanoutClient implements Runnable { while (i.hasNext()) { SelectionKey key = i.next(); i.remove(); - + if (key.isReadable()) { // read message String content = read(); @@ -266,7 +268,7 @@ public class FanoutClient implements Runnable { // resubscribe if (resubscribe) { resubscribe = false; - logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size())); + logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size())); for (String subscription : subscriptions) { write("subscribe " + subscription); } @@ -276,25 +278,25 @@ public class FanoutClient implements Runnable { } } catch (IOException e) { logger.error(MessageFormat.format("fanout client {0} error: {1}", id, e.getMessage())); - closeChannel(); + closeChannel(); if (!isAutomaticReconnect.get()) { isRunning.set(false); continue; } } } - + closeChannel(); resetState(); } - + protected void resetState() { readBuffer.clear(); writeBuffer.clear(); isRunning.set(false); isConnected.set(false); } - + private void closeChannel() { try { if (socketCh != null) { @@ -315,7 +317,7 @@ public class FanoutClient implements Runnable { long time = Long.parseLong(fields[0]); Date date = new Date(time); firePong(date); - } catch (Exception e) { + } catch (Exception e) { } return true; } else if (fields.length == 2) { @@ -366,7 +368,7 @@ public class FanoutClient implements Runnable { } } } - + protected synchronized String read() throws IOException { readBuffer.clear(); long len = socketCh.read(readBuffer); @@ -382,7 +384,7 @@ public class FanoutClient implements Runnable { return content; } } - + protected synchronized boolean write(String message) { try { logger.info(MessageFormat.format("fanout client {0} > {1}", id, message)); diff --git a/src/main/java/com/gitblit/fanout/FanoutConstants.java b/src/main/java/com/gitblit/fanout/FanoutConstants.java index 6e6964c9..84fd2c59 100644 --- a/src/main/java/com/gitblit/fanout/FanoutConstants.java +++ b/src/main/java/com/gitblit/fanout/FanoutConstants.java @@ -25,11 +25,11 @@ public class FanoutConstants { public final static String CH_DEBUG = "debug"; public final static String MSG_CONNECTED = "connected..."; public final static String MSG_BUSY = "busy"; - + public static String getRemoteSocketId(Socket socket) { return socket.getInetAddress().getHostAddress() + ":" + socket.getPort(); } - + public static String getLocalSocketId(Socket socket) { return socket.getInetAddress().getHostAddress() + ":" + socket.getLocalPort(); } diff --git a/src/main/java/com/gitblit/fanout/FanoutNioService.java b/src/main/java/com/gitblit/fanout/FanoutNioService.java index 65d022ab..e7aff34b 100644 --- a/src/main/java/com/gitblit/fanout/FanoutNioService.java +++ b/src/main/java/com/gitblit/fanout/FanoutNioService.java @@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory; * * This implementation uses channels and selectors, which are the Java analog of * the Linux epoll mechanism used in the original fanout C code. - * + * * @author James Moger * */ @@ -54,7 +54,7 @@ public class FanoutNioService extends FanoutService { private volatile ServerSocketChannel serviceCh; private volatile Selector selector; - + public static void main(String[] args) throws Exception { FanoutNioService pubsub = new FanoutNioService(null, DEFAULT_PORT); pubsub.setStrictRequestTermination(false); @@ -64,7 +64,7 @@ public class FanoutNioService extends FanoutService { /** * Create a single-threaded fanout service. - * + * * @param host * @param port * the port for running the fanout PubSub service @@ -73,10 +73,10 @@ public class FanoutNioService extends FanoutService { public FanoutNioService(int port) { this(null, port); } - + /** * Create a single-threaded fanout service. - * + * * @param bindInterface * the ip address to bind for the service, may be null * @param port @@ -86,7 +86,7 @@ public class FanoutNioService extends FanoutService { public FanoutNioService(String bindInterface, int port) { super(bindInterface, port, "Fanout nio service"); } - + @Override protected boolean isConnected() { return serviceCh != null; @@ -102,10 +102,10 @@ public class FanoutNioService extends FanoutService { serviceCh.socket().bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port)); selector = Selector.open(); serviceCh.register(selector, SelectionKey.OP_ACCEPT); - logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}", + logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}", name, host == null ? "0.0.0.0" : host, port)); } catch (IOException e) { - logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}", + logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}", name, name, host == null ? "0.0.0.0" : host, port), e); return false; } @@ -122,11 +122,11 @@ public class FanoutNioService extends FanoutService { for (Map.Entry client : clients.entrySet()) { closeClientSocket(client.getKey(), client.getValue()); } - + // close service socket channel logger.debug(MessageFormat.format("closing {0} socket channel", name)); serviceCh.socket().close(); - serviceCh.close(); + serviceCh.close(); serviceCh = null; selector.close(); selector = null; @@ -142,7 +142,7 @@ public class FanoutNioService extends FanoutService { Set keys = selector.selectedKeys(); Iterator keyItr = keys.iterator(); while (keyItr.hasNext()) { - SelectionKey key = (SelectionKey) keyItr.next(); + SelectionKey key = keyItr.next(); if (key.isAcceptable()) { // new fanout client connection ServerSocketChannel sch = (ServerSocketChannel) key.channel(); @@ -213,7 +213,7 @@ public class FanoutNioService extends FanoutService { } } } - + protected void closeClientSocket(String id, SocketChannel ch) { try { ch.close(); @@ -221,10 +221,11 @@ public class FanoutNioService extends FanoutService { logger.error(MessageFormat.format("fanout connection {0}", id), e); } } - + + @Override protected void broadcast(Collection connections, String channel, String message) { super.broadcast(connections, channel, message); - + // register queued write Map sockets = getCurrentClientSockets(); for (FanoutServiceConnection connection : connections) { @@ -241,7 +242,7 @@ public class FanoutNioService extends FanoutService { } } } - + protected Map getCurrentClientSockets() { Map sockets = new HashMap(); for (SelectionKey key : selector.keys()) { @@ -253,11 +254,11 @@ public class FanoutNioService extends FanoutService { } return sockets; } - + /** * FanoutNioConnection handles reading/writing messages from a remote fanout * connection. - * + * * @author James Moger * */ @@ -276,7 +277,7 @@ public class FanoutNioService extends FanoutService { replyQueue = new ArrayList(); decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder(); } - + protected void read(SocketChannel ch, boolean strictRequestTermination) throws CharacterCodingException, IOException { long bytesRead = 0; readBuffer.clear(); @@ -293,7 +294,7 @@ public class FanoutNioService extends FanoutService { String [] lines = req.split(strictRequestTermination ? "\n" : "\n|\r"); requestQueue.addAll(Arrays.asList(lines)); } - + protected void write(SocketChannel ch) throws IOException { Iterator itr = replyQueue.iterator(); while (itr.hasNext()) { @@ -306,7 +307,7 @@ public class FanoutNioService extends FanoutService { writeBuffer.put((byte) 0xa); } writeBuffer.flip(); - + // loop until write buffer has been completely sent int written = 0; int toWrite = writeBuffer.remaining(); @@ -316,7 +317,7 @@ public class FanoutNioService extends FanoutService { Thread.sleep(10); } catch (Exception x) { } - } + } itr.remove(); } writeBuffer.clear(); diff --git a/src/main/java/com/gitblit/fanout/FanoutService.java b/src/main/java/com/gitblit/fanout/FanoutService.java index cbfd8a24..e0e4d64d 100644 --- a/src/main/java/com/gitblit/fanout/FanoutService.java +++ b/src/main/java/com/gitblit/fanout/FanoutService.java @@ -37,29 +37,29 @@ import org.slf4j.LoggerFactory; /** * Base class for Fanout service implementations. - * + * * Subclass implementations can be used as a Sparkleshare PubSub notification * server. This allows Sparkleshare to be used in conjunction with Gitblit * behind a corporate firewall that restricts or prohibits client internet access * to the default Sparkleshare PubSub server: notifications.sparkleshare.org - * + * * @author James Moger * */ public abstract class FanoutService implements Runnable { private final static Logger logger = LoggerFactory.getLogger(FanoutService.class); - + public final static int DEFAULT_PORT = 17000; - + protected final static int serviceTimeout = 5000; protected final String host; protected final int port; - protected final String name; - + protected final String name; + private Thread serviceThread; - + private final Map connections; private final Map> subscriptions; @@ -67,7 +67,7 @@ public abstract class FanoutService implements Runnable { private final AtomicBoolean strictRequestTermination; private final AtomicBoolean allowAllChannelAnnouncements; private final AtomicInteger concurrentConnectionLimit; - + private final Date bootDate; private final AtomicLong rejectedConnectionCount; private final AtomicInteger peakConnectionCount; @@ -82,16 +82,16 @@ public abstract class FanoutService implements Runnable { this.host = host; this.port = port; this.name = name; - + connections = new ConcurrentHashMap(); subscriptions = new ConcurrentHashMap>(); subscriptions.put(FanoutConstants.CH_ALL, new ConcurrentSkipListSet()); - + isRunning = new AtomicBoolean(false); strictRequestTermination = new AtomicBoolean(false); allowAllChannelAnnouncements = new AtomicBoolean(false); concurrentConnectionLimit = new AtomicInteger(0); - + bootDate = new Date(); rejectedConnectionCount = new AtomicLong(0); peakConnectionCount = new AtomicInteger(0); @@ -106,18 +106,18 @@ public abstract class FanoutService implements Runnable { /* * Abstract methods */ - + protected abstract boolean isConnected(); - + protected abstract boolean connect(); - + protected abstract void listen() throws IOException; - + protected abstract void disconnect(); - + /** * Returns true if the service requires \n request termination. - * + * * @return true if request requires \n termination */ public boolean isStrictRequestTermination() { @@ -128,62 +128,62 @@ public abstract class FanoutService implements Runnable { * Control the termination of fanout requests. If true, fanout requests must * be terminated with \n. If false, fanout requests may be terminated with * \n, \r, \r\n, or \n\r. This is useful for debugging with a telnet client. - * + * * @param isStrictTermination */ public void setStrictRequestTermination(boolean isStrictTermination) { strictRequestTermination.set(isStrictTermination); } - + /** * Returns the maximum allowable concurrent fanout connections. - * + * * @return the maximum allowable concurrent connection count */ public int getConcurrentConnectionLimit() { return concurrentConnectionLimit.get(); } - + /** * Sets the maximum allowable concurrent fanout connection count. - * + * * @param value */ public void setConcurrentConnectionLimit(int value) { concurrentConnectionLimit.set(value); } - + /** * Returns true if connections are allowed to announce on the all channel. - * + * * @return true if connections are allowed to announce on the all channel */ public boolean allowAllChannelAnnouncements() { return allowAllChannelAnnouncements.get(); } - + /** * Allows/prohibits connections from announcing on the ALL channel. - * + * * @param value */ public void setAllowAllChannelAnnouncements(boolean value) { allowAllChannelAnnouncements.set(value); } - + /** * Returns the current connections - * + * * @param channel * @return map of current connections keyed by their id */ public Map getCurrentConnections() { return connections; } - + /** * Returns all subscriptions - * + * * @return map of current subscriptions keyed by channel name */ public Map> getCurrentSubscriptions() { @@ -192,7 +192,7 @@ public abstract class FanoutService implements Runnable { /** * Returns the subscriptions for the specified channel - * + * * @param channel * @return set of subscribed connections for the specified channel */ @@ -202,17 +202,17 @@ public abstract class FanoutService implements Runnable { /** * Returns the runtime statistics object for this service. - * + * * @return stats */ public FanoutStats getStatistics() { FanoutStats stats = new FanoutStats(); - + // settings stats.allowAllChannelAnnouncements = allowAllChannelAnnouncements(); stats.concurrentConnectionLimit = getConcurrentConnectionLimit(); stats.strictRequestTermination = isStrictRequestTermination(); - + // runtime stats stats.bootDate = bootDate; stats.rejectedConnectionCount = rejectedConnectionCount.get(); @@ -222,16 +222,16 @@ public abstract class FanoutService implements Runnable { stats.totalMessages = totalMessages.get(); stats.totalSubscribes = totalSubscribes.get(); stats.totalUnsubscribes = totalUnsubscribes.get(); - stats.totalPings = totalPings.get(); + stats.totalPings = totalPings.get(); stats.currentConnections = connections.size(); stats.currentChannels = subscriptions.size(); stats.currentSubscriptions = subscriptions.size() * connections.size(); return stats; } - + /** * Returns true if the service is ready. - * + * * @return true, if the service is ready */ public boolean isReady() { @@ -243,7 +243,7 @@ public abstract class FanoutService implements Runnable { /** * Start the Fanout service thread and immediatel return. - * + * */ public void start() { if (isRunning.get()) { @@ -254,10 +254,10 @@ public abstract class FanoutService implements Runnable { serviceThread.setName(MessageFormat.format("{0} {1}:{2,number,0}", name, host == null ? "all" : host, port)); serviceThread.start(); } - + /** * Start the Fanout service thread and wait until it is accepting connections. - * + * */ public void startSynchronously() { start(); @@ -268,7 +268,7 @@ public abstract class FanoutService implements Runnable { } } } - + /** * Stop the Fanout service. This method returns when the service has been * completely shutdown. @@ -290,7 +290,7 @@ public abstract class FanoutService implements Runnable { } logger.info(MessageFormat.format("stopped {0}", name)); } - + /** * Main execution method of the service */ @@ -314,10 +314,10 @@ public abstract class FanoutService implements Runnable { } } } - disconnect(); + disconnect(); resetState(); } - + protected void resetState() { // reset state data connections.clear(); @@ -334,23 +334,23 @@ public abstract class FanoutService implements Runnable { /** * Configure the client connection socket. - * + * * @param socket * @throws SocketException */ protected void configureClientSocket(Socket socket) throws SocketException { - socket.setKeepAlive(true); + socket.setKeepAlive(true); socket.setSoLinger(true, 0); // immediately discard any remaining data } - + /** * Add the connection to the connections map. - * + * * @param connection * @return false if the connection was rejected due to too many concurrent * connections */ - protected boolean addConnection(FanoutServiceConnection connection) { + protected boolean addConnection(FanoutServiceConnection connection) { int limit = getConcurrentConnectionLimit(); if (limit > 0 && connections.size() > limit) { logger.info(MessageFormat.format("hit {0,number,0} connection limit, rejecting fanout connection", concurrentConnectionLimit)); @@ -358,7 +358,7 @@ public abstract class FanoutService implements Runnable { connection.busy(); return false; } - + // add the connection to our map connections.put(connection.id, connection); @@ -371,10 +371,10 @@ public abstract class FanoutService implements Runnable { connection.connected(); return true; } - + /** * Remove the connection from the connections list and from subscriptions. - * + * * @param connection */ protected void removeConnection(FanoutServiceConnection connection) { @@ -393,46 +393,46 @@ public abstract class FanoutService implements Runnable { } logger.info(MessageFormat.format("fanout connection {0} removed", connection.id)); } - + /** * Tests to see if the connection is being monitored by the service. - * + * * @param connection * @return true if the service is monitoring the connection */ protected boolean hasConnection(FanoutServiceConnection connection) { return connections.containsKey(connection.id); } - + /** * Reply to a connection on the specified channel. - * + * * @param connection * @param channel * @param message * @return the reply */ - protected String reply(FanoutServiceConnection connection, String channel, String message) { + protected String reply(FanoutServiceConnection connection, String channel, String message) { if (channel != null && channel.length() > 0) { increment(totalMessages); } return connection.reply(channel, message); } - + /** * Service method to broadcast a message to all connections. - * + * * @param message */ public void broadcastAll(String message) { broadcast(connections.values(), FanoutConstants.CH_ALL, message); increment(totalAnnouncements); } - + /** * Service method to broadcast a message to connections subscribed to the * channel. - * + * * @param message */ public void broadcast(String channel, String message) { @@ -440,10 +440,10 @@ public abstract class FanoutService implements Runnable { broadcast(connections, channel, message); increment(totalAnnouncements); } - + /** * Broadcast a message to connections subscribed to the specified channel. - * + * * @param connections * @param channel * @param message @@ -453,10 +453,10 @@ public abstract class FanoutService implements Runnable { reply(connection, channel, message); } } - + /** * Process an incoming Fanout request. - * + * * @param connection * @param req * @return the reply to the request, may be null @@ -476,10 +476,10 @@ public abstract class FanoutService implements Runnable { } return null; } - + /** * Process the Fanout request. - * + * * @param connection * @param action * @param channel @@ -535,7 +535,7 @@ public abstract class FanoutService implements Runnable { } return null; } - + private String asHexArray(String req) { StringBuilder sb = new StringBuilder(); for (char c : req.toCharArray()) { @@ -543,10 +543,10 @@ public abstract class FanoutService implements Runnable { } return "[ " + sb.toString().trim() + " ]"; } - + /** * Increment a long and prevent negative rollover. - * + * * @param counter */ private void increment(AtomicLong counter) { @@ -555,7 +555,7 @@ public abstract class FanoutService implements Runnable { counter.set(0); } } - + @Override public String toString() { return name; diff --git a/src/main/java/com/gitblit/fanout/FanoutServiceConnection.java b/src/main/java/com/gitblit/fanout/FanoutServiceConnection.java index f7f2c959..2515ab1d 100644 --- a/src/main/java/com/gitblit/fanout/FanoutServiceConnection.java +++ b/src/main/java/com/gitblit/fanout/FanoutServiceConnection.java @@ -24,14 +24,14 @@ import org.slf4j.LoggerFactory; /** * FanoutServiceConnection handles reading/writing messages from a remote fanout * connection. - * + * * @author James Moger - * + * */ public abstract class FanoutServiceConnection implements Comparable { - + private static final Logger logger = LoggerFactory.getLogger(FanoutServiceConnection.class); - + public final String id; protected FanoutServiceConnection(Socket socket) { @@ -42,25 +42,25 @@ public abstract class FanoutServiceConnection implements Comparable