首页 > 代码库 > disruptor的并行用法
disruptor的并行用法
实现EventFactory,在newInstance方法中返回,ringBuffer缓冲区中的对象实例;代码如下:
public class DTaskFactory implements EventFactory<DTask> { @Override public DTask newInstance() {//disruptor使用环形缓冲区,这是环形缓冲区所承载的对象 return new DTask(); } }
生产消费的对象类型:
public class DTask { public String getName1() { return name1; } public void setName1(String name1) { this.name1 = name1; } public String getName2() { return name2; } public void setName2(String name2) { this.name2 = name2; } public String getName3() { return name3; } public void setName3(String name3) { this.name3 = name3; } String name1; String name2; String name3; }
disruptor的消费处理事件onEvent为消费调用的方法(下面的代码中包含并行和串行执行的消费事件):
public class DTaskHandle implements EventHandler<DTask> { @Override public void onEvent(DTask dTask, long l, boolean b) throws Exception { System.out.println("开始最后消费"); System.out.println(dTask.getName1()); System.out.println(dTask.getName2()); System.out.println(dTask.getName3()); System.out.println("结束最后消费"); } } public class DTaskHandle1 implements EventHandler<DTask> { @Override public void onEvent(DTask dTask, long l, boolean b) throws Exception { System.out.println("-----DTaskHandle1-----"); dTask.setName1("name1"); } } public class DTaskHandle2 implements EventHandler<DTask> { @Override public void onEvent(DTask dTask, long l, boolean b) throws Exception { System.out.println("-----DTaskHandle2-----"); dTask.setName2("name2"); } } public class DTaskHandle3 implements EventHandler<DTask> { @Override public void onEvent(DTask dTask, long l, boolean b) throws Exception { System.out.println("-----DTaskHandle3-----"); dTask.setName3("name3"); } }
测试执行类:
public class DisruptorTest { public void exec() throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); Disruptor<DTask> disruptor = new Disruptor(new DTaskFactory(), 1024 * 1024, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); DTaskHandle dTaskHandle = new DTaskHandle(); DTaskHandle1 dTaskHandle1 = new DTaskHandle1(); DTaskHandle2 dTaskHandle2 = new DTaskHandle2(); DTaskHandle3 dTaskHandle3 = new DTaskHandle3(); disruptor.handleEventsWith(dTaskHandle1, dTaskHandle2, dTaskHandle3);//消费生产出的对象,并行执行 disruptor.after(dTaskHandle1, dTaskHandle2, dTaskHandle3).handleEventsWith(dTaskHandle);//并行执行1 2 3后,串行执行dTaskHandle // disruptor. disruptor.start(); CountDownLatch latch = new CountDownLatch(1); //生产者准备 executor.submit(new TradePublisher(latch, disruptor)); latch.await();//等待生产者完事. disruptor.shutdown(); executor.shutdown(); } }
disruptor的并行用法
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。