Disruptor是一个用于在线程间通信的高效低延时的消息组件,它像个增强的队列,并且它是让LMAX Exchange跑的如此之快的一个关键创新。关于什么是Disruptor、为何它很重要以及它的工作原理方面的信息都呈爆炸性增长 —— 这些文章很适合开始学习Disruptor,还可跟着LMAX BLOG深入学习。这里还有一份更详细的白皮书。
虽然disruptor模式使用起来很简单,但是建立多个消费者以及它们之间的依赖关系需要的样板代码太多了。为了能快速又简单适用于99%的场景,我为Disruptor模式准备了一个简单的领域特定语言。例如,为建立一个消费者的“四边形模式”:
(从Trisha Gee’s excellent series explaining the disruptor pattern偷来的图片)
在这种情况下,只要生产者(P1)将元素放到ring buffer上,消费者C1和C2就可以并行处理这些元素。但是消费者C3必须一直等到C1和C2处理完之后,才可以处理。在现实世界中的对应的案例就像:在处理实际的业务逻辑(C3)之前,需要校验数据(C1),以及将数据写入磁盘(C2)。
用原生的Disruptor语法来创建这些消费者的话代码如下:
01 |
Executor executor = Executors.newCachedThreadPool(); |
02 |
BatchHandler handler1 = new MyBatchHandler1();
|
03 |
BatchHandler handler2 = new MyBatchHandler2();
|
04 |
BatchHandler handler3 = new MyBatchHandler3()
|
05 |
RingBuffer ringBuffer = new RingBuffer(ENTRY_FACTORY, RING_BUFFER_SIZE);
|
06 |
ConsumerBarrier consumerBarrier1 = ringBuffer.createConsumerBarrier(); |
07 |
BatchConsumer consumer1 = new BatchConsumer(consumerBarrier1, handler1);
|
08 |
BatchConsumer consumer2 = new BatchConsumer(consumerBarrier1, handler2);
|
09 |
ConsumerBarrier consumerBarrier2 = |
10 |
ringBuffer.createConsumerBarrier(consumer1, consumer2); |
11 |
BatchConsumer consumer3 = new BatchConsumer(consumerBarrier2, handler3);
|
12 |
executor.execute(consumer1); |
13 |
executor.execute(consumer2); |
14 |
executor.execute(consumer3); |
15 |
ProducerBarrier producerBarrier = |
16 |
ringBuffer.createProducerBarrier(consumer3); |
在以上这段代码中,我们不得不创建那些个handler(就是那些个MyBatchHandler实例),外加消费者屏障,BatchConsumer实例,然后在他们各自的线程中处理这些消费者。DSL能帮我们完成很多创建工作,最终的结果如下:
1 |
Executor executor = Executors.newCachedThreadPool(); |
2 |
BatchHandler handler1 = new MyBatchHandler1();
|
3 |
BatchHandler handler2 = new MyBatchHandler2();
|
4 |
BatchHandler handler3 = new MyBatchHandler3();
|
5 |
DisruptorWizard dw = new DisruptorWizard(ENTRY_FACTORY,
|
6 |
RING_BUFFER_SIZE, executor);
|
7 |
dw.consumeWith(handler1, handler2).then(handler3); |
8 |
ProducerBarrier producerBarrier = dw.createProducerBarrier(); |
我们甚至可以在一个更复杂的六边形模式中构建一个并行消费者链:
1 |
dw.consumeWith(handler1a, handler2a); |
2 |
dw.after(handler1a).consumeWith(handler1b); |
3 |
dw.after(handler2a).consumeWith(handler2b); |
4 |
dw.after(handler1b, handler2b).consumeWith(handler3); |
5 |
ProducerBarrier producerBarrier = dw.createProducerBarrier(); |
这个领域特定语言刚刚诞生不久,欢迎任何反馈,也欢迎大家从github上fork并改进它。
分享到:
相关推荐
Disruptor框架是由LMAX公司开发的一款高效的无锁内存队列。使用无锁的方式实现了一个环形队列。据官方描述,其性能要比BlockingQueue至少高一个数量级。根据GitHub上的最新版本源码打出的包,希望对大家有帮助。
LMAX是一种新型零售金融交 易平台,它能够以很低的延迟产生大量交易。这个系统是建立在JVM平台上,其核心是一个业务逻辑处理 器,它能够在一个线程里每秒处理6百万订单。业务逻辑处理器完全是运行在内存中,使用事件...
LMAX Disruptor 最新版本 源码+API+驱动包
Disruptor概述这是LMAX Disruptor进入Go编程语言的端口。 它保留了Disruptor的本质和精神,并使用了许多相同的抽象和概念,但并没有保持相同的Disruptor概述。这是LMAX Disruptor移植到Go编程语言中的移植。 它保留...
Error: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.<init>(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/...
Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他...Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。
显然,为了实现这一目标,我们需要做一些特殊的事情,以通过我们的Java平台实现极低的延迟和高吞吐量。 性能测试表明,使用队列在系统各阶段之间传递数据会引入延迟,因此我们专注于优化该区域。 Disruptor是我们...
Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟。Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用...
match-trade超高效的交易所撮合引擎,采用伦敦外汇交易所LMAX开源的Disruptor框架,分布式内存存取,以及原子性操作。使用数据流的方式进行计算撮合序列,才用价格水平独立撮合逻辑,实现高效大数据撮合
并发框架Disruptor介绍Martin Fowler在自己网站上写了一篇...下载剖析Disruptor为什么会这么快Disruptor如何工作和使用(五) Disruptor(无锁并发框架)-发布(六) LMAX Disruptor 一个高性能、低延迟且简单的框架(七) Di
Disruptor是一个高效的线程间交换数据的基础组件,它使用栅栏(barrier)+序号(Sequencing)机制协调生产者与消费者,从而避免使用锁和CAS,同时还组合使用预分配内存机制、缓存行机制(cache line)、批处理效应...
:warning: 免责声明:此板条箱有一个已知的安全漏洞,请参阅防锈剂该项目是到Rust的端口。特征 单一生产者 批量消费者 阻塞等待策略 旋转等待策略 多制片人 工人池 DSL 文献资料基准测试从生产者向消费者发送i32大小...
最大值 LMAX Disruptor使Rx Java最大化
Dispatcher 是一个以 LMAX Disruptor 为核心的事件处理器。 使用批量写入,可以轻松获得超过每秒 100 万个事件的速率,在 MacBook Pro 上的上限仅为每秒 2000 万个事件(每个事件约 50ns)。 发出单个事件的性能仍然...
Disruptor是一个高性能的线程间消息传递框架。 该项目是的.NET端口。 可以将Disruptor简洁地定义为具有可配置使用者序列的循环队列。 主要功能是: 初始设置后,内存分配为零(事件已预先分配)。 基于推送的消费...
LMAX Disruptor环形缓冲区示例 使用LMAX破坏者框架的示例: 受此博客文章的启发: 这是“钻石配置”的极其简化的版本,如以下内容所述: 在此实现中,日志记录和复制步骤同时发生,并且都必须成功才能执行发布...
Disruptor-cpp 总览 Disruptor-cpp是功能齐全的C ++端口。 实现java Disruptor v3.3.7中可用的所有功能。 建造 编译器 Clang 3.8或更高版本 GCC 5.0或更高版本 Microsoft Visual C ++ 2015或更高版本 Linux 必须在...
IOS应用源码——第一个我们小组的IPHONE程序LMAX郎迈.zip
赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...
最高效的交易所撮合引擎,采用伦敦外汇交易所LMAX开源的Disruptor框架,用Hazelcast进行分布式内存存