首页 > 代码库 > Disruptor快速入门

Disruptor快速入门

在JDK的多线程与并发库一文中, 提到了BlockingQueue实现了生产者-消费者模型

BlockingQueue是基于锁实现的, 而锁的效率通常较低. 有没有使用CAS机制实现的生产者-消费者?

Disruptor就是这样.

disruptor使用观察者模式, 主动将消息发送给消费者, 而不是等消费者从队列中取; 在无锁的情况下, 实现queue(环形, RingBuffer)的并发操作, 性能远高于BlockingQueue

 

1.生产者-消费者

1.1使用Disruptor类

RingBuffer通过Disruptor实例获得

public class Client {

    public static void main(String[] args) throws Exception {
        
        //1.配置并获得Disruptor
        ExecutorService  executor = Executors.newCachedThreadPool(); 
        LongEventFactory factory = new LongEventFactory();
        // 设置RingBuffer大小, 需为2的N次方(能将求模运算转为位运算提高效率 ), 否则影响性能
        int ringBufferSize = 1024 * 1024; 

        //创建disruptor, 泛型参数:传递的事件的类型
        // 第一个参数: 产生Event的工厂类, Event封装生成-消费的数据
        // 第二个参数: RingBuffer的缓冲区大小
        // 第三个参数: 线程池
        // 第四个参数: SINGLE单个生产者, MULTI多个生产者
        // 第五个参数: WaitStrategy 当消费者阻塞在SequenceBarrier上, 消费者如何等待的策略. 
            //BlockingWaitStrategy 使用锁和条件变量, 效率较低, 但CPU的消耗最小, 在不同部署环境下性能表现比较一致
            //SleepingWaitStrategy 多次循环尝试不成功后, 让出CPU, 等待下次调度; 多次调度后仍不成功, 睡眠纳秒级别的时间再尝试. 平衡了延迟和CPU资源占用, 但延迟不均匀.
            //YieldingWaitStrategy 多次循环尝试不成功后, 让出CPU, 等待下次调度. 平衡了延迟和CPU资源占用, 延迟也比较均匀.
            //BusySpinWaitStrategy 自旋等待,类似自旋锁. 低延迟但同时对CPU资源的占用也多.
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
        // 注册事件消费处理器, 也即消费者. 可传入多个EventHandler ...
        disruptor.handleEventsWith(new LongEventHandler());
        // 启动
        disruptor.start();
        
        //2.将数据装入RingBuffer
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        // 创建生产者, 以下方式一使用原始api, 方式二使用新API
        //LongEventProducer producer = new LongEventProducer(ringBuffer); 
        LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
        
        ByteBuffer byteBuffer = ByteBuffer.allocate(8); // 这里只是笔者实验, 不是必须要用ByteBuffer保存long数据
        for(int i = 0; i < 100; ++i){
            byteBuffer.putLong(0, i);
            producer.produceData(byteBuffer);
        }

        disruptor.shutdown(); //关闭 disruptor  阻塞直至所有事件都得到处理
        executor.shutdown(); // 需关闭 disruptor使用的线程池, 上一步disruptor关闭时不会连带关闭线程池        
    }
}
// Event封装要传递的数据 
public class LongEvent { 
    private long value;
    public long getValue() { 
        return value; 
    } 
    public void setValue(long value) { 
        this.value =http://www.mamicode.com/ value; 
    } 
} 
// 产生Event的工厂
public class LongEventFactory implements EventFactory { 
    @Override 
    public Object newInstance() { 
        return new LongEvent(); 
    } 
} 
public class LongEventHandler implements EventHandler<LongEvent>  {
    // 消费逻辑
    @Override
    public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
        System.out.println(longEvent.getValue());         
    }
}
//生产者实现一
public class LongEventProducer {
    // 生产者持有RingBuffer的引用
    private final RingBuffer<LongEvent> ringBuffer;
    
    public LongEventProducer(RingBuffer<LongEvent> ringBuffer){
        this.ringBuffer = ringBuffer;
    }
    
    public void produceData(ByteBuffer bb){
        // 获得下一个Event槽的下标
        long sequence = ringBuffer.next();
        try {
            // 给Event填充数据
            LongEvent event = ringBuffer.get(sequence);
            event.setValue(bb.getLong(0));
        } finally {
            // 发布Event, 激活观察者去消费, 将sequence传递给该消费者
            //publish应该放在 finally块中以确保一定会被调用->如果某个事件槽被获取但未提交, 将会堵塞后续的publish动作。
            ringBuffer.publish(sequence);
        }
    }
}
//生产者实现二
public class LongEventProducerWithTranslator {

    // 使用EventTranslator, 封装 获取Event的过程
    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
        @Override
        public void translateTo(LongEvent event, long sequeue, ByteBuffer buffer) {
            event.setValue(buffer.getLong(0));
        }
    };
    
    private final RingBuffer<LongEvent> ringBuffer;
    
    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
    
    public void produceData(ByteBuffer buffer){
        // 发布
        ringBuffer.publishEvent(TRANSLATOR, buffer);
    }
}

 

1.2 直接使用RingBuffer

给出了两种方式:EventProcessor与WorkPool(可处理多消费者)

public class ClientForEventProcessor {  
   
    public static void main(String[] args) throws Exception {  
        int BUFFER_SIZE = 1024;
        int THREAD_NUMBERS = 4;
        
        // 这里直接获得RingBuffer. createSingleProducer创建一个单生产者的RingBuffer
        
        // 第一个参数为EventFactory,产生数据Trade的工厂类
        // 第二个参数是RingBuffer的大小,需为2的N次方     
        // 第三个参数是WaitStrategy, 消费者阻塞时如何等待生产者放入Event
        final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {  
            @Override  
            public Trade newInstance() {  
                return new Trade(UUID.randomUUID().toString());
            }  
        }, BUFFER_SIZE, new YieldingWaitStrategy());  
        
        //SequenceBarrier, 协调消费者与生产者, 消费者链的先后顺序. 阻塞后面的消费者(没有Event可消费时)
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
          
        //创建事件处理器 (消费者): 处理ringBuffer, 用TradeHandler的方法处理(实现EventHandler), 用sequenceBarrier协调生成-消费
        //如果存在多个消费者(老api, 可用workpool解决) 那重复执行 创建事件处理器-注册进度-提交消费者的过程, 把其中TradeHandler换成其它消费者类  
        BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(ringBuffer, sequenceBarrier, new TradeHandler());  
        //把消费者的消费进度情况注册给RingBuffer结构(生产者)    !如果只有一个消费者的情况可以省略 
        ringBuffer.addGatingSequences(transProcessor.getSequence());  
          
        //创建线程池  
        ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);  
        //把消费者提交到线程池, !说明EventProcessor实现了callable接口  
        executors.submit(transProcessor);  
        
        // 生产者, 这里新建线程不是必要的
        Future<?> future= executors.submit(new Callable<Void>() {  
            @Override  
            public Void call() throws Exception {  
                long seq;  
                for (int i = 0; i < 10; i++) {
                    seq = ringBuffer.next();
                    ringBuffer.get(seq).setPrice(Math.random() * 9999);
                    ringBuffer.publish(seq);
                } 
                return null;  
            }  
        }); 

        Thread.sleep(1000); //等上1秒,等待消费完成  
        transProcessor.halt(); //通知事件处理器  可以结束了(并不是马上结束!)  
        executors.shutdown(); 
    }  
}  
public class ClientForWorkPool {  
    public static void main(String[] args) throws InterruptedException {  
        int BUFFER_SIZE = 1024;
        int THREAD_NUMBERS = 4;
        
        RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {  
            public Trade newInstance() {  
                return new Trade(UUID.randomUUID().toString());
            }  
        }, BUFFER_SIZE);  
       
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
        
        // 第三个参数: 异常处理器, 这里用ExceptionHandler; 第四个参数WorkHandler的实现类, 可为数组(即传入多个消费者)
        WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), new TradeHandler());  
          
        ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);  
        workerPool.start(executors);  
          
        // 生产10个数据
        for (int i = 0; i < 8; i++) {
            long seq = ringBuffer.next();
            ringBuffer.get(seq).setPrice(Math.random() * 9999);
            ringBuffer.publish(seq);
        }
          
        Thread.sleep(1000);  
        workerPool.halt();  
        executors.shutdown();  
    }  
}  
// 封装交易数据
public class Trade {  
    private String id; // 订单ID  
    private String name;
    private double price; // 金额  
    
    public Trade(String id) {
        this.id = id;
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public double getPrice() {
        return price;
    }
    public void setPrice(double price) {
        this.price = price;
    }
} 
// 消费者, 这里实现一个接口就行, 写两个是为了同时测试EventProcessor和WorkPool
public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> {  
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        this.onEvent(event);  
    }  
    @Override  
    public void onEvent(Trade event) throws Exception {  
        //具体的消费逻辑  
        System.out.println(event.getId());  
    }  
}  

 

1.3 多生产者-多消费者

一个Event只能被某一个消费者处理

public static void main(String[] args) throws Exception {
        //创建RingBuffer
        RingBuffer<Order> ringBuffer = 
                RingBuffer.create(ProducerType.MULTI, 
                        new EventFactory<Order>() {  
                            @Override  
                            public Order newInstance() {  
                                return new Order();  
                            }  
                        }, 
                        1024 * 1024, new YieldingWaitStrategy());
        
        SequenceBarrier barriers = ringBuffer.newBarrier();
        
        Consumer[] consumers = new Consumer[3];
        for(int i = 0; i < consumers.length; i++){
            consumers[i] = new Consumer("ct" + i);
        }
        // 3个消费者
        WorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer, barriers, new MyExceptionHandler(), consumers);
        
        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());  
        ExecutorService executors = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        workerPool.start(executors);  
        // 10个生产者, 每个生成者生产20个数据
        for (int i = 0; i < 10; i++) {  
            final Producer p = new Producer(ringBuffer);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for(int j = 0; j < 2; j++){
                        p.produceData(UUID.randomUUID().toString());
                    }
                }
            }).start();
        } 
        
        System.out.println("----开始生产----");
        Thread.sleep(1000);  // 等待消费完成
        System.out.println("总共消费数量:" + consumers[0].getCount() );
        
        workerPool.halt(); 
        executors.shutdown();
    }
    
    static class MyExceptionHandler implements ExceptionHandler {  
        public void handleEventException(Throwable ex, long sequence, Object event) {}  
        public void handleOnStartException(Throwable ex) {}  
        public void handleOnShutdownException(Throwable ex) {}  
    } 
}
public class Consumer implements WorkHandler<Order>{
    
    private String consumerId;
    // 消费计数器
    private static AtomicInteger count = new AtomicInteger(0);
    
    public Consumer(String consumerId){
        this.consumerId = consumerId;
    }

    @Override
    public void onEvent(Order order) throws Exception {
        System.out.println("当前消费者: " + this.consumerId + ", 消费信息: " + order.getId());
        count.incrementAndGet();
    }
    
    public int getCount(){
        return count.get();
    }
}
public class Producer {

    private final RingBuffer<Order> ringBuffer;
    public Producer(RingBuffer<Order> ringBuffer){
        this.ringBuffer = ringBuffer;
    }
    public void produceData(String data){
        long sequence = ringBuffer.next();
        try {
            Order order = ringBuffer.get(sequence);
            order.setId(data);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}
public class Order {  
    private String id;
    private String name;
    private double price;
    
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public double getPrice() {
        return price;
    }
    public void setPrice(double price) {
        this.price = price;
    }
}  

 

2. 并行处理

除了实现生产者-消费者模型, Disruptor还可以进行多路并行处理(一个Event可以进入多个路径同时进行处理, 因为不同路径操作的是同一个Event, 路径可以汇合)

public class Client {  
    public static void main(String[] args) throws InterruptedException {  
       
        long beginTime=System.currentTimeMillis();  
        int bufferSize=1024;  
        ExecutorService executor=Executors.newFixedThreadPool(7);  // 注意: 线程数>=handler数+1

        Disruptor<Trade> disruptor = new Disruptor<Trade>(
                new EventFactory<Trade>() {
                    @Override
                    public Trade newInstance() {
                        return new Trade(UUID.randomUUID().toString());
                    }
                }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());
        // 菱形操作   
        /*
        // 创建消费者组(含H1,H2)   H1,H2并行执行无先后顺序
        EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2());
        // C1,C2都完成后执行C3, 像JMS传递消息
        handlerGroup.then(new Handler3());
        */
        
        // 顺序操作
        /*
        disruptor.handleEventsWith(new Handler1()).handleEventsWith(new Handler2()).handleEventsWith(new Handler3());
        */
        
        // 六边形操作. H1, H4串行执行; H2, H5串行执行; 而H1,H4 与 H2,H5 并行执行
        Handler1 h1 = new Handler1();
        Handler2 h2 = new Handler2();
        Handler3 h3 = new Handler3();
        Handler4 h4 = new Handler4();
        Handler5 h5 = new Handler5();
        disruptor.handleEventsWith(h1, h2);
        disruptor.after(h1).handleEventsWith(h4);
        disruptor.after(h2).handleEventsWith(h5);
        disruptor.after(h4, h5).handleEventsWith(h3);
        
        disruptor.start(); 
// 启动生产线程  
        executor.submit(new TradePublisher(disruptor));
        Thread.sleep(1000); // 等待消费完成
       
        disruptor.shutdown();  
        executor.shutdown();  
        System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));  
    }  
}  
public class TradePublisher implements Runnable {

    private Disruptor<Trade> disruptor;
private static final int LOOP = 100;// 模拟百次交易的发生 public TradePublisher(Disruptor<Trade> disruptor) { this.disruptor = disruptor; } @Override public void run() { TradeEventTranslator tradeTransloator = new TradeEventTranslator(); for (int i = 0; i < LOOP; i++) { disruptor.publishEvent(tradeTransloator); } } } class TradeEventTranslator implements EventTranslator<Trade> { private Random random = new Random(); @Override public void translateTo(Trade event, long sequence) { this.generateTrade(event); } private Trade generateTrade(Trade trade) { trade.setPrice(random.nextDouble() * 9999); return trade; } }
public class Handler1 implements EventHandler<Trade> {
    @Override
    public void onEvent(Trade event, long sequence, boolean endOfBatch)
            throws Exception {
        System.out.println("handler1: set name");
        event.setName("h1");
    }
}
public class Handler2 implements EventHandler<Trade> {  
    @Override  
    public void onEvent(Trade event, long sequence,  boolean endOfBatch) throws Exception {  
        System.out.println("handler2: set price");
        event.setPrice(17.0);
    }  
}  
public class Handler3 implements EventHandler<Trade> {
    @Override  
    public void onEvent(Trade event, long sequence,  boolean endOfBatch) throws Exception {  
        System.out.println("handler3: name: " + event.getName() + " , price: " + event.getPrice() + ";  instance: " + event.getId());
    }  
}
public class Handler4 implements EventHandler<Trade> {  
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        System.out.println("handler4: append name");
        event.setName(event.getName() + "h4");
    }  
}  
public class Handler5 implements EventHandler<Trade> {    
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        System.out.println("handler5: add price");
        event.setPrice(event.getPrice() + 3.0);
    }  
}  

 

Disruptor快速入门