/* * Copyright 2013 gitblit.com. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.gitblit.fanout; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Fanout client class. * * @author James Moger * */ public class FanoutClient implements Runnable { private final static Logger logger = LoggerFactory.getLogger(FanoutClient.class); private final int clientTimeout = 500; private final int reconnectTimeout = 2000; private final String host; private final int port; private final List listeners; private String id; private volatile Selector selector; private volatile SocketChannel socketCh; private Thread clientThread; private final AtomicBoolean isConnected; private final AtomicBoolean isRunning; private final AtomicBoolean isAutomaticReconnect; private final ByteBuffer writeBuffer; private final ByteBuffer readBuffer; private final CharsetDecoder decoder; private final Set subscriptions; private boolean resubscribe; public interface FanoutListener { public void pong(Date timestamp); public void announcement(String channel, String message); } public static class FanoutAdapter implements FanoutListener { @Override public void pong(Date timestamp) { } @Override public void announcement(String channel, String message) { } } public static void main(String args[]) throws Exception { FanoutClient client = new FanoutClient("localhost", 2000); client.addListener(new FanoutAdapter() { @Override public void pong(Date timestamp) { System.out.println("Pong. " + timestamp); } @Override public void announcement(String channel, String message) { System.out.println(MessageFormat.format("Here ye, Here ye. {0} says {1}", channel, message)); } }); client.start(); Thread.sleep(5000); client.ping(); client.subscribe("james"); client.announce("james", "12345"); client.subscribe("c52f99d16eb5627877ae957df7ce1be102783bd5"); while (true) { Thread.sleep(10000); client.ping(); } } public FanoutClient(String host, int port) { this.host = host; this.port = port; readBuffer = ByteBuffer.allocateDirect(FanoutConstants.BUFFER_LENGTH); writeBuffer = ByteBuffer.allocateDirect(FanoutConstants.BUFFER_LENGTH); decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder(); listeners = Collections.synchronizedList(new ArrayList()); subscriptions = new LinkedHashSet(); isRunning = new AtomicBoolean(false); isConnected = new AtomicBoolean(false); isAutomaticReconnect = new AtomicBoolean(true); } public void addListener(FanoutListener listener) { listeners.add(listener); } public void removeListener(FanoutListener listener) { listeners.remove(listener); } public boolean isAutomaticReconnect() { return isAutomaticReconnect.get(); } public void setAutomaticReconnect(boolean value) { isAutomaticReconnect.set(value); } public void ping() { confirmConnection(); write("ping"); } public void status() { confirmConnection(); write("status"); } public void subscribe(String channel) { confirmConnection(); if (subscriptions.add(channel)) { write("subscribe " + channel); } } public void unsubscribe(String channel) { confirmConnection(); if (subscriptions.remove(channel)) { write("unsubscribe " + channel); } } public void announce(String channel, String message) { confirmConnection(); write("announce " + channel + " " + message); } private void confirmConnection() { if (!isConnected()) { throw new RuntimeException("Fanout client is disconnected!"); } } public boolean isConnected() { return isRunning.get() && socketCh != null && isConnected.get(); } /** * Start client connection and return immediately. */ public void start() { if (isRunning.get()) { logger.warn("Fanout client is already running"); return; } clientThread = new Thread(this, "Fanout client"); clientThread.start(); } /** * Start client connection and wait until it has connected. */ public void startSynchronously() { start(); while (!isConnected()) { try { Thread.sleep(100); } catch (Exception e) { } } } /** * Stops client connection. This method returns when the connection has * been completely shutdown. */ public void stop() { if (!isRunning.get()) { logger.warn("Fanout client is not running"); return; } isRunning.set(false); try { if (clientThread != null) { clientThread.join(); clientThread = null; } } catch (InterruptedException e1) { } } @Override public void run() { resetState(); isRunning.set(true); while (isRunning.get()) { // (re)connect if (socketCh == null) { try { InetAddress addr = InetAddress.getByName(host); socketCh = SocketChannel.open(new InetSocketAddress(addr, port)); socketCh.configureBlocking(false); selector = Selector.open(); id = FanoutConstants.getLocalSocketId(socketCh.socket()); socketCh.register(selector, SelectionKey.OP_READ); } catch (Exception e) { logger.error(MessageFormat.format("failed to open client connection to {0}:{1,number,0}", host, port), e); try { Thread.sleep(reconnectTimeout); } catch (InterruptedException x) { } continue; } } // read/write try { selector.select(clientTimeout); Iterator i = selector.selectedKeys().iterator(); while (i.hasNext()) { SelectionKey key = i.next(); i.remove(); if (key.isReadable()) { // read message String content = read(); String[] lines = content.split("\n"); for (String reply : lines) { logger.trace(MessageFormat.format("fanout client {0} received: {1}", id, reply)); if (!processReply(reply)) { logger.error(MessageFormat.format("fanout client {0} received unknown message", id)); } } } else if (key.isWritable()) { // resubscribe if (resubscribe) { resubscribe = false; logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size())); for (String subscription : subscriptions) { write("subscribe " + subscription); } } socketCh.register(selector, SelectionKey.OP_READ); } } } catch (IOException e) { logger.error(MessageFormat.format("fanout client {0} error: {1}", id, e.getMessage())); closeChannel(); if (!isAutomaticReconnect.get()) { isRunning.set(false); continue; } } } closeChannel(); resetState(); } protected void resetState() { readBuffer.clear(); writeBuffer.clear(); isRunning.set(false); isConnected.set(false); } private void closeChannel() { try { if (socketCh != null) { socketCh.close(); socketCh = null; selector.close(); selector = null; isConnected.set(false); } } catch (IOException x) { } } protected boolean processReply(String reply) { String[] fields = reply.split("!", 2); if (fields.length == 1) { try { long time = Long.parseLong(fields[0]); Date date = new Date(time); firePong(date); } catch (Exception e) { } return true; } else if (fields.length == 2) { String channel = fields[0]; String message = fields[1]; if (FanoutConstants.CH_DEBUG.equals(channel)) { // debug messages are for internal use if (FanoutConstants.MSG_CONNECTED.equals(message)) { isConnected.set(true); resubscribe = subscriptions.size() > 0; if (resubscribe) { try { // register for async resubscribe socketCh.register(selector, SelectionKey.OP_WRITE); } catch (Exception e) { logger.error("an error occurred", e); } } } logger.debug(MessageFormat.format("fanout client {0} < {1}", id, reply)); } else { fireAnnouncement(channel, message); } return true; } else { // unknown message return false; } } protected void firePong(Date timestamp) { logger.info(MessageFormat.format("fanout client {0} < pong {1,date,yyyy-MM-dd HH:mm:ss}", id, timestamp)); for (FanoutListener listener : listeners) { try { listener.pong(timestamp); } catch (Throwable t) { logger.error("FanoutListener threw an exception!", t); } } } protected void fireAnnouncement(String channel, String message) { logger.info(MessageFormat.format("fanout client {0} < announcement {1} {2}", id, channel, message)); for (FanoutListener listener : listeners) { try { listener.announcement(channel, message); } catch (Throwable t) { logger.error("FanoutListener threw an exception!", t); } } } protected synchronized String read() throws IOException { readBuffer.clear(); long len = socketCh.read(readBuffer); if (len == -1) { logger.error(MessageFormat.format("fanout client {0} lost connection to {1}:{2,number,0}, end of stream", id, host, port)); socketCh.close(); return null; } else { readBuffer.flip(); String content = decoder.decode(readBuffer).toString(); readBuffer.clear(); return content; } } protected synchronized boolean write(String message) { try { logger.info(MessageFormat.format("fanout client {0} > {1}", id, message)); byte [] bytes = message.getBytes(FanoutConstants.CHARSET); writeBuffer.clear(); writeBuffer.put(bytes); if (bytes[bytes.length - 1] != 0xa) { writeBuffer.put((byte) 0xa); } writeBuffer.flip(); // loop until write buffer has been completely sent long written = 0; long toWrite = writeBuffer.remaining(); while (written != toWrite) { written += socketCh.write(writeBuffer); try { Thread.sleep(10); } catch (Exception x) { } } return true; } catch (IOException e) { logger.error("fanout client {0} error: {1}", id, e.getMessage()); } return false; } }