diff options
Diffstat (limited to 'src/main/java/com/gitblit/fanout/FanoutClient.java')
-rw-r--r-- | src/main/java/com/gitblit/fanout/FanoutClient.java | 64 |
1 files changed, 33 insertions, 31 deletions
diff --git a/src/main/java/com/gitblit/fanout/FanoutClient.java b/src/main/java/com/gitblit/fanout/FanoutClient.java index b9ace4be..a676abcd 100644 --- a/src/main/java/com/gitblit/fanout/FanoutClient.java +++ b/src/main/java/com/gitblit/fanout/FanoutClient.java @@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory; /**
* Fanout client class.
- *
+ *
* @author James Moger
*
*/
@@ -57,24 +57,26 @@ public class FanoutClient implements Runnable { private volatile Selector selector;
private volatile SocketChannel socketCh;
private Thread clientThread;
-
+
private final AtomicBoolean isConnected;
private final AtomicBoolean isRunning;
private final AtomicBoolean isAutomaticReconnect;
private final ByteBuffer writeBuffer;
private final ByteBuffer readBuffer;
private final CharsetDecoder decoder;
-
+
private final Set<String> subscriptions;
private boolean resubscribe;
-
+
public interface FanoutListener {
public void pong(Date timestamp);
public void announcement(String channel, String message);
}
-
+
public static class FanoutAdapter implements FanoutListener {
+ @Override
public void pong(Date timestamp) { }
+ @Override
public void announcement(String channel, String message) { }
}
@@ -86,20 +88,20 @@ public class FanoutClient implements Runnable { public void pong(Date timestamp) {
System.out.println("Pong. " + timestamp);
}
-
+
@Override
public void announcement(String channel, String message) {
System.out.println(MessageFormat.format("Here ye, Here ye. {0} says {1}", channel, message));
}
});
client.start();
-
+
Thread.sleep(5000);
client.ping();
client.subscribe("james");
- client.announce("james", "12345");
+ client.announce("james", "12345");
client.subscribe("c52f99d16eb5627877ae957df7ce1be102783bd5");
-
+
while (true) {
Thread.sleep(10000);
client.ping();
@@ -126,11 +128,11 @@ public class FanoutClient implements Runnable { public void removeListener(FanoutListener listener) {
listeners.remove(listener);
}
-
+
public boolean isAutomaticReconnect() {
return isAutomaticReconnect.get();
}
-
+
public void setAutomaticReconnect(boolean value) {
isAutomaticReconnect.set(value);
}
@@ -144,21 +146,21 @@ public class FanoutClient implements Runnable { confirmConnection();
write("status");
}
-
+
public void subscribe(String channel) {
confirmConnection();
if (subscriptions.add(channel)) {
write("subscribe " + channel);
}
}
-
+
public void unsubscribe(String channel) {
confirmConnection();
if (subscriptions.remove(channel)) {
write("unsubscribe " + channel);
}
}
-
+
public void announce(String channel, String message) {
confirmConnection();
write("announce " + channel + " " + message);
@@ -169,11 +171,11 @@ public class FanoutClient implements Runnable { throw new RuntimeException("Fanout client is disconnected!");
}
}
-
+
public boolean isConnected() {
return isRunning.get() && socketCh != null && isConnected.get();
}
-
+
/**
* Start client connection and return immediately.
*/
@@ -185,13 +187,13 @@ public class FanoutClient implements Runnable { clientThread = new Thread(this, "Fanout client");
clientThread.start();
}
-
+
/**
* Start client connection and wait until it has connected.
*/
public void startSynchronously() {
start();
- while (!isConnected()) {
+ while (!isConnected()) {
try {
Thread.sleep(100);
} catch (Exception e) {
@@ -221,8 +223,8 @@ public class FanoutClient implements Runnable { @Override
public void run() {
resetState();
-
- isRunning.set(true);
+
+ isRunning.set(true);
while (isRunning.get()) {
// (re)connect
if (socketCh == null) {
@@ -231,7 +233,7 @@ public class FanoutClient implements Runnable { socketCh = SocketChannel.open(new InetSocketAddress(addr, port));
socketCh.configureBlocking(false);
selector = Selector.open();
- id = FanoutConstants.getLocalSocketId(socketCh.socket());
+ id = FanoutConstants.getLocalSocketId(socketCh.socket());
socketCh.register(selector, SelectionKey.OP_READ);
} catch (Exception e) {
logger.error(MessageFormat.format("failed to open client connection to {0}:{1,number,0}", host, port), e);
@@ -242,7 +244,7 @@ public class FanoutClient implements Runnable { continue;
}
}
-
+
// read/write
try {
selector.select(clientTimeout);
@@ -251,7 +253,7 @@ public class FanoutClient implements Runnable { while (i.hasNext()) {
SelectionKey key = i.next();
i.remove();
-
+
if (key.isReadable()) {
// read message
String content = read();
@@ -266,7 +268,7 @@ public class FanoutClient implements Runnable { // resubscribe
if (resubscribe) {
resubscribe = false;
- logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size()));
+ logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size()));
for (String subscription : subscriptions) {
write("subscribe " + subscription);
}
@@ -276,25 +278,25 @@ public class FanoutClient implements Runnable { }
} catch (IOException e) {
logger.error(MessageFormat.format("fanout client {0} error: {1}", id, e.getMessage()));
- closeChannel();
+ closeChannel();
if (!isAutomaticReconnect.get()) {
isRunning.set(false);
continue;
}
}
}
-
+
closeChannel();
resetState();
}
-
+
protected void resetState() {
readBuffer.clear();
writeBuffer.clear();
isRunning.set(false);
isConnected.set(false);
}
-
+
private void closeChannel() {
try {
if (socketCh != null) {
@@ -315,7 +317,7 @@ public class FanoutClient implements Runnable { long time = Long.parseLong(fields[0]);
Date date = new Date(time);
firePong(date);
- } catch (Exception e) {
+ } catch (Exception e) {
}
return true;
} else if (fields.length == 2) {
@@ -366,7 +368,7 @@ public class FanoutClient implements Runnable { }
}
}
-
+
protected synchronized String read() throws IOException {
readBuffer.clear();
long len = socketCh.read(readBuffer);
@@ -382,7 +384,7 @@ public class FanoutClient implements Runnable { return content;
}
}
-
+
protected synchronized boolean write(String message) {
try {
logger.info(MessageFormat.format("fanout client {0} > {1}", id, message));
|