首页 > 代码库 > 开源一个基于nio的java网络程序

开源一个基于nio的java网络程序

       因为最近要从公司离职,害怕用nio写的网络程序没有人能看懂(或许是因为写的不好吧),就调整成了mina(这样大家接触起来非常方便,即使没有socket基础,用起来也不难),所以之前基于nio写的网络程序就开放出来好了!

      写的比较挫,大家见谅!

      首先是PollServer类,主要处理select,做网络事件的监听和基于FutureTask的数据发送,代码如下:

  
package gs.gate;import gs.gate.handle.ClientHandle;import java.util.ArrayList;import java.util.Iterator;import java.util.List;import java.util.Set;import java.util.Vector;import java.util.concurrent.FutureTask;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import org.apache.log4j.Logger;import door.HeartTimer;public class PollServer implements Runnable{    private Logger log = Logger.getLogger(getClass());    private Selector select = null;    private ServerSocketChannel serverSocketChannel = null;    private HeartTimer writerExpire = null;    private volatile boolean run = true;        private List<FutureTask<Integer>> writeTasks =             new Vector<FutureTask<Integer>>();        public PollServer(String host,int port) throws IOException    {        select = Selector.open();        serverSocketChannel = ServerSocketChannel.open();        serverSocketChannel.socket().setReuseAddress(true);        serverSocketChannel.configureBlocking(false);        serverSocketChannel.socket().bind(new InetSocketAddress(host, port));        serverSocketChannel.register(select, SelectionKey.OP_ACCEPT);    }        public Selector getSelector()    {        return this.select;    }        public void stop()    {        this.run = false;    }        public void run()    {        if(writerExpire == null)        {            writerExpire = new HeartTimer(50);        }        while(this.run)        {                        try            {                this.listen();            }            catch (Exception e)            {                log.info("PollServer listen() err! " + e.toString());             }                        try            {                List<FutureTask<Integer>> writeTasks_ = null;                synchronized (writeTasks)                {                    writeTasks_ = new ArrayList<FutureTask<Integer>>(this.writeTasks);                    this.writeTasks.clear();                }                                for(FutureTask<Integer> task : writeTasks_)                {                    task.run();                }            }            catch (Exception e)            {                log.error("PollServer processOutput() err" ,e);            }        }    }        public void listen() throws IOException    {                select.select(10);        Set<SelectionKey> readyKeys = select.selectedKeys();        Iterator<SelectionKey> itr = readyKeys.iterator();                //处理接受        while(itr.hasNext())        {            SelectionKey key = itr.next();            itr.remove();                        if(key.isAcceptable())            {                SocketChannel newConnection = serverSocketChannel.accept();                this.addClient(newConnection);            }            else if(key.isReadable())            {                ClientHandle handle = (ClientHandle)key.attachment();                try                {                    if(handle.handleRead() <= 0)                    {                        log.info("if handleRead < 0");                        this.removeClient(handle);                    }                }                catch (Exception e)                {                    log.error("exception",e);                    this.removeClient(handle);                }            }            else if(key.isWritable())            {                ClientHandle handle = (ClientHandle)key.attachment();                try                {                    handle.handleWrite();                    if(handle.hasRemaining() == false)                    {                        key.cancel();                    }                }                catch (Exception e)                {                    this.removeClient(handle);                    log.error("if handleWrite error",e);                }            }                        }    }            public void addWriteTask(FutureTask<Integer> future)    {        this.writeTasks.add(future);    }        public void addClient(SocketChannel socket)    {        ClientHandle handle = new ClientHandle(socket,this);        try        {            socket.socket().setTcpNoDelay(run);            socket.configureBlocking(false);            socket.register(select, SelectionKey.OP_READ,handle);        }        catch (Exception e)        {            try            {                log.error("create client err",e);                socket.close();            }            catch (Exception  err)            {}                    }    }        public void removeClient(ClientHandle handle)    {        if(handle == null)        {            return ;        }                log.info(" remove Client ");        handle.handleDisConnected();    }    }
View Code

       主要函数: listen();作用:基于网络事件处理接受新链接和消息的接收! 

      主要函数: processOutput(); 作用: 做统一的发送处理,在这篇 浅谈游戏服务器的发送数据处理 中有讲解!每个连接在发送的时候,将数据和连接封装成FutureTask,然后投递到Pollserver中的安全队列中,在这里统一将安全队列中的任务执行完毕! 如果有数据没有发送完毕,就监听写时间,直到这个链接成为可写事件(即:写缓冲区中有空闲)。

     

      下面是ClientHandle类的代码,做每个连接的处理,比如拆包分包,代码如下

package gs.gate.handle;import gs.gate.PollServer;import java.io.IOException;import java.net.InetAddress;import java.nio.channels.SelectionKey;import java.nio.channels.SocketChannel;import java.util.concurrent.FutureTask;import org.apache.log4j.Logger;import dc.control.DCThread;import dc.util.DcTask;import door.IPlayer;import senv.server.ServerKit;import slib.net.ISession;import slib.util.ByteBuffer;public class ClientHandle implements ISession{    private Logger log = Logger.getLogger(getClass());    public final static int RW_BUFFER_SIZE = 1024;    private SocketChannel socket = null;        private java.nio.ByteBuffer reader = java.nio.ByteBuffer.allocate(8*RW_BUFFER_SIZE);    private java.nio.ByteBuffer writer = java.nio.ByteBuffer.allocate(10*RW_BUFFER_SIZE);        private volatile IPlayer player = null;        private PollServer poll = null;        private boolean active = false;        private boolean tgwProcessed = false;        private String tgw = "tgw_l7_forward\r\nHost:" + ServerKit.ip+ ":" + ServerKit.port + "\r\n\r\n";        public ClientHandle(SocketChannel socket,PollServer poll)    {        this.socket = socket;        this.writer.limit(this.writer.capacity());        this.active = true;        this.poll = poll;                try        {            this.socket.socket().setSendBufferSize(10*RW_BUFFER_SIZE);            this.socket.socket().setReceiveBufferSize(8*RW_BUFFER_SIZE);            this.socket.socket().setTcpNoDelay(true);            this.socket.socket().setSoLinger(true, 3600);        }        catch (Exception e)        {            log.error("err",e);        }            }        public SocketChannel getSocketChannel()    {        return this.socket;    }            public int handleWrite() throws IOException    {        if (!this.isActive())        {            return -1;        }                this.writer.flip();        this.socket.write(writer);        if(this.writer.hasRemaining())        {            this.writer.compact();        }        else        {            this.writer.clear();        }                return 0;    }            public boolean hasRemaining()    {        return this.writer.position() > 0;    }        public Integer writeData(ByteBuffer data) throws IOException    {        if (!this.isActive())        {            return -1;        }        if(data =http://www.mamicode.com/= null)        {            return 0;        }                this.writer.putInt(data.length());        this.writer.put(data.toByteArray(), 0, data.length());        this.writer.flip();                int result = this.socket.write(this.writer);                if(this.writer.hasRemaining())        {            this.writer.compact();        }        else        {            this.writer.clear();        }        return result;    }        @SuppressWarnings("deprecation")    public int handleRead() throws IOException     {        if(socket == null)        {            return -1;        }        if(!this.isActive())        {            return -1;        }                int r = this.socket.read(this.reader);        if(r <= 0)        {            return r;        }                if(this.tgwProcessed == false)        {            //腾讯平台 你mb            this.reader.flip();            byte bytes[] = new byte[tgw.length()];            this.reader.get(bytes);                        String vali = new String(bytes,"UTF-8");            if(vali.equals(tgw))            {                log.info("tgw 校验成功");            }            else            {                log.info("tgw 校验失败");            }            this.tgwProcessed = true;            this.reader.compact();        }                while(true)        {            this.reader.flip();            ByteBuffer data = this.createBuffer();            if(data =http://www.mamicode.com/= null)            {                break;            }                        this.reader.get(data.getByteArray(), data.top(), data.capacity());            this.processData(data);                        if(this.reader.hasRemaining())            {                this.reader.compact();            }            else            {                this.reader.clear();                break;            }        }        return 1;    }        public void processData(ByteBuffer data)    {        if(player == null)        {            DcTask task = new DcTask();            task.object = this;            task.data = data;            DCThread.getInstance().insertTask(task);        }        else        {            player.insertData(data);        }    }        public void handleDisConnected()    {        if(!this.isActive())        {            return ;        }        if(player != null)        {            this.player.setSession(null);            this.player.logOut();        }        this.close();        this.player = null;    }        private ByteBuffer createBuffer()    {        if(reader.remaining() < 4)        {            return null;        }                int len = reader.getInt();        if(len > reader.remaining())        {            this.reader.rewind();            this.reader.compact();            return null;        }                if (len > 0 && len <= 10 * 1024)        {            return new ByteBuffer(len);        }                return null;    }    @Override    public void close()    {        setActive(false);        try        {            log.error("socket close !");            this.socket.close();            this.socket.keyFor(this.poll.getSelector()).cancel();        }        catch (Exception e)        {            log.error("err" , e);         }    }    @Override    public long getActiveTime()    {        return 0;    }    @Override    public InetAddress getAddress()    {        return null;    }    @Override    public String getCode()    {        return null;    }    @Override    public int getPing()    {        return 0;    }    @Override    public long getPingTime()    {        return 0;    }    @Override    public int getPort()    {        return 0;    }    @Override    public int getServerId()    {        return 0;    }    @Override    public int getSessionId()    {        return 0;    }    @Override    public Object getSource()    {        return this.player;    }    @Override    public int getTimeout()    {        return 0;    }    @Override    public boolean isActive()    {        return this.active;    }    @Override    public void send(ByteBuffer data)    {        WriteTask task = new WriteTask(this,data);        FutureTask<Integer> future = new FutureTask<Integer>(task);        poll.addWriteTask(future);    }    @Override    public void send(ByteBuffer arg0, ByteBuffer arg1)    {            }        @Override    public void setCode(String arg0)    {            }    @Override    public void setPing(int arg0)    {            }    @Override    public void setPingTime(long arg0)    {            }        public void enableWriteEvent()    {        try        {            this.socket.register(this.poll.getSelector(), SelectionKey.OP_WRITE, this);        }        catch (Exception e)        {            e.printStackTrace();        }    }        public void shutdownWriteEvent()    {            }    @Override    public void setSource(Object arg0)    {        this.player = (IPlayer) arg0;    }        protected void setActive(boolean b)    {        this.active = b;    }    }
View Code

        重要的几个函数:  send(ByteBuffer data) 将发送处理包装成FutureTask,投递到PollServer中进行处理,就是PollServer::processOutput中处理

                                handleRead() 这里处理接受数据事件,做了拆包,将二进制数据,按照 长度-内容 的格式进行解析,拆分成一个个ByteBuffer(定义见下文)包,然后进行处理。

       ClientHandle继承自ISession接口,其实这个无所谓,大家可以自己定义。我这里因为要和之前的系统兼容,所以才继承了这个。这里一不小心居然用到了适配器模式,我以为这辈子只会用到创建者模式呢? 个人还是觉得,这些设计模式还是为了解决问题用到的,而不是为了多变的需求而想太多用到的;设计模式用得多用的频繁,反而增加代码的可读性!

       最后看下WriteTask的封装

package gs.gate.handle;import java.util.concurrent.Callable;import slib.util.ByteBuffer;public class WriteTask implements Callable<Integer>{        private ClientHandle client = null;        private ByteBuffer data = http://www.mamicode.com/null;        public WriteTask(ClientHandle handle,ByteBuffer data)    {        client = handle;        this.data =http://www.mamicode.com/ data;    }        @Override    public Integer call() throws Exception    {        return this.client.writeData(data);    }}
View Code

       这个就不多解析了!

      缺陷1: 没有做空闲连接的处理,后来的mina库,提供了这个功能!有兴趣的同学自己写个吧!

      缺陷2: 自定义的消息包,用了ByteBuffer类,和nio提供的ByteBuffer 重复!

      给出自定义的ByteBuffer的处理:

package slib.util;/** * 类说明:字节缓存类,字节操作高位在前,低位在后 *  * @version 1.0 * @author fxxxysh <hanshuang@linekong.com> */public class ByteBuffer{    /* static fields */    /** 默认的初始容量大小 */    public static final int CAPACITY = 32;    /** 默认的动态数据或文字的最大长度,400k */    public static final int MAX_DATA_LENGTH = 400 * 1024;    /* fields */    /** 字节数组 */    byte[] bytes;    /** 字节缓存的长度 */    int top;    /** 字节缓存的偏移量 */    int offset;    /* constructors */    /** 按默认的大小构造一个字节缓存对象 */    public ByteBuffer()    {        this(CAPACITY);    }    /** 按指定的大小构造一个字节缓存对象 */    public ByteBuffer(int capacity)    {        if (capacity < 1)            throw new IllegalArgumentException(getClass().getName()                    + " <init>, invalid capatity:" + capacity);        bytes = new byte[capacity];        top = 0;        offset = 0;    }    /** 按指定的字节数组构造一个字节缓存对象 */    public ByteBuffer(byte[] data)    {        if (data =http://www.mamicode.com/= null)            throw new IllegalArgumentException(getClass().getName()                    + " <init>, null data");        bytes = data;        top = data.length;        offset = 0;    }    /** 按指定的字节数组构造一个字节缓存对象 */    public ByteBuffer(byte[] data, int index, int length)    {        if (data =http://www.mamicode.com/= null)            throw new IllegalArgumentException(getClass().getName()                    + " <init>, null data");        if (index < 0 || index > data.length)            throw new IllegalArgumentException(getClass().getName()                    + " <init>, invalid index:" + index);        if (length < 0 || data.length < index + length)            throw new IllegalArgumentException(getClass().getName()                    + " <init>, invalid length:" + length);        bytes = data;        top = index + length;        offset = index;    }    /* properties */    /** 得到字节缓存的容积 */    public int capacity()    {        return bytes.length;    }    /** 设置字节缓存的容积,只能扩大容积 */    public void setCapacity(int len)    {        int c = bytes.length;        if (len <= c)            return;        for (; c < len; c = (c << 1) + 1)            ;        byte[] temp = new byte[c];        System.arraycopy(bytes, 0, temp, 0, top);        bytes = temp;    }    /** 得到字节缓存的长度 */    public int top()    {        return top;    }    /** 设置字节缓存的长度 */    public void setTop(int top)    {        if (top < offset)            throw new IllegalArgumentException(this + " setTop, invalid top:"                    + top);        if (top > bytes.length)            setCapacity(top);        this.top = top;    }    /** 得到字节缓存的偏移量 */    public int offset()    {        return offset;    }    /** 设置字节缓存的偏移量 */    public void setOffset(int offset)    {        if (offset < 0 || offset > top)            throw new IllegalArgumentException(this                    + " setOffset, invalid offset:" + offset);        this.offset = offset;    }    /** 得到字节缓存的使用长度 */    public int length()    {        return top - offset;    }    /** 得到字节缓存的字节数组,一般使用toArray()方法 */    public byte[] getByteArray()    {        return bytes;    }    /* methods */    /* byte methods */    /** 得到指定偏移位置的字节 */    public byte read(int pos)    {        return bytes[pos];    }    /** 设置指定偏移位置的字节 */    public void write(int b, int pos)    {        bytes[pos] = (byte) b;    }    /* read methods */    /**     * 按当前偏移位置读入指定的字节数组     *      * @param data     *            指定的字节数组     * @param pos     *            指定的字节数组的起始位置     * @param len     *            读入的长度     */    public void read(byte[] data, int pos, int len)    {        System.arraycopy(bytes, offset, data, pos, len);        offset += len;    }    /** 读出一个布尔值 */    public boolean readBoolean()    {        return (bytes[offset++] != 0);    }    /** 读出一个字节 */    public byte readByte()    {        return bytes[offset++];    }    /** 读出一个无符号字节 */    public int readUnsignedByte()    {        return bytes[offset++] & 0xff;    }    /** 读出一个字符 */    public char readChar()    {        return (char) readUnsignedShort();    }    /** 读出一个短整型数值 */    public short readShort()    {        return (short) readUnsignedShort();    }    /** 读出一个无符号的短整型数值 */    public int readUnsignedShort()    {        int pos = offset;        offset += 2;        return (bytes[pos + 1] & 0xff) + ((bytes[pos] & 0xff) << 8);    }    /** 读出一个整型数值 */    public int readInt()    {        int pos = offset;        offset += 4;        return (bytes[pos + 3] & 0xff) + ((bytes[pos + 2] & 0xff) << 8)                + ((bytes[pos + 1] & 0xff) << 16) + ((bytes[pos] & 0xff) << 24);    }    /** 读出一个浮点数值 */    public float readFloat()    {        return Float.intBitsToFloat(readInt());    }    /** 读出一个长整型数值 */    public long readLong()    {        int pos = offset;        offset += 8;        return (bytes[pos + 7] & 0xffL) + ((bytes[pos + 6] & 0xffL) << 8)                + ((bytes[pos + 5] & 0xffL) << 16)                + ((bytes[pos + 4] & 0xffL) << 24)                + ((bytes[pos + 3] & 0xffL) << 32)                + ((bytes[pos + 2] & 0xffL) << 40)                + ((bytes[pos + 1] & 0xffL) << 48)                + ((bytes[pos] & 0xffL) << 56);    }    /** 读出一个双浮点数值 */    public double readDouble()    {        return Double.longBitsToDouble(readLong());    }    /**     * 读出动态长度, 数据大小采用动态长度,整数类型下,最大为512M 1xxx,xxxx表示(0~0x80) 0~128B     * 01xx,xxxx,xxxx,xxxx表示(0~0x4000) 0~16K     * 001x,xxxx,xxxx,xxxx,xxxx,xxxx,xxxx,xxxx表示(0~0x20000000) 0~512M     */    public int readLength()    {        int n = bytes[offset] & 0xff;        if (n >= 0x80)        {            offset++;            return n - 0x80;        }        else if (n >= 0x40)            return readUnsignedShort() - 0x4000;        else if (n >= 0x20)            return readInt() - 0x20000000;        else            throw new IllegalArgumentException(this                    + " readLength, invalid number:" + n);    }    /** 读出一个指定长度的字节数组,可以为null */    public byte[] readData()    {        int len = readLength() - 1;        if (len < 0)            return null;        if (len > MAX_DATA_LENGTH)            throw new IllegalArgumentException(this                    + " readData, data overflow:" + len);        byte[] data = http://www.mamicode.com/new byte[len];        read(data, 0, len);        return data;    }    /** 读出一个短字节数组,长度不超过254 */    public byte[] readShortData()    {        int len = readUnsignedByte();        if (len == 255)            return null;        byte[] data = http://www.mamicode.com/new byte[len];        if (len != 0)            read(data, 0, len);        return data;    }    /** 读出一个指定长度的字符串 */    public String readString(int len)    {        byte[] data = http://www.mamicode.com/new byte[len];        if (len == 0)            return "";        read(data, 0, len);        return new String(data);    }    /** 读出一个短字符串,长度不超过254 */    public String readShortString()    {        int len = readUnsignedByte();        if (len == 255)            return null;        return readString(len);    }    /** 读出一个字符串,长度不超过65534 */    public String readString()    {        int len = readUnsignedShort();        if (len == 65535)            return null;        return readString(len);    }    /** 读出一个指定长度和编码类型的字符串 */    public String readUTF(String charsetName)    {        int len = readLength() - 1;        if (len < 0)            return null;        if (len > MAX_DATA_LENGTH)            throw new IllegalArgumentException(this                    + " readUTF, data overflow:" + len);        byte[] data = http://www.mamicode.com/new byte[len];        read(data, 0, len);        if (charsetName == null)            return new String(data);        try        {            return new String(data, charsetName);        }        catch (Exception e)        {            throw new IllegalArgumentException(this                    + " readUTF, invalid charsetName:" + charsetName);        }    }    /** 读出一个指定长度的utf字符串 */    public String readUTF()    {        int len = readLength() - 1;        if (len < 0)            return null;        if (len == 0)            return "";        if (len > MAX_DATA_LENGTH)            throw new IllegalArgumentException(this                    + " readUTF, data overflow:" + len);        StringBuffer sb = new StringBuffer(len);        int pos = ByteKit.readUTF(bytes, offset, len, sb);        if (pos > 0)            throw new IllegalArgumentException(this                    + " readUTF, format err, len=" + len + ", pos:" + pos);        offset += len;        return sb.toString();    }    /* write methods */    /**     * 写入指定字节数组     *      * @param data     *            指定的字节数组     * @param pos     *            指定的字节数组的起始位置     * @param len     *            写入的长度     */    public void write(byte[] data, int pos, int len)    {        if (bytes.length < top + len)            setCapacity(top + len);        System.arraycopy(data, pos, bytes, top, len);        top += len;    }    /** 写入一个布尔值 */    public void writeBoolean(boolean b)    {        if (bytes.length < top + 1)            setCapacity(top + CAPACITY);        bytes[top++] = (byte) (b ? 1 : 0);    }    /** 写入一个字节 */    public void writeByte(int b)    {        if (bytes.length < top + 1)            setCapacity(top + CAPACITY);        bytes[top++] = (byte) b;    }    /** 写入一个字符 */    public void writeChar(int c)    {        writeShort(c);    }    /** 写入一个短整型数值 */    public void writeShort(int s)    {        int pos = top;        if (bytes.length < pos + 2)            setCapacity(pos + CAPACITY);        bytes[pos] = (byte) (s >>> 8);        bytes[pos + 1] = (byte) s;        top += 2;    }    /** 在指定位置写入一个短整型数值,length不变 */    public void writeShort(int s, int pos)    {        if (bytes.length < pos + 2)            setCapacity(pos + CAPACITY);        bytes[pos] = (byte) (s >>> 8);        bytes[pos + 1] = (byte) s;    }    /** 写入一个整型数值 */    public void writeInt(int i)    {        int pos = top;        if (bytes.length < pos + 4)            setCapacity(pos + CAPACITY);        bytes[pos] = (byte) (i >>> 24);        bytes[pos + 1] = (byte) (i >>> 16);        bytes[pos + 2] = (byte) (i >>> 8);        bytes[pos + 3] = (byte) i;        top += 4;    }    /** 在指定位置写入一个整型数值,length不变 */    public void writeInt(int i, int pos)    {        if (bytes.length < pos + 4)            setCapacity(pos + CAPACITY);        bytes[pos] = (byte) (i >>> 24);        bytes[pos + 1] = (byte) (i >>> 16);        bytes[pos + 2] = (byte) (i >>> 8);        bytes[pos + 3] = (byte) i;    }    /** 写入一个浮点数值 */    public void writeFloat(float f)    {        writeInt(Float.floatToIntBits(f));    }    /** 写入一个长整型数值 */    public void writeLong(long l)    {        int pos = top;        if (bytes.length < pos + 8)            setCapacity(pos + CAPACITY);        bytes[pos] = (byte) (l >>> 56);        bytes[pos + 1] = (byte) (l >>> 48);        bytes[pos + 2] = (byte) (l >>> 40);        bytes[pos + 3] = (byte) (l >>> 32);        bytes[pos + 4] = (byte) (l >>> 24);        bytes[pos + 5] = (byte) (l >>> 16);        bytes[pos + 6] = (byte) (l >>> 8);        bytes[pos + 7] = (byte) l;        top += 8;    }    /** 写入一个双浮点数值 */    public void writeDouble(double d)    {        writeLong(Double.doubleToLongBits(d));    }    /** 写入动态长度 */    public void writeLength(int len)    {        if (len >= 0x20000000 || len < 0)            throw new IllegalArgumentException(this                    + " writeLength, invalid len:" + len);        if (len >= 0x4000)            writeInt(len + 0x20000000);        else if (len >= 0x80)            writeShort(len + 0x4000);        else            writeByte(len + 0x80);    }    /** 写入一个字节数组,可以为null */    public void writeData(byte[] data)    {        writeData(data, 0, (data != null) ? data.length : 0);    }    /** 写入一个字节数组,可以为null */    public void writeData(byte[] data, int pos, int len)    {        if (data =http://www.mamicode.com/= null)        {            writeLength(0);            return;        }        writeLength(len + 1);        write(data, pos, len);    }    /** 写入一个字符串,可以为null */    public void writeString(String s)    {        if (s != null)        {            byte[] temp = s.getBytes();            if (temp.length > 65534)                throw new IllegalArgumentException(getClass().getName()                        + " writeString, invalid s:" + s);            writeShort(temp.length);            if (temp.length != 0)                write(temp, 0, temp.length);        }        else            writeShort(65535);    }    /** 写入一个字符串,以指定的字符进行编码 */    public void writeUTF(String str, String charsetName)    {        if (str == null)        {            writeLength(0);            return;        }        byte[] data;        if (charsetName != null)        {            try            {                data = str.getBytes(charsetName);            }            catch (Exception e)            {                throw new IllegalArgumentException(this                        + " writeUTF, invalid charsetName:" + charsetName);            }        }        else            data = str.getBytes();        writeLength(data.length + 1);        write(data, 0, data.length);    }    /** 写入一个utf字符串,可以为null */    public void writeUTF(String str)    {        writeUTF(str, 0, (str != null) ? str.length() : 0);    }    /** 写入一个utf字符串中指定的部分,可以为null */    public void writeUTF(String str, int index, int length)    {        if (str == null)        {            writeLength(0);            return;        }        int len = ByteKit.getUTFLength(str, index, length);        writeLength(len + 1);        int pos = top;        if (bytes.length < pos + len)            setCapacity(pos + len);        ByteKit.writeUTF(str, index, length, bytes, pos);        top += len;    }    /** 检查是否为相同类型的实例 */    public boolean checkClass(Object obj)    {        return (obj instanceof ByteBuffer);    }    /** 在指定位置写入一个字节,length不变 */    public void writeByte(int b, int pos)    {        if (bytes.length < pos + 1)            setCapacity(pos + CAPACITY);        bytes[pos] = (byte) b;    }    /** 得到字节缓存当前长度的字节数组 */    public byte[] toByteArray()    {        byte[] data = http://www.mamicode.com/new byte[top - offset];        System.arraycopy(bytes, offset, data, 0, data.length);        return data;    }    /** 清除字节缓存对象 */    public void clear()    {        top = 0;        offset = 0;    }    /* common methods */    public int hashCode()    {        int hash = 17;        for (int i = top - 1; i >= 0; i--)            hash = 65537 * hash + bytes[i];        return hash;    }    public boolean equals(Object obj)    {        if (this == obj)            return true;        if (!checkClass(obj))            return false;        ByteBuffer bb = (ByteBuffer) obj;        if (bb.top != top)            return false;        if (bb.offset != offset)            return false;        for (int i = top - 1; i >= 0; i--)        {            if (bb.bytes[i] != bytes[i])                return false;        }        return true;    }    public String toString()    {        return super.toString() + "[" + top + "," + offset + "," + bytes.length                + "] ";    }}
View Code

       相应的ByteKit类代码:

/** * Copyright 2001 by seasky <www.seasky.cn>. */package slib.util;/** * 类说明: 字节及字节数组的方法操作库 *  * @version 1.0 * @author zminleo <zmin@seasky.cn> */public final class ByteKit{    /* static fields */    /** 库信息 */    public static final String toString=ByteKit.class.getName();    /* static methods */    /** 在字节数组中指定位置读出一个布尔值 */    public static boolean readBoolean(byte[] bytes,int pos)    {        return bytes[pos]!=0;    }    /** 在字节数组中指定位置读出一个字节 */    public static byte readByte(byte[] bytes,int pos)    {        return bytes[pos];    }    /** 在字节数组中指定位置读出一个无符号字节 */    public static int readUnsignedByte(byte[] bytes,int pos)    {        return bytes[pos]&0xff;    }    /** 在字节数组中指定位置读出一个字符 */    public static char readChar(byte[] bytes,int pos)    {        return (char)readUnsignedShort(bytes,pos);    }    /** 在字节数组中指定位置读出一个字符,低位在前,高位在后 */    public static char readChar_(byte[] bytes,int pos)    {        return (char)readUnsignedShort_(bytes,pos);    }    /** 在字节数组中指定位置读出一个短整型数值 */    public static short readShort(byte[] bytes,int pos)    {        return (short)readUnsignedShort(bytes,pos);    }    /** 在字节数组中指定位置读出一个短整型数值,低位在前,高位在后 */    public static short readShort_(byte[] bytes,int pos)    {        return (short)readUnsignedShort_(bytes,pos);    }    /** 在字节数组中指定位置读出一个无符号短整型数值 */    public static int readUnsignedShort(byte[] bytes,int pos)    {        return (bytes[pos+1]&0xff)+((bytes[pos]&0xff)<<8);    }    /** 在字节数组中指定位置读出一个无符号短整型数值,低位在前,高位在后 */    public static int readUnsignedShort_(byte[] bytes,int pos)    {        return ((bytes[pos+1]&0xff)<<8)+(bytes[pos]&0xff);    }    /** 在字节数组中指定位置读出一个整型数值 */    public static int readInt(byte[] bytes,int pos)    {        return ((bytes[pos+3]&0xff))+((bytes[pos+2]&0xff)<<8)            +((bytes[pos+1]&0xff)<<16)+((bytes[pos]&0xff)<<24);    }    /** 在字节数组中指定位置读出一个整型数值,低位在前,高位在后 */    public static int readInt_(byte[] bytes,int pos)    {        return ((bytes[pos+3]&0xff)<<24)+((bytes[pos+2]&0xff)<<16)            +((bytes[pos+1]&0xff)<<8)+((bytes[pos]&0xff));    }    /** 在字节数组中指定位置读出一个浮点数值 */    public static float readFloat(byte[] bytes,int pos)    {        return Float.intBitsToFloat(readInt(bytes,pos));    }    /** 在字节数组中指定位置读出一个浮点数值,低位在前,高位在后 */    public static float readFloat_(byte[] bytes,int pos)    {        return Float.intBitsToFloat(readInt_(bytes,pos));    }    /** 在字节数组中指定位置读出一个长整型数值 */    public static long readLong(byte[] bytes,int pos)    {        return (bytes[pos+7]&0xffL)+((bytes[pos+6]&0xffL)<<8)            +((bytes[pos+5]&0xffL)<<16)+((bytes[pos+4]&0xffL)<<24)            +((bytes[pos+3]&0xffL)<<32)+((bytes[pos+2]&0xffL)<<40)            +((bytes[pos+1]&0xffL)<<48)+((bytes[pos]&0xffL)<<56);    }    /** 在字节数组中指定位置读出一个长整型数值,低位在前,高位在后 */    public static long readLong_(byte[] bytes,int pos)    {        return ((bytes[pos+7]&0xffL)<<56)+((bytes[pos+6]&0xffL)<<48)            +((bytes[pos+5]&0xffL)<<40)+((bytes[pos+4]&0xffL)<<32)            +((bytes[pos+3]&0xffL)<<24)+((bytes[pos+2]&0xffL)<<16)            +((bytes[pos+1]&0xffL)<<8)+(bytes[pos]&0xffL);    }    /** 在字节数组中指定位置读出一个双浮点数值 */    public static double readDouble(byte[] bytes,int pos)    {        return Double.longBitsToDouble(readLong(bytes,pos));    }    /** 在字节数组中指定位置读出一个双浮点数值,低位在前,高位在后 */    public static double readDouble_(byte[] bytes,int pos)    {        return Double.longBitsToDouble(readLong_(bytes,pos));    }    /** 写入一个布尔值在字节数组中指定位置 */    public static void writeBoolean(boolean b,byte[] bytes,int pos)    {        bytes[pos]=(byte)(b?1:0);    }    /** 写入一个字节在字节数组中指定位置 */    public static void writeByte(int b,byte[] bytes,int pos)    {        bytes[pos]=(byte)b;    }    /** 在字节数组中指定位置写入一个字符 */    public static void writeChar(int c,byte[] bytes,int pos)    {        writeShort(c,bytes,pos);    }    /** 写入一个字符在字节数组中指定位置,低位在前,高位在后 */    public static void writeChar_(int c,byte[] bytes,int pos)    {        writeShort_(c,bytes,pos);    }    /** 写入一个短整型数值在字节数组中指定位置 */    public static void writeShort(int s,byte[] bytes,int pos)    {        bytes[pos]=(byte)(s>>>8);        bytes[pos+1]=(byte)s;    }    /** 写入一个短整型数值在字节数组中指定位置,低位在前,高位在后 */    public static void writeShort_(int s,byte[] bytes,int pos)    {        bytes[pos]=(byte)s;        bytes[pos+1]=(byte)(s>>>8);    }    /** 写入一个整型数值在字节数组中指定位置 */    public static void writeInt(int i,byte[] bytes,int pos)    {        bytes[pos]=(byte)(i>>>24);        bytes[pos+1]=(byte)(i>>>16);        bytes[pos+2]=(byte)(i>>>8);        bytes[pos+3]=(byte)i;    }    /** 在字节数组中指定位置写入一个整型数值,低位在前,高位在后 */    public static void writeInt_(int i,byte[] bytes,int pos)    {        bytes[pos]=(byte)i;        bytes[pos+1]=(byte)(i>>>8);        bytes[pos+2]=(byte)(i>>>16);        bytes[pos+3]=(byte)(i>>>24);    }    /** 写入一个浮点数值在字节数组中指定位置 */    public static void writeFloat(float f,byte[] bytes,int pos)    {        writeInt(Float.floatToIntBits(f),bytes,pos);    }    /** 写入一个浮点数值在字节数组中指定位置,低位在前,高位在后 */    public static void writeFloat_(float f,byte[] bytes,int pos)    {        writeInt_(Float.floatToIntBits(f),bytes,pos);    }    /** 写入一个长整型数值在字节数组中指定位置 */    public static void writeLong(long l,byte[] bytes,int pos)    {        bytes[pos]=(byte)(l>>>56);        bytes[pos+1]=(byte)(l>>>48);        bytes[pos+2]=(byte)(l>>>40);        bytes[pos+3]=(byte)(l>>>32);        bytes[pos+4]=(byte)(l>>>24);        bytes[pos+5]=(byte)(l>>>16);        bytes[pos+6]=(byte)(l>>>8);        bytes[pos+7]=(byte)l;    }    /** 写入一个长整型数值在字节数组中指定位置,低位在前,高位在后 */    public static void writeLong_(long l,byte[] bytes,int pos)    {        bytes[pos]=(byte)l;        bytes[pos+1]=(byte)(l>>>8);        bytes[pos+2]=(byte)(l>>>16);        bytes[pos+3]=(byte)(l>>>24);        bytes[pos+4]=(byte)(l>>>32);        bytes[pos+5]=(byte)(l>>>40);        bytes[pos+6]=(byte)(l>>>48);        bytes[pos+7]=(byte)(l>>>56);    }    /** 写入一个双浮点数值在字节数组中指定位置 */    public static void writeDouble(double d,byte[] bytes,int pos)    {        writeLong(Double.doubleToLongBits(d),bytes,pos);    }    /** 写入一个双浮点数值在字节数组中指定位置,低位在前,高位在后 */    public static void writeDouble_(double d,byte[] bytes,int pos)    {        writeLong_(Double.doubleToLongBits(d),bytes,pos);    }    /** 将指定的字节数据转换为ISO-8859-1格式的字符串 */    public static String readISO8859_1(byte[] data)    {        return readISO8859_1(data,0,data.length);    }    /** 将指定的字节数据转换为ISO-8859-1格式的字符串 */    public static String readISO8859_1(byte[] data,int pos,int len)    {        char[] array=new char[len];        for(int i=pos+len-1,j=array.length-1;i>=pos;i--,j--)            array[j]=(char)data[i];        return new String(array);    }    /** 将指定的字符串转换为ISO-8859-1格式的字节数据 */    public static byte[] writeISO8859_1(String str)    {        return writeISO8859_1(str,0,str.length());    }    /** 将指定的字符串转换为ISO-8859-1格式的字节数据 */    public static byte[] writeISO8859_1(String str,int index,int len)    {        byte[] data=http://www.mamicode.com/new byte[len];        writeISO8859_1(str,index,len,data,0);        return data;    }    /** 将指定的字符串转换为ISO-8859-1格式的字节数据 */    public static void writeISO8859_1(String str,int index,int len,        byte[] data,int pos)    {        int c;        for(int i=index+len-1,j=pos+len-1;i>=index;i--,j--)        {            c=str.charAt(i);            data[j]=(c>256)?63:(byte)c;        }    }    /** 将指定的字符数组转换为ISO-8859-1格式的字节数据 */    public static void writeISO8859_1(char[] chars,int index,int len,        byte[] data,int pos)    {        int c;        for(int i=index+len-1,j=pos+len-1;i>=index;i--,j--)        {            c=chars[i];            data[j]=(c>256)?63:(byte)c;        }    }    /** 将指定的UTF8格式的字节数据转换为字符串,返回null表示失败 */    public static String readUTF(byte[] data)    {        StringBuffer sb=new StringBuffer(data.length);        int pos=readUTF(data,0,data.length,sb);        return (pos==0)?sb.toString():null;    }    /**     * 将指定的UTF8格式的字节数据转换为字符串, 返回0表示成功,否则表示失败位置     */    public static int readUTF(byte[] data,StringBuffer sb)    {        return readUTF(data,0,data.length,sb);    }    /**     * 将指定的UTF8格式的字节数据转换为字符串, 返回0表示成功,否则表示失败位置     */    public static int readUTF(byte[] data,int pos,int len,StringBuffer sb)    {        int i,c,cc,ccc;        int end=pos+len;        while(pos<end)        {            c=data[pos]&0xff;            i=c>>4;            if(i<8)            {                // 0xxx xxxx                pos++;                sb.append((char)c);            }            else if(i==12||i==13)            {                // 110x xxxx 10xx xxxx                pos+=2;                if(pos>end) return pos;                cc=data[pos-1];                if((cc&0xC0)!=0x80) return pos;                sb.append((char)(((c&0x1f)<<6)|(cc&0x3f)));            }            else if(i==14)            {                // 1110 xxxx 10xx xxxx 10xx                // xxxx                pos+=3;                if(pos>end) return pos;                cc=data[pos-2];                ccc=data[pos-1];                if(((cc&0xC0)!=0x80)||((ccc&0xC0)!=0x80)) return pos;                sb.append((char)(((c&0x0f)<<12)|((cc&0x3f)<<6)|(ccc&0x3f)));            }            else                // 10xx xxxx 1111 xxxx                return pos;        }        return 0;    }    /** 获得指定的字符串转换为UTF8格式的字节数据的长度 */    public static int getUTFLength(String str,int index,int len)    {        int utfLen=0;        int c;        for(int i=index;i<len;i++)        {            c=str.charAt(i);            if((c>=0x0001)&&(c<=0x007f))                utfLen++;            else if(c>0x07ff)                utfLen+=3;            else                utfLen+=2;        }        return utfLen;    }    /** 在字节数组中指定位置写入一个短整型数值 */    public static void writeShort(byte[] bytes,int pos,int s)    {        bytes[pos]=(byte)(s>>>8);        bytes[pos+1]=(byte)s;    }    /** 在字节数组中指定位置写入一个字节 */    public static void writeByte(byte[] bytes,int pos,int b)    {        bytes[pos]=(byte)b;    }    /** 获得指定的字符数组转换为UTF8格式的字节数据的长度 */    public static int getUTFLength(char[] chars,int index,int len)    {        int utfLen=0;        int c;        for(int i=index;i<len;i++)        {            c=chars[i];            if((c>=0x0001)&&(c<=0x007f))                utfLen++;            else if(c>0x07ff)                utfLen+=3;            else                utfLen+=2;        }        return utfLen;    }    /** 将指定的字符串转换为UTF8格式的字节数据 */    public static byte[] writeUTF(String str)    {        return writeUTF(str,0,str.length());    }    /** 将指定的字符串转换为UTF8格式的字节数据 */    public static byte[] writeUTF(String str,int index,int len)    {        byte[] data=http://www.mamicode.com/new byte[getUTFLength(str,index,len)];        writeUTF(str,index,len,data,0);        return data;    }    /** 将指定的字符串转换为UTF8格式的字节数据 */    public static void writeUTF(String str,int index,int len,byte[] data,        int pos)    {        int c;        for(int i=index;i<len;i++)        {            c=str.charAt(i);            if((c>=0x0001)&&(c<=0x007f))            {                data[pos++]=(byte)c;            }            else if(c>0x07ff)            {                data[pos++]=(byte)(0xe0|((c>>12)&0x0f));                data[pos++]=(byte)(0x80|((c>>6)&0x3f));                data[pos++]=(byte)(0x80|(c&0x3f));            }            else            {                data[pos++]=(byte)(0xc0|((c>>6)&0x1f));                data[pos++]=(byte)(0x80|(c&0x3f));            }        }    }    /** 将指定的字符数组转换为UTF8格式的字节数据 */    public static void writeUTF(char[] chars,int index,int len,byte[] data,        int pos)    {        int c;        for(int i=index;i<len;i++)        {            c=chars[i];            if((c>=0x0001)&&(c<=0x007f))            {                data[pos++]=(byte)c;            }            else if(c>0x07ff)            {                data[pos++]=(byte)(0xe0|((c>>12)&0x0f));                data[pos++]=(byte)(0x80|((c>>6)&0x3f));                data[pos++]=(byte)(0x80|(c&0x3f));            }            else            {                data[pos++]=(byte)(0xc0|((c>>6)&0x1f));                data[pos++]=(byte)(0x80|(c&0x3f));            }        }    }    /* constructors */    private ByteKit()    {    }}
View Code

       

       使用方法:

for(Map.Entry<String, Map<String, String>> entry : gateConfig.entrySet()){    String host = entry.getValue().get("host");    String port = entry.getValue().get("port");        PollServer poll = new PollServer(host,Integer.parseInt(port));    Thread gateThread = new Thread(poll);    gateThread.setName(entry.getKey());    gateThread.start();}

      

       可能调整成mina库,还有其他的一个原因,就是在服务器端会无辜收到一个rst标识导致服务器断开。起初以为是代码问题,后来经过很长时间的排查和咨询,发现服务器用的是南方电信的网络,而一些北方网通的客户端在访问的时候,就会随机出现rst 连接复位现象!查询了好久,最后还是运维的大哥给的思维!当然在起初解决这个问题的时候,我还是本着代码的问题;顺便还去专门研究了Tcp/ip协议详解,翻出了大学里面学的计算机网络这本书。无论如何解决了就好!欢迎拍砖!

 

开源一个基于nio的java网络程序