首页 > 代码库 > Netty实现时间服务示例

Netty实现时间服务示例


相关知识点:
[1] ChannelGroup是一个容纳打开的通道实例的线程安全的集合,方便我们统一施加操作。所以在使用的过程中可以将一些相关的Channel归类为一个有意义的集合,关闭的通道会自动从集合中移除,而且一个Channel可以属于多个ChannelGroup。常见的应用场景是 向一组通道广播消息;简化一组通道的关闭流程。

[2] 因为在Channel中流通的是ChannelBuffer (可以联系NIO ByteBuffer进行理解),而在内部流水线中处理的可能是一个个对象,所以在收到或要发送到对端的时候就需要一个解码,编码的过程:在实际有意义的对象和字节数组间转换。

[3] 在客户端和服务器端流水线中handler执行顺序是不同的,在客户端中执行顺序和ChannelPipeline的增加顺序一致;而服务器端则恰好是相反的,这一点要注意,所以看到编解码类的位置是那样的。

具体代码:
TimeClient.java
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;

public class TimeClient {
	public static void main(String[] args) {
		String host = "localhost";
		int port = 8080;
		
		if(args.length == 2){
			host = args[0];
			port = Integer.parseInt(args[1]);
		}
		
		ChannelFactory factory = new NioClientSocketChannelFactory(
				Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
		
		ClientBootstrap bootstrap = new ClientBootstrap(factory);
		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
			@Override
			public ChannelPipeline getPipeline() throws Exception {
				return Channels.pipeline(new TimeDecoder2(),
										 new TimeClientHandler4());
			}
		});
		
		ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));  //
		future.awaitUninterruptibly();
		if(!future.isSuccess()){
			future.getCause().printStackTrace();
		}
		future.getChannel().getCloseFuture().awaitUninterruptibly();
		factory.releaseExternalResources();
	}

}

TimeDecoder2.java
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;

public class TimeDecoder2 extends FrameDecoder{

	@Override
	protected Object decode(ChannelHandlerContext ctx, Channel channel,
			ChannelBuffer buffer) throws Exception {
		if(buffer.readableBytes() < 4)
			return null;
		return new UnixTime(buffer.readInt());
	} 
}

TimeClientHandler4.java
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

public class TimeClientHandler4 extends SimpleChannelHandler{
	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
			throws Exception {
		//the decoder put UnixTime obj,so get just what we need
		UnixTime cur = (UnixTime)e.getMessage();
		System.out.println(cur);
		e.getChannel().close();
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
			throws Exception {
		e.getCause().printStackTrace();
		Channel c = e.getChannel();
		c.close();
	}
}

TimeServer.java
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

public class TimeServer {
	//[1]
	public static ChannelGroup allChannels = new DefaultChannelGroup("time-server");
	
	public static void main(String[] args) {
		ChannelFactory factory = new NioServerSocketChannelFactory(
				Executors.newCachedThreadPool(),
				Executors.newCachedThreadPool());
		
		ServerBootstrap bootstrap = new ServerBootstrap(factory);
		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
			@Override
			public ChannelPipeline getPipeline() throws Exception {
				return Channels.pipeline(new TimeServerHandler2(),
										 new TimeEncoder());
			}
		});
		
		bootstrap.setOption("reuseAddr", true);
		bootstrap.setOption("child.tcpNoDelay", true);
		bootstrap.setOption("child.keepAlive", true);
		
		Channel channel = bootstrap.bind(new InetSocketAddress(8080));
		
		allChannels.add(channel);
		//waitForShutdownCommand();  this is a imaginary logic:for instance 
		//when there is accepted connection we close this server ;
		if(allChannels.size() >=2){
			ChannelGroupFuture f = allChannels.close();
			f.awaitUninterruptibly();
			factory.releaseExternalResources();
		}
	}
}

TimeServerHandler2.java
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

public class TimeServerHandler2 extends SimpleChannelHandler{
	@Override
	public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
			throws Exception {
		TimeServer.allChannels.add(e.getChannel());
	}
	@Override
	public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
			throws Exception {
		UnixTime time = new UnixTime((int) (System.currentTimeMillis() / 1000));
		ChannelFuture f = e.getChannel().write(time);
		f.addListener(ChannelFutureListener.CLOSE);
	}
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
			throws Exception {
		e.getCause().printStackTrace();
		Channel c = e.getChannel();
		c.close();
	}
}

TimeEncoder.java
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

public class TimeEncoder extends SimpleChannelHandler{
	@Override
	public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
			throws Exception {
		UnixTime time = (UnixTime)e.getMessage();
		ChannelBuffer buf = ChannelBuffers.buffer(4);
		buf.writeInt(time.getValue());
		Channels.write(ctx, e.getFuture(), buf);
	}
}

UnixTime.java
import java.util.Date;

public class UnixTime {
	private int value;
	public UnixTime(int value){
		this.value = http://www.mamicode.com/value;>