首页 > 代码库 > Disruptor之粗糙认识

Disruptor之粗糙认识

 

一 概述

1.Disruptor

Disruptor是一个高性能的异步处理框架,一个“生产者-消费者”模型。

2.RingBuffer

RingBuffer是一种环形数据结构,包含一个指向下一个槽点的序号,可以在线程间传递数据。

3.Event

在Disruptor框架中,生产者生产的数据叫做Event。

二 Disruptor框架基本构成

1.MyEvent:自定义对象,充当“生产者-消费者”模型中的数据。
2.MyEventFactory:实现EventFactory的接口,用于生产数据。
3.MyEventProducerWithTranslator:将数据存储到自定义对象中并发布。
4.MyEventHandler:自定义消费者。

三 Demo

初次接触Disruptor,认识停留在表面,零散,模糊,在此记一个简单的示例,以便日后深入研究。

1.自定义数据类 

package com.disruptor.basic;

public class LongEvent {
    private long value;

    public long getValue() {
        return value;
    }

    public void setValue(long value) {
        this.value =http://www.mamicode.com/ value;
    }

}

2.数据生产工厂(创建数据类对象)

package com.disruptor.basic;

import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory<LongEvent> {

    public LongEvent newInstance() {
        // TODO Auto-generated method stub
        return new LongEvent();
    }

}

3.数据源(初始化数据对象并发布)

package com.disruptor.basic;

import java.nio.ByteBuffer;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

public class LongEventProducerWithTranslator {

    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    private final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
        /**
         * event:包含有消费数据的对象; sequence:分配给目标对象的RingBuffer空间序号;
         * bb:包含有将要被存储到目标对象中的数据的容器
         */
        public void translateTo(LongEvent event, long sequence, ByteBuffer bb) {
            // TODO Auto-generated method stub
            event.setValue(bb.getLong(0));// 将数据存储到目标对象中
        }
    };

    public void onData(ByteBuffer bb) {
        ringBuffer.publishEvent(TRANSLATOR, bb);// 发布,将数据推送给消费者
    }

}

4.消费者

package com.disruptor.basic;

import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler<LongEvent> {

    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
        // TODO Auto-generated method stub
        System.out.println("当前消费的数据="+event.getValue());
    }

}

5.测试类

package com.disruptor.basic;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.junit.Test;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class LongEventTest {

    @SuppressWarnings({ "unchecked", "deprecation" })
    @Test
    public void test01() throws InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();
        EventFactory<LongEvent> factory = new LongEventFactory();
        int bufferSize = 1024;
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor, ProducerType.SINGLE,
                new YieldingWaitStrategy());
        disruptor.handleEventsWith(new LongEventHandler());
        disruptor.start();

        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        // LongEventProducer producer = new
        // LongEventProducer(ringBuffer);
        LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
        ByteBuffer bb = ByteBuffer.allocate(8);
        // long startTime = System.currentTimeMillis();
        for (long a = 0; a < 100; a++) {
            bb.putLong(0, a);
            producer.onData(bb);
            /*if (a == 99) {
                long endTime = System.currentTimeMillis();
                System.out.println("useTime=" + (endTime - startTime));
            }*/
            Thread.sleep(100);
        }
        /*long endTime = System.currentTimeMillis();
        System.out.println("useTime=" + (endTime - startTime));*/
        disruptor.shutdown();
        executor.shutdown();
    }

    /*@Test
    public void test02() {
        long startTime = System.currentTimeMillis();
        for (long a = 0; a < 100; a++) {
            System.out.println(a);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("useTime=" + (endTime - startTime));
    }*/

}

 

Disruptor之粗糙认识