James Moger
2013-01-11 5316d20e861640867d10405b25cfe75aeca0a34c
Fanout service for Sparkleshare clients
8 files added
5 files modified
2047 ■■■■■ changed files
distrib/gitblit.properties 47 ●●●●● patch | view | raw | blame | history
docs/01_features.mkd 1 ●●●● patch | view | raw | blame | history
docs/04_releases.mkd 9 ●●●● patch | view | raw | blame | history
src/com/gitblit/GitBlit.java 34 ●●●●● patch | view | raw | blame | history
src/com/gitblit/fanout/FanoutClient.java 413 ●●●●● patch | view | raw | blame | history
src/com/gitblit/fanout/FanoutConstants.java 36 ●●●●● patch | view | raw | blame | history
src/com/gitblit/fanout/FanoutNioService.java 332 ●●●●● patch | view | raw | blame | history
src/com/gitblit/fanout/FanoutService.java 563 ●●●●● patch | view | raw | blame | history
src/com/gitblit/fanout/FanoutServiceConnection.java 105 ●●●●● patch | view | raw | blame | history
src/com/gitblit/fanout/FanoutSocketService.java 234 ●●●●● patch | view | raw | blame | history
src/com/gitblit/fanout/FanoutStats.java 98 ●●●●● patch | view | raw | blame | history
tests/com/gitblit/tests/FanoutServiceTest.java 172 ●●●●● patch | view | raw | blame | history
tests/com/gitblit/tests/GitBlitSuite.java 3 ●●●● patch | view | raw | blame | history
distrib/gitblit.properties
@@ -366,6 +366,53 @@
groovy.customFields = 
#
# Fanout Settings
#
# Fanout is a PubSub notification service that can be used by Sparkleshare
# to eliminate repository change polling.  The fanout service runs in a separate
# thread on a separate port from the Gitblit http/https application.
# This service is provided so that Sparkleshare may be used with Gitblit in
# firewalled environments or where reliance on Sparkleshare's default notifications
# server (notifications.sparkleshare.org) is unwanted.
#
# This service maintains an open socket connection from the client to the
# Fanout PubSub service. This service may not work properly behind a proxy server.
# Specify the interface for Fanout to bind it's service.
# You may specify an ip or an empty value to bind to all interfaces.
# Specifying localhost will result in Gitblit ONLY listening to requests to
# localhost.
#
# SINCE 1.2.1
# RESTART REQUIRED
fanout.bindInterface = localhost
# port for serving the Fanout PubSub service.  <= 0 disables this service.
# On Unix/Linux systems, ports < 1024 require root permissions.
# Recommended value: 17000
#
# SINCE 1.2.1
# RESTART REQUIRED
fanout.port = 0
# Use Fanout NIO service.  If false, a multi-threaded socket service will be used.
# Be advised, the socket implementation spawns a thread per connection plus the
# connection acceptor thread.  The NIO implementation is completely single-threaded.
#
# SINCE 1.2.1
# RESTART REQUIRED
fanout.useNio = true
# Concurrent connection limit.  <= 0 disables concurrent connection throttling.
# If > 0, only the specified number of concurrent connections will be allowed
# and all other connections will be rejected.
#
# SINCE 1.2.1
# RESTART REQUIRED
fanout.connectionLimit = 0
#
# Authentication Settings
#
docs/01_features.mkd
@@ -37,6 +37,7 @@
- Git-notes display support
- Submodule support
- Push log based on a hidden, orphan branch refs/gitblit/pushes
- Fanout PubSub notifications service for self-hosted [Sparkleshare](http://sparkleshare.org) use
- gh-pages display support (Jekyll is not supported)
- Branch metrics (uses Google Charts)
- HEAD and Branch RSS feeds
docs/04_releases.mkd
@@ -14,7 +14,14 @@
#### additions
- Implemented a simple push log based on a hidden, orphan branch refs/gitblit/pushes (issue 177)
- Fanout PubSub service for self-hosted [Sparkleshare](http://sparkleshare.org) notifications.<br/>
This service is disabled by default.<br/>
    **New:** *fanout.bindInterface = localhost*<br/>
    **New:** *fanout.port = 0*<br/>
    **New:** *fanout.useNio = true*<br/>
    **New:** *fanout.connectionLimit = 0*
- Implemented a simple push log based on a hidden, orphan branch refs/gitblit/pushes (issue 177)<br/>
The push log is not currently visible in the ui, but the data will be collected and it will be exposed to the ui in the next release.
- Support for locally and remotely authenticated accounts in LdapUserService and RedmineUserService (issue 183)
- Added Dutch translation (github/kwoot)
src/com/gitblit/GitBlit.java
@@ -85,6 +85,9 @@
import com.gitblit.Constants.FederationToken;
import com.gitblit.Constants.PermissionType;
import com.gitblit.Constants.RegistrantType;
import com.gitblit.fanout.FanoutNioService;
import com.gitblit.fanout.FanoutService;
import com.gitblit.fanout.FanoutSocketService;
import com.gitblit.models.FederationModel;
import com.gitblit.models.FederationProposal;
import com.gitblit.models.FederationSet;
@@ -180,6 +183,8 @@
    private TimeZone timezone;
    
    private FileBasedConfig projectConfigs;
    private FanoutService fanoutService;
    public GitBlit() {
        if (gitblit == null) {
@@ -3133,6 +3138,32 @@
        }
        ContainerUtils.CVE_2007_0450.test();
        // startup Fanout PubSub service
        if (settings.getInteger(Keys.fanout.port, 0) > 0) {
            String bindInterface = settings.getString(Keys.fanout.bindInterface, null);
            int port = settings.getInteger(Keys.fanout.port, FanoutService.DEFAULT_PORT);
            boolean useNio = settings.getBoolean(Keys.fanout.useNio, true);
            int limit = settings.getInteger(Keys.fanout.connectionLimit, 0);
            if (useNio) {
                if (StringUtils.isEmpty(bindInterface)) {
                    fanoutService = new FanoutNioService(port);
                } else {
                    fanoutService = new FanoutNioService(bindInterface, port);
                }
            } else {
                if (StringUtils.isEmpty(bindInterface)) {
                    fanoutService = new FanoutSocketService(port);
                } else {
                    fanoutService = new FanoutSocketService(bindInterface, port);
                }
            }
            fanoutService.setConcurrentConnectionLimit(limit);
            fanoutService.setAllowAllChannelAnnouncements(false);
            fanoutService.start();
        }
    }
    
    private void logTimezone(String type, TimeZone zone) {
@@ -3206,6 +3237,9 @@
        scheduledExecutor.shutdownNow();
        luceneExecutor.close();
        gcExecutor.close();
        if (fanoutService != null) {
            fanoutService.stop();
        }
    }
    
    /**
src/com/gitblit/fanout/FanoutClient.java
New file
@@ -0,0 +1,413 @@
/*
 * Copyright 2013 gitblit.com.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.gitblit.fanout;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * Fanout client class.
 *
 * @author James Moger
 *
 */
public class FanoutClient implements Runnable {
    private final static Logger logger = LoggerFactory.getLogger(FanoutClient.class);
    private final int clientTimeout = 500;
    private final int reconnectTimeout = 2000;
    private final String host;
    private final int port;
    private final List<FanoutListener> listeners;
    private String id;
    private volatile Selector selector;
    private volatile SocketChannel socketCh;
    private Thread clientThread;
    private final AtomicBoolean isConnected;
    private final AtomicBoolean isRunning;
    private final AtomicBoolean isAutomaticReconnect;
    private final ByteBuffer writeBuffer;
    private final ByteBuffer readBuffer;
    private final CharsetDecoder decoder;
    private final Set<String> subscriptions;
    private boolean resubscribe;
    public interface FanoutListener {
        public void pong(Date timestamp);
        public void announcement(String channel, String message);
    }
    public static class FanoutAdapter implements FanoutListener {
        public void pong(Date timestamp) { }
        public void announcement(String channel, String message) { }
    }
    public static void main(String args[]) throws Exception {
        FanoutClient client = new FanoutClient("localhost", 2000);
        client.addListener(new FanoutAdapter() {
            @Override
            public void pong(Date timestamp) {
                System.out.println("Pong. " + timestamp);
            }
            @Override
            public void announcement(String channel, String message) {
                System.out.println(MessageFormat.format("Here ye, Here ye. {0} says {1}", channel, message));
            }
        });
        client.start();
        Thread.sleep(5000);
        client.ping();
        client.subscribe("james");
        client.announce("james", "12345");
        client.subscribe("c52f99d16eb5627877ae957df7ce1be102783bd5");
        while (true) {
            Thread.sleep(10000);
            client.ping();
        }
    }
    public FanoutClient(String host, int port) {
        this.host = host;
        this.port = port;
        readBuffer = ByteBuffer.allocateDirect(FanoutConstants.BUFFER_LENGTH);
        writeBuffer = ByteBuffer.allocateDirect(FanoutConstants.BUFFER_LENGTH);
        decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder();
        listeners = Collections.synchronizedList(new ArrayList<FanoutListener>());
        subscriptions = new LinkedHashSet<String>();
        isRunning = new AtomicBoolean(false);
        isConnected = new AtomicBoolean(false);
        isAutomaticReconnect = new AtomicBoolean(true);
    }
    public void addListener(FanoutListener listener) {
        listeners.add(listener);
    }
    public void removeListener(FanoutListener listener) {
        listeners.remove(listener);
    }
    public boolean isAutomaticReconnect() {
        return isAutomaticReconnect.get();
    }
    public void setAutomaticReconnect(boolean value) {
        isAutomaticReconnect.set(value);
    }
    public void ping() {
        confirmConnection();
        write("ping");
    }
    public void status() {
        confirmConnection();
        write("status");
    }
    public void subscribe(String channel) {
        confirmConnection();
        if (subscriptions.add(channel)) {
            write("subscribe " + channel);
        }
    }
    public void unsubscribe(String channel) {
        confirmConnection();
        if (subscriptions.remove(channel)) {
            write("unsubscribe " + channel);
        }
    }
    public void announce(String channel, String message) {
        confirmConnection();
        write("announce " + channel + " " + message);
    }
    private void confirmConnection() {
        if (!isConnected()) {
            throw new RuntimeException("Fanout client is disconnected!");
        }
    }
    public boolean isConnected() {
        return isRunning.get() && socketCh != null && isConnected.get();
    }
    /**
     * Start client connection and return immediately.
     */
    public void start() {
        if (isRunning.get()) {
            logger.warn("Fanout client is already running");
            return;
        }
        clientThread = new Thread(this, "Fanout client");
        clientThread.start();
    }
    /**
     * Start client connection and wait until it has connected.
     */
    public void startSynchronously() {
        start();
        while (!isConnected()) {
            try {
                Thread.sleep(100);
            } catch (Exception e) {
            }
        }
    }
    /**
     * Stops client connection.  This method returns when the connection has
     * been completely shutdown.
     */
    public void stop() {
        if (!isRunning.get()) {
            logger.warn("Fanout client is not running");
            return;
        }
        isRunning.set(false);
        try {
            if (clientThread != null) {
                clientThread.join();
                clientThread = null;
            }
        } catch (InterruptedException e1) {
        }
    }
    @Override
    public void run() {
        resetState();
        isRunning.set(true);
        while (isRunning.get()) {
            // (re)connect
            if (socketCh == null) {
                try {
                    InetAddress addr = InetAddress.getByName(host);
                    socketCh = SocketChannel.open(new InetSocketAddress(addr, port));
                    socketCh.configureBlocking(false);
                    selector = Selector.open();
                    id = FanoutConstants.getLocalSocketId(socketCh.socket());
                    socketCh.register(selector, SelectionKey.OP_READ);
                } catch (Exception e) {
                    logger.error(MessageFormat.format("failed to open client connection to {0}:{1,number,0}", host, port), e);
                    try {
                        Thread.sleep(reconnectTimeout);
                    } catch (InterruptedException x) {
                    }
                    continue;
                }
            }
            // read/write
            try {
                selector.select(clientTimeout);
                Iterator<SelectionKey> i = selector.selectedKeys().iterator();
                while (i.hasNext()) {
                    SelectionKey key = i.next();
                    i.remove();
                    if (key.isReadable()) {
                        // read message
                        String content = read();
                        String[] lines = content.split("\n");
                        for (String reply : lines) {
                            logger.trace(MessageFormat.format("fanout client {0} received: {1}", id, reply));
                            if (!processReply(reply)) {
                                logger.error(MessageFormat.format("fanout client {0} received unknown message", id));
                            }
                        }
                    } else if (key.isWritable()) {
                        // resubscribe
                        if (resubscribe) {
                            resubscribe = false;
                            logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size()));
                            for (String subscription : subscriptions) {
                                write("subscribe " + subscription);
                            }
                        }
                        socketCh.register(selector, SelectionKey.OP_READ);
                    }
                }
            } catch (IOException e) {
                logger.error(MessageFormat.format("fanout client {0} error: {1}", id, e.getMessage()));
                closeChannel();
                if (!isAutomaticReconnect.get()) {
                    isRunning.set(false);
                    continue;
                }
            }
        }
        closeChannel();
        resetState();
    }
    protected void resetState() {
        readBuffer.clear();
        writeBuffer.clear();
        isRunning.set(false);
        isConnected.set(false);
    }
    private void closeChannel() {
        try {
            if (socketCh != null) {
                socketCh.close();
                socketCh = null;
                selector.close();
                selector = null;
                isConnected.set(false);
            }
        } catch (IOException x) {
        }
    }
    protected boolean processReply(String reply) {
        String[] fields = reply.split("!", 2);
        if (fields.length == 1) {
            try {
                long time = Long.parseLong(fields[0]);
                Date date = new Date(time);
                firePong(date);
            } catch (Exception e) {
            }
            return true;
        } else if (fields.length == 2) {
            String channel = fields[0];
            String message = fields[1];
            if (FanoutConstants.CH_DEBUG.equals(channel)) {
                // debug messages are for internal use
                if (FanoutConstants.MSG_CONNECTED.equals(message)) {
                    isConnected.set(true);
                    resubscribe = subscriptions.size() > 0;
                    if (resubscribe) {
                        try {
                            // register for async resubscribe
                            socketCh.register(selector, SelectionKey.OP_WRITE);
                        } catch (Exception e) {
                            logger.error("an error occurred", e);
                        }
                    }
                }
                logger.debug(MessageFormat.format("fanout client {0} < {1}", id, reply));
            } else {
                fireAnnouncement(channel, message);
            }
            return true;
        } else {
            // unknown message
            return false;
        }
    }
    protected void firePong(Date timestamp) {
        logger.info(MessageFormat.format("fanout client {0} < pong {1,date,yyyy-MM-dd HH:mm:ss}", id, timestamp));
        for (FanoutListener listener : listeners) {
            try {
                listener.pong(timestamp);
            } catch (Throwable t) {
                logger.error("FanoutListener threw an exception!", t);
            }
        }
    }
    protected void fireAnnouncement(String channel, String message) {
        logger.info(MessageFormat.format("fanout client {0} < announcement {1} {2}", id, channel, message));
        for (FanoutListener listener : listeners) {
            try {
                listener.announcement(channel, message);
            } catch (Throwable t) {
                logger.error("FanoutListener threw an exception!", t);
            }
        }
    }
    protected synchronized String read() throws IOException {
        readBuffer.clear();
        long len = socketCh.read(readBuffer);
        if (len == -1) {
            logger.error(MessageFormat.format("fanout client {0} lost connection to {1}:{2,number,0}, end of stream", id, host, port));
            socketCh.close();
            return null;
        } else {
            readBuffer.flip();
            String content = decoder.decode(readBuffer).toString();
            readBuffer.clear();
            return content;
        }
    }
    protected synchronized boolean write(String message) {
        try {
            logger.info(MessageFormat.format("fanout client {0} > {1}", id, message));
            byte [] bytes = message.getBytes(FanoutConstants.CHARSET);
            writeBuffer.clear();
            writeBuffer.put(bytes);
            if (bytes[bytes.length - 1] != 0xa) {
                writeBuffer.put((byte) 0xa);
            }
            writeBuffer.flip();
            // loop until write buffer has been completely sent
            long written = 0;
            long toWrite = writeBuffer.remaining();
            while (written != toWrite) {
                written += socketCh.write(writeBuffer);
                try {
                    Thread.sleep(10);
                } catch (Exception x) {
                }
            }
            return true;
        } catch (IOException e) {
            logger.error("fanout client {0} error: {1}", id, e.getMessage());
        }
        return false;
    }
}
src/com/gitblit/fanout/FanoutConstants.java
New file
@@ -0,0 +1,36 @@
/*
 * Copyright 2013 gitblit.com.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.gitblit.fanout;
import java.net.Socket;
public class FanoutConstants {
    public final static String CHARSET = "ISO-8859-1";
    public final static int BUFFER_LENGTH = 512;
    public final static String CH_ALL = "all";
    public final static String CH_DEBUG = "debug";
    public final static String MSG_CONNECTED = "connected...";
    public final static String MSG_BUSY = "busy";
    public static String getRemoteSocketId(Socket socket) {
        return socket.getInetAddress().getHostAddress() + ":" + socket.getPort();
    }
    public static String getLocalSocketId(Socket socket) {
        return socket.getInetAddress().getHostAddress() + ":" + socket.getLocalPort();
    }
}
src/com/gitblit/fanout/FanoutNioService.java
New file
@@ -0,0 +1,332 @@
/*
 * Copyright 2013 gitblit.com.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.gitblit.fanout;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * A single-thread NIO implementation of https://github.com/travisghansen/fanout
 *
 * This implementation uses channels and selectors, which are the Java analog of
 * the Linux epoll mechanism used in the original fanout C code.
 *
 * @author James Moger
 *
 */
public class FanoutNioService extends FanoutService {
    private final static Logger logger = LoggerFactory.getLogger(FanoutNioService.class);
    private volatile ServerSocketChannel serviceCh;
    private volatile Selector selector;
    public static void main(String[] args) throws Exception {
        FanoutNioService pubsub = new FanoutNioService(null, DEFAULT_PORT);
        pubsub.setStrictRequestTermination(false);
        pubsub.setAllowAllChannelAnnouncements(false);
        pubsub.start();
    }
    /**
     * Create a single-threaded fanout service.
     *
     * @param host
     * @param port
     *            the port for running the fanout PubSub service
     * @throws IOException
     */
    public FanoutNioService(int port) {
        this(null, port);
    }
    /**
     * Create a single-threaded fanout service.
     *
     * @param bindInterface
     *            the ip address to bind for the service, may be null
     * @param port
     *            the port for running the fanout PubSub service
     * @throws IOException
     */
    public FanoutNioService(String bindInterface, int port) {
        super(bindInterface, port, "Fanout nio service");
    }
    @Override
    protected boolean isConnected() {
        return serviceCh != null;
    }
    @Override
    protected boolean connect() {
        if (serviceCh == null) {
            try {
                serviceCh = ServerSocketChannel.open();
                serviceCh.configureBlocking(false);
                serviceCh.socket().setReuseAddress(true);
                serviceCh.socket().bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));
                selector = Selector.open();
                serviceCh.register(selector, SelectionKey.OP_ACCEPT);
                logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}",
                        name, host == null ? "0.0.0.0" : host, port));
            } catch (IOException e) {
                logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}",
                        name, name, host == null ? "0.0.0.0" : host, port), e);
                return false;
            }
        }
        return true;
    }
    @Override
    protected void disconnect() {
        try {
            if (serviceCh != null) {
                // close all active client connections
                Map<String, SocketChannel> clients = getCurrentClientSockets();
                for (Map.Entry<String, SocketChannel> client : clients.entrySet()) {
                    closeClientSocket(client.getKey(), client.getValue());
                }
                // close service socket channel
                logger.debug(MessageFormat.format("closing {0} socket channel", name));
                serviceCh.socket().close();
                serviceCh.close();
                serviceCh = null;
                selector.close();
                selector = null;
            }
        } catch (IOException e) {
            logger.error(MessageFormat.format("failed to disconnect {0}", name), e);
        }
    }
    @Override
    protected void listen() throws IOException {
        while (selector.select(serviceTimeout) > 0) {
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> keyItr = keys.iterator();
            while (keyItr.hasNext()) {
                SelectionKey key = (SelectionKey) keyItr.next();
                if (key.isAcceptable()) {
                    // new fanout client connection
                    ServerSocketChannel sch = (ServerSocketChannel) key.channel();
                    try {
                        SocketChannel ch = sch.accept();
                        ch.configureBlocking(false);
                        configureClientSocket(ch.socket());
                        FanoutNioConnection connection = new FanoutNioConnection(ch);
                        addConnection(connection);
                        // register to send the queued message
                        ch.register(selector, SelectionKey.OP_WRITE, connection);
                    } catch (IOException e) {
                        logger.error("error accepting fanout connection", e);
                    }
                } else if (key.isReadable()) {
                    // read fanout client request
                    SocketChannel ch = (SocketChannel) key.channel();
                    FanoutNioConnection connection = (FanoutNioConnection) key.attachment();
                    try {
                        connection.read(ch, isStrictRequestTermination());
                        int replies = 0;
                        Iterator<String> reqItr = connection.requestQueue.iterator();
                        while (reqItr.hasNext()) {
                            String req = reqItr.next();
                            String reply = processRequest(connection, req);
                            reqItr.remove();
                            if (reply != null) {
                                replies++;
                            }
                        }
                        if (replies > 0) {
                            // register to send the replies to requests
                            ch.register(selector, SelectionKey.OP_WRITE, connection);
                        } else {
                            // re-register for next read
                            ch.register(selector, SelectionKey.OP_READ, connection);
                        }
                    } catch (IOException e) {
                        logger.error(MessageFormat.format("fanout connection {0} error: {1}", connection.id, e.getMessage()));
                        removeConnection(connection);
                        closeClientSocket(connection.id, ch);
                    }
                } else if (key.isWritable()) {
                    // asynchronous reply to fanout client request
                    SocketChannel ch = (SocketChannel) key.channel();
                    FanoutNioConnection connection = (FanoutNioConnection) key.attachment();
                    try {
                        connection.write(ch);
                        if (hasConnection(connection)) {
                            // register for next read
                            ch.register(selector, SelectionKey.OP_READ, connection);
                        } else {
                            // Connection was rejected due to load or
                            // some other reason. Close it.
                            closeClientSocket(connection.id, ch);
                        }
                    } catch (IOException e) {
                        logger.error(MessageFormat.format("fanout connection {0}: {1}", connection.id, e.getMessage()));
                        removeConnection(connection);
                        closeClientSocket(connection.id, ch);
                    }
                }
                keyItr.remove();
            }
        }
    }
    protected void closeClientSocket(String id, SocketChannel ch) {
        try {
            ch.close();
        } catch (IOException e) {
            logger.error(MessageFormat.format("fanout connection {0}", id), e);
        }
    }
    protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {
        super.broadcast(connections, channel, message);
        // register queued write
        Map<String, SocketChannel> sockets = getCurrentClientSockets();
        for (FanoutServiceConnection connection : connections) {
            SocketChannel ch = sockets.get(connection.id);
            if (ch == null) {
                logger.warn(MessageFormat.format("fanout connection {0} has been disconnected", connection.id));
                removeConnection(connection);
                continue;
            }
            try {
                ch.register(selector, SelectionKey.OP_WRITE, connection);
            } catch (IOException e) {
                logger.error(MessageFormat.format("failed to register write op for fanout connection {0}", connection.id));
            }
        }
    }
    protected Map<String, SocketChannel> getCurrentClientSockets() {
        Map<String, SocketChannel> sockets = new HashMap<String, SocketChannel>();
        for (SelectionKey key : selector.keys()) {
            if (key.channel() instanceof SocketChannel) {
                SocketChannel ch = (SocketChannel) key.channel();
                String id = FanoutConstants.getRemoteSocketId(ch.socket());
                sockets.put(id, ch);
            }
        }
        return sockets;
    }
    /**
     * FanoutNioConnection handles reading/writing messages from a remote fanout
     * connection.
     *
     * @author James Moger
     *
     */
    static class FanoutNioConnection extends FanoutServiceConnection {
        final ByteBuffer readBuffer;
        final ByteBuffer writeBuffer;
        final List<String> requestQueue;
        final List<String> replyQueue;
        final CharsetDecoder decoder;
        FanoutNioConnection(SocketChannel ch) {
            super(ch.socket());
            readBuffer = ByteBuffer.allocate(FanoutConstants.BUFFER_LENGTH);
            writeBuffer = ByteBuffer.allocate(FanoutConstants.BUFFER_LENGTH);
            requestQueue = new ArrayList<String>();
            replyQueue = new ArrayList<String>();
            decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder();
        }
        protected void read(SocketChannel ch, boolean strictRequestTermination) throws CharacterCodingException, IOException {
            long bytesRead = 0;
            readBuffer.clear();
            bytesRead = ch.read(readBuffer);
            readBuffer.flip();
            if (bytesRead == -1) {
                throw new IOException("lost client connection, end of stream");
            }
            if (readBuffer.limit() == 0) {
                return;
            }
            CharBuffer cbuf = decoder.decode(readBuffer);
            String req = cbuf.toString();
            String [] lines = req.split(strictRequestTermination ? "\n" : "\n|\r");
            requestQueue.addAll(Arrays.asList(lines));
        }
        protected void write(SocketChannel ch) throws IOException {
            Iterator<String> itr = replyQueue.iterator();
            while (itr.hasNext()) {
                String reply = itr.next();
                writeBuffer.clear();
                logger.debug(MessageFormat.format("fanout reply to {0}: {1}", id, reply));
                byte [] bytes = reply.getBytes(FanoutConstants.CHARSET);
                writeBuffer.put(bytes);
                if (bytes[bytes.length - 1] != 0xa) {
                    writeBuffer.put((byte) 0xa);
                }
                writeBuffer.flip();
                // loop until write buffer has been completely sent
                int written = 0;
                int toWrite = writeBuffer.remaining();
                while (written != toWrite) {
                    written += ch.write(writeBuffer);
                    try {
                        Thread.sleep(10);
                    } catch (Exception x) {
                    }
                }
                itr.remove();
            }
            writeBuffer.clear();
        }
        @Override
        protected void reply(String content) throws IOException {
            // queue the reply
            // replies are transmitted asynchronously from the requests
            replyQueue.add(content);
        }
    }
}
src/com/gitblit/fanout/FanoutService.java
New file
@@ -0,0 +1,563 @@
/*
 * Copyright 2013 gitblit.com.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.gitblit.fanout;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * Base class for Fanout service implementations.
 *
 * Subclass implementations can be used as a Sparkleshare PubSub notification
 * server.  This allows Sparkleshare to be used in conjunction with Gitblit
 * behind a corporate firewall that restricts or prohibits client internet access
 * to the default Sparkleshare PubSub server: notifications.sparkleshare.org
 *
 * @author James Moger
 *
 */
public abstract class FanoutService implements Runnable {
    private final static Logger logger = LoggerFactory.getLogger(FanoutService.class);
    public final static int DEFAULT_PORT = 17000;
    protected final static int serviceTimeout = 5000;
    protected final String host;
    protected final int port;
    protected final String name;
    private Thread serviceThread;
    private final Map<String, FanoutServiceConnection> connections;
    private final Map<String, Set<FanoutServiceConnection>> subscriptions;
    protected final AtomicBoolean isRunning;
    private final AtomicBoolean strictRequestTermination;
    private final AtomicBoolean allowAllChannelAnnouncements;
    private final AtomicInteger concurrentConnectionLimit;
    private final Date bootDate;
    private final AtomicLong rejectedConnectionCount;
    private final AtomicInteger peakConnectionCount;
    private final AtomicLong totalConnections;
    private final AtomicLong totalAnnouncements;
    private final AtomicLong totalMessages;
    private final AtomicLong totalSubscribes;
    private final AtomicLong totalUnsubscribes;
    private final AtomicLong totalPings;
    protected FanoutService(String host, int port, String name) {
        this.host = host;
        this.port = port;
        this.name = name;
        connections = new ConcurrentHashMap<String, FanoutServiceConnection>();
        subscriptions = new ConcurrentHashMap<String, Set<FanoutServiceConnection>>();
        subscriptions.put(FanoutConstants.CH_ALL, new ConcurrentSkipListSet<FanoutServiceConnection>());
        isRunning = new AtomicBoolean(false);
        strictRequestTermination = new AtomicBoolean(false);
        allowAllChannelAnnouncements = new AtomicBoolean(false);
        concurrentConnectionLimit = new AtomicInteger(0);
        bootDate = new Date();
        rejectedConnectionCount = new AtomicLong(0);
        peakConnectionCount = new AtomicInteger(0);
        totalConnections = new AtomicLong(0);
        totalAnnouncements = new AtomicLong(0);
        totalMessages = new AtomicLong(0);
        totalSubscribes = new AtomicLong(0);
        totalUnsubscribes = new AtomicLong(0);
        totalPings = new AtomicLong(0);
    }
    /*
     * Abstract methods
     */
    protected abstract boolean isConnected();
    protected abstract boolean connect();
    protected abstract void listen() throws IOException;
    protected abstract void disconnect();
    /**
     * Returns true if the service requires \n request termination.
     *
     * @return true if request requires \n termination
     */
    public boolean isStrictRequestTermination() {
        return strictRequestTermination.get();
    }
    /**
     * Control the termination of fanout requests. If true, fanout requests must
     * be terminated with \n. If false, fanout requests may be terminated with
     * \n, \r, \r\n, or \n\r. This is useful for debugging with a telnet client.
     *
     * @param isStrictTermination
     */
    public void setStrictRequestTermination(boolean isStrictTermination) {
        strictRequestTermination.set(isStrictTermination);
    }
    /**
     * Returns the maximum allowable concurrent fanout connections.
     *
     * @return the maximum allowable concurrent connection count
     */
    public int getConcurrentConnectionLimit() {
        return concurrentConnectionLimit.get();
    }
    /**
     * Sets the maximum allowable concurrent fanout connection count.
     *
     * @param value
     */
    public void setConcurrentConnectionLimit(int value) {
        concurrentConnectionLimit.set(value);
    }
    /**
     * Returns true if connections are allowed to announce on the all channel.
     *
     * @return true if connections are allowed to announce on the all channel
     */
    public boolean allowAllChannelAnnouncements() {
        return allowAllChannelAnnouncements.get();
    }
    /**
     * Allows/prohibits connections from announcing on the ALL channel.
     *
     * @param value
     */
    public void setAllowAllChannelAnnouncements(boolean value) {
        allowAllChannelAnnouncements.set(value);
    }
    /**
     * Returns the current connections
     *
     * @param channel
     * @return map of current connections keyed by their id
     */
    public Map<String, FanoutServiceConnection> getCurrentConnections() {
        return connections;
    }
    /**
     * Returns all subscriptions
     *
     * @return map of current subscriptions keyed by channel name
     */
    public Map<String, Set<FanoutServiceConnection>> getCurrentSubscriptions() {
        return subscriptions;
    }
    /**
     * Returns the subscriptions for the specified channel
     *
     * @param channel
     * @return set of subscribed connections for the specified channel
     */
    public Set<FanoutServiceConnection> getCurrentSubscriptions(String channel) {
        return subscriptions.get(channel);
    }
    /**
     * Returns the runtime statistics object for this service.
     *
     * @return stats
     */
    public FanoutStats getStatistics() {
        FanoutStats stats = new FanoutStats();
        // settings
        stats.allowAllChannelAnnouncements = allowAllChannelAnnouncements();
        stats.concurrentConnectionLimit = getConcurrentConnectionLimit();
        stats.strictRequestTermination = isStrictRequestTermination();
        // runtime stats
        stats.bootDate = bootDate;
        stats.rejectedConnectionCount = rejectedConnectionCount.get();
        stats.peakConnectionCount = peakConnectionCount.get();
        stats.totalConnections = totalConnections.get();
        stats.totalAnnouncements = totalAnnouncements.get();
        stats.totalMessages = totalMessages.get();
        stats.totalSubscribes = totalSubscribes.get();
        stats.totalUnsubscribes = totalUnsubscribes.get();
        stats.totalPings = totalPings.get();
        stats.currentConnections = connections.size();
        stats.currentChannels = subscriptions.size();
        stats.currentSubscriptions = subscriptions.size() * connections.size();
        return stats;
    }
    /**
     * Returns true if the service is ready.
     *
     * @return true, if the service is ready
     */
    public boolean isReady() {
        if (isRunning.get()) {
            return isConnected();
        }
        return false;
    }
    /**
     * Start the Fanout service thread and immediatel return.
     *
     */
    public void start() {
        if (isRunning.get()) {
            logger.warn(MessageFormat.format("{0} is already running", name));
            return;
        }
        serviceThread = new Thread(this);
        serviceThread.setName(MessageFormat.format("{0} {1}:{2,number,0}", name, host == null ? "all" : host, port));
        serviceThread.start();
    }
    /**
     * Start the Fanout service thread and wait until it is accepting connections.
     *
     */
    public void startSynchronously() {
        start();
        while (!isReady()) {
            try {
                Thread.sleep(100);
            } catch (Exception e) {
            }
        }
    }
    /**
     * Stop the Fanout service.  This method returns when the service has been
     * completely shutdown.
     */
    public void stop() {
        if (!isRunning.get()) {
            logger.warn(MessageFormat.format("{0} is not running", name));
            return;
        }
        logger.info(MessageFormat.format("stopping {0}...", name));
        isRunning.set(false);
        try {
            if (serviceThread != null) {
                serviceThread.join();
                serviceThread = null;
            }
        } catch (InterruptedException e1) {
            logger.error("", e1);
        }
        logger.info(MessageFormat.format("stopped {0}", name));
    }
    /**
     * Main execution method of the service
     */
    @Override
    public final void run() {
        disconnect();
        resetState();
        isRunning.set(true);
        while (isRunning.get()) {
            if (connect()) {
                try {
                    listen();
                } catch (IOException e) {
                    logger.error(MessageFormat.format("error processing {0}", name), e);
                    isRunning.set(false);
                }
            } else {
                try {
                    Thread.sleep(serviceTimeout);
                } catch (InterruptedException x) {
                }
            }
        }
        disconnect();
        resetState();
    }
    protected void resetState() {
        // reset state data
        connections.clear();
        subscriptions.clear();
        rejectedConnectionCount.set(0);
        peakConnectionCount.set(0);
        totalConnections.set(0);
        totalAnnouncements.set(0);
        totalMessages.set(0);
        totalSubscribes.set(0);
        totalUnsubscribes.set(0);
        totalPings.set(0);
    }
    /**
     * Configure the client connection socket.
     *
     * @param socket
     * @throws SocketException
     */
    protected void configureClientSocket(Socket socket) throws SocketException {
        socket.setKeepAlive(true);
        socket.setSoLinger(true, 0); // immediately discard any remaining data
    }
    /**
     * Add the connection to the connections map.
     *
     * @param connection
     * @return false if the connection was rejected due to too many concurrent
     *         connections
     */
    protected boolean addConnection(FanoutServiceConnection connection) {
        int limit = getConcurrentConnectionLimit();
        if (limit > 0 && connections.size() > limit) {
            logger.info(MessageFormat.format("hit {0,number,0} connection limit, rejecting fanout connection", concurrentConnectionLimit));
            increment(rejectedConnectionCount);
            connection.busy();
            return false;
        }
        // add the connection to our map
        connections.put(connection.id, connection);
        // track peak number of concurrent connections
        if (connections.size() > peakConnectionCount.get()) {
            peakConnectionCount.set(connections.size());
        }
        logger.info("fanout new connection " + connection.id);
        connection.connected();
        return true;
    }
    /**
     * Remove the connection from the connections list and from subscriptions.
     *
     * @param connection
     */
    protected void removeConnection(FanoutServiceConnection connection) {
        connections.remove(connection.id);
        Iterator<Map.Entry<String, Set<FanoutServiceConnection>>> itr = subscriptions.entrySet().iterator();
        while (itr.hasNext()) {
            Map.Entry<String, Set<FanoutServiceConnection>> entry = itr.next();
            Set<FanoutServiceConnection> subscriptions = entry.getValue();
            subscriptions.remove(connection);
            if (!FanoutConstants.CH_ALL.equals(entry.getKey())) {
                if (subscriptions.size() == 0) {
                    itr.remove();
                    logger.info(MessageFormat.format("fanout remove channel {0}, no subscribers", entry.getKey()));
                }
            }
        }
        logger.info(MessageFormat.format("fanout connection {0} removed", connection.id));
    }
    /**
     * Tests to see if the connection is being monitored by the service.
     *
     * @param connection
     * @return true if the service is monitoring the connection
     */
    protected boolean hasConnection(FanoutServiceConnection connection) {
        return connections.containsKey(connection.id);
    }
    /**
     * Reply to a connection on the specified channel.
     *
     * @param connection
     * @param channel
     * @param message
     * @return the reply
     */
    protected String reply(FanoutServiceConnection connection, String channel, String message) {
        if (channel != null && channel.length() > 0) {
            increment(totalMessages);
        }
        return connection.reply(channel, message);
    }
    /**
     * Service method to broadcast a message to all connections.
     *
     * @param message
     */
    public void broadcastAll(String message) {
        broadcast(connections.values(), FanoutConstants.CH_ALL, message);
        increment(totalAnnouncements);
    }
    /**
     * Service method to broadcast a message to connections subscribed to the
     * channel.
     *
     * @param message
     */
    public void broadcast(String channel, String message) {
        List<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>(subscriptions.get(channel));
        broadcast(connections, channel, message);
        increment(totalAnnouncements);
    }
    /**
     * Broadcast a message to connections subscribed to the specified channel.
     *
     * @param connections
     * @param channel
     * @param message
     */
    protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {
        for (FanoutServiceConnection connection : connections) {
            reply(connection, channel, message);
        }
    }
    /**
     * Process an incoming Fanout request.
     *
     * @param connection
     * @param req
     * @return the reply to the request, may be null
     */
    protected String processRequest(FanoutServiceConnection connection, String req) {
        logger.info(MessageFormat.format("fanout request from {0}: {1}", connection.id, req));
        String[] fields = req.split(" ", 3);
        String action = fields[0];
        String channel = fields.length >= 2 ? fields[1] : null;
        String message = fields.length >= 3 ? fields[2] : null;
        try {
            return processRequest(connection, action, channel, message);
        } catch (IllegalArgumentException e) {
            // invalid action
            logger.error(MessageFormat.format("fanout connection {0} requested invalid action {1}", connection.id, action));
            logger.error(asHexArray(req));
        }
        return null;
    }
    /**
     * Process the Fanout request.
     *
     * @param connection
     * @param action
     * @param channel
     * @param message
     * @return the reply to the request, may be null
     * @throws IllegalArgumentException
     */
    protected String processRequest(FanoutServiceConnection connection, String action, String channel, String message) throws IllegalArgumentException {
        if ("ping".equals(action)) {
            // ping
            increment(totalPings);
            return reply(connection, null, "" + System.currentTimeMillis());
        } else if ("info".equals(action)) {
            // info
            String info = getStatistics().info();
            return reply(connection, null, info);
        } else if ("announce".equals(action)) {
            // announcement
            if (!allowAllChannelAnnouncements.get() && FanoutConstants.CH_ALL.equals(channel)) {
                // prohibiting connection-sourced all announcements
                logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on ALL channel", connection.id, message));
            } else if ("debug".equals(channel)) {
                // prohibiting connection-sourced debug announcements
                logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on DEBUG channel", connection.id, message));
            } else {
                // acceptable announcement
                List<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>(subscriptions.get(channel));
                connections.remove(connection); // remove announcer
                broadcast(connections, channel, message);
                increment(totalAnnouncements);
            }
        } else if ("subscribe".equals(action)) {
            // subscribe
            if (!subscriptions.containsKey(channel)) {
                logger.info(MessageFormat.format("fanout new channel {0}", channel));
                subscriptions.put(channel, new ConcurrentSkipListSet<FanoutServiceConnection>());
            }
            subscriptions.get(channel).add(connection);
            logger.debug(MessageFormat.format("fanout connection {0} subscribed to channel {1}", connection.id, channel));
            increment(totalSubscribes);
        } else if ("unsubscribe".equals(action)) {
            // unsubscribe
            if (subscriptions.containsKey(channel)) {
                subscriptions.get(channel).remove(connection);
                if (subscriptions.get(channel).size() == 0) {
                    subscriptions.remove(channel);
                }
                increment(totalUnsubscribes);
            }
        } else {
            // invalid action
            throw new IllegalArgumentException(action);
        }
        return null;
    }
    private String asHexArray(String req) {
        StringBuilder sb = new StringBuilder();
        for (char c : req.toCharArray()) {
            sb.append(Integer.toHexString(c)).append(' ');
        }
        return "[ " + sb.toString().trim() + " ]";
    }
    /**
     * Increment a long and prevent negative rollover.
     *
     * @param counter
     */
    private void increment(AtomicLong counter) {
        long v = counter.incrementAndGet();
        if (v < 0) {
            counter.set(0);
        }
    }
    @Override
    public String toString() {
        return name;
    }
}
src/com/gitblit/fanout/FanoutServiceConnection.java
New file
@@ -0,0 +1,105 @@
/*
 * Copyright 2013 gitblit.com.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.gitblit.fanout;
import java.io.IOException;
import java.net.Socket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * FanoutServiceConnection handles reading/writing messages from a remote fanout
 * connection.
 *
 * @author James Moger
 *
 */
public abstract class FanoutServiceConnection implements Comparable<FanoutServiceConnection> {
    private static final Logger logger = LoggerFactory.getLogger(FanoutServiceConnection.class);
    public final String id;
    protected FanoutServiceConnection(Socket socket) {
        this.id = FanoutConstants.getRemoteSocketId(socket);
    }
    protected abstract void reply(String content) throws IOException;
    /**
     * Send the connection a debug channel connected message.
     *
     * @param message
     */
    protected void connected() {
        reply(FanoutConstants.CH_DEBUG, FanoutConstants.MSG_CONNECTED);
    }
    /**
     * Send the connection a debug channel busy message.
     *
     * @param message
     */
    protected void busy() {
        reply(FanoutConstants.CH_DEBUG, FanoutConstants.MSG_BUSY);
    }
    /**
     * Send the connection a message for the specified channel.
     *
     * @param channel
     * @param message
     * @return the reply
     */
    protected String reply(String channel, String message) {
        String content;
        if (channel != null) {
            content = channel + "!" + message;
        } else {
            content = message;
        }
        try {
            reply(content);
        } catch (Exception e) {
            logger.error("failed to reply to fanout connection " + id, e);
        }
        return content;
    }
    @Override
    public int compareTo(FanoutServiceConnection c) {
        return id.compareTo(c.id);
    }
    @Override
    public boolean equals(Object o) {
        if (o instanceof FanoutServiceConnection) {
            return id.equals(((FanoutServiceConnection) o).id);
        }
        return false;
    }
    @Override
    public int hashCode() {
        return id.hashCode();
    }
    @Override
    public String toString() {
        return id;
    }
}
src/com/gitblit/fanout/FanoutSocketService.java
New file
@@ -0,0 +1,234 @@
/*
 * Copyright 2013 gitblit.com.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.gitblit.fanout;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.text.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * A multi-threaded socket implementation of https://github.com/travisghansen/fanout
 *
 * This implementation creates a master acceptor thread which accepts incoming
 * fanout connections and then spawns a daemon thread for each accepted connection.
 * If there are 100 concurrent fanout connections, there are 101 threads.
 *
 * @author James Moger
 *
 */
public class FanoutSocketService extends FanoutService {
    private final static Logger logger = LoggerFactory.getLogger(FanoutSocketService.class);
    private volatile ServerSocket serviceSocket;
    public static void main(String[] args) throws Exception {
        FanoutSocketService pubsub = new FanoutSocketService(null, DEFAULT_PORT);
        pubsub.setStrictRequestTermination(false);
        pubsub.setAllowAllChannelAnnouncements(false);
        pubsub.start();
    }
    /**
     * Create a multi-threaded fanout service.
     *
     * @param port
     *            the port for running the fanout PubSub service
     * @throws IOException
     */
    public FanoutSocketService(int port) {
        this(null, port);
    }
    /**
     * Create a multi-threaded fanout service.
     *
     * @param bindInterface
     *            the ip address to bind for the service, may be null
     * @param port
     *            the port for running the fanout PubSub service
     * @throws IOException
     */
    public FanoutSocketService(String bindInterface, int port) {
        super(bindInterface, port, "Fanout socket service");
    }
    @Override
    protected boolean isConnected() {
        return serviceSocket != null;
    }
    @Override
    protected boolean connect() {
        if (serviceSocket == null) {
            try {
                serviceSocket = new ServerSocket();
                serviceSocket.setReuseAddress(true);
                serviceSocket.setSoTimeout(serviceTimeout);
                serviceSocket.bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));
                logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}",
                        name, host == null ? "0.0.0.0" : host, serviceSocket.getLocalPort()));
            } catch (IOException e) {
                logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}",
                        name, host == null ? "0.0.0.0" : host, port), e);
                return false;
            }
        }
        return true;
    }
    @Override
    protected void disconnect() {
        try {
            if (serviceSocket != null) {
                logger.debug(MessageFormat.format("closing {0} server socket", name));
                serviceSocket.close();
                serviceSocket = null;
            }
        } catch (IOException e) {
            logger.error(MessageFormat.format("failed to disconnect {0}", name), e);
        }
    }
    /**
     * This accepts incoming fanout connections and spawns connection threads.
     */
    @Override
    protected void listen() throws IOException {
        try {
            Socket socket;
            socket = serviceSocket.accept();
            configureClientSocket(socket);
            FanoutSocketConnection connection = new FanoutSocketConnection(socket);
            if (addConnection(connection)) {
                // spawn connection daemon thread
                Thread connectionThread = new Thread(connection);
                connectionThread.setDaemon(true);
                connectionThread.setName("Fanout " + connection.id);
                connectionThread.start();
            } else {
                // synchronously close the connection and remove it
                removeConnection(connection);
                connection.closeConnection();
                connection = null;
            }
        } catch (SocketTimeoutException e) {
            // ignore accept timeout exceptions
        }
    }
    /**
     * FanoutSocketConnection handles reading/writing messages from a remote fanout
     * connection.
     *
     * @author James Moger
     *
     */
    class FanoutSocketConnection extends FanoutServiceConnection implements Runnable {
        Socket socket;
        FanoutSocketConnection(Socket socket) {
            super(socket);
            this.socket = socket;
        }
        /**
         * Connection thread read/write method.
         */
        @Override
        public void run() {
            try {
                StringBuilder sb = new StringBuilder();
                BufferedInputStream is = new BufferedInputStream(socket.getInputStream());
                byte[] buffer = new byte[FanoutConstants.BUFFER_LENGTH];
                int len = 0;
                while (true) {
                    while (is.available() > 0) {
                        len = is.read(buffer);
                        for (int i = 0; i < len; i++) {
                            byte b = buffer[i];
                            if (b == 0xa || (!isStrictRequestTermination() && b == 0xd)) {
                                String req = sb.toString();
                                sb.setLength(0);
                                if (req.length() > 0) {
                                    // ignore empty request strings
                                    processRequest(this, req);
                                }
                            } else {
                                sb.append((char) b);
                            }
                        }
                    }
                    if (!isRunning.get()) {
                        // service has stopped, terminate client connection
                        break;
                    } else {
                        Thread.sleep(500);
                    }
                }
            } catch (Throwable t) {
                if (t instanceof SocketException) {
                    logger.error(MessageFormat.format("fanout connection {0}: {1}", id, t.getMessage()));
                } else if (t instanceof SocketTimeoutException) {
                    logger.error(MessageFormat.format("fanout connection {0}: {1}", id, t.getMessage()));
                } else {
                    logger.error(MessageFormat.format("exception while handling fanout connection {0}", id), t);
                }
            } finally {
                closeConnection();
            }
            logger.info(MessageFormat.format("thread for fanout connection {0} is finished", id));
        }
        @Override
        protected void reply(String content) throws IOException {
            // synchronously send reply
            logger.debug(MessageFormat.format("fanout reply to {0}: {1}", id, content));
            OutputStream os = socket.getOutputStream();
            byte [] bytes = content.getBytes(FanoutConstants.CHARSET);
            os.write(bytes);
            if (bytes[bytes.length - 1] != 0xa) {
                os.write(0xa);
            }
            os.flush();
        }
        protected void closeConnection() {
            // close the connection socket
            try {
                socket.close();
            } catch (IOException e) {
            }
            socket = null;
            // remove this connection from the service
            removeConnection(this);
        }
    }
}
src/com/gitblit/fanout/FanoutStats.java
New file
@@ -0,0 +1,98 @@
/*
 * Copyright 2013 gitblit.com.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.gitblit.fanout;
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.Date;
/**
 * Encapsulates the runtime stats of a fanout service.
 *
 * @author James Moger
 *
 */
public class FanoutStats implements Serializable {
    private static final long serialVersionUID = 1L;
    public long concurrentConnectionLimit;
    public boolean allowAllChannelAnnouncements;
    public boolean strictRequestTermination;
    public Date bootDate;
    public long rejectedConnectionCount;
    public int peakConnectionCount;
    public long currentChannels;
    public long currentSubscriptions;
    public long currentConnections;
    public long totalConnections;
    public long totalAnnouncements;
    public long totalMessages;
    public long totalSubscribes;
    public long totalUnsubscribes;
    public long totalPings;
    public String info() {
        int i = 0;
        StringBuilder sb = new StringBuilder();
        sb.append(infoStr(i++, "boot date"));
        sb.append(infoStr(i++, "strict request termination"));
        sb.append(infoStr(i++, "allow connection \"all\" announcements"));
        sb.append(infoInt(i++, "concurrent connection limit"));
        sb.append(infoInt(i++, "concurrent limit rejected connections"));
        sb.append(infoInt(i++, "peak connections"));
        sb.append(infoInt(i++, "current connections"));
        sb.append(infoInt(i++, "current channels"));
        sb.append(infoInt(i++, "current subscriptions"));
        sb.append(infoInt(i++, "user-requested subscriptions"));
        sb.append(infoInt(i++, "total connections"));
        sb.append(infoInt(i++, "total announcements"));
        sb.append(infoInt(i++, "total messages"));
        sb.append(infoInt(i++, "total subscribes"));
        sb.append(infoInt(i++, "total unsubscribes"));
        sb.append(infoInt(i++, "total pings"));
        String template = sb.toString();
        String info = MessageFormat.format(template,
                bootDate.toString(),
                Boolean.toString(strictRequestTermination),
                Boolean.toString(allowAllChannelAnnouncements),
                concurrentConnectionLimit,
                rejectedConnectionCount,
                peakConnectionCount,
                currentConnections,
                currentChannels,
                currentSubscriptions,
                currentSubscriptions == 0 ? 0 : (currentSubscriptions - currentConnections),
                        totalConnections,
                        totalAnnouncements,
                        totalMessages,
                        totalSubscribes,
                        totalUnsubscribes,
                        totalPings);
        return info;
    }
    private String infoStr(int index, String label) {
        return label + ": {" + index + "}\n";
    }
    private String infoInt(int index, String label) {
        return label + ": {" + index + ",number,0}\n";
    }
}
tests/com/gitblit/tests/FanoutServiceTest.java
New file
@@ -0,0 +1,172 @@
/*
 * Copyright 2013 gitblit.com.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.gitblit.tests;
import static org.junit.Assert.assertEquals;
import java.text.MessageFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import com.gitblit.fanout.FanoutService;
import com.gitblit.fanout.FanoutClient;
import com.gitblit.fanout.FanoutClient.FanoutAdapter;
import com.gitblit.fanout.FanoutNioService;
import com.gitblit.fanout.FanoutService;
import com.gitblit.fanout.FanoutSocketService;
public class FanoutServiceTest {
    int fanoutPort = FanoutService.DEFAULT_PORT;
    @Test
    public void testNioPubSub() throws Exception {
        testPubSub(new FanoutNioService(fanoutPort));
    }
    @Test
    public void testSocketPubSub() throws Exception {
        testPubSub(new FanoutSocketService(fanoutPort));
    }
    @Test
    public void testNioDisruptionAndRecovery() throws Exception {
        testDisruption(new FanoutNioService(fanoutPort));
    }
    @Test
    public void testSocketDisruptionAndRecovery() throws Exception {
        testDisruption(new FanoutSocketService(fanoutPort));
    }
    protected void testPubSub(FanoutService service) throws Exception {
        System.out.println(MessageFormat.format("\n\n========================================\nPUBSUB TEST {0}\n========================================\n\n", service.toString()));
        service.startSynchronously();
        final Map<String, String> announcementsA = new ConcurrentHashMap<String, String>();
        FanoutClient clientA = new FanoutClient("localhost", fanoutPort);
        clientA.addListener(new FanoutAdapter() {
            @Override
            public void announcement(String channel, String message) {
                announcementsA.put(channel, message);
            }
        });
        clientA.startSynchronously();
        final Map<String, String> announcementsB = new ConcurrentHashMap<String, String>();
        FanoutClient clientB = new FanoutClient("localhost", fanoutPort);
        clientB.addListener(new FanoutAdapter() {
            @Override
            public void announcement(String channel, String message) {
                announcementsB.put(channel, message);
            }
        });
        clientB.startSynchronously();
        // subscribe clients A and B to the channels
        clientA.subscribe("a");
        clientA.subscribe("b");
        clientA.subscribe("c");
        clientB.subscribe("a");
        clientB.subscribe("b");
        clientB.subscribe("c");
        // give async messages a chance to be delivered
        Thread.sleep(1000);
        clientA.announce("a", "apple");
        clientA.announce("b", "banana");
        clientA.announce("c", "cantelope");
        clientB.announce("a", "avocado");
        clientB.announce("b", "beet");
        clientB.announce("c", "carrot");
        // give async messages a chance to be delivered
        Thread.sleep(2000);
        // confirm that client B received client A's announcements
        assertEquals("apple", announcementsB.get("a"));
        assertEquals("banana", announcementsB.get("b"));
        assertEquals("cantelope", announcementsB.get("c"));
        // confirm that client A received client B's announcements
        assertEquals("avocado", announcementsA.get("a"));
        assertEquals("beet", announcementsA.get("b"));
        assertEquals("carrot", announcementsA.get("c"));
        clientA.stop();
        clientB.stop();
        service.stop();
    }
    protected void testDisruption(FanoutService service) throws Exception  {
        System.out.println(MessageFormat.format("\n\n========================================\nDISRUPTION TEST {0}\n========================================\n\n", service.toString()));
        service.startSynchronously();
        final AtomicInteger pongCount = new AtomicInteger(0);
        FanoutClient client = new FanoutClient("localhost", fanoutPort);
        client.addListener(new FanoutAdapter() {
            @Override
            public void pong(Date timestamp) {
                pongCount.incrementAndGet();
            }
        });
        client.startSynchronously();
        // ping and wait for pong
        client.ping();
        Thread.sleep(500);
        // restart client
        client.stop();
        Thread.sleep(1000);
        client.startSynchronously();
        // ping and wait for pong
        client.ping();
        Thread.sleep(500);
        assertEquals(2, pongCount.get());
        // now disrupt service
        service.stop();
        Thread.sleep(2000);
        service.startSynchronously();
        // wait for reconnect
        Thread.sleep(2000);
        // ping and wait for pong
        client.ping();
        Thread.sleep(500);
        // kill all
        client.stop();
        service.stop();
        // confirm expected pong count
        assertEquals(3, pongCount.get());
    }
}
tests/com/gitblit/tests/GitBlitSuite.java
@@ -59,7 +59,8 @@
        MarkdownUtilsTest.class, JGitUtilsTest.class, SyndicationUtilsTest.class,
        DiffUtilsTest.class, MetricUtilsTest.class, TicgitUtilsTest.class, X509UtilsTest.class,
        GitBlitTest.class, FederationTests.class, RpcTests.class, GitServletTest.class,
        GroovyScriptTest.class, LuceneExecutorTest.class, IssuesTest.class, RepositoryModelTest.class })
        GroovyScriptTest.class, LuceneExecutorTest.class, IssuesTest.class, RepositoryModelTest.class,
        FanoutServiceTest.class })
public class GitBlitSuite {
    public static final File REPOSITORIES = new File("git");