/* * Copyright 2013 gitblit.com. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.gitblit.fanout; import java.io.BufferedInputStream; import java.io.IOException; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; import java.net.SocketTimeoutException; import java.text.MessageFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * A multi-threaded socket implementation of https://github.com/travisghansen/fanout * * This implementation creates a master acceptor thread which accepts incoming * fanout connections and then spawns a daemon thread for each accepted connection. * If there are 100 concurrent fanout connections, there are 101 threads. * * @author James Moger * */ public class FanoutSocketService extends FanoutService { private final static Logger logger = LoggerFactory.getLogger(FanoutSocketService.class); private volatile ServerSocket serviceSocket; public static void main(String[] args) throws Exception { FanoutSocketService pubsub = new FanoutSocketService(null, DEFAULT_PORT); pubsub.setStrictRequestTermination(false); pubsub.setAllowAllChannelAnnouncements(false); pubsub.start(); } /** * Create a multi-threaded fanout service. * * @param port * the port for running the fanout PubSub service * @throws IOException */ public FanoutSocketService(int port) { this(null, port); } /** * Create a multi-threaded fanout service. * * @param bindInterface * the ip address to bind for the service, may be null * @param port * the port for running the fanout PubSub service * @throws IOException */ public FanoutSocketService(String bindInterface, int port) { super(bindInterface, port, "Fanout socket service"); } @Override protected boolean isConnected() { return serviceSocket != null; } @Override protected boolean connect() { if (serviceSocket == null) { try { serviceSocket = new ServerSocket(); serviceSocket.setReuseAddress(true); serviceSocket.setSoTimeout(serviceTimeout); serviceSocket.bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port)); logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}", name, host == null ? "0.0.0.0" : host, serviceSocket.getLocalPort())); } catch (IOException e) { logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}", name, host == null ? "0.0.0.0" : host, port), e); return false; } } return true; } @Override protected void disconnect() { try { if (serviceSocket != null) { logger.debug(MessageFormat.format("closing {0} server socket", name)); serviceSocket.close(); serviceSocket = null; } } catch (IOException e) { logger.error(MessageFormat.format("failed to disconnect {0}", name), e); } } /** * This accepts incoming fanout connections and spawns connection threads. */ @Override protected void listen() throws IOException { try { Socket socket; socket = serviceSocket.accept(); configureClientSocket(socket); FanoutSocketConnection connection = new FanoutSocketConnection(socket); if (addConnection(connection)) { // spawn connection daemon thread Thread connectionThread = new Thread(connection); connectionThread.setDaemon(true); connectionThread.setName("Fanout " + connection.id); connectionThread.start(); } else { // synchronously close the connection and remove it removeConnection(connection); connection.closeConnection(); connection = null; } } catch (SocketTimeoutException e) { // ignore accept timeout exceptions } } /** * FanoutSocketConnection handles reading/writing messages from a remote fanout * connection. * * @author James Moger * */ class FanoutSocketConnection extends FanoutServiceConnection implements Runnable { Socket socket; FanoutSocketConnection(Socket socket) { super(socket); this.socket = socket; } /** * Connection thread read/write method. */ @Override public void run() { try { StringBuilder sb = new StringBuilder(); BufferedInputStream is = new BufferedInputStream(socket.getInputStream()); byte[] buffer = new byte[FanoutConstants.BUFFER_LENGTH]; int len = 0; while (true) { while (is.available() > 0) { len = is.read(buffer); for (int i = 0; i < len; i++) { byte b = buffer[i]; if (b == 0xa || (!isStrictRequestTermination() && b == 0xd)) { String req = sb.toString(); sb.setLength(0); if (req.length() > 0) { // ignore empty request strings processRequest(this, req); } } else { sb.append((char) b); } } } if (!isRunning.get()) { // service has stopped, terminate client connection break; } else { Thread.sleep(500); } } } catch (Throwable t) { if (t instanceof SocketException) { logger.error(MessageFormat.format("fanout connection {0}: {1}", id, t.getMessage())); } else if (t instanceof SocketTimeoutException) { logger.error(MessageFormat.format("fanout connection {0}: {1}", id, t.getMessage())); } else { logger.error(MessageFormat.format("exception while handling fanout connection {0}", id), t); } } finally { closeConnection(); } logger.info(MessageFormat.format("thread for fanout connection {0} is finished", id)); } @Override protected void reply(String content) throws IOException { // synchronously send reply logger.debug(MessageFormat.format("fanout reply to {0}: {1}", id, content)); OutputStream os = socket.getOutputStream(); byte [] bytes = content.getBytes(FanoutConstants.CHARSET); os.write(bytes); if (bytes[bytes.length - 1] != 0xa) { os.write(0xa); } os.flush(); } protected void closeConnection() { // close the connection socket try { socket.close(); } catch (IOException e) { } socket = null; // remove this connection from the service removeConnection(this); } } }