Fanout service for Sparkleshare clients
8 files added
5 files modified
| | |
| | | groovy.customFields =
|
| | |
|
| | | #
|
| | | # Fanout Settings
|
| | | #
|
| | |
|
| | | # Fanout is a PubSub notification service that can be used by Sparkleshare
|
| | | # to eliminate repository change polling. The fanout service runs in a separate
|
| | | # thread on a separate port from the Gitblit http/https application.
|
| | | # This service is provided so that Sparkleshare may be used with Gitblit in
|
| | | # firewalled environments or where reliance on Sparkleshare's default notifications
|
| | | # server (notifications.sparkleshare.org) is unwanted.
|
| | | #
|
| | | # This service maintains an open socket connection from the client to the
|
| | | # Fanout PubSub service. This service may not work properly behind a proxy server. |
| | |
|
| | | # Specify the interface for Fanout to bind it's service.
|
| | | # You may specify an ip or an empty value to bind to all interfaces.
|
| | | # Specifying localhost will result in Gitblit ONLY listening to requests to
|
| | | # localhost.
|
| | | #
|
| | | # SINCE 1.2.1
|
| | | # RESTART REQUIRED
|
| | | fanout.bindInterface = localhost
|
| | |
|
| | | # port for serving the Fanout PubSub service. <= 0 disables this service.
|
| | | # On Unix/Linux systems, ports < 1024 require root permissions.
|
| | | # Recommended value: 17000
|
| | | #
|
| | | # SINCE 1.2.1
|
| | | # RESTART REQUIRED
|
| | | fanout.port = 0
|
| | |
|
| | | # Use Fanout NIO service. If false, a multi-threaded socket service will be used.
|
| | | # Be advised, the socket implementation spawns a thread per connection plus the
|
| | | # connection acceptor thread. The NIO implementation is completely single-threaded.
|
| | | #
|
| | | # SINCE 1.2.1
|
| | | # RESTART REQUIRED
|
| | | fanout.useNio = true
|
| | |
|
| | | # Concurrent connection limit. <= 0 disables concurrent connection throttling.
|
| | | # If > 0, only the specified number of concurrent connections will be allowed
|
| | | # and all other connections will be rejected.
|
| | | #
|
| | | # SINCE 1.2.1
|
| | | # RESTART REQUIRED
|
| | | fanout.connectionLimit = 0
|
| | |
|
| | | #
|
| | | # Authentication Settings
|
| | | #
|
| | |
|
| | |
| | | - Git-notes display support
|
| | | - Submodule support
|
| | | - Push log based on a hidden, orphan branch refs/gitblit/pushes
|
| | | - Fanout PubSub notifications service for self-hosted [Sparkleshare](http://sparkleshare.org) use
|
| | | - gh-pages display support (Jekyll is not supported)
|
| | | - Branch metrics (uses Google Charts)
|
| | | - HEAD and Branch RSS feeds
|
| | |
| | |
|
| | | #### additions
|
| | |
|
| | | - Implemented a simple push log based on a hidden, orphan branch refs/gitblit/pushes (issue 177)
|
| | | - Fanout PubSub service for self-hosted [Sparkleshare](http://sparkleshare.org) notifications.<br/>
|
| | | This service is disabled by default.<br/>
|
| | | **New:** *fanout.bindInterface = localhost*<br/>
|
| | | **New:** *fanout.port = 0*<br/>
|
| | | **New:** *fanout.useNio = true*<br/>
|
| | | **New:** *fanout.connectionLimit = 0*
|
| | | - Implemented a simple push log based on a hidden, orphan branch refs/gitblit/pushes (issue 177)<br/>
|
| | | The push log is not currently visible in the ui, but the data will be collected and it will be exposed to the ui in the next release.
|
| | | - Support for locally and remotely authenticated accounts in LdapUserService and RedmineUserService (issue 183)
|
| | | - Added Dutch translation (github/kwoot)
|
| | |
|
| | |
| | | import com.gitblit.Constants.FederationToken;
|
| | | import com.gitblit.Constants.PermissionType;
|
| | | import com.gitblit.Constants.RegistrantType;
|
| | | import com.gitblit.fanout.FanoutNioService;
|
| | | import com.gitblit.fanout.FanoutService;
|
| | | import com.gitblit.fanout.FanoutSocketService;
|
| | | import com.gitblit.models.FederationModel;
|
| | | import com.gitblit.models.FederationProposal;
|
| | | import com.gitblit.models.FederationSet;
|
| | |
| | | private TimeZone timezone;
|
| | |
|
| | | private FileBasedConfig projectConfigs;
|
| | | |
| | | private FanoutService fanoutService;
|
| | |
|
| | | public GitBlit() {
|
| | | if (gitblit == null) {
|
| | |
| | | }
|
| | |
|
| | | ContainerUtils.CVE_2007_0450.test();
|
| | | |
| | | // startup Fanout PubSub service
|
| | | if (settings.getInteger(Keys.fanout.port, 0) > 0) {
|
| | | String bindInterface = settings.getString(Keys.fanout.bindInterface, null);
|
| | | int port = settings.getInteger(Keys.fanout.port, FanoutService.DEFAULT_PORT);
|
| | | boolean useNio = settings.getBoolean(Keys.fanout.useNio, true);
|
| | | int limit = settings.getInteger(Keys.fanout.connectionLimit, 0);
|
| | | |
| | | if (useNio) {
|
| | | if (StringUtils.isEmpty(bindInterface)) {
|
| | | fanoutService = new FanoutNioService(port);
|
| | | } else {
|
| | | fanoutService = new FanoutNioService(bindInterface, port);
|
| | | }
|
| | | } else {
|
| | | if (StringUtils.isEmpty(bindInterface)) {
|
| | | fanoutService = new FanoutSocketService(port);
|
| | | } else {
|
| | | fanoutService = new FanoutSocketService(bindInterface, port);
|
| | | }
|
| | | }
|
| | | |
| | | fanoutService.setConcurrentConnectionLimit(limit);
|
| | | fanoutService.setAllowAllChannelAnnouncements(false);
|
| | | fanoutService.start();
|
| | | }
|
| | | }
|
| | |
|
| | | private void logTimezone(String type, TimeZone zone) {
|
| | |
| | | scheduledExecutor.shutdownNow();
|
| | | luceneExecutor.close();
|
| | | gcExecutor.close();
|
| | | if (fanoutService != null) {
|
| | | fanoutService.stop();
|
| | | }
|
| | | }
|
| | |
|
| | | /**
|
New file |
| | |
| | | /*
|
| | | * Copyright 2013 gitblit.com.
|
| | | *
|
| | | * Licensed under the Apache License, Version 2.0 (the "License");
|
| | | * you may not use this file except in compliance with the License.
|
| | | * You may obtain a copy of the License at
|
| | | *
|
| | | * http://www.apache.org/licenses/LICENSE-2.0
|
| | | *
|
| | | * Unless required by applicable law or agreed to in writing, software
|
| | | * distributed under the License is distributed on an "AS IS" BASIS,
|
| | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| | | * See the License for the specific language governing permissions and
|
| | | * limitations under the License.
|
| | | */
|
| | | package com.gitblit.fanout;
|
| | |
|
| | | import java.io.IOException;
|
| | | import java.net.InetAddress;
|
| | | import java.net.InetSocketAddress;
|
| | | import java.nio.ByteBuffer;
|
| | | import java.nio.channels.SelectionKey;
|
| | | import java.nio.channels.Selector;
|
| | | import java.nio.channels.SocketChannel;
|
| | | import java.nio.charset.Charset;
|
| | | import java.nio.charset.CharsetDecoder;
|
| | | import java.text.MessageFormat;
|
| | | import java.util.ArrayList;
|
| | | import java.util.Collections;
|
| | | import java.util.Date;
|
| | | import java.util.Iterator;
|
| | | import java.util.LinkedHashSet;
|
| | | import java.util.List;
|
| | | import java.util.Set;
|
| | | import java.util.concurrent.atomic.AtomicBoolean;
|
| | |
|
| | | import org.slf4j.Logger;
|
| | | import org.slf4j.LoggerFactory;
|
| | |
|
| | | /**
|
| | | * Fanout client class.
|
| | | * |
| | | * @author James Moger
|
| | | *
|
| | | */
|
| | | public class FanoutClient implements Runnable {
|
| | |
|
| | | private final static Logger logger = LoggerFactory.getLogger(FanoutClient.class);
|
| | |
|
| | | private final int clientTimeout = 500;
|
| | | private final int reconnectTimeout = 2000;
|
| | | private final String host;
|
| | | private final int port;
|
| | | private final List<FanoutListener> listeners;
|
| | |
|
| | | private String id;
|
| | | private volatile Selector selector;
|
| | | private volatile SocketChannel socketCh;
|
| | | private Thread clientThread;
|
| | | |
| | | private final AtomicBoolean isConnected;
|
| | | private final AtomicBoolean isRunning;
|
| | | private final AtomicBoolean isAutomaticReconnect;
|
| | | private final ByteBuffer writeBuffer;
|
| | | private final ByteBuffer readBuffer;
|
| | | private final CharsetDecoder decoder;
|
| | | |
| | | private final Set<String> subscriptions;
|
| | | private boolean resubscribe;
|
| | | |
| | | public interface FanoutListener {
|
| | | public void pong(Date timestamp);
|
| | | public void announcement(String channel, String message);
|
| | | }
|
| | | |
| | | public static class FanoutAdapter implements FanoutListener {
|
| | | public void pong(Date timestamp) { }
|
| | | public void announcement(String channel, String message) { }
|
| | | }
|
| | |
|
| | | public static void main(String args[]) throws Exception {
|
| | | FanoutClient client = new FanoutClient("localhost", 2000);
|
| | | client.addListener(new FanoutAdapter() {
|
| | |
|
| | | @Override
|
| | | public void pong(Date timestamp) {
|
| | | System.out.println("Pong. " + timestamp);
|
| | | }
|
| | | |
| | | @Override
|
| | | public void announcement(String channel, String message) {
|
| | | System.out.println(MessageFormat.format("Here ye, Here ye. {0} says {1}", channel, message));
|
| | | }
|
| | | });
|
| | | client.start();
|
| | | |
| | | Thread.sleep(5000);
|
| | | client.ping();
|
| | | client.subscribe("james");
|
| | | client.announce("james", "12345"); |
| | | client.subscribe("c52f99d16eb5627877ae957df7ce1be102783bd5");
|
| | | |
| | | while (true) {
|
| | | Thread.sleep(10000);
|
| | | client.ping();
|
| | | }
|
| | | }
|
| | |
|
| | | public FanoutClient(String host, int port) {
|
| | | this.host = host;
|
| | | this.port = port;
|
| | | readBuffer = ByteBuffer.allocateDirect(FanoutConstants.BUFFER_LENGTH);
|
| | | writeBuffer = ByteBuffer.allocateDirect(FanoutConstants.BUFFER_LENGTH);
|
| | | decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder();
|
| | | listeners = Collections.synchronizedList(new ArrayList<FanoutListener>());
|
| | | subscriptions = new LinkedHashSet<String>();
|
| | | isRunning = new AtomicBoolean(false);
|
| | | isConnected = new AtomicBoolean(false);
|
| | | isAutomaticReconnect = new AtomicBoolean(true);
|
| | | }
|
| | |
|
| | | public void addListener(FanoutListener listener) {
|
| | | listeners.add(listener);
|
| | | }
|
| | |
|
| | | public void removeListener(FanoutListener listener) {
|
| | | listeners.remove(listener);
|
| | | }
|
| | | |
| | | public boolean isAutomaticReconnect() {
|
| | | return isAutomaticReconnect.get();
|
| | | }
|
| | | |
| | | public void setAutomaticReconnect(boolean value) {
|
| | | isAutomaticReconnect.set(value);
|
| | | }
|
| | |
|
| | | public void ping() {
|
| | | confirmConnection();
|
| | | write("ping");
|
| | | }
|
| | |
|
| | | public void status() {
|
| | | confirmConnection();
|
| | | write("status");
|
| | | }
|
| | | |
| | | public void subscribe(String channel) {
|
| | | confirmConnection();
|
| | | if (subscriptions.add(channel)) {
|
| | | write("subscribe " + channel);
|
| | | }
|
| | | }
|
| | | |
| | | public void unsubscribe(String channel) {
|
| | | confirmConnection();
|
| | | if (subscriptions.remove(channel)) {
|
| | | write("unsubscribe " + channel);
|
| | | }
|
| | | }
|
| | | |
| | | public void announce(String channel, String message) {
|
| | | confirmConnection();
|
| | | write("announce " + channel + " " + message);
|
| | | }
|
| | |
|
| | | private void confirmConnection() {
|
| | | if (!isConnected()) {
|
| | | throw new RuntimeException("Fanout client is disconnected!");
|
| | | }
|
| | | }
|
| | | |
| | | public boolean isConnected() {
|
| | | return isRunning.get() && socketCh != null && isConnected.get();
|
| | | }
|
| | | |
| | | /**
|
| | | * Start client connection and return immediately.
|
| | | */
|
| | | public void start() {
|
| | | if (isRunning.get()) {
|
| | | logger.warn("Fanout client is already running");
|
| | | return;
|
| | | }
|
| | | clientThread = new Thread(this, "Fanout client");
|
| | | clientThread.start();
|
| | | }
|
| | | |
| | | /**
|
| | | * Start client connection and wait until it has connected.
|
| | | */
|
| | | public void startSynchronously() {
|
| | | start();
|
| | | while (!isConnected()) { |
| | | try {
|
| | | Thread.sleep(100);
|
| | | } catch (Exception e) {
|
| | | }
|
| | | }
|
| | | }
|
| | |
|
| | | /**
|
| | | * Stops client connection. This method returns when the connection has
|
| | | * been completely shutdown.
|
| | | */
|
| | | public void stop() {
|
| | | if (!isRunning.get()) {
|
| | | logger.warn("Fanout client is not running");
|
| | | return;
|
| | | }
|
| | | isRunning.set(false);
|
| | | try {
|
| | | if (clientThread != null) {
|
| | | clientThread.join();
|
| | | clientThread = null;
|
| | | }
|
| | | } catch (InterruptedException e1) {
|
| | | }
|
| | | }
|
| | |
|
| | | @Override
|
| | | public void run() {
|
| | | resetState();
|
| | | |
| | | isRunning.set(true); |
| | | while (isRunning.get()) {
|
| | | // (re)connect
|
| | | if (socketCh == null) {
|
| | | try {
|
| | | InetAddress addr = InetAddress.getByName(host);
|
| | | socketCh = SocketChannel.open(new InetSocketAddress(addr, port));
|
| | | socketCh.configureBlocking(false);
|
| | | selector = Selector.open();
|
| | | id = FanoutConstants.getLocalSocketId(socketCh.socket()); |
| | | socketCh.register(selector, SelectionKey.OP_READ);
|
| | | } catch (Exception e) {
|
| | | logger.error(MessageFormat.format("failed to open client connection to {0}:{1,number,0}", host, port), e);
|
| | | try {
|
| | | Thread.sleep(reconnectTimeout);
|
| | | } catch (InterruptedException x) {
|
| | | }
|
| | | continue;
|
| | | }
|
| | | }
|
| | | |
| | | // read/write
|
| | | try {
|
| | | selector.select(clientTimeout);
|
| | |
|
| | | Iterator<SelectionKey> i = selector.selectedKeys().iterator();
|
| | | while (i.hasNext()) {
|
| | | SelectionKey key = i.next();
|
| | | i.remove();
|
| | | |
| | | if (key.isReadable()) {
|
| | | // read message
|
| | | String content = read();
|
| | | String[] lines = content.split("\n");
|
| | | for (String reply : lines) {
|
| | | logger.trace(MessageFormat.format("fanout client {0} received: {1}", id, reply));
|
| | | if (!processReply(reply)) {
|
| | | logger.error(MessageFormat.format("fanout client {0} received unknown message", id));
|
| | | }
|
| | | }
|
| | | } else if (key.isWritable()) {
|
| | | // resubscribe
|
| | | if (resubscribe) {
|
| | | resubscribe = false;
|
| | | logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size())); |
| | | for (String subscription : subscriptions) {
|
| | | write("subscribe " + subscription);
|
| | | }
|
| | | }
|
| | | socketCh.register(selector, SelectionKey.OP_READ);
|
| | | }
|
| | | }
|
| | | } catch (IOException e) {
|
| | | logger.error(MessageFormat.format("fanout client {0} error: {1}", id, e.getMessage()));
|
| | | closeChannel(); |
| | | if (!isAutomaticReconnect.get()) {
|
| | | isRunning.set(false);
|
| | | continue;
|
| | | }
|
| | | }
|
| | | }
|
| | | |
| | | closeChannel();
|
| | | resetState();
|
| | | }
|
| | | |
| | | protected void resetState() {
|
| | | readBuffer.clear();
|
| | | writeBuffer.clear();
|
| | | isRunning.set(false);
|
| | | isConnected.set(false);
|
| | | }
|
| | | |
| | | private void closeChannel() {
|
| | | try {
|
| | | if (socketCh != null) {
|
| | | socketCh.close();
|
| | | socketCh = null;
|
| | | selector.close();
|
| | | selector = null;
|
| | | isConnected.set(false);
|
| | | }
|
| | | } catch (IOException x) {
|
| | | }
|
| | | }
|
| | |
|
| | | protected boolean processReply(String reply) {
|
| | | String[] fields = reply.split("!", 2);
|
| | | if (fields.length == 1) {
|
| | | try {
|
| | | long time = Long.parseLong(fields[0]);
|
| | | Date date = new Date(time);
|
| | | firePong(date);
|
| | | } catch (Exception e) { |
| | | }
|
| | | return true;
|
| | | } else if (fields.length == 2) {
|
| | | String channel = fields[0];
|
| | | String message = fields[1];
|
| | | if (FanoutConstants.CH_DEBUG.equals(channel)) {
|
| | | // debug messages are for internal use
|
| | | if (FanoutConstants.MSG_CONNECTED.equals(message)) {
|
| | | isConnected.set(true);
|
| | | resubscribe = subscriptions.size() > 0;
|
| | | if (resubscribe) {
|
| | | try {
|
| | | // register for async resubscribe
|
| | | socketCh.register(selector, SelectionKey.OP_WRITE);
|
| | | } catch (Exception e) {
|
| | | logger.error("an error occurred", e);
|
| | | }
|
| | | }
|
| | | }
|
| | | logger.debug(MessageFormat.format("fanout client {0} < {1}", id, reply));
|
| | | } else {
|
| | | fireAnnouncement(channel, message);
|
| | | }
|
| | | return true;
|
| | | } else {
|
| | | // unknown message
|
| | | return false;
|
| | | }
|
| | | }
|
| | |
|
| | | protected void firePong(Date timestamp) {
|
| | | logger.info(MessageFormat.format("fanout client {0} < pong {1,date,yyyy-MM-dd HH:mm:ss}", id, timestamp));
|
| | | for (FanoutListener listener : listeners) {
|
| | | try {
|
| | | listener.pong(timestamp);
|
| | | } catch (Throwable t) {
|
| | | logger.error("FanoutListener threw an exception!", t);
|
| | | }
|
| | | }
|
| | | }
|
| | | protected void fireAnnouncement(String channel, String message) {
|
| | | logger.info(MessageFormat.format("fanout client {0} < announcement {1} {2}", id, channel, message));
|
| | | for (FanoutListener listener : listeners) {
|
| | | try {
|
| | | listener.announcement(channel, message);
|
| | | } catch (Throwable t) {
|
| | | logger.error("FanoutListener threw an exception!", t);
|
| | | }
|
| | | }
|
| | | }
|
| | | |
| | | protected synchronized String read() throws IOException {
|
| | | readBuffer.clear();
|
| | | long len = socketCh.read(readBuffer);
|
| | |
|
| | | if (len == -1) {
|
| | | logger.error(MessageFormat.format("fanout client {0} lost connection to {1}:{2,number,0}, end of stream", id, host, port));
|
| | | socketCh.close();
|
| | | return null;
|
| | | } else {
|
| | | readBuffer.flip();
|
| | | String content = decoder.decode(readBuffer).toString();
|
| | | readBuffer.clear();
|
| | | return content;
|
| | | }
|
| | | }
|
| | | |
| | | protected synchronized boolean write(String message) {
|
| | | try {
|
| | | logger.info(MessageFormat.format("fanout client {0} > {1}", id, message));
|
| | | byte [] bytes = message.getBytes(FanoutConstants.CHARSET);
|
| | | writeBuffer.clear();
|
| | | writeBuffer.put(bytes);
|
| | | if (bytes[bytes.length - 1] != 0xa) {
|
| | | writeBuffer.put((byte) 0xa);
|
| | | }
|
| | | writeBuffer.flip();
|
| | |
|
| | | // loop until write buffer has been completely sent
|
| | | long written = 0;
|
| | | long toWrite = writeBuffer.remaining();
|
| | | while (written != toWrite) {
|
| | | written += socketCh.write(writeBuffer);
|
| | | try {
|
| | | Thread.sleep(10);
|
| | | } catch (Exception x) {
|
| | | }
|
| | | }
|
| | | return true;
|
| | | } catch (IOException e) {
|
| | | logger.error("fanout client {0} error: {1}", id, e.getMessage());
|
| | | }
|
| | | return false;
|
| | | }
|
| | | } |
New file |
| | |
| | | /*
|
| | | * Copyright 2013 gitblit.com.
|
| | | *
|
| | | * Licensed under the Apache License, Version 2.0 (the "License");
|
| | | * you may not use this file except in compliance with the License.
|
| | | * You may obtain a copy of the License at
|
| | | *
|
| | | * http://www.apache.org/licenses/LICENSE-2.0
|
| | | *
|
| | | * Unless required by applicable law or agreed to in writing, software
|
| | | * distributed under the License is distributed on an "AS IS" BASIS,
|
| | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| | | * See the License for the specific language governing permissions and
|
| | | * limitations under the License.
|
| | | */
|
| | | package com.gitblit.fanout;
|
| | |
|
| | | import java.net.Socket;
|
| | |
|
| | | public class FanoutConstants {
|
| | |
|
| | | public final static String CHARSET = "ISO-8859-1";
|
| | | public final static int BUFFER_LENGTH = 512;
|
| | | public final static String CH_ALL = "all";
|
| | | public final static String CH_DEBUG = "debug";
|
| | | public final static String MSG_CONNECTED = "connected...";
|
| | | public final static String MSG_BUSY = "busy";
|
| | | |
| | | public static String getRemoteSocketId(Socket socket) {
|
| | | return socket.getInetAddress().getHostAddress() + ":" + socket.getPort();
|
| | | }
|
| | | |
| | | public static String getLocalSocketId(Socket socket) {
|
| | | return socket.getInetAddress().getHostAddress() + ":" + socket.getLocalPort();
|
| | | }
|
| | | }
|
New file |
| | |
| | | /*
|
| | | * Copyright 2013 gitblit.com.
|
| | | *
|
| | | * Licensed under the Apache License, Version 2.0 (the "License");
|
| | | * you may not use this file except in compliance with the License.
|
| | | * You may obtain a copy of the License at
|
| | | *
|
| | | * http://www.apache.org/licenses/LICENSE-2.0
|
| | | *
|
| | | * Unless required by applicable law or agreed to in writing, software
|
| | | * distributed under the License is distributed on an "AS IS" BASIS,
|
| | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| | | * See the License for the specific language governing permissions and
|
| | | * limitations under the License.
|
| | | */
|
| | | package com.gitblit.fanout;
|
| | |
|
| | | import java.io.IOException;
|
| | | import java.net.InetSocketAddress;
|
| | | import java.nio.ByteBuffer;
|
| | | import java.nio.CharBuffer;
|
| | | import java.nio.channels.SelectionKey;
|
| | | import java.nio.channels.Selector;
|
| | | import java.nio.channels.ServerSocketChannel;
|
| | | import java.nio.channels.SocketChannel;
|
| | | import java.nio.charset.CharacterCodingException;
|
| | | import java.nio.charset.Charset;
|
| | | import java.nio.charset.CharsetDecoder;
|
| | | import java.text.MessageFormat;
|
| | | import java.util.ArrayList;
|
| | | import java.util.Arrays;
|
| | | import java.util.Collection;
|
| | | import java.util.HashMap;
|
| | | import java.util.Iterator;
|
| | | import java.util.List;
|
| | | import java.util.Map;
|
| | | import java.util.Set;
|
| | |
|
| | | import org.slf4j.Logger;
|
| | | import org.slf4j.LoggerFactory;
|
| | |
|
| | | /**
|
| | | * A single-thread NIO implementation of https://github.com/travisghansen/fanout
|
| | | *
|
| | | * This implementation uses channels and selectors, which are the Java analog of
|
| | | * the Linux epoll mechanism used in the original fanout C code.
|
| | | * |
| | | * @author James Moger
|
| | | *
|
| | | */
|
| | | public class FanoutNioService extends FanoutService {
|
| | |
|
| | | private final static Logger logger = LoggerFactory.getLogger(FanoutNioService.class);
|
| | |
|
| | | private volatile ServerSocketChannel serviceCh;
|
| | | private volatile Selector selector;
|
| | | |
| | | public static void main(String[] args) throws Exception {
|
| | | FanoutNioService pubsub = new FanoutNioService(null, DEFAULT_PORT);
|
| | | pubsub.setStrictRequestTermination(false);
|
| | | pubsub.setAllowAllChannelAnnouncements(false);
|
| | | pubsub.start();
|
| | | }
|
| | |
|
| | | /**
|
| | | * Create a single-threaded fanout service.
|
| | | * |
| | | * @param host
|
| | | * @param port
|
| | | * the port for running the fanout PubSub service
|
| | | * @throws IOException
|
| | | */
|
| | | public FanoutNioService(int port) {
|
| | | this(null, port);
|
| | | }
|
| | | |
| | | /**
|
| | | * Create a single-threaded fanout service.
|
| | | * |
| | | * @param bindInterface
|
| | | * the ip address to bind for the service, may be null
|
| | | * @param port
|
| | | * the port for running the fanout PubSub service
|
| | | * @throws IOException
|
| | | */
|
| | | public FanoutNioService(String bindInterface, int port) {
|
| | | super(bindInterface, port, "Fanout nio service");
|
| | | }
|
| | | |
| | | @Override
|
| | | protected boolean isConnected() {
|
| | | return serviceCh != null;
|
| | | }
|
| | |
|
| | | @Override
|
| | | protected boolean connect() {
|
| | | if (serviceCh == null) {
|
| | | try {
|
| | | serviceCh = ServerSocketChannel.open();
|
| | | serviceCh.configureBlocking(false);
|
| | | serviceCh.socket().setReuseAddress(true);
|
| | | serviceCh.socket().bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));
|
| | | selector = Selector.open();
|
| | | serviceCh.register(selector, SelectionKey.OP_ACCEPT);
|
| | | logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}", |
| | | name, host == null ? "0.0.0.0" : host, port));
|
| | | } catch (IOException e) {
|
| | | logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}", |
| | | name, name, host == null ? "0.0.0.0" : host, port), e);
|
| | | return false;
|
| | | }
|
| | | }
|
| | | return true;
|
| | | }
|
| | |
|
| | | @Override
|
| | | protected void disconnect() {
|
| | | try {
|
| | | if (serviceCh != null) {
|
| | | // close all active client connections
|
| | | Map<String, SocketChannel> clients = getCurrentClientSockets();
|
| | | for (Map.Entry<String, SocketChannel> client : clients.entrySet()) {
|
| | | closeClientSocket(client.getKey(), client.getValue());
|
| | | }
|
| | | |
| | | // close service socket channel
|
| | | logger.debug(MessageFormat.format("closing {0} socket channel", name));
|
| | | serviceCh.socket().close();
|
| | | serviceCh.close(); |
| | | serviceCh = null;
|
| | | selector.close();
|
| | | selector = null;
|
| | | }
|
| | | } catch (IOException e) {
|
| | | logger.error(MessageFormat.format("failed to disconnect {0}", name), e);
|
| | | }
|
| | | }
|
| | |
|
| | | @Override
|
| | | protected void listen() throws IOException {
|
| | | while (selector.select(serviceTimeout) > 0) {
|
| | | Set<SelectionKey> keys = selector.selectedKeys();
|
| | | Iterator<SelectionKey> keyItr = keys.iterator();
|
| | | while (keyItr.hasNext()) {
|
| | | SelectionKey key = (SelectionKey) keyItr.next();
|
| | | if (key.isAcceptable()) {
|
| | | // new fanout client connection
|
| | | ServerSocketChannel sch = (ServerSocketChannel) key.channel();
|
| | | try {
|
| | | SocketChannel ch = sch.accept();
|
| | | ch.configureBlocking(false);
|
| | | configureClientSocket(ch.socket());
|
| | |
|
| | | FanoutNioConnection connection = new FanoutNioConnection(ch);
|
| | | addConnection(connection);
|
| | |
|
| | | // register to send the queued message
|
| | | ch.register(selector, SelectionKey.OP_WRITE, connection);
|
| | | } catch (IOException e) {
|
| | | logger.error("error accepting fanout connection", e);
|
| | | }
|
| | | } else if (key.isReadable()) {
|
| | | // read fanout client request
|
| | | SocketChannel ch = (SocketChannel) key.channel();
|
| | | FanoutNioConnection connection = (FanoutNioConnection) key.attachment();
|
| | | try {
|
| | | connection.read(ch, isStrictRequestTermination());
|
| | | int replies = 0;
|
| | | Iterator<String> reqItr = connection.requestQueue.iterator();
|
| | | while (reqItr.hasNext()) {
|
| | | String req = reqItr.next();
|
| | | String reply = processRequest(connection, req);
|
| | | reqItr.remove();
|
| | | if (reply != null) {
|
| | | replies++;
|
| | | }
|
| | | }
|
| | |
|
| | | if (replies > 0) {
|
| | | // register to send the replies to requests
|
| | | ch.register(selector, SelectionKey.OP_WRITE, connection);
|
| | | } else {
|
| | | // re-register for next read
|
| | | ch.register(selector, SelectionKey.OP_READ, connection);
|
| | | }
|
| | | } catch (IOException e) {
|
| | | logger.error(MessageFormat.format("fanout connection {0} error: {1}", connection.id, e.getMessage()));
|
| | | removeConnection(connection);
|
| | | closeClientSocket(connection.id, ch);
|
| | | }
|
| | | } else if (key.isWritable()) {
|
| | | // asynchronous reply to fanout client request
|
| | | SocketChannel ch = (SocketChannel) key.channel();
|
| | | FanoutNioConnection connection = (FanoutNioConnection) key.attachment();
|
| | | try {
|
| | | connection.write(ch);
|
| | |
|
| | | if (hasConnection(connection)) {
|
| | | // register for next read
|
| | | ch.register(selector, SelectionKey.OP_READ, connection);
|
| | | } else {
|
| | | // Connection was rejected due to load or
|
| | | // some other reason. Close it.
|
| | | closeClientSocket(connection.id, ch);
|
| | | }
|
| | | } catch (IOException e) {
|
| | | logger.error(MessageFormat.format("fanout connection {0}: {1}", connection.id, e.getMessage()));
|
| | | removeConnection(connection);
|
| | | closeClientSocket(connection.id, ch);
|
| | | }
|
| | | }
|
| | | keyItr.remove();
|
| | | }
|
| | | }
|
| | | }
|
| | | |
| | | protected void closeClientSocket(String id, SocketChannel ch) {
|
| | | try {
|
| | | ch.close();
|
| | | } catch (IOException e) {
|
| | | logger.error(MessageFormat.format("fanout connection {0}", id), e);
|
| | | }
|
| | | }
|
| | | |
| | | protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {
|
| | | super.broadcast(connections, channel, message);
|
| | | |
| | | // register queued write
|
| | | Map<String, SocketChannel> sockets = getCurrentClientSockets();
|
| | | for (FanoutServiceConnection connection : connections) {
|
| | | SocketChannel ch = sockets.get(connection.id);
|
| | | if (ch == null) {
|
| | | logger.warn(MessageFormat.format("fanout connection {0} has been disconnected", connection.id));
|
| | | removeConnection(connection);
|
| | | continue;
|
| | | }
|
| | | try {
|
| | | ch.register(selector, SelectionKey.OP_WRITE, connection);
|
| | | } catch (IOException e) {
|
| | | logger.error(MessageFormat.format("failed to register write op for fanout connection {0}", connection.id));
|
| | | }
|
| | | }
|
| | | }
|
| | | |
| | | protected Map<String, SocketChannel> getCurrentClientSockets() {
|
| | | Map<String, SocketChannel> sockets = new HashMap<String, SocketChannel>();
|
| | | for (SelectionKey key : selector.keys()) {
|
| | | if (key.channel() instanceof SocketChannel) {
|
| | | SocketChannel ch = (SocketChannel) key.channel();
|
| | | String id = FanoutConstants.getRemoteSocketId(ch.socket());
|
| | | sockets.put(id, ch);
|
| | | }
|
| | | }
|
| | | return sockets;
|
| | | }
|
| | | |
| | | /**
|
| | | * FanoutNioConnection handles reading/writing messages from a remote fanout
|
| | | * connection.
|
| | | * |
| | | * @author James Moger
|
| | | *
|
| | | */
|
| | | static class FanoutNioConnection extends FanoutServiceConnection {
|
| | | final ByteBuffer readBuffer;
|
| | | final ByteBuffer writeBuffer;
|
| | | final List<String> requestQueue;
|
| | | final List<String> replyQueue;
|
| | | final CharsetDecoder decoder;
|
| | |
|
| | | FanoutNioConnection(SocketChannel ch) {
|
| | | super(ch.socket());
|
| | | readBuffer = ByteBuffer.allocate(FanoutConstants.BUFFER_LENGTH);
|
| | | writeBuffer = ByteBuffer.allocate(FanoutConstants.BUFFER_LENGTH);
|
| | | requestQueue = new ArrayList<String>();
|
| | | replyQueue = new ArrayList<String>();
|
| | | decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder();
|
| | | }
|
| | | |
| | | protected void read(SocketChannel ch, boolean strictRequestTermination) throws CharacterCodingException, IOException {
|
| | | long bytesRead = 0;
|
| | | readBuffer.clear();
|
| | | bytesRead = ch.read(readBuffer);
|
| | | readBuffer.flip();
|
| | | if (bytesRead == -1) {
|
| | | throw new IOException("lost client connection, end of stream");
|
| | | }
|
| | | if (readBuffer.limit() == 0) {
|
| | | return;
|
| | | }
|
| | | CharBuffer cbuf = decoder.decode(readBuffer);
|
| | | String req = cbuf.toString();
|
| | | String [] lines = req.split(strictRequestTermination ? "\n" : "\n|\r");
|
| | | requestQueue.addAll(Arrays.asList(lines));
|
| | | }
|
| | | |
| | | protected void write(SocketChannel ch) throws IOException {
|
| | | Iterator<String> itr = replyQueue.iterator();
|
| | | while (itr.hasNext()) {
|
| | | String reply = itr.next();
|
| | | writeBuffer.clear();
|
| | | logger.debug(MessageFormat.format("fanout reply to {0}: {1}", id, reply));
|
| | | byte [] bytes = reply.getBytes(FanoutConstants.CHARSET);
|
| | | writeBuffer.put(bytes);
|
| | | if (bytes[bytes.length - 1] != 0xa) {
|
| | | writeBuffer.put((byte) 0xa);
|
| | | }
|
| | | writeBuffer.flip();
|
| | | |
| | | // loop until write buffer has been completely sent
|
| | | int written = 0;
|
| | | int toWrite = writeBuffer.remaining();
|
| | | while (written != toWrite) {
|
| | | written += ch.write(writeBuffer);
|
| | | try {
|
| | | Thread.sleep(10);
|
| | | } catch (Exception x) {
|
| | | }
|
| | | } |
| | | itr.remove();
|
| | | }
|
| | | writeBuffer.clear();
|
| | | }
|
| | |
|
| | | @Override
|
| | | protected void reply(String content) throws IOException {
|
| | | // queue the reply
|
| | | // replies are transmitted asynchronously from the requests
|
| | | replyQueue.add(content);
|
| | | }
|
| | | }
|
| | | } |
New file |
| | |
| | | /*
|
| | | * Copyright 2013 gitblit.com.
|
| | | *
|
| | | * Licensed under the Apache License, Version 2.0 (the "License");
|
| | | * you may not use this file except in compliance with the License.
|
| | | * You may obtain a copy of the License at
|
| | | *
|
| | | * http://www.apache.org/licenses/LICENSE-2.0
|
| | | *
|
| | | * Unless required by applicable law or agreed to in writing, software
|
| | | * distributed under the License is distributed on an "AS IS" BASIS,
|
| | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| | | * See the License for the specific language governing permissions and
|
| | | * limitations under the License.
|
| | | */
|
| | | package com.gitblit.fanout;
|
| | |
|
| | | import java.io.IOException;
|
| | | import java.net.Socket;
|
| | | import java.net.SocketException;
|
| | | import java.text.MessageFormat;
|
| | | import java.util.ArrayList;
|
| | | import java.util.Collection;
|
| | | import java.util.Date;
|
| | | import java.util.Iterator;
|
| | | import java.util.List;
|
| | | import java.util.Map;
|
| | | import java.util.Set;
|
| | | import java.util.concurrent.ConcurrentHashMap;
|
| | | import java.util.concurrent.ConcurrentSkipListSet;
|
| | | import java.util.concurrent.atomic.AtomicBoolean;
|
| | | import java.util.concurrent.atomic.AtomicInteger;
|
| | | import java.util.concurrent.atomic.AtomicLong;
|
| | |
|
| | | import org.slf4j.Logger;
|
| | | import org.slf4j.LoggerFactory;
|
| | |
|
| | | /**
|
| | | * Base class for Fanout service implementations.
|
| | | * |
| | | * Subclass implementations can be used as a Sparkleshare PubSub notification
|
| | | * server. This allows Sparkleshare to be used in conjunction with Gitblit
|
| | | * behind a corporate firewall that restricts or prohibits client internet access
|
| | | * to the default Sparkleshare PubSub server: notifications.sparkleshare.org
|
| | | * |
| | | * @author James Moger
|
| | | *
|
| | | */
|
| | | public abstract class FanoutService implements Runnable {
|
| | |
|
| | | private final static Logger logger = LoggerFactory.getLogger(FanoutService.class);
|
| | | |
| | | public final static int DEFAULT_PORT = 17000;
|
| | | |
| | | protected final static int serviceTimeout = 5000;
|
| | |
|
| | | protected final String host;
|
| | | protected final int port;
|
| | | protected final String name; |
| | | |
| | | private Thread serviceThread;
|
| | | |
| | | private final Map<String, FanoutServiceConnection> connections;
|
| | | private final Map<String, Set<FanoutServiceConnection>> subscriptions;
|
| | |
|
| | | protected final AtomicBoolean isRunning;
|
| | | private final AtomicBoolean strictRequestTermination;
|
| | | private final AtomicBoolean allowAllChannelAnnouncements;
|
| | | private final AtomicInteger concurrentConnectionLimit;
|
| | | |
| | | private final Date bootDate;
|
| | | private final AtomicLong rejectedConnectionCount;
|
| | | private final AtomicInteger peakConnectionCount;
|
| | | private final AtomicLong totalConnections;
|
| | | private final AtomicLong totalAnnouncements;
|
| | | private final AtomicLong totalMessages;
|
| | | private final AtomicLong totalSubscribes;
|
| | | private final AtomicLong totalUnsubscribes;
|
| | | private final AtomicLong totalPings;
|
| | |
|
| | | protected FanoutService(String host, int port, String name) {
|
| | | this.host = host;
|
| | | this.port = port;
|
| | | this.name = name;
|
| | | |
| | | connections = new ConcurrentHashMap<String, FanoutServiceConnection>();
|
| | | subscriptions = new ConcurrentHashMap<String, Set<FanoutServiceConnection>>();
|
| | | subscriptions.put(FanoutConstants.CH_ALL, new ConcurrentSkipListSet<FanoutServiceConnection>());
|
| | | |
| | | isRunning = new AtomicBoolean(false);
|
| | | strictRequestTermination = new AtomicBoolean(false);
|
| | | allowAllChannelAnnouncements = new AtomicBoolean(false);
|
| | | concurrentConnectionLimit = new AtomicInteger(0);
|
| | | |
| | | bootDate = new Date();
|
| | | rejectedConnectionCount = new AtomicLong(0);
|
| | | peakConnectionCount = new AtomicInteger(0);
|
| | | totalConnections = new AtomicLong(0);
|
| | | totalAnnouncements = new AtomicLong(0);
|
| | | totalMessages = new AtomicLong(0);
|
| | | totalSubscribes = new AtomicLong(0);
|
| | | totalUnsubscribes = new AtomicLong(0);
|
| | | totalPings = new AtomicLong(0);
|
| | | }
|
| | |
|
| | | /*
|
| | | * Abstract methods
|
| | | */
|
| | | |
| | | protected abstract boolean isConnected();
|
| | | |
| | | protected abstract boolean connect();
|
| | | |
| | | protected abstract void listen() throws IOException;
|
| | | |
| | | protected abstract void disconnect();
|
| | | |
| | | /**
|
| | | * Returns true if the service requires \n request termination.
|
| | | * |
| | | * @return true if request requires \n termination
|
| | | */
|
| | | public boolean isStrictRequestTermination() {
|
| | | return strictRequestTermination.get();
|
| | | }
|
| | |
|
| | | /**
|
| | | * Control the termination of fanout requests. If true, fanout requests must
|
| | | * be terminated with \n. If false, fanout requests may be terminated with
|
| | | * \n, \r, \r\n, or \n\r. This is useful for debugging with a telnet client.
|
| | | * |
| | | * @param isStrictTermination
|
| | | */
|
| | | public void setStrictRequestTermination(boolean isStrictTermination) {
|
| | | strictRequestTermination.set(isStrictTermination);
|
| | | }
|
| | | |
| | | /**
|
| | | * Returns the maximum allowable concurrent fanout connections.
|
| | | * |
| | | * @return the maximum allowable concurrent connection count
|
| | | */
|
| | | public int getConcurrentConnectionLimit() {
|
| | | return concurrentConnectionLimit.get();
|
| | | }
|
| | | |
| | | /**
|
| | | * Sets the maximum allowable concurrent fanout connection count.
|
| | | * |
| | | * @param value
|
| | | */
|
| | | public void setConcurrentConnectionLimit(int value) {
|
| | | concurrentConnectionLimit.set(value);
|
| | | }
|
| | | |
| | | /**
|
| | | * Returns true if connections are allowed to announce on the all channel.
|
| | | * |
| | | * @return true if connections are allowed to announce on the all channel
|
| | | */
|
| | | public boolean allowAllChannelAnnouncements() {
|
| | | return allowAllChannelAnnouncements.get();
|
| | | }
|
| | | |
| | | /**
|
| | | * Allows/prohibits connections from announcing on the ALL channel.
|
| | | * |
| | | * @param value
|
| | | */
|
| | | public void setAllowAllChannelAnnouncements(boolean value) {
|
| | | allowAllChannelAnnouncements.set(value);
|
| | | }
|
| | | |
| | | /**
|
| | | * Returns the current connections
|
| | | * |
| | | * @param channel
|
| | | * @return map of current connections keyed by their id
|
| | | */
|
| | | public Map<String, FanoutServiceConnection> getCurrentConnections() {
|
| | | return connections;
|
| | | }
|
| | | |
| | | /**
|
| | | * Returns all subscriptions
|
| | | * |
| | | * @return map of current subscriptions keyed by channel name
|
| | | */
|
| | | public Map<String, Set<FanoutServiceConnection>> getCurrentSubscriptions() {
|
| | | return subscriptions;
|
| | | }
|
| | |
|
| | | /**
|
| | | * Returns the subscriptions for the specified channel
|
| | | * |
| | | * @param channel
|
| | | * @return set of subscribed connections for the specified channel
|
| | | */
|
| | | public Set<FanoutServiceConnection> getCurrentSubscriptions(String channel) {
|
| | | return subscriptions.get(channel);
|
| | | }
|
| | |
|
| | | /**
|
| | | * Returns the runtime statistics object for this service.
|
| | | * |
| | | * @return stats
|
| | | */
|
| | | public FanoutStats getStatistics() {
|
| | | FanoutStats stats = new FanoutStats();
|
| | | |
| | | // settings
|
| | | stats.allowAllChannelAnnouncements = allowAllChannelAnnouncements();
|
| | | stats.concurrentConnectionLimit = getConcurrentConnectionLimit();
|
| | | stats.strictRequestTermination = isStrictRequestTermination();
|
| | | |
| | | // runtime stats
|
| | | stats.bootDate = bootDate;
|
| | | stats.rejectedConnectionCount = rejectedConnectionCount.get();
|
| | | stats.peakConnectionCount = peakConnectionCount.get();
|
| | | stats.totalConnections = totalConnections.get();
|
| | | stats.totalAnnouncements = totalAnnouncements.get();
|
| | | stats.totalMessages = totalMessages.get();
|
| | | stats.totalSubscribes = totalSubscribes.get();
|
| | | stats.totalUnsubscribes = totalUnsubscribes.get();
|
| | | stats.totalPings = totalPings.get(); |
| | | stats.currentConnections = connections.size();
|
| | | stats.currentChannels = subscriptions.size();
|
| | | stats.currentSubscriptions = subscriptions.size() * connections.size();
|
| | | return stats;
|
| | | }
|
| | | |
| | | /**
|
| | | * Returns true if the service is ready.
|
| | | * |
| | | * @return true, if the service is ready
|
| | | */
|
| | | public boolean isReady() {
|
| | | if (isRunning.get()) {
|
| | | return isConnected();
|
| | | }
|
| | | return false;
|
| | | }
|
| | |
|
| | | /**
|
| | | * Start the Fanout service thread and immediatel return.
|
| | | * |
| | | */
|
| | | public void start() {
|
| | | if (isRunning.get()) {
|
| | | logger.warn(MessageFormat.format("{0} is already running", name));
|
| | | return;
|
| | | }
|
| | | serviceThread = new Thread(this);
|
| | | serviceThread.setName(MessageFormat.format("{0} {1}:{2,number,0}", name, host == null ? "all" : host, port));
|
| | | serviceThread.start();
|
| | | }
|
| | | |
| | | /**
|
| | | * Start the Fanout service thread and wait until it is accepting connections.
|
| | | * |
| | | */
|
| | | public void startSynchronously() {
|
| | | start();
|
| | | while (!isReady()) {
|
| | | try {
|
| | | Thread.sleep(100);
|
| | | } catch (Exception e) {
|
| | | }
|
| | | }
|
| | | }
|
| | | |
| | | /**
|
| | | * Stop the Fanout service. This method returns when the service has been
|
| | | * completely shutdown.
|
| | | */
|
| | | public void stop() {
|
| | | if (!isRunning.get()) {
|
| | | logger.warn(MessageFormat.format("{0} is not running", name));
|
| | | return;
|
| | | }
|
| | | logger.info(MessageFormat.format("stopping {0}...", name));
|
| | | isRunning.set(false);
|
| | | try {
|
| | | if (serviceThread != null) {
|
| | | serviceThread.join();
|
| | | serviceThread = null;
|
| | | }
|
| | | } catch (InterruptedException e1) {
|
| | | logger.error("", e1);
|
| | | }
|
| | | logger.info(MessageFormat.format("stopped {0}", name));
|
| | | }
|
| | | |
| | | /**
|
| | | * Main execution method of the service
|
| | | */
|
| | | @Override
|
| | | public final void run() {
|
| | | disconnect();
|
| | | resetState();
|
| | | isRunning.set(true);
|
| | | while (isRunning.get()) {
|
| | | if (connect()) {
|
| | | try {
|
| | | listen();
|
| | | } catch (IOException e) {
|
| | | logger.error(MessageFormat.format("error processing {0}", name), e);
|
| | | isRunning.set(false);
|
| | | }
|
| | | } else {
|
| | | try {
|
| | | Thread.sleep(serviceTimeout);
|
| | | } catch (InterruptedException x) {
|
| | | }
|
| | | }
|
| | | }
|
| | | disconnect(); |
| | | resetState();
|
| | | }
|
| | | |
| | | protected void resetState() {
|
| | | // reset state data
|
| | | connections.clear();
|
| | | subscriptions.clear();
|
| | | rejectedConnectionCount.set(0);
|
| | | peakConnectionCount.set(0);
|
| | | totalConnections.set(0);
|
| | | totalAnnouncements.set(0);
|
| | | totalMessages.set(0);
|
| | | totalSubscribes.set(0);
|
| | | totalUnsubscribes.set(0);
|
| | | totalPings.set(0);
|
| | | }
|
| | |
|
| | | /**
|
| | | * Configure the client connection socket.
|
| | | * |
| | | * @param socket
|
| | | * @throws SocketException
|
| | | */
|
| | | protected void configureClientSocket(Socket socket) throws SocketException {
|
| | | socket.setKeepAlive(true); |
| | | socket.setSoLinger(true, 0); // immediately discard any remaining data
|
| | | }
|
| | | |
| | | /**
|
| | | * Add the connection to the connections map.
|
| | | * |
| | | * @param connection
|
| | | * @return false if the connection was rejected due to too many concurrent
|
| | | * connections
|
| | | */
|
| | | protected boolean addConnection(FanoutServiceConnection connection) { |
| | | int limit = getConcurrentConnectionLimit();
|
| | | if (limit > 0 && connections.size() > limit) {
|
| | | logger.info(MessageFormat.format("hit {0,number,0} connection limit, rejecting fanout connection", concurrentConnectionLimit));
|
| | | increment(rejectedConnectionCount);
|
| | | connection.busy();
|
| | | return false;
|
| | | }
|
| | | |
| | | // add the connection to our map
|
| | | connections.put(connection.id, connection);
|
| | |
|
| | | // track peak number of concurrent connections
|
| | | if (connections.size() > peakConnectionCount.get()) {
|
| | | peakConnectionCount.set(connections.size());
|
| | | }
|
| | |
|
| | | logger.info("fanout new connection " + connection.id);
|
| | | connection.connected();
|
| | | return true;
|
| | | }
|
| | | |
| | | /**
|
| | | * Remove the connection from the connections list and from subscriptions.
|
| | | * |
| | | * @param connection
|
| | | */
|
| | | protected void removeConnection(FanoutServiceConnection connection) {
|
| | | connections.remove(connection.id);
|
| | | Iterator<Map.Entry<String, Set<FanoutServiceConnection>>> itr = subscriptions.entrySet().iterator();
|
| | | while (itr.hasNext()) {
|
| | | Map.Entry<String, Set<FanoutServiceConnection>> entry = itr.next();
|
| | | Set<FanoutServiceConnection> subscriptions = entry.getValue();
|
| | | subscriptions.remove(connection);
|
| | | if (!FanoutConstants.CH_ALL.equals(entry.getKey())) {
|
| | | if (subscriptions.size() == 0) {
|
| | | itr.remove();
|
| | | logger.info(MessageFormat.format("fanout remove channel {0}, no subscribers", entry.getKey()));
|
| | | }
|
| | | }
|
| | | }
|
| | | logger.info(MessageFormat.format("fanout connection {0} removed", connection.id));
|
| | | }
|
| | | |
| | | /**
|
| | | * Tests to see if the connection is being monitored by the service.
|
| | | * |
| | | * @param connection
|
| | | * @return true if the service is monitoring the connection
|
| | | */
|
| | | protected boolean hasConnection(FanoutServiceConnection connection) {
|
| | | return connections.containsKey(connection.id);
|
| | | }
|
| | | |
| | | /**
|
| | | * Reply to a connection on the specified channel.
|
| | | * |
| | | * @param connection
|
| | | * @param channel
|
| | | * @param message
|
| | | * @return the reply
|
| | | */
|
| | | protected String reply(FanoutServiceConnection connection, String channel, String message) { |
| | | if (channel != null && channel.length() > 0) {
|
| | | increment(totalMessages);
|
| | | }
|
| | | return connection.reply(channel, message);
|
| | | }
|
| | | |
| | | /**
|
| | | * Service method to broadcast a message to all connections.
|
| | | * |
| | | * @param message
|
| | | */
|
| | | public void broadcastAll(String message) {
|
| | | broadcast(connections.values(), FanoutConstants.CH_ALL, message);
|
| | | increment(totalAnnouncements);
|
| | | }
|
| | | |
| | | /**
|
| | | * Service method to broadcast a message to connections subscribed to the
|
| | | * channel.
|
| | | * |
| | | * @param message
|
| | | */
|
| | | public void broadcast(String channel, String message) {
|
| | | List<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>(subscriptions.get(channel));
|
| | | broadcast(connections, channel, message);
|
| | | increment(totalAnnouncements);
|
| | | }
|
| | | |
| | | /**
|
| | | * Broadcast a message to connections subscribed to the specified channel.
|
| | | * |
| | | * @param connections
|
| | | * @param channel
|
| | | * @param message
|
| | | */
|
| | | protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {
|
| | | for (FanoutServiceConnection connection : connections) {
|
| | | reply(connection, channel, message);
|
| | | }
|
| | | }
|
| | | |
| | | /**
|
| | | * Process an incoming Fanout request.
|
| | | * |
| | | * @param connection
|
| | | * @param req
|
| | | * @return the reply to the request, may be null
|
| | | */
|
| | | protected String processRequest(FanoutServiceConnection connection, String req) {
|
| | | logger.info(MessageFormat.format("fanout request from {0}: {1}", connection.id, req));
|
| | | String[] fields = req.split(" ", 3);
|
| | | String action = fields[0];
|
| | | String channel = fields.length >= 2 ? fields[1] : null;
|
| | | String message = fields.length >= 3 ? fields[2] : null;
|
| | | try {
|
| | | return processRequest(connection, action, channel, message);
|
| | | } catch (IllegalArgumentException e) {
|
| | | // invalid action
|
| | | logger.error(MessageFormat.format("fanout connection {0} requested invalid action {1}", connection.id, action));
|
| | | logger.error(asHexArray(req));
|
| | | }
|
| | | return null;
|
| | | }
|
| | | |
| | | /**
|
| | | * Process the Fanout request.
|
| | | * |
| | | * @param connection
|
| | | * @param action
|
| | | * @param channel
|
| | | * @param message
|
| | | * @return the reply to the request, may be null
|
| | | * @throws IllegalArgumentException
|
| | | */
|
| | | protected String processRequest(FanoutServiceConnection connection, String action, String channel, String message) throws IllegalArgumentException {
|
| | | if ("ping".equals(action)) {
|
| | | // ping
|
| | | increment(totalPings);
|
| | | return reply(connection, null, "" + System.currentTimeMillis());
|
| | | } else if ("info".equals(action)) {
|
| | | // info
|
| | | String info = getStatistics().info();
|
| | | return reply(connection, null, info);
|
| | | } else if ("announce".equals(action)) {
|
| | | // announcement
|
| | | if (!allowAllChannelAnnouncements.get() && FanoutConstants.CH_ALL.equals(channel)) {
|
| | | // prohibiting connection-sourced all announcements
|
| | | logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on ALL channel", connection.id, message));
|
| | | } else if ("debug".equals(channel)) {
|
| | | // prohibiting connection-sourced debug announcements
|
| | | logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on DEBUG channel", connection.id, message));
|
| | | } else {
|
| | | // acceptable announcement
|
| | | List<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>(subscriptions.get(channel));
|
| | | connections.remove(connection); // remove announcer
|
| | | broadcast(connections, channel, message);
|
| | | increment(totalAnnouncements);
|
| | | }
|
| | | } else if ("subscribe".equals(action)) {
|
| | | // subscribe
|
| | | if (!subscriptions.containsKey(channel)) {
|
| | | logger.info(MessageFormat.format("fanout new channel {0}", channel));
|
| | | subscriptions.put(channel, new ConcurrentSkipListSet<FanoutServiceConnection>());
|
| | | }
|
| | | subscriptions.get(channel).add(connection);
|
| | | logger.debug(MessageFormat.format("fanout connection {0} subscribed to channel {1}", connection.id, channel));
|
| | | increment(totalSubscribes);
|
| | | } else if ("unsubscribe".equals(action)) {
|
| | | // unsubscribe
|
| | | if (subscriptions.containsKey(channel)) {
|
| | | subscriptions.get(channel).remove(connection);
|
| | | if (subscriptions.get(channel).size() == 0) {
|
| | | subscriptions.remove(channel);
|
| | | }
|
| | | increment(totalUnsubscribes);
|
| | | }
|
| | | } else {
|
| | | // invalid action
|
| | | throw new IllegalArgumentException(action);
|
| | | }
|
| | | return null;
|
| | | }
|
| | | |
| | | private String asHexArray(String req) {
|
| | | StringBuilder sb = new StringBuilder();
|
| | | for (char c : req.toCharArray()) {
|
| | | sb.append(Integer.toHexString(c)).append(' ');
|
| | | }
|
| | | return "[ " + sb.toString().trim() + " ]";
|
| | | }
|
| | | |
| | | /**
|
| | | * Increment a long and prevent negative rollover.
|
| | | * |
| | | * @param counter
|
| | | */
|
| | | private void increment(AtomicLong counter) {
|
| | | long v = counter.incrementAndGet();
|
| | | if (v < 0) {
|
| | | counter.set(0);
|
| | | }
|
| | | }
|
| | | |
| | | @Override
|
| | | public String toString() {
|
| | | return name;
|
| | | }
|
| | | } |
New file |
| | |
| | | /*
|
| | | * Copyright 2013 gitblit.com.
|
| | | *
|
| | | * Licensed under the Apache License, Version 2.0 (the "License");
|
| | | * you may not use this file except in compliance with the License.
|
| | | * You may obtain a copy of the License at
|
| | | *
|
| | | * http://www.apache.org/licenses/LICENSE-2.0
|
| | | *
|
| | | * Unless required by applicable law or agreed to in writing, software
|
| | | * distributed under the License is distributed on an "AS IS" BASIS,
|
| | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| | | * See the License for the specific language governing permissions and
|
| | | * limitations under the License.
|
| | | */
|
| | | package com.gitblit.fanout;
|
| | |
|
| | | import java.io.IOException;
|
| | | import java.net.Socket;
|
| | |
|
| | | import org.slf4j.Logger;
|
| | | import org.slf4j.LoggerFactory;
|
| | |
|
| | | /**
|
| | | * FanoutServiceConnection handles reading/writing messages from a remote fanout
|
| | | * connection.
|
| | | * |
| | | * @author James Moger
|
| | | * |
| | | */
|
| | | public abstract class FanoutServiceConnection implements Comparable<FanoutServiceConnection> {
|
| | | |
| | | private static final Logger logger = LoggerFactory.getLogger(FanoutServiceConnection.class);
|
| | | |
| | | public final String id;
|
| | |
|
| | | protected FanoutServiceConnection(Socket socket) {
|
| | | this.id = FanoutConstants.getRemoteSocketId(socket);
|
| | | }
|
| | |
|
| | | protected abstract void reply(String content) throws IOException;
|
| | |
|
| | | /**
|
| | | * Send the connection a debug channel connected message.
|
| | | * |
| | | * @param message
|
| | | */
|
| | | protected void connected() {
|
| | | reply(FanoutConstants.CH_DEBUG, FanoutConstants.MSG_CONNECTED);
|
| | | }
|
| | | |
| | | /**
|
| | | * Send the connection a debug channel busy message.
|
| | | * |
| | | * @param message
|
| | | */
|
| | | protected void busy() {
|
| | | reply(FanoutConstants.CH_DEBUG, FanoutConstants.MSG_BUSY);
|
| | | }
|
| | | |
| | | /**
|
| | | * Send the connection a message for the specified channel.
|
| | | * |
| | | * @param channel
|
| | | * @param message
|
| | | * @return the reply
|
| | | */
|
| | | protected String reply(String channel, String message) {
|
| | | String content;
|
| | | if (channel != null) {
|
| | | content = channel + "!" + message;
|
| | | } else {
|
| | | content = message;
|
| | | }
|
| | | try {
|
| | | reply(content);
|
| | | } catch (Exception e) {
|
| | | logger.error("failed to reply to fanout connection " + id, e);
|
| | | }
|
| | | return content;
|
| | | }
|
| | |
|
| | | @Override
|
| | | public int compareTo(FanoutServiceConnection c) {
|
| | | return id.compareTo(c.id);
|
| | | }
|
| | |
|
| | | @Override
|
| | | public boolean equals(Object o) {
|
| | | if (o instanceof FanoutServiceConnection) {
|
| | | return id.equals(((FanoutServiceConnection) o).id);
|
| | | }
|
| | | return false;
|
| | | }
|
| | |
|
| | | @Override
|
| | | public int hashCode() {
|
| | | return id.hashCode();
|
| | | }
|
| | |
|
| | | @Override
|
| | | public String toString() {
|
| | | return id;
|
| | | }
|
| | | } |
New file |
| | |
| | | /*
|
| | | * Copyright 2013 gitblit.com.
|
| | | *
|
| | | * Licensed under the Apache License, Version 2.0 (the "License");
|
| | | * you may not use this file except in compliance with the License.
|
| | | * You may obtain a copy of the License at
|
| | | *
|
| | | * http://www.apache.org/licenses/LICENSE-2.0
|
| | | *
|
| | | * Unless required by applicable law or agreed to in writing, software
|
| | | * distributed under the License is distributed on an "AS IS" BASIS,
|
| | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| | | * See the License for the specific language governing permissions and
|
| | | * limitations under the License.
|
| | | */
|
| | | package com.gitblit.fanout;
|
| | |
|
| | | import java.io.BufferedInputStream;
|
| | | import java.io.IOException;
|
| | | import java.io.OutputStream;
|
| | | import java.net.InetSocketAddress;
|
| | | import java.net.ServerSocket;
|
| | | import java.net.Socket;
|
| | | import java.net.SocketException;
|
| | | import java.net.SocketTimeoutException;
|
| | | import java.text.MessageFormat;
|
| | |
|
| | | import org.slf4j.Logger;
|
| | | import org.slf4j.LoggerFactory;
|
| | |
|
| | | /**
|
| | | * A multi-threaded socket implementation of https://github.com/travisghansen/fanout
|
| | | *
|
| | | * This implementation creates a master acceptor thread which accepts incoming
|
| | | * fanout connections and then spawns a daemon thread for each accepted connection.
|
| | | * If there are 100 concurrent fanout connections, there are 101 threads.
|
| | | * |
| | | * @author James Moger
|
| | | *
|
| | | */
|
| | | public class FanoutSocketService extends FanoutService {
|
| | |
|
| | | private final static Logger logger = LoggerFactory.getLogger(FanoutSocketService.class);
|
| | |
|
| | | private volatile ServerSocket serviceSocket;
|
| | |
|
| | | public static void main(String[] args) throws Exception {
|
| | | FanoutSocketService pubsub = new FanoutSocketService(null, DEFAULT_PORT);
|
| | | pubsub.setStrictRequestTermination(false);
|
| | | pubsub.setAllowAllChannelAnnouncements(false);
|
| | | pubsub.start();
|
| | | }
|
| | | |
| | | /**
|
| | | * Create a multi-threaded fanout service.
|
| | | * |
| | | * @param port
|
| | | * the port for running the fanout PubSub service
|
| | | * @throws IOException
|
| | | */
|
| | | public FanoutSocketService(int port) {
|
| | | this(null, port);
|
| | | }
|
| | |
|
| | | /**
|
| | | * Create a multi-threaded fanout service.
|
| | | * |
| | | * @param bindInterface
|
| | | * the ip address to bind for the service, may be null
|
| | | * @param port
|
| | | * the port for running the fanout PubSub service
|
| | | * @throws IOException
|
| | | */
|
| | | public FanoutSocketService(String bindInterface, int port) {
|
| | | super(bindInterface, port, "Fanout socket service");
|
| | | }
|
| | | |
| | | @Override
|
| | | protected boolean isConnected() {
|
| | | return serviceSocket != null;
|
| | | }
|
| | | |
| | | @Override
|
| | | protected boolean connect() {
|
| | | if (serviceSocket == null) {
|
| | | try {
|
| | | serviceSocket = new ServerSocket();
|
| | | serviceSocket.setReuseAddress(true);
|
| | | serviceSocket.setSoTimeout(serviceTimeout);
|
| | | serviceSocket.bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));
|
| | | logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}", |
| | | name, host == null ? "0.0.0.0" : host, serviceSocket.getLocalPort()));
|
| | | } catch (IOException e) {
|
| | | logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}",
|
| | | name, host == null ? "0.0.0.0" : host, port), e);
|
| | | return false;
|
| | | }
|
| | | }
|
| | | return true;
|
| | | }
|
| | |
|
| | | @Override
|
| | | protected void disconnect() {
|
| | | try {
|
| | | if (serviceSocket != null) {
|
| | | logger.debug(MessageFormat.format("closing {0} server socket", name));
|
| | | serviceSocket.close();
|
| | | serviceSocket = null;
|
| | | }
|
| | | } catch (IOException e) {
|
| | | logger.error(MessageFormat.format("failed to disconnect {0}", name), e);
|
| | | }
|
| | | }
|
| | |
|
| | | /**
|
| | | * This accepts incoming fanout connections and spawns connection threads.
|
| | | */
|
| | | @Override
|
| | | protected void listen() throws IOException {
|
| | | try {
|
| | | Socket socket;
|
| | | socket = serviceSocket.accept();
|
| | | configureClientSocket(socket);
|
| | |
|
| | | FanoutSocketConnection connection = new FanoutSocketConnection(socket);
|
| | |
|
| | | if (addConnection(connection)) {
|
| | | // spawn connection daemon thread
|
| | | Thread connectionThread = new Thread(connection);
|
| | | connectionThread.setDaemon(true);
|
| | | connectionThread.setName("Fanout " + connection.id);
|
| | | connectionThread.start();
|
| | | } else {
|
| | | // synchronously close the connection and remove it
|
| | | removeConnection(connection);
|
| | | connection.closeConnection();
|
| | | connection = null;
|
| | | }
|
| | | } catch (SocketTimeoutException e) {
|
| | | // ignore accept timeout exceptions
|
| | | }
|
| | | }
|
| | | |
| | | /**
|
| | | * FanoutSocketConnection handles reading/writing messages from a remote fanout
|
| | | * connection.
|
| | | * |
| | | * @author James Moger
|
| | | *
|
| | | */
|
| | | class FanoutSocketConnection extends FanoutServiceConnection implements Runnable {
|
| | | Socket socket;
|
| | | |
| | | FanoutSocketConnection(Socket socket) {
|
| | | super(socket);
|
| | | this.socket = socket;
|
| | | }
|
| | |
|
| | | /**
|
| | | * Connection thread read/write method.
|
| | | */
|
| | | @Override
|
| | | public void run() {
|
| | | try {
|
| | | StringBuilder sb = new StringBuilder();
|
| | | BufferedInputStream is = new BufferedInputStream(socket.getInputStream());
|
| | | byte[] buffer = new byte[FanoutConstants.BUFFER_LENGTH];
|
| | | int len = 0;
|
| | | while (true) {
|
| | | while (is.available() > 0) {
|
| | | len = is.read(buffer);
|
| | | for (int i = 0; i < len; i++) {
|
| | | byte b = buffer[i];
|
| | | if (b == 0xa || (!isStrictRequestTermination() && b == 0xd)) {
|
| | | String req = sb.toString();
|
| | | sb.setLength(0);
|
| | | if (req.length() > 0) {
|
| | | // ignore empty request strings
|
| | | processRequest(this, req);
|
| | | }
|
| | | } else {
|
| | | sb.append((char) b);
|
| | | }
|
| | | }
|
| | | }
|
| | |
|
| | | if (!isRunning.get()) {
|
| | | // service has stopped, terminate client connection
|
| | | break;
|
| | | } else {
|
| | | Thread.sleep(500);
|
| | | }
|
| | | }
|
| | | } catch (Throwable t) {
|
| | | if (t instanceof SocketException) {
|
| | | logger.error(MessageFormat.format("fanout connection {0}: {1}", id, t.getMessage()));
|
| | | } else if (t instanceof SocketTimeoutException) {
|
| | | logger.error(MessageFormat.format("fanout connection {0}: {1}", id, t.getMessage()));
|
| | | } else {
|
| | | logger.error(MessageFormat.format("exception while handling fanout connection {0}", id), t);
|
| | | }
|
| | | } finally {
|
| | | closeConnection();
|
| | | }
|
| | |
|
| | | logger.info(MessageFormat.format("thread for fanout connection {0} is finished", id));
|
| | | }
|
| | | |
| | | @Override
|
| | | protected void reply(String content) throws IOException {
|
| | | // synchronously send reply
|
| | | logger.debug(MessageFormat.format("fanout reply to {0}: {1}", id, content));
|
| | | OutputStream os = socket.getOutputStream();
|
| | | byte [] bytes = content.getBytes(FanoutConstants.CHARSET);
|
| | | os.write(bytes);
|
| | | if (bytes[bytes.length - 1] != 0xa) {
|
| | | os.write(0xa);
|
| | | }
|
| | | os.flush();
|
| | | }
|
| | | |
| | | protected void closeConnection() {
|
| | | // close the connection socket
|
| | | try {
|
| | | socket.close();
|
| | | } catch (IOException e) {
|
| | | }
|
| | | socket = null;
|
| | | |
| | | // remove this connection from the service
|
| | | removeConnection(this);
|
| | | }
|
| | | }
|
| | | } |
New file |
| | |
| | | /*
|
| | | * Copyright 2013 gitblit.com.
|
| | | *
|
| | | * Licensed under the Apache License, Version 2.0 (the "License");
|
| | | * you may not use this file except in compliance with the License.
|
| | | * You may obtain a copy of the License at
|
| | | *
|
| | | * http://www.apache.org/licenses/LICENSE-2.0
|
| | | *
|
| | | * Unless required by applicable law or agreed to in writing, software
|
| | | * distributed under the License is distributed on an "AS IS" BASIS,
|
| | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| | | * See the License for the specific language governing permissions and
|
| | | * limitations under the License.
|
| | | */
|
| | | package com.gitblit.fanout;
|
| | |
|
| | | import java.io.Serializable;
|
| | | import java.text.MessageFormat;
|
| | | import java.util.Date;
|
| | |
|
| | | /**
|
| | | * Encapsulates the runtime stats of a fanout service.
|
| | | * |
| | | * @author James Moger
|
| | | *
|
| | | */
|
| | | public class FanoutStats implements Serializable {
|
| | |
|
| | | private static final long serialVersionUID = 1L;
|
| | | |
| | | public long concurrentConnectionLimit;
|
| | | public boolean allowAllChannelAnnouncements;
|
| | | public boolean strictRequestTermination;
|
| | | |
| | | public Date bootDate;
|
| | | public long rejectedConnectionCount;
|
| | | public int peakConnectionCount;
|
| | | public long currentChannels;
|
| | | public long currentSubscriptions;
|
| | | public long currentConnections;
|
| | | public long totalConnections;
|
| | | public long totalAnnouncements;
|
| | | public long totalMessages;
|
| | | public long totalSubscribes;
|
| | | public long totalUnsubscribes;
|
| | | public long totalPings;
|
| | | |
| | | public String info() {
|
| | | int i = 0;
|
| | | StringBuilder sb = new StringBuilder();
|
| | | sb.append(infoStr(i++, "boot date"));
|
| | | sb.append(infoStr(i++, "strict request termination"));
|
| | | sb.append(infoStr(i++, "allow connection \"all\" announcements"));
|
| | | sb.append(infoInt(i++, "concurrent connection limit"));
|
| | | sb.append(infoInt(i++, "concurrent limit rejected connections"));
|
| | | sb.append(infoInt(i++, "peak connections"));
|
| | | sb.append(infoInt(i++, "current connections"));
|
| | | sb.append(infoInt(i++, "current channels"));
|
| | | sb.append(infoInt(i++, "current subscriptions"));
|
| | | sb.append(infoInt(i++, "user-requested subscriptions"));
|
| | | sb.append(infoInt(i++, "total connections"));
|
| | | sb.append(infoInt(i++, "total announcements"));
|
| | | sb.append(infoInt(i++, "total messages"));
|
| | | sb.append(infoInt(i++, "total subscribes"));
|
| | | sb.append(infoInt(i++, "total unsubscribes"));
|
| | | sb.append(infoInt(i++, "total pings"));
|
| | | String template = sb.toString();
|
| | |
|
| | | String info = MessageFormat.format(template, |
| | | bootDate.toString(),
|
| | | Boolean.toString(strictRequestTermination),
|
| | | Boolean.toString(allowAllChannelAnnouncements),
|
| | | concurrentConnectionLimit,
|
| | | rejectedConnectionCount,
|
| | | peakConnectionCount,
|
| | | currentConnections, |
| | | currentChannels,
|
| | | currentSubscriptions,
|
| | | currentSubscriptions == 0 ? 0 : (currentSubscriptions - currentConnections),
|
| | | totalConnections,
|
| | | totalAnnouncements,
|
| | | totalMessages,
|
| | | totalSubscribes,
|
| | | totalUnsubscribes,
|
| | | totalPings);
|
| | | return info;
|
| | | }
|
| | | |
| | | private String infoStr(int index, String label) {
|
| | | return label + ": {" + index + "}\n";
|
| | | }
|
| | | |
| | | private String infoInt(int index, String label) {
|
| | | return label + ": {" + index + ",number,0}\n";
|
| | | }
|
| | |
|
| | | }
|
New file |
| | |
| | | /*
|
| | | * Copyright 2013 gitblit.com.
|
| | | *
|
| | | * Licensed under the Apache License, Version 2.0 (the "License");
|
| | | * you may not use this file except in compliance with the License.
|
| | | * You may obtain a copy of the License at
|
| | | *
|
| | | * http://www.apache.org/licenses/LICENSE-2.0
|
| | | *
|
| | | * Unless required by applicable law or agreed to in writing, software
|
| | | * distributed under the License is distributed on an "AS IS" BASIS,
|
| | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| | | * See the License for the specific language governing permissions and
|
| | | * limitations under the License.
|
| | | */
|
| | | package com.gitblit.tests;
|
| | |
|
| | | import static org.junit.Assert.assertEquals;
|
| | |
|
| | | import java.text.MessageFormat;
|
| | | import java.util.Date;
|
| | | import java.util.Map;
|
| | | import java.util.concurrent.ConcurrentHashMap;
|
| | | import java.util.concurrent.atomic.AtomicInteger;
|
| | |
|
| | | import org.junit.Test;
|
| | |
|
| | | import com.gitblit.fanout.FanoutService;
|
| | | import com.gitblit.fanout.FanoutClient;
|
| | | import com.gitblit.fanout.FanoutClient.FanoutAdapter;
|
| | | import com.gitblit.fanout.FanoutNioService;
|
| | | import com.gitblit.fanout.FanoutService;
|
| | | import com.gitblit.fanout.FanoutSocketService;
|
| | |
|
| | | public class FanoutServiceTest {
|
| | | |
| | | int fanoutPort = FanoutService.DEFAULT_PORT;
|
| | | |
| | | @Test
|
| | | public void testNioPubSub() throws Exception {
|
| | | testPubSub(new FanoutNioService(fanoutPort));
|
| | | }
|
| | |
|
| | | @Test
|
| | | public void testSocketPubSub() throws Exception {
|
| | | testPubSub(new FanoutSocketService(fanoutPort));
|
| | | }
|
| | | |
| | | @Test
|
| | | public void testNioDisruptionAndRecovery() throws Exception {
|
| | | testDisruption(new FanoutNioService(fanoutPort));
|
| | | }
|
| | |
|
| | | @Test
|
| | | public void testSocketDisruptionAndRecovery() throws Exception {
|
| | | testDisruption(new FanoutSocketService(fanoutPort));
|
| | | }
|
| | | |
| | | protected void testPubSub(FanoutService service) throws Exception {
|
| | | System.out.println(MessageFormat.format("\n\n========================================\nPUBSUB TEST {0}\n========================================\n\n", service.toString()));
|
| | | service.startSynchronously();
|
| | | |
| | | final Map<String, String> announcementsA = new ConcurrentHashMap<String, String>();
|
| | | FanoutClient clientA = new FanoutClient("localhost", fanoutPort);
|
| | | clientA.addListener(new FanoutAdapter() {
|
| | | |
| | | @Override
|
| | | public void announcement(String channel, String message) {
|
| | | announcementsA.put(channel, message);
|
| | | }
|
| | | });
|
| | | |
| | | clientA.startSynchronously();
|
| | |
|
| | | final Map<String, String> announcementsB = new ConcurrentHashMap<String, String>();
|
| | | FanoutClient clientB = new FanoutClient("localhost", fanoutPort);
|
| | | clientB.addListener(new FanoutAdapter() {
|
| | | @Override
|
| | | public void announcement(String channel, String message) {
|
| | | announcementsB.put(channel, message);
|
| | | }
|
| | | });
|
| | | clientB.startSynchronously();
|
| | |
|
| | | |
| | | // subscribe clients A and B to the channels
|
| | | clientA.subscribe("a");
|
| | | clientA.subscribe("b");
|
| | | clientA.subscribe("c");
|
| | | |
| | | clientB.subscribe("a");
|
| | | clientB.subscribe("b");
|
| | | clientB.subscribe("c");
|
| | | |
| | | // give async messages a chance to be delivered
|
| | | Thread.sleep(1000);
|
| | | |
| | | clientA.announce("a", "apple");
|
| | | clientA.announce("b", "banana");
|
| | | clientA.announce("c", "cantelope");
|
| | | |
| | | clientB.announce("a", "avocado");
|
| | | clientB.announce("b", "beet");
|
| | | clientB.announce("c", "carrot");
|
| | |
|
| | | // give async messages a chance to be delivered
|
| | | Thread.sleep(2000);
|
| | |
|
| | | // confirm that client B received client A's announcements
|
| | | assertEquals("apple", announcementsB.get("a"));
|
| | | assertEquals("banana", announcementsB.get("b"));
|
| | | assertEquals("cantelope", announcementsB.get("c"));
|
| | |
|
| | | // confirm that client A received client B's announcements
|
| | | assertEquals("avocado", announcementsA.get("a"));
|
| | | assertEquals("beet", announcementsA.get("b"));
|
| | | assertEquals("carrot", announcementsA.get("c"));
|
| | | |
| | | clientA.stop();
|
| | | clientB.stop();
|
| | | service.stop(); |
| | | }
|
| | | |
| | | protected void testDisruption(FanoutService service) throws Exception {
|
| | | System.out.println(MessageFormat.format("\n\n========================================\nDISRUPTION TEST {0}\n========================================\n\n", service.toString()));
|
| | | service.startSynchronously();
|
| | | |
| | | final AtomicInteger pongCount = new AtomicInteger(0);
|
| | | FanoutClient client = new FanoutClient("localhost", fanoutPort);
|
| | | client.addListener(new FanoutAdapter() {
|
| | | @Override
|
| | | public void pong(Date timestamp) {
|
| | | pongCount.incrementAndGet();
|
| | | }
|
| | | });
|
| | | client.startSynchronously();
|
| | | |
| | | // ping and wait for pong
|
| | | client.ping(); |
| | | Thread.sleep(500);
|
| | | |
| | | // restart client
|
| | | client.stop();
|
| | | Thread.sleep(1000);
|
| | | client.startSynchronously(); |
| | | |
| | | // ping and wait for pong
|
| | | client.ping(); |
| | | Thread.sleep(500);
|
| | | |
| | | assertEquals(2, pongCount.get());
|
| | | |
| | | // now disrupt service
|
| | | service.stop(); |
| | | Thread.sleep(2000);
|
| | | service.startSynchronously();
|
| | | |
| | | // wait for reconnect
|
| | | Thread.sleep(2000);
|
| | |
|
| | | // ping and wait for pong
|
| | | client.ping();
|
| | | Thread.sleep(500);
|
| | |
|
| | | // kill all
|
| | | client.stop();
|
| | | service.stop();
|
| | | |
| | | // confirm expected pong count
|
| | | assertEquals(3, pongCount.get());
|
| | | }
|
| | | } |
| | |
| | | MarkdownUtilsTest.class, JGitUtilsTest.class, SyndicationUtilsTest.class,
|
| | | DiffUtilsTest.class, MetricUtilsTest.class, TicgitUtilsTest.class, X509UtilsTest.class,
|
| | | GitBlitTest.class, FederationTests.class, RpcTests.class, GitServletTest.class,
|
| | | GroovyScriptTest.class, LuceneExecutorTest.class, IssuesTest.class, RepositoryModelTest.class })
|
| | | GroovyScriptTest.class, LuceneExecutorTest.class, IssuesTest.class, RepositoryModelTest.class,
|
| | | FanoutServiceTest.class })
|
| | | public class GitBlitSuite {
|
| | |
|
| | | public static final File REPOSITORIES = new File("git");
|