基于循环数组的无锁队列

在之前的两篇博客(线程安全的无锁RingBuffer的实现多个写线程一个读线程的无锁队列实现)中,分别写了在只有一个读线程、一个写线程的情况下,以及只有一个写线程、两个读线程的情况下,不采用加锁技术,甚至原子运算的循环队列的实现。但是,在其他的情况下,我们也需要尽可能高效的线程安全的队列的实现。本文实现了一种基于循环数组和原子运算的无锁队列。采用原子运算(compare and swap)而不是加锁同步,可以很大的提高运行效率。之所以用循环数组,是因为这样在使用过程中不需要反复开辟内存空间,可以达到较好的效率。本文的实现参考了论文Implementing Lock-Free Queues所提到的方法。但是,在这篇论文中,作者并没有给出具体的实现。本文的实现也和论文中的方法有所不同。本文的实现基于Windows操作系统,代码在Visual Studio 2010下测试通过。

本文的实现基于compare and swap这个原子操作,简写为CAS。其原型为

long CAS(long* ptr, long old_value, long new_value).

其操作为,如果ptr所指向的变量和old_value相等,则将其置为new_value. 否则什么也不做。返回值为ptr所指向的变量。

本文提出的方法的基本原理如下。代码会在其后附上。

首先,我们约定,有一个特殊的数值,叫做EMPTY,存入数组的所有数都不能是这个数值。在一开始,将整个buffer的值都初始化为EMPTY.

第二,以两个变量,readCnt和writeCnt,来记录读和写的次数。这两个数模数组长度,就可以得到下一次读和写的位置。如果readCnt和writeCnt模buffer大小相等,则说明当前队列为控;而如果在写的位置不是EMPTY,则说明buffer已满。

第三,也就是最重要的,线程同步的原理。在进行写buffer操作时,首先通过CAS操作将buffer对应位置置为指定值,而writeCnt不变,因此其他线程无法在同样的位置进行写操作,这样防止了其他写线程的覆盖。而由于此时writeCnt还没有变,读线程此时也无法读该位置的数据,这样防止了其他读线程的冲突。接下来,第二步,则是通过CAS操作,将writeCnt加一。可以看到,代码中在第一个CAS操作失败的情况下,会执行一个增大writeCnt的原子操作。这样做的目的是避免某个线程停掉导致其他所有线程都停掉。

而在进行写操作时,首先通过判断readCnt和writeCnt是否相等来判断当前队列是否已满。这样做的原因如前所述,是为了避免和写线程之间的冲突。接下来,如果当前位置可以读,则通过一个CAS操作将readCnt加一。这样,其他读线程就无法再读这个位置。而由于这个位置值还不是EMPTY,其他写线程也无法写这个位置。接下来,在读走数值之后,再通过CAS操作将buffer中的这个位置置为EMPTY. 此时,写线程可以在这个位置写入数据。

但是,这个实现其实还有点问题。如果读线程比较多,例如,读线程个数和数组长度一样,就可能两个读线程同时读同一个位置。这样的读冲突这个程序是无法避免的。另外,如果读数据过程中有一个读线程停掉了,那么其他线程在读或者写到这个位置时,也会被阻塞。

代码如下。

 1 #include <windows.h>
 2 #include <stdlib.h>
 3
 4 #define CAS(ptr, oldval, newval) (InterlockedCompareExchange(ptr, newval, oldval))
 5
 6 static const long EMPTY=-1;
 7
 8 class NBQueue {
 9 public:
10   NBQueue()
11     : queueSize(0)
12     , buffer(NULL)
13     , readCnt(0)
14     , writeCnt(0){}
15
16   ~NBQueue() {
17     uninit();
18   }
19
20   bool init(int size) {
21     queueSize = size;
22     try {
23       buffer = new long[queueSize];
24     }
25     catch (...) {
26       queueSize = 0;
27       buffer = NULL;
28       return false;
29     }
30     for (int i = 0; i < queueSize; i++) {
31       buffer[i] = EMPTY;
32     }
33     return true;
34   }
35   void uninit() {
36     if (buffer != NULL) {
37       delete[] buffer;
38       buffer = NULL;
39     }
40     queueSize = 0;
41   }
42
43   bool put(long v) {
44     bool succ;
45     long tail;
46     long index;
47     do {
48       tail = writeCnt;
49       index = tail % queueSize;
50       if (buffer[index] != EMPTY) {
51         return false;
52       }
53       long oldval = CAS(&buffer[index], EMPTY, v);
54       succ = oldval == EMPTY;
55       if (!succ) {
56         CAS(&writeCnt, tail, tail + 1);
57       }
58     } while (!succ);
59
60     CAS(&writeCnt, tail, tail + 1);
61
62     return true;
63   }
64
65   bool get(long* v) {
66     long head;
67     bool succ;
68     do {
69       head = readCnt;
70       if (head == writeCnt % queueSize) {
71         return false;
72       }
73       long oldval = CAS(&readCnt, head, head + 1);
74       succ = oldval == head;
75     } while (!succ);
76
77     long index = head % queueSize;
78     *v = buffer[index];
79     CAS(&buffer[index], *v, EMPTY);
80
81     return true;
82   }
83
84 private:
85   long* buffer;
86   int queueSize;
87   long readCnt;
88   long writeCnt;
89 };
时间: 01-10

基于循环数组的无锁队列的相关文章

并发无锁队列

1.前言 队列在计算机中非常重要的一种数据结构,尤其在操作系统中.队列典型的特征是先进先出(FIFO),符合流水线业务流程.在进程间通信.网络通信之间经常采用队列做缓存,缓解数据处理压力.结合自己在工作中遇到的队列问题,总结一下对不同场景下的队列实现.根据操作队列的场景分为:单生产者--单消费者.多生产者--单消费者.单生产者--多消费者.多生产者--多消费者四大模型.其实后面三种的队列,可以归纳为一种多对多.根据队列中数据分为:队列中的数据是定长的.队列中的数据是变长的. 2.队列操作模型 (

转载:无锁队列的实现(CAS同步)

转自:http://coolshell.cn/articles/8239.html 关于无锁队列的实现,网上有很多文章,虽然本文可能和那些文章有所重复,但是我还是想以我自己的方式把这些文章中的重要的知识点串起来和大家讲一讲这个技术.下面开始正文. 关于CAS等原子操作 在开始说无锁队列之前,我们需要知道一个很重要的技术就是CAS操作——Compare & Set,或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令.

原子操作实现无锁队列

关于CAS等原子操作 在开始说无锁队列之前,我们需要知道一个很重要的技术就是CAS操作——Compare & Set或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令.有了这个原子操作,我们就可以用其来实现各种无锁(lock free)的数据结构. 这个操作用C语言来描述就是下面这个样子:(代码来自Wikipedia的Compare And Swap词条)意思就是说,看一看内存*reg里的值是不是oldval,如果

无锁队列的环形数组实现

对无锁队列的最初兴趣来自梁斌同志的一个英雄帖:http://coderpk.com/. 第一次看到这个题目的时候还不知道CAS,FAA等所谓的“原子操作”,但直觉上感觉,通过对读写操作的性能优化来达到大幅提高队列性能的方法是行不通的,就算读写操作全用汇编来写,也不会和正常的read及 write有数量级上的区别.后来搜索了一下lock free data structure,才知道了关于原子操作的一些东西,同时也纠正了自己由来已久的一个错误观点:C++中的++操作和--操作都不是原子操作.这篇笔

基于无锁队列和c++11的高性能线程池

基于无锁队列和c++11的高性能线程池线程使用c++11库和线程池之间的消息通讯使用一个简单的无锁消息队列适用于linux平台,gcc 4.6以上 标签: <无> 代码片段(6)[全屏查看所有代码] 1. [代码]lckfree.h ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 // lckfree.h //

环形无锁队列

环形无锁队列 Table of Contents 1 环形无锁队列的实现 2 死锁及饥饿 3 一些优化 1 环形无锁队列的实现 数据结构定义: template class LockFreeQueue { private: ElementT *mArray; int mCapacity; int mFront; int mTail; } 由于出队操作是在队首进行,入队操作是在队尾进行,因此,我们可以尝试用mFront和mTail来实现多个线程之间的协调.这其中会用到CAS操作: 入队操作伪码:

boost 无锁队列

一哥们翻译的boost的无锁队列的官方文档 原文地址:http://blog.csdn.net/great3779/article/details/8765103 Boost_1_53_0终于迎来了久违的Boost.Lockfree模块,本着学习的心态,将其翻译如下.(原文地址:http://www.boost.org/doc/libs/1_53_0/doc/html/lockfree.html) Chapter 17. Boost.Lockfree 第17章.Boost.Lockfree Ta

锁、CAS操作和无锁队列的实现

https://blog.csdn.net/yishizuofei/article/details/78353722 锁的机制 锁和人很像,有的人乐观,总会想到好的一方面,所以只要越努力,就会越幸运:有的人悲观,总会想到不好的一方面,患得患失,所以经常会做不好事.我一直把前一个当作为我前进的动力和方向,快乐充实的过好每一天. 常用的锁机制也有两种: 1.乐观锁:假设不会发生并发冲突,每次不加锁而去完成某项操作,只在提交操作时,检查是否违反数据完整性.如果因为冲突失败就继续重试,直到成功为止.而乐

【DPDK】【ring】从DPDK的ring来看x86无锁队列的实现

[前言] 队列是众多数据结构中最常见的一种之一.曾经有人和我说过这么一句话,叫做“程序等于数据结构+算法”.因此在设计模块.写代码时,队列常常作为一个很常见的结构出现在模块设计中.DPDK不仅是一个加速网络IO的框架,其内部还提供众多的功能组件,rte_ring就是DPDK内部提供的一种无锁队列,本篇文章将从使用的角度出发阐述DPDK的ring怎么用?在怎么用的角度上再来阐述ring无锁的实现,最后将探讨实现无锁队列的关键以及在不通平台上如何实现,本文将会探讨x86平台下无锁队列的实现. 权当抛