博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
disruptor架构三 使用场景更加复杂的场景
阅读量:5768 次
发布时间:2019-06-18

本文共 14831 字,大约阅读时间需要 49 分钟。

先c1和c2并行消费生产者产生的数据,然后c3再消费该数据

 我们来使用代码实现:我们可以使用Disruptor实例来实现,也可以不用产生Disruptor实例,直接调用RingBuffer的api来实现,不清楚看上一节使用的api函数workpool和BatchEventProcess来辅助实现消费者

上面需要使用的场景很复杂,只能使用Disruptor实例来实现线程通信,简单场景就直接使用RingBuffer就可以了

我们来看下程序的代码:

案例一:

package bhz.generate2;import java.util.UUID;import bhz.generate1.Trade;import com.lmax.disruptor.EventHandler;import com.lmax.disruptor.WorkHandler;public class Handler1 implements EventHandler
,WorkHandler
{ @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } @Override public void onEvent(Trade event) throws Exception { System.out.println("handler1: set name"); event.setName("h1"); Thread.sleep(1000); } }
package bhz.generate2;import bhz.generate1.Trade;import com.lmax.disruptor.EventHandler;public class Handler2 implements EventHandler
{ @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { System.out.println("handler2: set price"); event.setPrice(17.0); Thread.sleep(1000); } }
package bhz.generate2;import bhz.generate1.Trade;import com.lmax.disruptor.EventHandler;public class Handler3 implements EventHandler
{ @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { System.out.println("handler3: name: " + event.getName() + " , price: " + event.getPrice() + "; instance: " + event.toString()); } }

 

我们来看下主函数

package bhz.generate2;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import bhz.generate1.Trade;import bhz.generate1.TradeHandler;import com.lmax.disruptor.BusySpinWaitStrategy;import com.lmax.disruptor.EventFactory;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.EventHandlerGroup;import com.lmax.disruptor.dsl.ProducerType;public class Main {      public static void main(String[] args) throws InterruptedException {                 long beginTime=System.currentTimeMillis();          int bufferSize=1024;          ExecutorService executor=Executors.newFixedThreadPool(8);          Disruptor
disruptor = new Disruptor
(new EventFactory
() { @Override public Trade newInstance() { return new Trade(); } }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); //菱形操作 //使用disruptor创建消费者组C1,C2 EventHandlerGroup
handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2()); //声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3 handlerGroup.then(new Handler3()); //顺序操作 /** disruptor.handleEventsWith(new Handler1()). handleEventsWith(new Handler2()). handleEventsWith(new Handler3()); */ //六边形操作. /** Handler1 h1 = new Handler1(); Handler2 h2 = new Handler2(); Handler3 h3 = new Handler3(); Handler4 h4 = new Handler4(); Handler5 h5 = new Handler5(); disruptor.handleEventsWith(h1, h2); disruptor.after(h1).handleEventsWith(h4); disruptor.after(h2).handleEventsWith(h5); disruptor.after(h4, h5).handleEventsWith(h3); */ disruptor.start();//启动 CountDownLatch latch=new CountDownLatch(1); //生产者准备 executor.submit(new TradePublisher(latch, disruptor)); latch.await();//等待生产者完事. disruptor.shutdown(); executor.shutdown(); System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime)); } } 生产者的代码:
package bhz.generate2;import java.util.Random;import java.util.concurrent.CountDownLatch;import bhz.generate1.Trade;import com.lmax.disruptor.EventTranslator;import com.lmax.disruptor.dsl.Disruptor;public class TradePublisher implements Runnable {          Disruptor
disruptor; private CountDownLatch latch; private static int LOOP=10;//模拟百万次交易的发生 public TradePublisher(CountDownLatch latch,Disruptor
disruptor) { this.disruptor=disruptor; this.latch=latch; } @Override public void run() { TradeEventTranslator tradeTransloator = new TradeEventTranslator(); for(int i=0;i
{ private Random random=new Random(); @Override public void translateTo(Trade event, long sequence) { this.generateTrade(event); } private Trade generateTrade(Trade trade){ trade.setPrice(random.nextDouble()*9999); return trade; } }
 

hanlde1设置name,handle2设置价格,hanle3将名字和价格打印出来

我们来看下程序运行的代码:  

handler2: set price

handler1: set name
handler2: set price
handler1: set name
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@2aa2f9e6
handler2: set price
handler1: set name
handler2: set price
handler1: set name
handler1: set name
handler2: set price
handler1: set name
handler2: set price
handler2: set price
handler1: set name
handler2: set price
handler1: set name
handler1: set name
handler2: set price
handler1: set name
handler2: set price
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@7d6c848f
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@5f73089d
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@793aac5f
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@7b0acf26
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@2a606e6
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@620ee765
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@4079ca2e
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@7bc8b313
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@3564e3e2
总耗时:10027

上面对应一个生产者,三个消费者,一个生产者提交了10个任务,上面每一个hashcode都是不同的

上面这种模式是一个生产者,多个消费者的情况

 场景2:

 

 

 

 我们来看下程序的代码:

package bhz.generate2;import java.util.UUID;import bhz.generate1.Trade;import com.lmax.disruptor.EventHandler;import com.lmax.disruptor.WorkHandler;public class Handler4 implements EventHandler
,WorkHandler
{ @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } @Override public void onEvent(Trade event) throws Exception { System.out.println("handler4: get name : " + event.getName()); event.setName(event.getName() + "h4"); } }

 

package bhz.generate2;import java.util.UUID;import bhz.generate1.Trade;import com.lmax.disruptor.EventHandler;import com.lmax.disruptor.WorkHandler;public class Handler5 implements EventHandler
,WorkHandler
{ @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } @Override public void onEvent(Trade event) throws Exception { System.out.println("handler5: get price : " + event.getPrice()); event.setPrice(event.getPrice() + 3.0); } }

 

主函数:

package bhz.generate2;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import bhz.generate1.Trade;import bhz.generate1.TradeHandler;import com.lmax.disruptor.BusySpinWaitStrategy;import com.lmax.disruptor.EventFactory;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.EventHandlerGroup;import com.lmax.disruptor.dsl.ProducerType;public class Main {      public static void main(String[] args) throws InterruptedException {                 long beginTime=System.currentTimeMillis();          int bufferSize=1024;          ExecutorService executor=Executors.newFixedThreadPool(8);          Disruptor
disruptor = new Disruptor
(new EventFactory
() { @Override public Trade newInstance() { return new Trade(); } }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); /* //菱形操作 //使用disruptor创建消费者组C1,C2 EventHandlerGroup
handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2()); //声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3 handlerGroup.then(new Handler3());*/ //顺序操作 /** disruptor.handleEventsWith(new Handler1()). handleEventsWith(new Handler2()). handleEventsWith(new Handler3()); */ //六边形操作. Handler1 h1 = new Handler1(); Handler2 h2 = new Handler2(); Handler3 h3 = new Handler3(); Handler4 h4 = new Handler4(); Handler5 h5 = new Handler5(); disruptor.handleEventsWith(h1, h2); disruptor.after(h1).handleEventsWith(h4); disruptor.after(h2).handleEventsWith(h5); disruptor.after(h4, h5).handleEventsWith(h3); disruptor.start();//启动 CountDownLatch latch=new CountDownLatch(1); //生产者准备 executor.submit(new TradePublisher(latch, disruptor)); latch.await();//等待生产者完事. disruptor.shutdown(); executor.shutdown(); System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime)); } }

 

程序运行的效果:

handler2: set price

handler1: set name
handler2: set price
handler1: set name
handler2: set price
handler1: set name
handler2: set price
handler1: set name
handler1: set name
handler2: set price
handler1: set name
handler2: set price
handler1: set name
handler2: set price
handler1: set name
handler2: set price
handler1: set name
handler2: set price
handler2: set price
handler1: set name
handler5: get price : 17.0
handler5: get price : 17.0
handler5: get price : 17.0
handler5: get price : 17.0
handler5: get price : 17.0
handler5: get price : 17.0
handler5: get price : 17.0
handler5: get price : 17.0
handler5: get price : 17.0
handler5: get price : 17.0
handler4: get name : h1
handler4: get name : h1
handler4: get name : h1
handler4: get name : h1
handler4: get name : h1
handler4: get name : h1
handler4: get name : h1
handler4: get name : h1
handler4: get name : h1
handler4: get name : h1
handler3: name: h1h4 , price: 20.0; instance: bhz.generate1.Trade@2f30e44c
handler3: name: h1h4 , price: 20.0; instance: bhz.generate1.Trade@32dc51c8
handler3: name: h1h4 , price: 20.0; instance: bhz.generate1.Trade@1e7d3b8e
handler3: name: h1h4 , price: 20.0; instance: bhz.generate1.Trade@2fd0f745
handler3: name: h1h4 , price: 20.0; instance: bhz.generate1.Trade@4019eb89
handler3: name: h1h4 , price: 20.0; instance: bhz.generate1.Trade@65b6903f
handler3: name: h1h4 , price: 20.0; instance: bhz.generate1.Trade@34b84c44
handler3: name: h1h4 , price: 20.0; instance: bhz.generate1.Trade@2f971f66
handler3: name: h1h4 , price: 20.0; instance: bhz.generate1.Trade@6a8e79f2
handler3: name: h1h4 , price: 20.0; instance: bhz.generate1.Trade@2fe83585
总耗时:10368

3、案例三:如何实现按顺利操作了,c1执行完了执行c2,c2执行完了执行c3

package bhz.generate2;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import bhz.generate1.Trade;import bhz.generate1.TradeHandler;import com.lmax.disruptor.BusySpinWaitStrategy;import com.lmax.disruptor.EventFactory;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.EventHandlerGroup;import com.lmax.disruptor.dsl.ProducerType;public class Main {      public static void main(String[] args) throws InterruptedException {                 long beginTime=System.currentTimeMillis();          int bufferSize=1024;          ExecutorService executor=Executors.newFixedThreadPool(8);          Disruptor
disruptor = new Disruptor
(new EventFactory
() { @Override public Trade newInstance() { return new Trade(); } }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); /* //菱形操作 //使用disruptor创建消费者组C1,C2 EventHandlerGroup
handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2()); //声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3 handlerGroup.then(new Handler3());*/ //顺序操作 disruptor.handleEventsWith(new Handler1()). handleEventsWith(new Handler2()). handleEventsWith(new Handler3()); //六边形操作. /* Handler1 h1 = new Handler1(); Handler2 h2 = new Handler2(); Handler3 h3 = new Handler3(); Handler4 h4 = new Handler4(); Handler5 h5 = new Handler5(); disruptor.handleEventsWith(h1, h2); disruptor.after(h1).handleEventsWith(h4); disruptor.after(h2).handleEventsWith(h5); disruptor.after(h4, h5).handleEventsWith(h3);*/ disruptor.start();//启动 CountDownLatch latch=new CountDownLatch(1); //生产者准备 executor.submit(new TradePublisher(latch, disruptor)); latch.await();//等待生产者完事. disruptor.shutdown(); executor.shutdown(); System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime)); } }

执行执行的效果如下所示:

handler1: set name

handler1: set name
handler1: set name
handler1: set name
handler1: set name
handler1: set name
handler1: set name
handler1: set name
handler1: set name
handler1: set name
handler2: set price
handler2: set price
handler2: set price
handler2: set price
handler2: set price
handler2: set price
handler2: set price
handler2: set price
handler2: set price
handler2: set price
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@51532e9f
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@2c19f26f
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@5b7b6c07
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@3f16e7d
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@5f8cd290
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@16541cf2
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@6c385f51
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@68f9f658
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@729c413f
handler3: name: h1 , price: 17.0; instance: bhz.generate1.Trade@707fc9e1
总耗时:20049

转载于:https://www.cnblogs.com/kebibuluan/p/7660407.html

你可能感兴趣的文章
极光推送(一)集成
查看>>
MySQL 8.0 压缩包版安装方法
查看>>
@Transient注解输出空间位置属性
查看>>
Ansible-playbook 条件判断when、pause(学习笔记二十三)
查看>>
5种你未必知道的JavaScript和CSS交互的方法(转发)
查看>>
线程进程间通信机制
查看>>
galera mysql 多主复制启动顺序及命令
查看>>
JS prototype 属性
查看>>
中位数性质——数列各个数到中位数的距离和最小
查看>>
WebApp之Meta标签
查看>>
添加Java文档注释
查看>>
Python3批量爬取网页图片
查看>>
iphone-common-codes-ccteam源代码 CCEncoding.m
查看>>
微信公众平台开发(96) 多个功能整合
查看>>
[转]MVC4项目中验证用户登录一个特性就搞定
查看>>
用Perl编写Apache模块续二 - SVN动态鉴权实现SVNAuth 禅道版
查看>>
Android 阴影,圆形的Button
查看>>
C++概述
查看>>
卡特兰数
查看>>
006_mac osx 应用跨屏幕
查看>>