前端 高性能无锁队列:disruptor

badcodez · December 16, 2019 · 18 hits

1. 简要描述

disruptor 是一个高性能的队列,最初是应用在LMAX架构中。我们可以把 disruptor 用在生产者 - 消费者问题以获取高效地处理和高吞吐量。现在 disruptor 版本已稳定在 3.0,请尽量使用版本 3.0,因为低版本的 api 和 3.0 几乎不兼容,并且版本 3.0 提供更多生产者 - 消费者设计
支持。

2. 我们不一样:阻塞队列 vs Disruptor

(稳) 稳健派:BlockingQueue

阻塞队列是一个稳健派的设计:阻塞队列是一个 FIFO 队列,生产者 (Producer) 往队列里发布 (publish) 一件物料时,消费者 (Consumer) 能获得通知;如果没有物料时,消费者被堵塞,直到生产者发布了新的物料,防止生产者生产速度过快,导致内存溢出(OOM)。

平常我们实现生产者 - 消费者模型时,一般会选用阻塞队列 ArrayBlockingQueue 来实现。当数据量上升到模型无法处理时,这时阻塞队列的 “稳健” 操作开始带来了负面影响:

锁是慢的

~ > 加锁时:线程会因为获取不到锁而被阻塞挂起;

~ > 锁释放时:阻塞的线程会被唤醒;而整个过程具有大量的时间消耗。

在 disruptor 论文中讲述一个实验:

  • 测试程序中调用了一个函数,该函数会对一个 64 位计数器循环自增 5 亿次;
方式 耗时(ms)
单线程无锁 300
单线程加锁 10000
两个线程加锁 224000

从实验中可以看出并发环境下锁保证了线程安全,却带来了糟糕的性能表现。

伪共享问题

image

(快) 别人家的孩子:Disruptor

Disruptor 不仅同样能完成阻塞队列的功能,而且 Disruptor 还能做的更多。

disruptor:

  • 无锁设计,通过 CPU 指令级别的 CAS 操作保证线程安全,同时大大提高处理速度;
  • 缓冲行填充解决伪共享问题;
  • 提供了良好的并发编程接口,适应多种场景:消费者既可以并行处理,也可以消费同一个,还可以相互依赖消费;
  • 底层的极度优化:缓冲行解决伪共享问题、预先分配内存空间等。

image

disruptor 使用 RingBuffer 实现,而 RingBuffer 相比于链表的优势有:

(1)RingBuffer 使用数组实现,访问比链表快,并且由于 CPU 缓冲行会加载相邻数据的特性,数组的元素在硬件级别是会被预加载的。但同时由于 CPU 缓冲行的这个特性从而导致了伪共享问题,不过 disruptor 使用了缓冲行填充技术解决伪共享。

(2)RingBuffer 可以预先分配堆内存空间,数组元素保证一直有效。当生产者生产物料,只需要拿到预先创建好的 RingBuffer 元素实例直接进行设置,从而减少 gc 的产生。

ArrayBlockingQueue vs Disruptor

在 disruptor 官方说明文档中,它做了一个对比实验:

  • 生产者生产物料后等待 1 微秒,然后消费者消费物料;重复执行 5000 万次
  • CPU : 2.2Ghz Core i7-2720QM
  • OS: Ubuntu 11.04
指标 Array Blocking Queue (ns) Disruptor (ns)
平均延迟 32,757 52
99% 的观测值低于 2,097,152 128
99.99% 的观测值低于 4,194,304 8,192

3. 该如何使用你,我的 disruptor?

这里简单介绍:如何使用 disruptor 实现 Single-Producer ~ Multi-Consumer 模型

模型实现

  • 自定义 Event,作为生产者 - 消费者模型中的物料;同时实现 EventFactory<?>接口,作为物料的生成工具。
  • 自定义生产者模型,批量从数据源获取” 原料” 组装放入物料中。
  • 自定义消费者模型,实现 WorkHandler<?>接口,作为自定义 Event 的处理器。
  • Disruptor 采用 WorkPool 的消费模式,保证消费者不会重复消费物料。

部分实现代码

// 定义物料
public class BatchIdEvent {

private List<Long> batch;

public BatchIdEvent() { this.batch = new ArrayList<>(); }

public List<Long> getBatch() { return batch; }

public void setBatch(List<Long> batch) { this.batch = batch; }

public static final EventFactory EVENT_FACTORY = () -> new BatchIdEvent(); }

// 自定义生产者模型 class DisruptorProducer { ...

public void onData(List<Long> batchIds) { long sequence = ringBuffer.next();

BatchIdEvent event = ringBuffer.get(sequence); event.setBatch(batchIds);

ringBuffer.publish(sequence);

}

}

// 实现WorkHandler<?> 接口,实现消费者模型 class EventConsumer implements WorkHandler<BatchIdEvent> { ...

@Override public void onEvent(BatchIdEvent batchIdEvent) throws Exception { doJob(batchIdEvent.getBatch()); ... } }

// 使用WorkPool的消费方式,并启动disruptor for (int i = 0; i < consumerNum; i++) { AtomicInteger count = new AtomicInteger(0); eventConsumers[i] = new EventConsumer(count); }

disruptor.handleEventsWithWorkerPool(eventConsumers); disruptor.start();

4. 推荐阅读

LMAX 架构
LMAX Disruptor——一个高性能、低延迟且简单的框架
高性能队列——Disruptor

No Reply at the moment.
You need to Sign in before reply, if you don't have an account, please Sign up first.