diff options
author | James Moger <james.moger@gitblit.com> | 2013-09-30 09:30:04 -0400 |
---|---|---|
committer | James Moger <james.moger@gitblit.com> | 2013-09-30 10:11:28 -0400 |
commit | 699e71e76b15081baf746c6ce9c9144f7e5f1ff9 (patch) | |
tree | 4a9ea25c258caeae3dea4bc1de809f47bc615d81 /src/main/java/com/gitblit/fanout/FanoutService.java | |
parent | 235ad956fa84cad4fac1b2e69a0c9e4f50376ea3 (diff) | |
download | gitblit-699e71e76b15081baf746c6ce9c9144f7e5f1ff9.tar.gz gitblit-699e71e76b15081baf746c6ce9c9144f7e5f1ff9.zip |
Trim trailing whitespace and organize imports
Change-Id: I9f91138b20219be6e3c4b28251487df262bff6cc
Diffstat (limited to 'src/main/java/com/gitblit/fanout/FanoutService.java')
-rw-r--r-- | src/main/java/com/gitblit/fanout/FanoutService.java | 142 |
1 files changed, 71 insertions, 71 deletions
diff --git a/src/main/java/com/gitblit/fanout/FanoutService.java b/src/main/java/com/gitblit/fanout/FanoutService.java index cbfd8a24..e0e4d64d 100644 --- a/src/main/java/com/gitblit/fanout/FanoutService.java +++ b/src/main/java/com/gitblit/fanout/FanoutService.java @@ -37,29 +37,29 @@ import org.slf4j.LoggerFactory; /**
* Base class for Fanout service implementations.
- *
+ *
* Subclass implementations can be used as a Sparkleshare PubSub notification
* server. This allows Sparkleshare to be used in conjunction with Gitblit
* behind a corporate firewall that restricts or prohibits client internet access
* to the default Sparkleshare PubSub server: notifications.sparkleshare.org
- *
+ *
* @author James Moger
*
*/
public abstract class FanoutService implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(FanoutService.class);
-
+
public final static int DEFAULT_PORT = 17000;
-
+
protected final static int serviceTimeout = 5000;
protected final String host;
protected final int port;
- protected final String name;
-
+ protected final String name;
+
private Thread serviceThread;
-
+
private final Map<String, FanoutServiceConnection> connections;
private final Map<String, Set<FanoutServiceConnection>> subscriptions;
@@ -67,7 +67,7 @@ public abstract class FanoutService implements Runnable { private final AtomicBoolean strictRequestTermination;
private final AtomicBoolean allowAllChannelAnnouncements;
private final AtomicInteger concurrentConnectionLimit;
-
+
private final Date bootDate;
private final AtomicLong rejectedConnectionCount;
private final AtomicInteger peakConnectionCount;
@@ -82,16 +82,16 @@ public abstract class FanoutService implements Runnable { this.host = host;
this.port = port;
this.name = name;
-
+
connections = new ConcurrentHashMap<String, FanoutServiceConnection>();
subscriptions = new ConcurrentHashMap<String, Set<FanoutServiceConnection>>();
subscriptions.put(FanoutConstants.CH_ALL, new ConcurrentSkipListSet<FanoutServiceConnection>());
-
+
isRunning = new AtomicBoolean(false);
strictRequestTermination = new AtomicBoolean(false);
allowAllChannelAnnouncements = new AtomicBoolean(false);
concurrentConnectionLimit = new AtomicInteger(0);
-
+
bootDate = new Date();
rejectedConnectionCount = new AtomicLong(0);
peakConnectionCount = new AtomicInteger(0);
@@ -106,18 +106,18 @@ public abstract class FanoutService implements Runnable { /*
* Abstract methods
*/
-
+
protected abstract boolean isConnected();
-
+
protected abstract boolean connect();
-
+
protected abstract void listen() throws IOException;
-
+
protected abstract void disconnect();
-
+
/**
* Returns true if the service requires \n request termination.
- *
+ *
* @return true if request requires \n termination
*/
public boolean isStrictRequestTermination() {
@@ -128,62 +128,62 @@ public abstract class FanoutService implements Runnable { * Control the termination of fanout requests. If true, fanout requests must
* be terminated with \n. If false, fanout requests may be terminated with
* \n, \r, \r\n, or \n\r. This is useful for debugging with a telnet client.
- *
+ *
* @param isStrictTermination
*/
public void setStrictRequestTermination(boolean isStrictTermination) {
strictRequestTermination.set(isStrictTermination);
}
-
+
/**
* Returns the maximum allowable concurrent fanout connections.
- *
+ *
* @return the maximum allowable concurrent connection count
*/
public int getConcurrentConnectionLimit() {
return concurrentConnectionLimit.get();
}
-
+
/**
* Sets the maximum allowable concurrent fanout connection count.
- *
+ *
* @param value
*/
public void setConcurrentConnectionLimit(int value) {
concurrentConnectionLimit.set(value);
}
-
+
/**
* Returns true if connections are allowed to announce on the all channel.
- *
+ *
* @return true if connections are allowed to announce on the all channel
*/
public boolean allowAllChannelAnnouncements() {
return allowAllChannelAnnouncements.get();
}
-
+
/**
* Allows/prohibits connections from announcing on the ALL channel.
- *
+ *
* @param value
*/
public void setAllowAllChannelAnnouncements(boolean value) {
allowAllChannelAnnouncements.set(value);
}
-
+
/**
* Returns the current connections
- *
+ *
* @param channel
* @return map of current connections keyed by their id
*/
public Map<String, FanoutServiceConnection> getCurrentConnections() {
return connections;
}
-
+
/**
* Returns all subscriptions
- *
+ *
* @return map of current subscriptions keyed by channel name
*/
public Map<String, Set<FanoutServiceConnection>> getCurrentSubscriptions() {
@@ -192,7 +192,7 @@ public abstract class FanoutService implements Runnable { /**
* Returns the subscriptions for the specified channel
- *
+ *
* @param channel
* @return set of subscribed connections for the specified channel
*/
@@ -202,17 +202,17 @@ public abstract class FanoutService implements Runnable { /**
* Returns the runtime statistics object for this service.
- *
+ *
* @return stats
*/
public FanoutStats getStatistics() {
FanoutStats stats = new FanoutStats();
-
+
// settings
stats.allowAllChannelAnnouncements = allowAllChannelAnnouncements();
stats.concurrentConnectionLimit = getConcurrentConnectionLimit();
stats.strictRequestTermination = isStrictRequestTermination();
-
+
// runtime stats
stats.bootDate = bootDate;
stats.rejectedConnectionCount = rejectedConnectionCount.get();
@@ -222,16 +222,16 @@ public abstract class FanoutService implements Runnable { stats.totalMessages = totalMessages.get();
stats.totalSubscribes = totalSubscribes.get();
stats.totalUnsubscribes = totalUnsubscribes.get();
- stats.totalPings = totalPings.get();
+ stats.totalPings = totalPings.get();
stats.currentConnections = connections.size();
stats.currentChannels = subscriptions.size();
stats.currentSubscriptions = subscriptions.size() * connections.size();
return stats;
}
-
+
/**
* Returns true if the service is ready.
- *
+ *
* @return true, if the service is ready
*/
public boolean isReady() {
@@ -243,7 +243,7 @@ public abstract class FanoutService implements Runnable { /**
* Start the Fanout service thread and immediatel return.
- *
+ *
*/
public void start() {
if (isRunning.get()) {
@@ -254,10 +254,10 @@ public abstract class FanoutService implements Runnable { serviceThread.setName(MessageFormat.format("{0} {1}:{2,number,0}", name, host == null ? "all" : host, port));
serviceThread.start();
}
-
+
/**
* Start the Fanout service thread and wait until it is accepting connections.
- *
+ *
*/
public void startSynchronously() {
start();
@@ -268,7 +268,7 @@ public abstract class FanoutService implements Runnable { }
}
}
-
+
/**
* Stop the Fanout service. This method returns when the service has been
* completely shutdown.
@@ -290,7 +290,7 @@ public abstract class FanoutService implements Runnable { }
logger.info(MessageFormat.format("stopped {0}", name));
}
-
+
/**
* Main execution method of the service
*/
@@ -314,10 +314,10 @@ public abstract class FanoutService implements Runnable { }
}
}
- disconnect();
+ disconnect();
resetState();
}
-
+
protected void resetState() {
// reset state data
connections.clear();
@@ -334,23 +334,23 @@ public abstract class FanoutService implements Runnable { /**
* Configure the client connection socket.
- *
+ *
* @param socket
* @throws SocketException
*/
protected void configureClientSocket(Socket socket) throws SocketException {
- socket.setKeepAlive(true);
+ socket.setKeepAlive(true);
socket.setSoLinger(true, 0); // immediately discard any remaining data
}
-
+
/**
* Add the connection to the connections map.
- *
+ *
* @param connection
* @return false if the connection was rejected due to too many concurrent
* connections
*/
- protected boolean addConnection(FanoutServiceConnection connection) {
+ protected boolean addConnection(FanoutServiceConnection connection) {
int limit = getConcurrentConnectionLimit();
if (limit > 0 && connections.size() > limit) {
logger.info(MessageFormat.format("hit {0,number,0} connection limit, rejecting fanout connection", concurrentConnectionLimit));
@@ -358,7 +358,7 @@ public abstract class FanoutService implements Runnable { connection.busy();
return false;
}
-
+
// add the connection to our map
connections.put(connection.id, connection);
@@ -371,10 +371,10 @@ public abstract class FanoutService implements Runnable { connection.connected();
return true;
}
-
+
/**
* Remove the connection from the connections list and from subscriptions.
- *
+ *
* @param connection
*/
protected void removeConnection(FanoutServiceConnection connection) {
@@ -393,46 +393,46 @@ public abstract class FanoutService implements Runnable { }
logger.info(MessageFormat.format("fanout connection {0} removed", connection.id));
}
-
+
/**
* Tests to see if the connection is being monitored by the service.
- *
+ *
* @param connection
* @return true if the service is monitoring the connection
*/
protected boolean hasConnection(FanoutServiceConnection connection) {
return connections.containsKey(connection.id);
}
-
+
/**
* Reply to a connection on the specified channel.
- *
+ *
* @param connection
* @param channel
* @param message
* @return the reply
*/
- protected String reply(FanoutServiceConnection connection, String channel, String message) {
+ protected String reply(FanoutServiceConnection connection, String channel, String message) {
if (channel != null && channel.length() > 0) {
increment(totalMessages);
}
return connection.reply(channel, message);
}
-
+
/**
* Service method to broadcast a message to all connections.
- *
+ *
* @param message
*/
public void broadcastAll(String message) {
broadcast(connections.values(), FanoutConstants.CH_ALL, message);
increment(totalAnnouncements);
}
-
+
/**
* Service method to broadcast a message to connections subscribed to the
* channel.
- *
+ *
* @param message
*/
public void broadcast(String channel, String message) {
@@ -440,10 +440,10 @@ public abstract class FanoutService implements Runnable { broadcast(connections, channel, message);
increment(totalAnnouncements);
}
-
+
/**
* Broadcast a message to connections subscribed to the specified channel.
- *
+ *
* @param connections
* @param channel
* @param message
@@ -453,10 +453,10 @@ public abstract class FanoutService implements Runnable { reply(connection, channel, message);
}
}
-
+
/**
* Process an incoming Fanout request.
- *
+ *
* @param connection
* @param req
* @return the reply to the request, may be null
@@ -476,10 +476,10 @@ public abstract class FanoutService implements Runnable { }
return null;
}
-
+
/**
* Process the Fanout request.
- *
+ *
* @param connection
* @param action
* @param channel
@@ -535,7 +535,7 @@ public abstract class FanoutService implements Runnable { }
return null;
}
-
+
private String asHexArray(String req) {
StringBuilder sb = new StringBuilder();
for (char c : req.toCharArray()) {
@@ -543,10 +543,10 @@ public abstract class FanoutService implements Runnable { }
return "[ " + sb.toString().trim() + " ]";
}
-
+
/**
* Increment a long and prevent negative rollover.
- *
+ *
* @param counter
*/
private void increment(AtomicLong counter) {
@@ -555,7 +555,7 @@ public abstract class FanoutService implements Runnable { counter.set(0);
}
}
-
+
@Override
public String toString() {
return name;
|