diff options
author | James Moger <james.moger@gitblit.com> | 2013-09-30 09:30:04 -0400 |
---|---|---|
committer | James Moger <james.moger@gitblit.com> | 2013-09-30 10:11:28 -0400 |
commit | 699e71e76b15081baf746c6ce9c9144f7e5f1ff9 (patch) | |
tree | 4a9ea25c258caeae3dea4bc1de809f47bc615d81 /src/main/java/com/gitblit/fanout/FanoutNioService.java | |
parent | 235ad956fa84cad4fac1b2e69a0c9e4f50376ea3 (diff) | |
download | gitblit-699e71e76b15081baf746c6ce9c9144f7e5f1ff9.tar.gz gitblit-699e71e76b15081baf746c6ce9c9144f7e5f1ff9.zip |
Trim trailing whitespace and organize imports
Change-Id: I9f91138b20219be6e3c4b28251487df262bff6cc
Diffstat (limited to 'src/main/java/com/gitblit/fanout/FanoutNioService.java')
-rw-r--r-- | src/main/java/com/gitblit/fanout/FanoutNioService.java | 43 |
1 files changed, 22 insertions, 21 deletions
diff --git a/src/main/java/com/gitblit/fanout/FanoutNioService.java b/src/main/java/com/gitblit/fanout/FanoutNioService.java index 65d022ab..e7aff34b 100644 --- a/src/main/java/com/gitblit/fanout/FanoutNioService.java +++ b/src/main/java/com/gitblit/fanout/FanoutNioService.java @@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory; *
* This implementation uses channels and selectors, which are the Java analog of
* the Linux epoll mechanism used in the original fanout C code.
- *
+ *
* @author James Moger
*
*/
@@ -54,7 +54,7 @@ public class FanoutNioService extends FanoutService { private volatile ServerSocketChannel serviceCh;
private volatile Selector selector;
-
+
public static void main(String[] args) throws Exception {
FanoutNioService pubsub = new FanoutNioService(null, DEFAULT_PORT);
pubsub.setStrictRequestTermination(false);
@@ -64,7 +64,7 @@ public class FanoutNioService extends FanoutService { /**
* Create a single-threaded fanout service.
- *
+ *
* @param host
* @param port
* the port for running the fanout PubSub service
@@ -73,10 +73,10 @@ public class FanoutNioService extends FanoutService { public FanoutNioService(int port) {
this(null, port);
}
-
+
/**
* Create a single-threaded fanout service.
- *
+ *
* @param bindInterface
* the ip address to bind for the service, may be null
* @param port
@@ -86,7 +86,7 @@ public class FanoutNioService extends FanoutService { public FanoutNioService(String bindInterface, int port) {
super(bindInterface, port, "Fanout nio service");
}
-
+
@Override
protected boolean isConnected() {
return serviceCh != null;
@@ -102,10 +102,10 @@ public class FanoutNioService extends FanoutService { serviceCh.socket().bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));
selector = Selector.open();
serviceCh.register(selector, SelectionKey.OP_ACCEPT);
- logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}",
+ logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}",
name, host == null ? "0.0.0.0" : host, port));
} catch (IOException e) {
- logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}",
+ logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}",
name, name, host == null ? "0.0.0.0" : host, port), e);
return false;
}
@@ -122,11 +122,11 @@ public class FanoutNioService extends FanoutService { for (Map.Entry<String, SocketChannel> client : clients.entrySet()) {
closeClientSocket(client.getKey(), client.getValue());
}
-
+
// close service socket channel
logger.debug(MessageFormat.format("closing {0} socket channel", name));
serviceCh.socket().close();
- serviceCh.close();
+ serviceCh.close();
serviceCh = null;
selector.close();
selector = null;
@@ -142,7 +142,7 @@ public class FanoutNioService extends FanoutService { Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> keyItr = keys.iterator();
while (keyItr.hasNext()) {
- SelectionKey key = (SelectionKey) keyItr.next();
+ SelectionKey key = keyItr.next();
if (key.isAcceptable()) {
// new fanout client connection
ServerSocketChannel sch = (ServerSocketChannel) key.channel();
@@ -213,7 +213,7 @@ public class FanoutNioService extends FanoutService { }
}
}
-
+
protected void closeClientSocket(String id, SocketChannel ch) {
try {
ch.close();
@@ -221,10 +221,11 @@ public class FanoutNioService extends FanoutService { logger.error(MessageFormat.format("fanout connection {0}", id), e);
}
}
-
+
+ @Override
protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {
super.broadcast(connections, channel, message);
-
+
// register queued write
Map<String, SocketChannel> sockets = getCurrentClientSockets();
for (FanoutServiceConnection connection : connections) {
@@ -241,7 +242,7 @@ public class FanoutNioService extends FanoutService { }
}
}
-
+
protected Map<String, SocketChannel> getCurrentClientSockets() {
Map<String, SocketChannel> sockets = new HashMap<String, SocketChannel>();
for (SelectionKey key : selector.keys()) {
@@ -253,11 +254,11 @@ public class FanoutNioService extends FanoutService { }
return sockets;
}
-
+
/**
* FanoutNioConnection handles reading/writing messages from a remote fanout
* connection.
- *
+ *
* @author James Moger
*
*/
@@ -276,7 +277,7 @@ public class FanoutNioService extends FanoutService { replyQueue = new ArrayList<String>();
decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder();
}
-
+
protected void read(SocketChannel ch, boolean strictRequestTermination) throws CharacterCodingException, IOException {
long bytesRead = 0;
readBuffer.clear();
@@ -293,7 +294,7 @@ public class FanoutNioService extends FanoutService { String [] lines = req.split(strictRequestTermination ? "\n" : "\n|\r");
requestQueue.addAll(Arrays.asList(lines));
}
-
+
protected void write(SocketChannel ch) throws IOException {
Iterator<String> itr = replyQueue.iterator();
while (itr.hasNext()) {
@@ -306,7 +307,7 @@ public class FanoutNioService extends FanoutService { writeBuffer.put((byte) 0xa);
}
writeBuffer.flip();
-
+
// loop until write buffer has been completely sent
int written = 0;
int toWrite = writeBuffer.remaining();
@@ -316,7 +317,7 @@ public class FanoutNioService extends FanoutService { Thread.sleep(10);
} catch (Exception x) {
}
- }
+ }
itr.remove();
}
writeBuffer.clear();
|