首页 > 代码库 > Disruptor3.0的实现细节
Disruptor3.0的实现细节
本文旨在介绍Disruptor3.0的实现细节,首先从整体上描述了Disruptor3.0的核心类图,Disruptor3.0 DSL(领域专用语言)的实现类图,并以Disruptor官方列举的几大特性作为行文思路,看看Disruptor3.0是如何实现这些特性的:内存预加载、消除‘伪共享’、序号栅栏和序号配合使用来消除锁和CAS、批处理效应的具体实现等。
核心类图
- RingBuffer——Disruptor底层数据结构实现,核心类,是线程间交换数据的中转地;
- Sequencer——序号管理器,负责消费者/生产者各自序号、序号栅栏的管理和协调;
- Sequence——序号,声明一个序号,用于跟踪ringbuffer中任务的变化和消费者的消费情况;
- SequenceBarrier——序号栅栏,管理和协调生产者的游标序号和各个消费者的序号,确保生产者不会覆盖消费者未来得及处理的消息,确保存在依赖的消费者之间能够按照正确的顺序处理;
- EventProcessor——事件处理器,监听RingBuffer的事件,并消费可用事件,从RingBuffer读取的事件会交由实际的生产者实现类来消费;它会一直侦听下一个可用的序号,直到该序号对应的事件已经准备好。
- EventHandler——业务处理器,是实际消费者的接口,完成具体的业务逻辑实现,第三方实现该接口;代表着消费者。
- Producer——生产者接口,第三方线程充当该角色,producer向RingBuffer写入事件。
DSL类图
以下是Disruptor3.0 DSL(domain specific language 特定领域语言)的类图,可以大致知道第三方如何继承Disruptor3.0实现具体业务逻辑。
- Disruptor——对外暴露的门面类,提供start(),stop(),消费者事件注册,生产者事件发布等api;
- RingBuffer——对生产者提供下一序号获取、entry元素获取、entry数据更改等api;
- EventHandler——消费者的接口定义,提供onEvent()方法,负责具体业务逻辑实现;
- EventHandlerGroup——业务处理器分组,管理多个业务处理器的依赖关系,提供then()、before()、after()等api。
以下给出代码demo阐述第三方如何简单继承Disruptor3.0:
public static void main(String[] args) throws Exception { // The ThreadFactory for create producer thread. ThreadFactory producerFactory = new ProducerFactory(); // The factory for the event LongEventFactory eventFactory = new LongEventFactory(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 8; // Construct the Disruptor,创建Disruptor组件 Disruptor<LongEvent> disruptor = new Disruptor<>( eventFactory, bufferSize, producerFactory, ProducerType.SINGLE, new BlockingWaitStrategy() ); // Connect the handler,绑定消费者事件,可以是多个 disruptor.handleEventsWith(new LongEventHandler()); disruptor.handleEventsWith(new LogEventHandler()); // Start the Disruptor, starts all threads running,启动Disruptor,启动所有线程,主要是消费者对应的EventProcessor侦听线程,消费者事件处理器开始侦听RingBuffer中的消息 disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer producer = new LongEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l);
//生产者向RingBuffer中写入消息 producer.onData(bb); Thread.sleep(10); } }
关键时序图
下图展示了Disruptor3.0整个运行过程的时序图,包括:始化、启动、处理过程。
内存预分配
RingBuffer使用数组Object[] entries作为存储元素,如下图所示,初始化RingBuffer时,会将所有的entries的每个元素指定为特定的Event,这时候event中的detail属性是null;后面生产者向RingBuffer中写入消息时,RingBuffer不是直接将enties[7]指向其他的event对象,而是先获取event对象,然后更改event对象的detail属性;消费者在消费时,也是从RingBuffer中读取出event,然后取出其detail属性。可以看出,生产/消费过程中,RingBuffer的entities[7]元素并未发生任何变化,未产生临时对象,entities及其元素对象一直存活,知道RingBuffer消亡。故而可以最小化GC的频率,提升性能。
注:图中对象Entry写错,应当为Event。
以下是RingBuffer.java类中初始化enties数组的源码:
private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); //使用工厂方法初始化enties元素 } }
消费者写入数据到entry中:
//消费者实现EventHandler接口
public class LongEventHandler implements EventHandler<LongEvent> {
//event为从RingBuffer entry中读取的事件内容,消费者从event中读取数据,并完成业务逻辑处理 public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println(Thread.currentThread().getName() + " say : process LONG Event: " + event); }}
生产者从entry中读取数据:
public class LongEventProducer{
//生产者持有RingBuffer实例,可以直接向RingBuffer实例中的entry写入数据 private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer bb) { long sequence = ringBuffer.next(); // Grab the next sequence try {
//从ringBuffer实例中获取entry LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor // for the sequence
//生产者将数据写入entry event.set(bb.getLong(0)); // Fill with data } finally {
//生产者向ringBuffer提交数据变更 ringBuffer.publish(sequence); } }}
可以看出:生产者未更改ringBuffer实例中entry对象,只是更改了entry中的数据,避免了过多创建临时entry对象带来的GC,进而降低了性能损耗。
消除‘伪共享’
如果两个不同的并发变量位于同一个缓存行,则在并发情况下,会互相影响到彼此的缓存有效性,进而影响到性能,这叫着‘伪共享’。为了避开‘伪共享’,Disruptor3.0在Sequence.java中使用多个long变量填充,从而确保一个序号独占一个缓存行。关于缓存行和‘伪共享’请参考:伪共享(False Sharing)。
具体实现代码如下:
//在序号实际value变量(long型)左边填充7个long变量
class LhsPadding{ protected long p1, p2, p3, p4, p5, p6, p7;}class Value extends LhsPadding{ protected volatile long value;}
//在序号实际value变量(long型)右边填充7个long变量
class RhsPadding extends Value {
protected long p9, p10, p11, p12, p13, p14, p15;
}
public class Sequence extends RhsPadding {
static final long INITIAL_VALUE = http://www.mamicode.com/-1L;
public Sequence() {
this(INITIAL_VALUE);
}
......
}
Sequence实际value变量的左右均被填充了7个long型变量,其自身也是long型变量,一个long型变量占据8个字节,所以序号与他上一个/下一个序号之间的最小内存分布距离为:7*8=56byte,加上自身的8个byte,可以确保序号变量独占长度为64byte(通常的一个缓存行长度)缓存行。
序号栅栏和序号配合使用来消除锁和CAS
Disruptor3.0中,序号栅栏(SequenceBarrier)和序号(Sequence)搭配使用,协调和管理消费者与生产者的工作节奏,避免了锁和CAS的使用。在Disruptor3.0中,各个消费者和生产者持有自己的序号,这些序号的变化必须满足如下基本条件:
- 消费者序号数值必须小于生产者序号数值;
- 消费者序号数值必须小于其前置(依赖关系)消费者的序号数值;
- 生产者序号数值不能大于消费者中最小的序号数值,以避免生产者速度过快,将还未来得及消费的消息覆盖。
上述前两点是在SequenceBarrier的waitFor()方法中完成的,源码如下:
public long waitFor(final long sequence) //sequence参数是该消费者期望获取的下一个序号值 throws AlertException, InterruptedException, TimeoutException { checkAlert(); //根据配置的waitStrategy策略,等待期望的下一序号值变得可用
//这里并不保证返回值availableSequence一定等于 given sequence,他们的大小关系取决于采用的WaitStrategy。
//eg. 1、YieldingWaitStrategy在自旋100次尝试后,会直接返回dependentSequence的最小seq,这时并不保证返回值>=given sequence
// 2、BlockingWaitStrategy则会阻塞等待given sequence可用为止,可用并不是说availableSequence == given sequence,而应当是指 >=
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
//如果当前可用的序号小于期望获取的下一个序号,则返回availableSequence,这将导致调用者EventProcessor继续wait if (availableSequence < sequence) { return availableSequence; } //这一句是‘批处理’的精妙所在,放在后面讲 return sequencer.getHighestPublishedSequence(sequence, availableSequence); }
上面第三点是针对生产者建立的Barrier,逻辑判定发生在生产者从ringBuffer获取下一个可用的entry时,RingBuffer会将获取下一个可用的entry委托给Sequencer。我们以最简单的单生产者SingleProducerSequencer的next()实现来说明。SingleProducerSequencer.next()的源码如下:
public long next(int n) { if (n < 1) //n表示此次生产者期望获取多少个序号,通常是1 { throw new IllegalArgumentException("n must be > 0"); } long nextValue = http://www.mamicode.com/this.nextValue; long nextSequence = nextValue + n; //生产者当前序号值+期望获取的序号数量后达到的序号值 long wrapPoint = nextSequence - bufferSize; //减掉RingBuffer的总的buffer值,用于判断是否出现‘覆盖’ long cachedGatingSequence = this.cachedValue; //从后面代码分析可得:cachedValue就是缓存的消费者中最小序号值,他不是当前最新的‘消费者中最小序号值’,而是上次程序进入到下面的if判定代码段是,被赋值的当时的‘消费者中最小序号值’ //这样做的好处在于:在判定是否出现覆盖的时候,不用每次都调用getMininumSequence计算‘消费者中的最小序号值’,从而节约开销。只要确保当生产者的节奏大于了缓存的cachedGateingSequence一个bufferSize时,从新获取一下 getMinimumSequence()即可。//(wrapPoint > cachedGatingSequence) : 当生产者已经超过上一次缓存的‘消费者中最小序号值’(cachedGatingSequence)一个‘Ring’大小(bufferSize),需要重新获取cachedGatingSequence,避免当生产者一直在生产,但是消费者不再消费的情况下,出现‘覆盖’ //(cachedGatingSequence > nextValue) : 生产者和消费者均为顺序递增的,且生产者的seq“先于”消费者的seq,注意是‘先于’而不是‘大于’。当nextValue>Long.MAXVALUE时,nextValue+1就会变成负数,wrapPoint也会变成负数,这时候必然会是:cachedGatingSequence > nextValue // 这个变化的过程会持续bufferSize个序号,这个区间,由于getMinimumSequence()得到的虽然是名义上的‘消费者中最小序号值’,但是不代表是走在‘最后面’的消费者 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { cursor.setVolatile(nextValue); // StoreLoad fence long minSequence; //生产者停下来,等待消费者消费,知道‘覆盖’现象清除。 while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { waitStrategy.signalAllWhenBlocking(); LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? } this.cachedValue =http://www.mamicode.com/ minSequence; } this.nextValue =http://www.mamicode.com/ nextSequence; return nextSequence; }
批处理效应
当生产者节奏快于消费者,消费者可以通过‘批处理效应’快速追赶,即:消费者可以一次性从RingBuffer中获取多个已经准备好的enties,从而提高效率。代码实现如下:
SequenceBarrier的waitFor()方法:
public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException { checkAlert(); long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); if (availableSequence < sequence) { return availableSequence; } //获取消费者可以消费的最大的可用序号,支持批处理效应,提升处理效率。 //当availableSequence > sequence时,需要遍历 sequence --> availableSequence,找到最前一个准备就绪,可以被消费的event对应的seq。 //最小值为:sequence-1 return sequencer.getHighestPublishedSequence(sequence, availableSequence); }
源代码
LMAX-Exchange源码github地址:https://github.com/LMAX-Exchange/disruptor
带中文注释的源码github地址:https://github.com/daoqidelv/disruptor
Disruptor3.0的实现细节