首页 > 代码库 > 编写一个简易的Java NIO Reactor库

编写一个简易的Java NIO Reactor库

开源地址

https://github.com/sea-boat/net-reactor

源码设计

接收器Acceptor

/**
 * 
 * @author seaboat
 * @date 2016-08-25
 * @version 1.0
 * <pre><b>email: </b>849586227@qq.com</pre>
 * <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
 * <p>This Acceptor provides a NIO mode to accept client sockets.</p>
 */
public final class Acceptor extends Thread {

    private static final Logger LOGGER = LoggerFactory
            .getLogger(Acceptor.class);
    private final int port;
    private final Selector selector;
    private final ServerSocketChannel serverChannel;
    private long acceptCount;
    private static final AcceptIdGenerator IdGenerator = new AcceptIdGenerator();
    private ReactorPool reactorPool;

    public Acceptor(ReactorPool reactorPool, String name, String bindIp,
            int port) throws IOException {
        super.setName(name);
        this.port = port;
        this.selector = Selector.open();
        this.serverChannel = ServerSocketChannel.open();
        this.serverChannel.configureBlocking(false);
        this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
        this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 1024);
        this.serverChannel.bind(new InetSocketAddress(bindIp, port), 100);
        this.serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        this.reactorPool = reactorPool;
    }

    public int getPort() {
        return port;
    }

    public long getAcceptCount() {
        return acceptCount;
    }

    @Override
    public void run() {
        final Selector selector = this.selector;
        for (;;) {
            ++acceptCount;
            try {
                selector.select(1000L);
                Set<SelectionKey> keys = selector.selectedKeys();
                try {
                    for (SelectionKey key : keys) {
                        if (key.isValid() && key.isAcceptable()) {
                            accept();
                        } else {
                            key.cancel();
                        }
                    }
                } finally {
                    keys.clear();
                }
            } catch (Throwable e) {
                LOGGER.warn(getName(), e);
            }
        }
    }

    /**
     * Accept client sockets.
     */
    private void accept() {
        SocketChannel channel = null;
        try {
            channel = serverChannel.accept();
            channel.configureBlocking(false);
            channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
            channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
            channel.setOption(StandardSocketOptions.SO_RCVBUF, 1024);
            channel.setOption(StandardSocketOptions.SO_SNDBUF, 1024);
            channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
            reactorPool.getNextReactor().postRegister(
                    new FrontendConnection(channel, IdGenerator.getId()));
        } catch (Throwable e) {
            closeChannel(channel);
            LOGGER.warn(getName(), e);
        }
    }

    /**
     * Close a channel.
     * 
     * @param channel
     */
    private static void closeChannel(SocketChannel channel) {
        if (channel == null) {
            return;
        }
        Socket socket = channel.socket();
        if (socket != null) {
            try {
                socket.close();
                LOGGER.info("channel close.");
            } catch (IOException e) {
                LOGGER.warn("IOException happens when closing socket : ", e);
            }
        }
        try {
            channel.close();
        } catch (IOException e) {
            LOGGER.warn("IOException happens when closing channel : ", e);
        }
    }

    /**
     * ID Generator.
     */
    private static class AcceptIdGenerator {
        private static final long MAX_VALUE = http://www.mamicode.com/0xffffffffL;
        private long acceptId = 0L;
        private final Object lock = new Object();

        private long getId() {
            synchronized (lock) {
                if (acceptId >= MAX_VALUE) {
                    acceptId = 0L;
                }
                return ++acceptId;
            }
        }
    }
}

Reactor类

/**
 * 
 * @author seaboat
 * @date 2016-08-25
 * @version 1.0
 * <pre><b>email: </b>849586227@qq.com</pre>
 * <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
 * <p>Reactor reacts all sockets.</p>
 */
public final class Reactor extends Thread {
    private static final Logger LOGGER = LoggerFactory.getLogger(Reactor.class);
    private final String name;
    private final Selector selector;
    private final ConcurrentLinkedQueue<FrontendConnection> queue;
    private long doCount;
    private Handler handler;

    public Reactor(String name, Handler handler) throws IOException {
        this.name = name;
        this.selector = Selector.open();
        this.queue = new ConcurrentLinkedQueue<FrontendConnection>();
        this.handler = handler;
    }

    final void postRegister(FrontendConnection frontendConnection) {
        queue.offer(frontendConnection);
        this.selector.wakeup();
    }

    @Override
    public void run() {
        final Selector selector = this.selector;
        Set<SelectionKey> keys = null;
        for (;;) {
            ++doCount;
            try {
                selector.select(500L);
                register(selector);
                keys = selector.selectedKeys();
                for (SelectionKey key : keys) {
                    FrontendConnection connection = null;
                    Object attach = key.attachment();
                    if (attach != null && key.isValid()) {
                        connection = (FrontendConnection) attach;
                        if (key.isReadable()) {
                            try {
                                connection.read();
                                handler.handle(connection);
                            } catch (IOException e) {
                                connection.close();
                                LOGGER.warn("IOException happens : ", e);
                                continue;
                            } catch (Throwable e) {
                                LOGGER.warn("Throwable happens : ", e);
                                continue;
                            }
                        }
                        if (key.isValid() && key.isWritable()) {
                            connection.write();
                        }
                    } else {
                        key.cancel();
                    }
                }
            } catch (Throwable e) {
                LOGGER.warn("exception happens selecting : ", e);
            } finally {
                if (keys != null) {
                    keys.clear();
                }
            }
        }
    }

    private void register(Selector selector) {
        FrontendConnection c = null;
        if (queue.isEmpty()) {
            return;
        }
        while ((c = queue.poll()) != null) {
            try {
                c.register(selector);
            } catch (Throwable e) {
                LOGGER.warn("ClosedChannelException happens : ", e);
            }
        }
    }

    final Queue<FrontendConnection> getRegisterQueue() {
        return queue;
    }

    final long getReactCount() {
        return doCount;
    }

}

Reactor池

/**
 * 
 * @author seaboat
 * @date 2016-08-25
 * @version 1.0
 * <pre><b>email: </b>849586227@qq.com</pre>
 * <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
 * <p>Reactor pool. Socket connections are polling to the reactor of this pool. </p>
 */
public class ReactorPool {
    private final Reactor[] reactors;
    private volatile int nextReactor;
    private String name = "reactor";

    public ReactorPool(int poolSize, Handler handler) throws IOException {
        reactors = new Reactor[poolSize];
        for (int i = 0; i < poolSize; i++) {
            Reactor reactor = new Reactor(name + "-" + i,handler);
            reactors[i] = reactor;
            reactor.start();
        }
    }

    public Reactor getNextReactor() {
        if (++nextReactor == reactors.length) {
            nextReactor = 0;
        }
        return reactors[nextReactor];
    }
}

前端连接抽象

/**
 * 
 * @author seaboat
 * @date 2016-08-25
 * @version 1.0
 * <pre><b>email: </b>849586227@qq.com</pre>
 * <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
 * <p>This is a abstraction of frontend.</p>
 */
public class FrontendConnection {
    private static final Logger LOGGER = LoggerFactory
            .getLogger(FrontendConnection.class);
    private long id;
    private SocketChannel channel;
    private SelectionKey selectionKey;
    private ByteBuffer readBuffer;
    private static int BYFFERSIZE = 1024;
    protected ConcurrentLinkedQueue<ByteBuffer> writeQueue = new ConcurrentLinkedQueue<ByteBuffer>();

    public FrontendConnection(SocketChannel channel, long id) {
        this.id = id;
        this.channel = channel;
    }

    public SocketChannel getChannel() {
        return channel;
    }

    public long getId() {
        return id;
    }

    public void read() throws IOException {
        readBuffer = ByteBuffer.allocate(BYFFERSIZE);
        channel.read(readBuffer);
    }

    public void close() throws IOException {
        channel.close();
    }

    public void write() throws IOException {
        ByteBuffer buffer;
        while ((buffer = writeQueue.poll()) != null) {
            buffer.flip();
            while (buffer.hasRemaining()) {
                int len = channel.write(buffer);
                if (len < 0) {
                    throw new EOFException();
                }
                if (len == 0) {
                    selectionKey.interestOps(selectionKey.interestOps()
                            | SelectionKey.OP_WRITE);
                    selectionKey.selector().wakeup();
                    break;
                }
            }
        }
        selectionKey.interestOps(selectionKey.interestOps()
                & ~SelectionKey.OP_WRITE);
    }

    public ByteBuffer getReadBuffer() {
        return readBuffer;
    }

    public ConcurrentLinkedQueue<ByteBuffer> getWriteQueue() {
        return writeQueue;
    }

    public void register(Selector selector) throws Throwable {
        selectionKey = channel.register(selector, SelectionKey.OP_READ, this);
    }

}

处理

/**
 * 
 * @author seaboat
 * @date 2016-08-25
 * @version 1.0
 * <pre><b>email: </b>849586227@qq.com</pre>
 * <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
 * <p>This Handler will be call when there is a data having be ready.</p>
 */
public interface Handler {

    public void handle(FrontendConnection connection);

}

定义自己的处理

/**
 * 
 * @author seaboat
 * @date 2016-08-25
 * @version 1.0
 * <pre><b>email: </b>849586227@qq.com</pre>
 * <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
 * <p>Demo.</p>
 */
public class MyHandler implements Handler {

    private static final Logger LOGGER = LoggerFactory
            .getLogger(MyHandler.class);
    private long readSize;

    /**
     * The logic to deal with the received data.
     *  
     * It means that reactor will trigger this function once the data is received.
     */
    public void handle(FrontendConnection connection) {
        Buffer buff = connection.getReadBuffer();
        readSize = +readSize + buff.position();
        LOGGER.info(connection.getId() + " connection has receive " + readSize);
        if (readSize % 5 == 0) {
            ByteBuffer sendBuffer = ByteBuffer.allocate(10);;
            sendBuffer.wrap("hello".getBytes());
            connection.getWriteQueue().add(sendBuffer);
            try {
                connection.write();
            } catch (IOException e) {
                LOGGER.warn("IOException", e);
            }
        }
    }

}

启动

/**
 * 
 * @author seaboat
 * @date 2016-08-25
 * @version 1.0
 * <pre><b>email: </b>849586227@qq.com</pre>
 * <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
 * <p>The reactor bootstrap.</p>
 */
public class Bootstrap {
    private static final Logger LOGGER = LoggerFactory
            .getLogger(Bootstrap.class);
    private static String acceptorName = "acceptor-thread";
    private static String host = "localhost";
    private static int port = 6789;

    public static void main(String[] args) {
        try {
            LOGGER.info("starting up ......");
            Handler handler = new MyHandler();
            ReactorPool reactorPool = new ReactorPool(Runtime.getRuntime().availableProcessors(), handler);
            new Acceptor(reactorPool, acceptorName, host, port).start();
            LOGGER.info("started up successfully.");
            while (true) {
                Thread.sleep(300 * 1000);
            }
        } catch (Throwable e) {
            LOGGER.error(" launch error", e);
            System.exit(-1);
        }
    }
}

net-reactor

it’s a simple and easy net framework with nio mode written by java

how-to

just simply like:

Handler handler = new MyHandler();
ReactorPool reactorPool = new ReactorPool(Runtime.getRuntime().availableProcessors(), handler);
new Acceptor(reactorPool, acceptorName, host, port).start();
<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    编写一个简易的Java NIO Reactor库