diff options
author | James Moger <james.moger@gitblit.com> | 2013-09-30 09:30:04 -0400 |
---|---|---|
committer | James Moger <james.moger@gitblit.com> | 2013-09-30 10:11:28 -0400 |
commit | 699e71e76b15081baf746c6ce9c9144f7e5f1ff9 (patch) | |
tree | 4a9ea25c258caeae3dea4bc1de809f47bc615d81 /src/main/java/com/gitblit/fanout | |
parent | 235ad956fa84cad4fac1b2e69a0c9e4f50376ea3 (diff) | |
download | gitblit-699e71e76b15081baf746c6ce9c9144f7e5f1ff9.tar.gz gitblit-699e71e76b15081baf746c6ce9c9144f7e5f1ff9.zip |
Trim trailing whitespace and organize imports
Change-Id: I9f91138b20219be6e3c4b28251487df262bff6cc
Diffstat (limited to 'src/main/java/com/gitblit/fanout')
7 files changed, 158 insertions, 155 deletions
diff --git a/src/main/java/com/gitblit/fanout/FanoutClient.java b/src/main/java/com/gitblit/fanout/FanoutClient.java index b9ace4be..a676abcd 100644 --- a/src/main/java/com/gitblit/fanout/FanoutClient.java +++ b/src/main/java/com/gitblit/fanout/FanoutClient.java @@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory; /**
* Fanout client class.
- *
+ *
* @author James Moger
*
*/
@@ -57,24 +57,26 @@ public class FanoutClient implements Runnable { private volatile Selector selector;
private volatile SocketChannel socketCh;
private Thread clientThread;
-
+
private final AtomicBoolean isConnected;
private final AtomicBoolean isRunning;
private final AtomicBoolean isAutomaticReconnect;
private final ByteBuffer writeBuffer;
private final ByteBuffer readBuffer;
private final CharsetDecoder decoder;
-
+
private final Set<String> subscriptions;
private boolean resubscribe;
-
+
public interface FanoutListener {
public void pong(Date timestamp);
public void announcement(String channel, String message);
}
-
+
public static class FanoutAdapter implements FanoutListener {
+ @Override
public void pong(Date timestamp) { }
+ @Override
public void announcement(String channel, String message) { }
}
@@ -86,20 +88,20 @@ public class FanoutClient implements Runnable { public void pong(Date timestamp) {
System.out.println("Pong. " + timestamp);
}
-
+
@Override
public void announcement(String channel, String message) {
System.out.println(MessageFormat.format("Here ye, Here ye. {0} says {1}", channel, message));
}
});
client.start();
-
+
Thread.sleep(5000);
client.ping();
client.subscribe("james");
- client.announce("james", "12345");
+ client.announce("james", "12345");
client.subscribe("c52f99d16eb5627877ae957df7ce1be102783bd5");
-
+
while (true) {
Thread.sleep(10000);
client.ping();
@@ -126,11 +128,11 @@ public class FanoutClient implements Runnable { public void removeListener(FanoutListener listener) {
listeners.remove(listener);
}
-
+
public boolean isAutomaticReconnect() {
return isAutomaticReconnect.get();
}
-
+
public void setAutomaticReconnect(boolean value) {
isAutomaticReconnect.set(value);
}
@@ -144,21 +146,21 @@ public class FanoutClient implements Runnable { confirmConnection();
write("status");
}
-
+
public void subscribe(String channel) {
confirmConnection();
if (subscriptions.add(channel)) {
write("subscribe " + channel);
}
}
-
+
public void unsubscribe(String channel) {
confirmConnection();
if (subscriptions.remove(channel)) {
write("unsubscribe " + channel);
}
}
-
+
public void announce(String channel, String message) {
confirmConnection();
write("announce " + channel + " " + message);
@@ -169,11 +171,11 @@ public class FanoutClient implements Runnable { throw new RuntimeException("Fanout client is disconnected!");
}
}
-
+
public boolean isConnected() {
return isRunning.get() && socketCh != null && isConnected.get();
}
-
+
/**
* Start client connection and return immediately.
*/
@@ -185,13 +187,13 @@ public class FanoutClient implements Runnable { clientThread = new Thread(this, "Fanout client");
clientThread.start();
}
-
+
/**
* Start client connection and wait until it has connected.
*/
public void startSynchronously() {
start();
- while (!isConnected()) {
+ while (!isConnected()) {
try {
Thread.sleep(100);
} catch (Exception e) {
@@ -221,8 +223,8 @@ public class FanoutClient implements Runnable { @Override
public void run() {
resetState();
-
- isRunning.set(true);
+
+ isRunning.set(true);
while (isRunning.get()) {
// (re)connect
if (socketCh == null) {
@@ -231,7 +233,7 @@ public class FanoutClient implements Runnable { socketCh = SocketChannel.open(new InetSocketAddress(addr, port));
socketCh.configureBlocking(false);
selector = Selector.open();
- id = FanoutConstants.getLocalSocketId(socketCh.socket());
+ id = FanoutConstants.getLocalSocketId(socketCh.socket());
socketCh.register(selector, SelectionKey.OP_READ);
} catch (Exception e) {
logger.error(MessageFormat.format("failed to open client connection to {0}:{1,number,0}", host, port), e);
@@ -242,7 +244,7 @@ public class FanoutClient implements Runnable { continue;
}
}
-
+
// read/write
try {
selector.select(clientTimeout);
@@ -251,7 +253,7 @@ public class FanoutClient implements Runnable { while (i.hasNext()) {
SelectionKey key = i.next();
i.remove();
-
+
if (key.isReadable()) {
// read message
String content = read();
@@ -266,7 +268,7 @@ public class FanoutClient implements Runnable { // resubscribe
if (resubscribe) {
resubscribe = false;
- logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size()));
+ logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size()));
for (String subscription : subscriptions) {
write("subscribe " + subscription);
}
@@ -276,25 +278,25 @@ public class FanoutClient implements Runnable { }
} catch (IOException e) {
logger.error(MessageFormat.format("fanout client {0} error: {1}", id, e.getMessage()));
- closeChannel();
+ closeChannel();
if (!isAutomaticReconnect.get()) {
isRunning.set(false);
continue;
}
}
}
-
+
closeChannel();
resetState();
}
-
+
protected void resetState() {
readBuffer.clear();
writeBuffer.clear();
isRunning.set(false);
isConnected.set(false);
}
-
+
private void closeChannel() {
try {
if (socketCh != null) {
@@ -315,7 +317,7 @@ public class FanoutClient implements Runnable { long time = Long.parseLong(fields[0]);
Date date = new Date(time);
firePong(date);
- } catch (Exception e) {
+ } catch (Exception e) {
}
return true;
} else if (fields.length == 2) {
@@ -366,7 +368,7 @@ public class FanoutClient implements Runnable { }
}
}
-
+
protected synchronized String read() throws IOException {
readBuffer.clear();
long len = socketCh.read(readBuffer);
@@ -382,7 +384,7 @@ public class FanoutClient implements Runnable { return content;
}
}
-
+
protected synchronized boolean write(String message) {
try {
logger.info(MessageFormat.format("fanout client {0} > {1}", id, message));
diff --git a/src/main/java/com/gitblit/fanout/FanoutConstants.java b/src/main/java/com/gitblit/fanout/FanoutConstants.java index 6e6964c9..84fd2c59 100644 --- a/src/main/java/com/gitblit/fanout/FanoutConstants.java +++ b/src/main/java/com/gitblit/fanout/FanoutConstants.java @@ -25,11 +25,11 @@ public class FanoutConstants { public final static String CH_DEBUG = "debug";
public final static String MSG_CONNECTED = "connected...";
public final static String MSG_BUSY = "busy";
-
+
public static String getRemoteSocketId(Socket socket) {
return socket.getInetAddress().getHostAddress() + ":" + socket.getPort();
}
-
+
public static String getLocalSocketId(Socket socket) {
return socket.getInetAddress().getHostAddress() + ":" + socket.getLocalPort();
}
diff --git a/src/main/java/com/gitblit/fanout/FanoutNioService.java b/src/main/java/com/gitblit/fanout/FanoutNioService.java index 65d022ab..e7aff34b 100644 --- a/src/main/java/com/gitblit/fanout/FanoutNioService.java +++ b/src/main/java/com/gitblit/fanout/FanoutNioService.java @@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory; *
* This implementation uses channels and selectors, which are the Java analog of
* the Linux epoll mechanism used in the original fanout C code.
- *
+ *
* @author James Moger
*
*/
@@ -54,7 +54,7 @@ public class FanoutNioService extends FanoutService { private volatile ServerSocketChannel serviceCh;
private volatile Selector selector;
-
+
public static void main(String[] args) throws Exception {
FanoutNioService pubsub = new FanoutNioService(null, DEFAULT_PORT);
pubsub.setStrictRequestTermination(false);
@@ -64,7 +64,7 @@ public class FanoutNioService extends FanoutService { /**
* Create a single-threaded fanout service.
- *
+ *
* @param host
* @param port
* the port for running the fanout PubSub service
@@ -73,10 +73,10 @@ public class FanoutNioService extends FanoutService { public FanoutNioService(int port) {
this(null, port);
}
-
+
/**
* Create a single-threaded fanout service.
- *
+ *
* @param bindInterface
* the ip address to bind for the service, may be null
* @param port
@@ -86,7 +86,7 @@ public class FanoutNioService extends FanoutService { public FanoutNioService(String bindInterface, int port) {
super(bindInterface, port, "Fanout nio service");
}
-
+
@Override
protected boolean isConnected() {
return serviceCh != null;
@@ -102,10 +102,10 @@ public class FanoutNioService extends FanoutService { serviceCh.socket().bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));
selector = Selector.open();
serviceCh.register(selector, SelectionKey.OP_ACCEPT);
- logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}",
+ logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}",
name, host == null ? "0.0.0.0" : host, port));
} catch (IOException e) {
- logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}",
+ logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}",
name, name, host == null ? "0.0.0.0" : host, port), e);
return false;
}
@@ -122,11 +122,11 @@ public class FanoutNioService extends FanoutService { for (Map.Entry<String, SocketChannel> client : clients.entrySet()) {
closeClientSocket(client.getKey(), client.getValue());
}
-
+
// close service socket channel
logger.debug(MessageFormat.format("closing {0} socket channel", name));
serviceCh.socket().close();
- serviceCh.close();
+ serviceCh.close();
serviceCh = null;
selector.close();
selector = null;
@@ -142,7 +142,7 @@ public class FanoutNioService extends FanoutService { Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> keyItr = keys.iterator();
while (keyItr.hasNext()) {
- SelectionKey key = (SelectionKey) keyItr.next();
+ SelectionKey key = keyItr.next();
if (key.isAcceptable()) {
// new fanout client connection
ServerSocketChannel sch = (ServerSocketChannel) key.channel();
@@ -213,7 +213,7 @@ public class FanoutNioService extends FanoutService { }
}
}
-
+
protected void closeClientSocket(String id, SocketChannel ch) {
try {
ch.close();
@@ -221,10 +221,11 @@ public class FanoutNioService extends FanoutService { logger.error(MessageFormat.format("fanout connection {0}", id), e);
}
}
-
+
+ @Override
protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {
super.broadcast(connections, channel, message);
-
+
// register queued write
Map<String, SocketChannel> sockets = getCurrentClientSockets();
for (FanoutServiceConnection connection : connections) {
@@ -241,7 +242,7 @@ public class FanoutNioService extends FanoutService { }
}
}
-
+
protected Map<String, SocketChannel> getCurrentClientSockets() {
Map<String, SocketChannel> sockets = new HashMap<String, SocketChannel>();
for (SelectionKey key : selector.keys()) {
@@ -253,11 +254,11 @@ public class FanoutNioService extends FanoutService { }
return sockets;
}
-
+
/**
* FanoutNioConnection handles reading/writing messages from a remote fanout
* connection.
- *
+ *
* @author James Moger
*
*/
@@ -276,7 +277,7 @@ public class FanoutNioService extends FanoutService { replyQueue = new ArrayList<String>();
decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder();
}
-
+
protected void read(SocketChannel ch, boolean strictRequestTermination) throws CharacterCodingException, IOException {
long bytesRead = 0;
readBuffer.clear();
@@ -293,7 +294,7 @@ public class FanoutNioService extends FanoutService { String [] lines = req.split(strictRequestTermination ? "\n" : "\n|\r");
requestQueue.addAll(Arrays.asList(lines));
}
-
+
protected void write(SocketChannel ch) throws IOException {
Iterator<String> itr = replyQueue.iterator();
while (itr.hasNext()) {
@@ -306,7 +307,7 @@ public class FanoutNioService extends FanoutService { writeBuffer.put((byte) 0xa);
}
writeBuffer.flip();
-
+
// loop until write buffer has been completely sent
int written = 0;
int toWrite = writeBuffer.remaining();
@@ -316,7 +317,7 @@ public class FanoutNioService extends FanoutService { Thread.sleep(10);
} catch (Exception x) {
}
- }
+ }
itr.remove();
}
writeBuffer.clear();
diff --git a/src/main/java/com/gitblit/fanout/FanoutService.java b/src/main/java/com/gitblit/fanout/FanoutService.java index cbfd8a24..e0e4d64d 100644 --- a/src/main/java/com/gitblit/fanout/FanoutService.java +++ b/src/main/java/com/gitblit/fanout/FanoutService.java @@ -37,29 +37,29 @@ import org.slf4j.LoggerFactory; /**
* Base class for Fanout service implementations.
- *
+ *
* Subclass implementations can be used as a Sparkleshare PubSub notification
* server. This allows Sparkleshare to be used in conjunction with Gitblit
* behind a corporate firewall that restricts or prohibits client internet access
* to the default Sparkleshare PubSub server: notifications.sparkleshare.org
- *
+ *
* @author James Moger
*
*/
public abstract class FanoutService implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(FanoutService.class);
-
+
public final static int DEFAULT_PORT = 17000;
-
+
protected final static int serviceTimeout = 5000;
protected final String host;
protected final int port;
- protected final String name;
-
+ protected final String name;
+
private Thread serviceThread;
-
+
private final Map<String, FanoutServiceConnection> connections;
private final Map<String, Set<FanoutServiceConnection>> subscriptions;
@@ -67,7 +67,7 @@ public abstract class FanoutService implements Runnable { private final AtomicBoolean strictRequestTermination;
private final AtomicBoolean allowAllChannelAnnouncements;
private final AtomicInteger concurrentConnectionLimit;
-
+
private final Date bootDate;
private final AtomicLong rejectedConnectionCount;
private final AtomicInteger peakConnectionCount;
@@ -82,16 +82,16 @@ public abstract class FanoutService implements Runnable { this.host = host;
this.port = port;
this.name = name;
-
+
connections = new ConcurrentHashMap<String, FanoutServiceConnection>();
subscriptions = new ConcurrentHashMap<String, Set<FanoutServiceConnection>>();
subscriptions.put(FanoutConstants.CH_ALL, new ConcurrentSkipListSet<FanoutServiceConnection>());
-
+
isRunning = new AtomicBoolean(false);
strictRequestTermination = new AtomicBoolean(false);
allowAllChannelAnnouncements = new AtomicBoolean(false);
concurrentConnectionLimit = new AtomicInteger(0);
-
+
bootDate = new Date();
rejectedConnectionCount = new AtomicLong(0);
peakConnectionCount = new AtomicInteger(0);
@@ -106,18 +106,18 @@ public abstract class FanoutService implements Runnable { /*
* Abstract methods
*/
-
+
protected abstract boolean isConnected();
-
+
protected abstract boolean connect();
-
+
protected abstract void listen() throws IOException;
-
+
protected abstract void disconnect();
-
+
/**
* Returns true if the service requires \n request termination.
- *
+ *
* @return true if request requires \n termination
*/
public boolean isStrictRequestTermination() {
@@ -128,62 +128,62 @@ public abstract class FanoutService implements Runnable { * Control the termination of fanout requests. If true, fanout requests must
* be terminated with \n. If false, fanout requests may be terminated with
* \n, \r, \r\n, or \n\r. This is useful for debugging with a telnet client.
- *
+ *
* @param isStrictTermination
*/
public void setStrictRequestTermination(boolean isStrictTermination) {
strictRequestTermination.set(isStrictTermination);
}
-
+
/**
* Returns the maximum allowable concurrent fanout connections.
- *
+ *
* @return the maximum allowable concurrent connection count
*/
public int getConcurrentConnectionLimit() {
return concurrentConnectionLimit.get();
}
-
+
/**
* Sets the maximum allowable concurrent fanout connection count.
- *
+ *
* @param value
*/
public void setConcurrentConnectionLimit(int value) {
concurrentConnectionLimit.set(value);
}
-
+
/**
* Returns true if connections are allowed to announce on the all channel.
- *
+ *
* @return true if connections are allowed to announce on the all channel
*/
public boolean allowAllChannelAnnouncements() {
return allowAllChannelAnnouncements.get();
}
-
+
/**
* Allows/prohibits connections from announcing on the ALL channel.
- *
+ *
* @param value
*/
public void setAllowAllChannelAnnouncements(boolean value) {
allowAllChannelAnnouncements.set(value);
}
-
+
/**
* Returns the current connections
- *
+ *
* @param channel
* @return map of current connections keyed by their id
*/
public Map<String, FanoutServiceConnection> getCurrentConnections() {
return connections;
}
-
+
/**
* Returns all subscriptions
- *
+ *
* @return map of current subscriptions keyed by channel name
*/
public Map<String, Set<FanoutServiceConnection>> getCurrentSubscriptions() {
@@ -192,7 +192,7 @@ public abstract class FanoutService implements Runnable { /**
* Returns the subscriptions for the specified channel
- *
+ *
* @param channel
* @return set of subscribed connections for the specified channel
*/
@@ -202,17 +202,17 @@ public abstract class FanoutService implements Runnable { /**
* Returns the runtime statistics object for this service.
- *
+ *
* @return stats
*/
public FanoutStats getStatistics() {
FanoutStats stats = new FanoutStats();
-
+
// settings
stats.allowAllChannelAnnouncements = allowAllChannelAnnouncements();
stats.concurrentConnectionLimit = getConcurrentConnectionLimit();
stats.strictRequestTermination = isStrictRequestTermination();
-
+
// runtime stats
stats.bootDate = bootDate;
stats.rejectedConnectionCount = rejectedConnectionCount.get();
@@ -222,16 +222,16 @@ public abstract class FanoutService implements Runnable { stats.totalMessages = totalMessages.get();
stats.totalSubscribes = totalSubscribes.get();
stats.totalUnsubscribes = totalUnsubscribes.get();
- stats.totalPings = totalPings.get();
+ stats.totalPings = totalPings.get();
stats.currentConnections = connections.size();
stats.currentChannels = subscriptions.size();
stats.currentSubscriptions = subscriptions.size() * connections.size();
return stats;
}
-
+
/**
* Returns true if the service is ready.
- *
+ *
* @return true, if the service is ready
*/
public boolean isReady() {
@@ -243,7 +243,7 @@ public abstract class FanoutService implements Runnable { /**
* Start the Fanout service thread and immediatel return.
- *
+ *
*/
public void start() {
if (isRunning.get()) {
@@ -254,10 +254,10 @@ public abstract class FanoutService implements Runnable { serviceThread.setName(MessageFormat.format("{0} {1}:{2,number,0}", name, host == null ? "all" : host, port));
serviceThread.start();
}
-
+
/**
* Start the Fanout service thread and wait until it is accepting connections.
- *
+ *
*/
public void startSynchronously() {
start();
@@ -268,7 +268,7 @@ public abstract class FanoutService implements Runnable { }
}
}
-
+
/**
* Stop the Fanout service. This method returns when the service has been
* completely shutdown.
@@ -290,7 +290,7 @@ public abstract class FanoutService implements Runnable { }
logger.info(MessageFormat.format("stopped {0}", name));
}
-
+
/**
* Main execution method of the service
*/
@@ -314,10 +314,10 @@ public abstract class FanoutService implements Runnable { }
}
}
- disconnect();
+ disconnect();
resetState();
}
-
+
protected void resetState() {
// reset state data
connections.clear();
@@ -334,23 +334,23 @@ public abstract class FanoutService implements Runnable { /**
* Configure the client connection socket.
- *
+ *
* @param socket
* @throws SocketException
*/
protected void configureClientSocket(Socket socket) throws SocketException {
- socket.setKeepAlive(true);
+ socket.setKeepAlive(true);
socket.setSoLinger(true, 0); // immediately discard any remaining data
}
-
+
/**
* Add the connection to the connections map.
- *
+ *
* @param connection
* @return false if the connection was rejected due to too many concurrent
* connections
*/
- protected boolean addConnection(FanoutServiceConnection connection) {
+ protected boolean addConnection(FanoutServiceConnection connection) {
int limit = getConcurrentConnectionLimit();
if (limit > 0 && connections.size() > limit) {
logger.info(MessageFormat.format("hit {0,number,0} connection limit, rejecting fanout connection", concurrentConnectionLimit));
@@ -358,7 +358,7 @@ public abstract class FanoutService implements Runnable { connection.busy();
return false;
}
-
+
// add the connection to our map
connections.put(connection.id, connection);
@@ -371,10 +371,10 @@ public abstract class FanoutService implements Runnable { connection.connected();
return true;
}
-
+
/**
* Remove the connection from the connections list and from subscriptions.
- *
+ *
* @param connection
*/
protected void removeConnection(FanoutServiceConnection connection) {
@@ -393,46 +393,46 @@ public abstract class FanoutService implements Runnable { }
logger.info(MessageFormat.format("fanout connection {0} removed", connection.id));
}
-
+
/**
* Tests to see if the connection is being monitored by the service.
- *
+ *
* @param connection
* @return true if the service is monitoring the connection
*/
protected boolean hasConnection(FanoutServiceConnection connection) {
return connections.containsKey(connection.id);
}
-
+
/**
* Reply to a connection on the specified channel.
- *
+ *
* @param connection
* @param channel
* @param message
* @return the reply
*/
- protected String reply(FanoutServiceConnection connection, String channel, String message) {
+ protected String reply(FanoutServiceConnection connection, String channel, String message) {
if (channel != null && channel.length() > 0) {
increment(totalMessages);
}
return connection.reply(channel, message);
}
-
+
/**
* Service method to broadcast a message to all connections.
- *
+ *
* @param message
*/
public void broadcastAll(String message) {
broadcast(connections.values(), FanoutConstants.CH_ALL, message);
increment(totalAnnouncements);
}
-
+
/**
* Service method to broadcast a message to connections subscribed to the
* channel.
- *
+ *
* @param message
*/
public void broadcast(String channel, String message) {
@@ -440,10 +440,10 @@ public abstract class FanoutService implements Runnable { broadcast(connections, channel, message);
increment(totalAnnouncements);
}
-
+
/**
* Broadcast a message to connections subscribed to the specified channel.
- *
+ *
* @param connections
* @param channel
* @param message
@@ -453,10 +453,10 @@ public abstract class FanoutService implements Runnable { reply(connection, channel, message);
}
}
-
+
/**
* Process an incoming Fanout request.
- *
+ *
* @param connection
* @param req
* @return the reply to the request, may be null
@@ -476,10 +476,10 @@ public abstract class FanoutService implements Runnable { }
return null;
}
-
+
/**
* Process the Fanout request.
- *
+ *
* @param connection
* @param action
* @param channel
@@ -535,7 +535,7 @@ public abstract class FanoutService implements Runnable { }
return null;
}
-
+
private String asHexArray(String req) {
StringBuilder sb = new StringBuilder();
for (char c : req.toCharArray()) {
@@ -543,10 +543,10 @@ public abstract class FanoutService implements Runnable { }
return "[ " + sb.toString().trim() + " ]";
}
-
+
/**
* Increment a long and prevent negative rollover.
- *
+ *
* @param counter
*/
private void increment(AtomicLong counter) {
@@ -555,7 +555,7 @@ public abstract class FanoutService implements Runnable { counter.set(0);
}
}
-
+
@Override
public String toString() {
return name;
diff --git a/src/main/java/com/gitblit/fanout/FanoutServiceConnection.java b/src/main/java/com/gitblit/fanout/FanoutServiceConnection.java index f7f2c959..2515ab1d 100644 --- a/src/main/java/com/gitblit/fanout/FanoutServiceConnection.java +++ b/src/main/java/com/gitblit/fanout/FanoutServiceConnection.java @@ -24,14 +24,14 @@ import org.slf4j.LoggerFactory; /**
* FanoutServiceConnection handles reading/writing messages from a remote fanout
* connection.
- *
+ *
* @author James Moger
- *
+ *
*/
public abstract class FanoutServiceConnection implements Comparable<FanoutServiceConnection> {
-
+
private static final Logger logger = LoggerFactory.getLogger(FanoutServiceConnection.class);
-
+
public final String id;
protected FanoutServiceConnection(Socket socket) {
@@ -42,25 +42,25 @@ public abstract class FanoutServiceConnection implements Comparable<FanoutServic /**
* Send the connection a debug channel connected message.
- *
+ *
* @param message
*/
protected void connected() {
reply(FanoutConstants.CH_DEBUG, FanoutConstants.MSG_CONNECTED);
}
-
+
/**
* Send the connection a debug channel busy message.
- *
+ *
* @param message
*/
protected void busy() {
reply(FanoutConstants.CH_DEBUG, FanoutConstants.MSG_BUSY);
}
-
+
/**
* Send the connection a message for the specified channel.
- *
+ *
* @param channel
* @param message
* @return the reply
diff --git a/src/main/java/com/gitblit/fanout/FanoutSocketService.java b/src/main/java/com/gitblit/fanout/FanoutSocketService.java index 07c18f90..342b2774 100644 --- a/src/main/java/com/gitblit/fanout/FanoutSocketService.java +++ b/src/main/java/com/gitblit/fanout/FanoutSocketService.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; * This implementation creates a master acceptor thread which accepts incoming
* fanout connections and then spawns a daemon thread for each accepted connection.
* If there are 100 concurrent fanout connections, there are 101 threads.
- *
+ *
* @author James Moger
*
*/
@@ -50,10 +50,10 @@ public class FanoutSocketService extends FanoutService { pubsub.setAllowAllChannelAnnouncements(false);
pubsub.start();
}
-
+
/**
* Create a multi-threaded fanout service.
- *
+ *
* @param port
* the port for running the fanout PubSub service
* @throws IOException
@@ -64,7 +64,7 @@ public class FanoutSocketService extends FanoutService { /**
* Create a multi-threaded fanout service.
- *
+ *
* @param bindInterface
* the ip address to bind for the service, may be null
* @param port
@@ -74,12 +74,12 @@ public class FanoutSocketService extends FanoutService { public FanoutSocketService(String bindInterface, int port) {
super(bindInterface, port, "Fanout socket service");
}
-
+
@Override
protected boolean isConnected() {
return serviceSocket != null;
}
-
+
@Override
protected boolean connect() {
if (serviceSocket == null) {
@@ -88,7 +88,7 @@ public class FanoutSocketService extends FanoutService { serviceSocket.setReuseAddress(true);
serviceSocket.setSoTimeout(serviceTimeout);
serviceSocket.bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));
- logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}",
+ logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}",
name, host == null ? "0.0.0.0" : host, serviceSocket.getLocalPort()));
} catch (IOException e) {
logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}",
@@ -140,17 +140,17 @@ public class FanoutSocketService extends FanoutService { // ignore accept timeout exceptions
}
}
-
+
/**
* FanoutSocketConnection handles reading/writing messages from a remote fanout
* connection.
- *
+ *
* @author James Moger
*
*/
class FanoutSocketConnection extends FanoutServiceConnection implements Runnable {
Socket socket;
-
+
FanoutSocketConnection(Socket socket) {
super(socket);
this.socket = socket;
@@ -205,7 +205,7 @@ public class FanoutSocketService extends FanoutService { logger.info(MessageFormat.format("thread for fanout connection {0} is finished", id));
}
-
+
@Override
protected void reply(String content) throws IOException {
// synchronously send reply
@@ -218,7 +218,7 @@ public class FanoutSocketService extends FanoutService { }
os.flush();
}
-
+
protected void closeConnection() {
// close the connection socket
try {
@@ -226,7 +226,7 @@ public class FanoutSocketService extends FanoutService { } catch (IOException e) {
}
socket = null;
-
+
// remove this connection from the service
removeConnection(this);
}
diff --git a/src/main/java/com/gitblit/fanout/FanoutStats.java b/src/main/java/com/gitblit/fanout/FanoutStats.java index b06884d3..3545472f 100644 --- a/src/main/java/com/gitblit/fanout/FanoutStats.java +++ b/src/main/java/com/gitblit/fanout/FanoutStats.java @@ -21,18 +21,18 @@ import java.util.Date; /**
* Encapsulates the runtime stats of a fanout service.
- *
+ *
* @author James Moger
*
*/
public class FanoutStats implements Serializable {
private static final long serialVersionUID = 1L;
-
+
public long concurrentConnectionLimit;
public boolean allowAllChannelAnnouncements;
public boolean strictRequestTermination;
-
+
public Date bootDate;
public long rejectedConnectionCount;
public int peakConnectionCount;
@@ -45,7 +45,7 @@ public class FanoutStats implements Serializable { public long totalSubscribes;
public long totalUnsubscribes;
public long totalPings;
-
+
public String info() {
int i = 0;
StringBuilder sb = new StringBuilder();
@@ -67,14 +67,14 @@ public class FanoutStats implements Serializable { sb.append(infoInt(i++, "total pings"));
String template = sb.toString();
- String info = MessageFormat.format(template,
+ String info = MessageFormat.format(template,
bootDate.toString(),
Boolean.toString(strictRequestTermination),
Boolean.toString(allowAllChannelAnnouncements),
concurrentConnectionLimit,
rejectedConnectionCount,
peakConnectionCount,
- currentConnections,
+ currentConnections,
currentChannels,
currentSubscriptions,
currentSubscriptions == 0 ? 0 : (currentSubscriptions - currentConnections),
@@ -86,11 +86,11 @@ public class FanoutStats implements Serializable { totalPings);
return info;
}
-
+
private String infoStr(int index, String label) {
return label + ": {" + index + "}\n";
}
-
+
private String infoInt(int index, String label) {
return label + ": {" + index + ",number,0}\n";
}
|