并发场景下锁的最佳实践

前文分析了一波锁的原理:

锁实现分析:从glibc到futex(一) 锁实现分析:从glibc到futex(二)

并实现了协程下的锁:

在C++中实现协程

也该总结一下并发场景下锁的用法了

在这之前回顾一下之前总结的锁的原理,以对其性能有足够了解

锁性能

核心思想:一个标记位+一个队列

可以映射成一群人排队上一个厕所

  • 标记位相当于厕所门口的一个指示牌(如“有人”或“无人”)。
    • 当厕所没人时(标记位为0,门开着指示牌“有人”),走上前来的人直接进入(加锁成功)。
    • 当厕所里有人时(标记位为1,门关着指示牌“无人”),后续到达的人自动排队(进入等待队列,等待加锁)。
  • 队列就相当于门外排队的人。当里面的人使用完厕所从里面走出来(释放锁)时,需要通知排队中的下一个人可以进入厕所了:
    • 队列中第一个人收到通知后进入厕所,其他人保持继续等待状态。

也可以用条件变量去理解锁:锁相当于保护条件为只有当前进程/协程可用的条件变量

因此锁在无竞争状态下,仅有一次对标记位CAS操作

避免锁

在处理并发场景的情况下,最优解永远是避免锁:

当数据可以被分区处理,或者源数据可以被简单复制的情况下,可以引入线程私有变量来避免并发场景

  1. 数据分区
    • 对原始数据按照一定规则(如Hash、范围或模)拆分成多个相互独立的子集,每个线程只处理自己专属的数据子集,从而无需共享数据,把并发竞争完全消除。
    • 例如:
      • 多线程求数组求和,可将数组划分多个区间,每个线程处理单独的区间,最后汇总各线程结果即可。
      • TCMalloc的每个线程都有一个私有的buffer用于分配
  2. 数据复制
    • 当原数据是只读性质,允许每个线程生成一份原数据的副本供自身独立读
    • 例如:
      • 当每个线程初次访问数据时,通过一些复杂计算生成线程私有的、永久静态的结果缓存,以后所有访问都直接访问本线程的缓存
      • 并发场景下的日期格式化(如 SimpleDateFormat 每线程缓存一份)

乐观锁

内存原子CAS

CAS(Compare-And-Swap,比较并交换)是一种乐观锁的实现方式。

常见的用法是:

  1. 先读取一个共享变量的当前值(旧值);
  2. 使用读取到的这个旧值,自己先做一些计算,算出一个新值;
  3. 准备将新值回写到共享变量之前,检查一下当前共享变量的值是不是还是刚才读取的那个旧值:
    • 如果还是旧值,说明没人改过,可以放心地更新成功;
    • 如果发现值被别人改掉了,更新失败,不要立刻放弃,而是重新从第一步再来一遍(再读一次旧值,然后重新做计算再试着更新)。

这种方式的好处是避免锁竞争带来的性能问题,因为它“猜测”在它运算的时候别人不会动那块数据。如果猜得不对,就重头再算一次,直至正确更新为止。

伪代码大致有如下结构

1
2
3
4
5
do {
auto oldValue = value;
//用oldValue计算出newValue
auto newValue = func(oldValue);
} while (!cas(value, oldValue, newValue)); //cas返回false,失败重试

应用:令牌桶

在前文设计令牌桶的时候就涉及到了这个算法:限流器:令牌桶和漏桶

为了省略令牌桶算法本身的大量细节(有兴趣可以读我那一篇博文),列出伪代码大致如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
double zeroTime = 0;
// 简化版令牌桶consume函数伪代码
double consume(double tokenAddRatePerSec, double bucketMaxSize, double now, double num) {
do {
double lastRefillTime = atomic_load(zeroTime);
double currentTokens = min((now - lastRefillTime) * tokenAddRatePerSec, bucketMaxSize);

if (currentTokens < num) {
return 0.0; // 令牌不足,无法消费
}

double newTokens = currentTokens - num;
double newRefillTime = now - newTokens / tokenAddRatePerSec;

// 并发安全更新
} while (!atomic_cas(zeroTime, lastRefillTime, newRefillTime));

return num; // 成功消费 num 个令牌
}

应用:无锁队列

要说无锁队列的实现,就不得不提陈皓老师(R.I.P.)的经典博文无锁队列的实现,我在上学的时候就拜读过,给我印象深刻

虽然有一部分实现是错的,但是作为入门博文通俗易懂。

以入队操作为例说明CAS的应用:

用CAS来实现队列入队,就是用链表进行两步操作:

  • 第一步:当前队尾结点本来指向NULL(空),现在新来了一个节点,要让队尾的next指向这个新节点。
  • 第二步:把队列的尾部指针(tail)也更新一下,使它指向新添加的这个结点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
EnQueue(Q, data) //进队列
{
//准备新加入的结点数据
n = new node();
n->value = data;
n->next = NULL;

do {
p = Q->tail; //取链表尾指针的快照
} while( CAS(p->next, NULL, n) != TRUE);
//while条件注释:如果没有把结点链在尾指针上,再试

CAS(Q->tail, p, n); //置尾结点 tail = n;
}

由于可能同时有多个线程在往队尾插入节点,我们需要确保安全,方法如下:

  • 每个线程先记住当前的tail位置,尝试去操作tail->next:
  • 如果tail->next确实是NULL,表示没人抢先操作,我们就用CAS把新节点链接上去。
  • 如果tail->next不是NULL,表示已经有别的线程领先一步完成了插入,那请求就会失败,不得不重新再来一次。

这种方法就像多人同时拨打一个热门热线电话,只有拨通最快的那个人能成功连接,其余人都听到占线,只能不停地重拨。

不过我们会看到,第二步(更新tail指针)却不判断CAS是否成功,原因是:

  • 假设线程T1已经在第一步CAS成功了,它已经把新节点链在了队尾。
  • 此时其他线程(T2, T3, ...)再用CAS操作tail->next时,一定会失败,因为tail->next已经不为NULL了,于是它们反复重试,卡在第一步等待。
  • 等到T1线程走到第二步开始更新tail指针时,它是不可能与任何人竞争的,目前只有它独占了成功链接的机会。因此,“更新tail”这一次操作的CAS是不需要做循环检测的,一定一次成功。
  • 等T1成功更新了tail指针,其余线程才能进入下一轮操作,进行下一次竞争尾节点的链接。

通俗地说就是:

第一个CAS操作就像排队抢座位,大家会争来争去。

第二个CAS操作则像已经坐下的人安心挪座位,肯定没人跟他抢。

ABA

所谓ABA(见维基百科的ABA词条),问题基本是这个样子:

  1. 进程P1在共享变量中读到值为A
  2. P1被抢占了,进程P2执行
  3. P2把共享变量里的值从A改成了B,再改回到A,此时被P1抢占。
  4. P1回来看到共享变量里的值没有被改变,于是继续执行。

虽然P1以为变量值没有改变,继续执行了,但是这个会引发一些潜在的问题。ABA问题最容易发生在lock free 的算法中的,CAS首当其冲,因为CAS判断的是指针的值。很明显,值是很容易又变成原样的。

维基百科上给了一个活生生的例子:

你拿着一个装满钱的手提箱在飞机场,此时过来了一个火辣性感的美女,然后她很暖昧地挑逗着你,并趁你不注意的时候,把用一个一模一样的手提箱和你那装满钱的箱子调了个包,然后就离开了,你看到你的手提箱还在那,于是就提着手提箱去赶飞机去了。

解决办法是double CAS

回到无锁队列问题中来,完全的正确的算法是Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms

我根据这个实现了https://github.com/tedcy/algorithm_test/blob/master/concurrent_linked_queue.cpp,并且压测通过

其中有这么一段代码

1
2
3
4
5
6
7
8
9
10
11
class alignas(16) pointer {
Node *p_ = nullptr;
uint64_t count_;
bool compare_exchange_weak(pointer &expected, const pointer &desired) {
return __atomic_compare_exchange_n(
reinterpret_cast<__int128*>(this),
reinterpret_cast<__int128*>(&expected),
*reinterpret_cast<const __int128*>(&desired),
true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
}
};

这便是通过gcc的扩展实现的double CAS,也就是128位的CAS,完成了64位指针和64位版本号的一起更新,从而解决了ABA问题

这里有个小坑是数据结构必须标注alignas(16),否则毫无原因的core掉

分布式CAS

分布式CAS主要是在关系式数据库上基于版本号机制,对于redis这样的kv结构,有专门的watch机制或者lua脚本来支持

mysql方式:

假设你的表结构里已经有一个自增id,也就是版本号version。更新时候要确保version不变,才可以写入成功:

1
2
3
4
5
6
7
8
SELECT id, data_field, version FROM table WHERE id = xx;

#客户端逻辑处理...
#new_value = do_some_calc_logic(data_field);

UPDATE table
SET data_field = new_value, version = version + 1
WHERE id = xx AND version = old_version;

这个方法思想上和CAS相似:

  • 旧值读取 (select version as old_version)
  • 客户端做一些逻辑计算
  • 提交更新,如果version此时不变,则写成功;如果别的事务先行更新version,则影响行数=0可知更新失败,此时可重试更新或放弃。

redis方式:

redis中可通过WATCH机制轻松实现类似CAS的乐观锁:

1
2
3
4
5
6
7
WATCH key_version              # 监控version key
GET key_version # 拿到旧版本号old_version
# 客户端逻辑处理...
MULTI
SET key_data new_value # 设置新数据
INCR key_version # 更新版本以用于后续监控
EXEC # 如果有别的client改了key,就会失败

实现思路跟上面提到的mysql类似:

  • 监视key,在事务提交前如果key变了,则整个更新放弃,需要重新计算数据再提交。

悲观锁

全量更新

多读一写(双Buffer)

写线程修改备份,修改完了再切换flag,让读线程读备份,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#include <atomic>
#include <thread>
#include <vector>
#include <chrono>
#include <iostream>

struct Data {
std::vector<int> vec;
};

// 双Buffer数据结构
Data buffers[2];

// 当前读的buffer的index
std::atomic<int> read_index(0);

// 写线程,更新备份buffer再切换flag
void writer_thread() {
//初始化为备用Buffer的index,这个变量只有写线程用,无需同步手段
int current_write_idx = !read_index;

while (true) {
// 模拟写数据(写入备份buffer)
buffers[current_write_idx].vec.clear();
for (int i = 0; i < 10; ++i) {
buffers[current_write_idx].vec.push_back(i);
}

// 写完切换flag给读线程
read_index.store(current_write_idx, std::memory_order_release);

// 一个周期后再写另一个buffer
current_write_idx = !current_write_idx;

std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}

// 读线程
void reader_thread() {
while (true) {
// 开始读取flag
int cur_read_idx = read_index.load(std::memory_order_acquire);

// 模拟读操作(读时间过长可能造成跨越期间数据被修改)
for (auto val : buffers[cur_read_idx].vec) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); //模拟长耗时读(此时可能会被写线程翻转)
}

std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}

int main() {
// 启动读写线程
std::thread w(writer_thread);
std::vector<std::thread> rs;
for (int i = 0;i < 10;i++) {
std::thread r(reader_thread);
rs.push_back(std::move(r));
}

w.join();
for (auto &r : rs) {
r.join();
}

return 0;
}

正常来说读写不会操作同一份数据,发生冲突,需要读操作耗时特别大,超过了一次切换时间才有可能

1 read_index = 0
2 正在读取buffer[read_index = 0]的数据
3 第N轮写入,write_index = 1
4 写入buffer[write_index = 1]数据
5 ...
6 写完buffer[write_index = 1]数据
7 第N+1轮写入,write_index = 0
8 还在读取buffer[read_index = 0]的数据 写入buffer[write_index = 0]数据

多读少写(RCU)

https://zhuanlan.zhihu.com/p/582271718

RCU可以用引用计数的简单版本

部分更新(索引数据结构)

要做到部分更新,是需要某个key来定位到某个value的,因此一般用于红黑树或者哈希表这样的索引数据结构