首页 > 代码库 > 基于netty的异步http请求

基于netty的异步http请求

 

 

技术分享
package com.pt.utils;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.http.*;import io.netty.util.concurrent.DefaultEventExecutorGroup;import io.netty.util.concurrent.EventExecutorGroup;import java.net.URI;import java.util.Map;/** * @author panteng * @description * @date 17-3-20. */public class NonBlockHttpClient {    public static EventLoopGroup workerGroup = new NioEventLoopGroup(1);    public static Bootstrap b = new Bootstrap();    public static final EventExecutorGroup executor = new DefaultEventExecutorGroup(2);    static {        b.group(workerGroup);        b.channel(NioSocketChannel.class);        b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);    }    public static Object lock = new Object();    /**     * 异步GET请求     *      * @param url     * @param head     * @param handler     * @return     */    public static Boolean get(String url, Map<String, String> head, final HttpHandler handler) {        try {            URI uri = new URI(url);            String domain = uri.getHost();            Integer port = uri.getPort() < 0 ? 80 : uri.getPort();            DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString());            if (head == null) {                request.headers().add("Host", domain);                request.headers().add("User-Agent", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:44.0) Gecko/20100101 Firefox/44.0");                request.headers().add("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8");                request.headers().add("Accept-Language", "zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3");                request.headers().add("Connection", "keep-alive");                request.headers().add("Cache-Control", "max-age=0");            } else {                for (Map.Entry entry : head.entrySet()) {                    request.headers().add((String) entry.getKey(), entry.getValue());                }            }            ChannelInitializer channelInitializer = new ChannelInitializer<SocketChannel>() {                @Override                protected void initChannel(SocketChannel socketChannel) throws Exception {                    // 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码                    socketChannel.pipeline().addLast(new HttpResponseDecoder());                    // 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码                    socketChannel.pipeline().addLast(new HttpRequestEncoder());                    socketChannel.pipeline().addLast(executor, new GeneralHandler(handler));                }            };            ChannelFuture f;            synchronized (lock) {                b.handler(channelInitializer);                f = b.connect(domain, port).sync();            }            f.channel().writeAndFlush(request);        } catch (Exception e) {            e.printStackTrace();        }        return false;    }    public static void close() {        try {            executor.shutdownGracefully();            workerGroup.shutdownGracefully();        } catch (Exception e) {            e.printStackTrace();        }    }}
核心类1

 

技术分享
package com.pt.utils;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.handler.codec.http.HttpContent;import io.netty.handler.codec.http.HttpResponse;import java.util.HashMap;import java.util.Map;/** * @author panteng * @description * @date 17-3-20. */public class GeneralHandler extends ChannelInboundHandlerAdapter {    com.pt.utils.HttpHandler httpHandler;    Integer respLength = Integer.MAX_VALUE; // 响应报文长度    Map<String, String> head = new HashMap<String, String>();    String respContent = "";    public GeneralHandler(com.pt.utils.HttpHandler handler) {        this.httpHandler = handler;    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        if (msg instanceof HttpResponse) {            HttpResponse response = (HttpResponse) msg;            for (Map.Entry entry : response.headers().entries()) {                head.put((String) entry.getKey(), (String) entry.getValue());            }            if (response.headers().get("Content-Length") != null) {                respLength = Integer.parseInt(response.headers().get("Content-Length"));            }        }        if (msg instanceof HttpContent) {            HttpContent content = (HttpContent) msg;            ByteBuf buf = content.content();            respContent += buf.toString(httpHandler.getCharset());            ((HttpContent) msg).release();            if (respContent.getBytes().length >= respLength || !buf.isReadable()) {                ctx.channel().close();                httpHandler.handler(head, respContent);            }        }    }}
核心类2

 

技术分享
package com.pt.utils;import java.nio.charset.Charset;import java.util.Map;/** * @author panteng * @description http响应的异步回调 * @date 17-3-20. */public interface HttpHandler {    public void handler(Map<String, String> headMap, String body);    public Charset getCharset();}
用户自定义处理接口

 

使用用例:

技术分享
package com.pt.utils.test;import com.pt.utils.HttpHandler;import java.nio.charset.Charset;import java.util.Map;/** * @author panteng * @description * @date 17-3-20. */public class MyHandler implements HttpHandler {    boolean isFinish = false;    String id;    public MyHandler(String id) {        this.id = id;    }    public void handler(Map<String, String> headMap, String body) {        try {            Thread.sleep(3000);        } catch (Exception e) {            e.printStackTrace();        }        System.out.println(id + "自己处理:" + body);        this.setIsFinish(true);    }    public Charset getCharset() {        return Charset.forName("UTF-8");    }    public boolean isFinish() {        return isFinish;    }    public void setIsFinish(boolean isFinish) {        this.isFinish = isFinish;    }    public String getId() {        return id;    }    public void setId(String id) {        this.id = id;    }}
用户定义处理的实现

 

package com.pt.utils.test;import com.pt.utils.NonBlockHttpClient;/** * @author panteng * @description * @date 17-3-22. */public class NonBlockHttpClientTest {    public static void main(String[] arges) {        MyHandler myHandler = new MyHandler("A");        MyHandler myHandler1 = new MyHandler("B");        MyHandler myHandler2 = new MyHandler("C");        MyHandler myHandler3 = new MyHandler("D");        NonBlockHttpClient                .get(url1,                        null, myHandler);        NonBlockHttpClient                .get(url2,                        null, myHandler1);        NonBlockHttpClient                .get(url3,                        null, myHandler2);        NonBlockHttpClient                .get(url4,                        null, myHandler3);        System.out.println("做别的事情");        try {            Thread.sleep(2000);        } catch (Exception e) {            e.printStackTrace();        }        while (!(myHandler.isFinish() && myHandler1.isFinish() && myHandler2.isFinish() && myHandler3.isFinish())) {            try {                Thread.sleep(10);            } catch (Exception e) {                e.printStackTrace();            }        }        NonBlockHttpClient.close();        System.out.println("退出主函数... ...");    }}

 

基于netty的异步http请求