并发场景下锁的最佳实践

前文分析了一波锁的原理:

锁实现分析:从glibc到futex(一) 锁实现分析:从glibc到futex(二)

并实现了协程下的锁:

在C++中实现协程

也该总结一下并发场景下锁的用法了

在这之前回顾一下之前总结的锁的原理,以对其性能有足够了解

锁性能

核心思想:一个标记位+一个队列

可以映射成一群人排队上一个厕所

  • 标记位相当于厕所门口的一个指示牌(如“有人”或“无人”)。
    • 当厕所没人时(标记位为0,门开着指示牌“有人”),走上前来的人直接进入(加锁成功)。
    • 当厕所里有人时(标记位为1,门关着指示牌“无人”),后续到达的人自动排队(进入等待队列,等待加锁)。
  • 队列就相当于门外排队的人。当里面的人使用完厕所从里面走出来(释放锁)时,需要通知排队中的下一个人可以进入厕所了:
    • 队列中第一个人收到通知后进入厕所,其他人保持继续等待状态。

也可以用条件变量去理解锁:锁相当于保护条件为只有当前进程/协程可用的条件变量

因此锁在无竞争状态下,仅有一次对标记位CAS操作

避免锁

在处理并发场景的情况下,最优解永远是避免锁:

当数据可以被分区处理,或者源数据可以被简单复制的情况下,可以引入线程私有变量来避免并发场景

  1. 数据分区
    • 对原始数据按照一定规则(如Hash、范围或模)拆分成多个相互独立的子集,每个线程只处理自己专属的数据子集,从而无需共享数据,把并发竞争完全消除。
    • 例如:
      • 多线程求数组求和,可将数组划分多个区间,每个线程处理单独的区间,最后汇总各线程结果即可。
      • TCMalloc的每个线程都有一个私有的buffer用于分配
  2. 数据复制
    • 当原数据是只读性质,允许每个线程生成一份原数据的副本供自身独立读
    • 例如:
      • 当每个线程初次访问数据时,通过一些复杂计算生成线程私有的、永久静态的结果缓存,以后所有访问都直接访问本线程的缓存
      • 并发场景下的日期格式化(如 SimpleDateFormat 每线程缓存一份)

乐观锁

内存原子CAS

CAS(Compare-And-Swap,比较并交换)是一种乐观锁的实现方式。

常见的用法是:

  1. 先读取一个共享变量的当前值(旧值);
  2. 使用读取到的这个旧值,自己先做一些计算,算出一个新值;
  3. 准备将新值回写到共享变量之前,检查一下当前共享变量的值是不是还是刚才读取的那个旧值:
    • 如果还是旧值,说明没人改过,可以放心地更新成功;
    • 如果发现值被别人改掉了,更新失败,不要立刻放弃,而是重新从第一步再来一遍(再读一次旧值,然后重新做计算再试着更新)。

这种方式的好处是避免锁竞争带来的性能问题,因为它“猜测”在它运算的时候别人不会动那块数据。如果猜得不对,就重头再算一次,直至正确更新为止。

伪代码大致有如下结构

1
2
3
4
5
do {
auto oldValue = value;
//用oldValue计算出newValue
auto newValue = func(oldValue);
} while (!cas(value, oldValue, newValue)); //cas返回false,失败重试

应用:令牌桶

在前文设计令牌桶的时候就涉及到了这个算法:限流器:令牌桶和漏桶

为了省略令牌桶算法本身的大量细节(有兴趣可以读我那一篇博文),列出伪代码大致如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
double zeroTime = 0;
// 简化版令牌桶consume函数伪代码
double consume(double tokenAddRatePerSec, double bucketMaxSize, double now, double num) {
do {
double lastRefillTime = atomic_load(zeroTime);
double currentTokens = min((now - lastRefillTime) * tokenAddRatePerSec, bucketMaxSize);

if (currentTokens < num) {
return 0.0; // 令牌不足,无法消费
}

double newTokens = currentTokens - num;
double newRefillTime = now - newTokens / tokenAddRatePerSec;

// 并发安全更新
} while (!atomic_cas(zeroTime, lastRefillTime, newRefillTime));

return num; // 成功消费 num 个令牌
}

应用:无锁队列

要说无锁队列的实现,就不得不提陈皓老师(R.I.P.)的经典博文无锁队列的实现,我在上学的时候就拜读过,给我印象深刻

虽然有一部分实现是错的,但是作为入门博文通俗易懂。

以入队操作为例说明CAS的应用:

用CAS来实现队列入队,就是用链表进行两步操作:

  • 第一步:当前队尾结点本来指向NULL(空),现在新来了一个节点,要让队尾的next指向这个新节点。
  • 第二步:把队列的尾部指针(tail)也更新一下,使它指向新添加的这个结点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
EnQueue(Q, data) //进队列
{
//准备新加入的结点数据
n = new node();
n->value = data;
n->next = NULL;

do {
p = Q->tail; //取链表尾指针的快照
} while( CAS(p->next, NULL, n) != TRUE);
//while条件注释:如果没有把结点链在尾指针上,再试

CAS(Q->tail, p, n); //置尾结点 tail = n;
}

由于可能同时有多个线程在往队尾插入节点,我们需要确保安全,方法如下:

  • 每个线程先记住当前的tail位置,尝试去操作tail->next:
  • 如果tail->next确实是NULL,表示没人抢先操作,我们就用CAS把新节点链接上去。
  • 如果tail->next不是NULL,表示已经有别的线程领先一步完成了插入,那请求就会失败,不得不重新再来一次。

这种方法就像多人同时拨打一个热门热线电话,只有拨通最快的那个人能成功连接,其余人都听到占线,只能不停地重拨。

不过我们会看到,第二步(更新tail指针)却不判断CAS是否成功,原因是:

  • 假设线程T1已经在第一步CAS成功了,它已经把新节点链在了队尾。
  • 此时其他线程(T2, T3, ...)再用CAS操作tail->next时,一定会失败,因为tail->next已经不为NULL了,于是它们反复重试,卡在第一步等待。
  • 等到T1线程走到第二步开始更新tail指针时,它是不可能与任何人竞争的,目前只有它独占了成功链接的机会。因此,“更新tail”这一次操作的CAS是不需要做循环检测的,一定一次成功。
  • 等T1成功更新了tail指针,其余线程才能进入下一轮操作,进行下一次竞争尾节点的链接。

通俗地说就是:

第一个CAS操作就像排队抢座位,大家会争来争去。

第二个CAS操作则像已经坐下的人安心挪座位,肯定没人跟他抢。

ABA问题

所谓ABA问题(见维基百科的ABA词条),问题基本是这个样子:

  1. 进程P1在共享变量中读到值为A
  2. P1被抢占了,进程P2执行
  3. P2把共享变量里的值从A改成了B,再改回到A,此时被P1抢占。
  4. P1回来看到共享变量里的值没有被改变,于是继续执行。

虽然P1以为变量值没有改变,继续执行了,但是这个会引发一些潜在的问题。ABA问题最容易发生在lock free 的算法中的,CAS首当其冲,因为CAS判断的是指针的值。很明显,值是很容易又变成原样的。

维基百科上给了一个活生生的例子:

你拿着一个装满钱的手提箱在飞机场,此时过来了一个火辣性感的美女,然后她很暖昧地挑逗着你,并趁你不注意的时候,把用一个一模一样的手提箱和你那装满钱的箱子调了个包,然后就离开了,你看到你的手提箱还在那,于是就提着手提箱去赶飞机去了。

解决办法是double CAS

回到无锁队列问题中来,完全的正确的算法是Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms

我根据这个实现了https://github.com/tedcy/algorithm_test/blob/master/concurrent_linked_queue.cpp,并且压测通过

其中有这么一段代码

1
2
3
4
5
6
7
8
9
10
11
class alignas(16) pointer {
Node *p_ = nullptr;
uint64_t count_;
bool compare_exchange_weak(pointer &expected, const pointer &desired) {
return __atomic_compare_exchange_n(
reinterpret_cast<__int128*>(this),
reinterpret_cast<__int128*>(&expected),
*reinterpret_cast<const __int128*>(&desired),
true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
}
};

这便是通过gcc的扩展实现的double CAS,也就是128位的CAS,完成了64位指针和64位版本号的一起更新,从而解决了ABA问题

这里有个小坑是数据结构必须标注alignas(16),否则毫无原因的core掉

分布式CAS

分布式CAS主要是在关系式数据库上基于版本号机制,对于redis这样的kv结构,有专门的watch机制或者lua脚本来支持

mysql方式

假设你的表结构里已经有一个自增id,也就是版本号version。更新时候要确保version不变,才可以写入成功:

1
2
3
4
5
6
7
8
SELECT id, data_field, version FROM table WHERE id = xx;

#客户端逻辑处理...
#new_value = do_some_calc_logic(data_field);

UPDATE table
SET data_field = new_value, version = version + 1
WHERE id = xx AND version = old_version;

这个方法思想上和CAS相似:

  • 旧值读取 (select version as old_version)
  • 客户端做一些逻辑计算
  • 提交更新,如果version此时不变,则写成功;如果别的事务先行更新version,则影响行数=0可知更新失败,此时可重试更新或放弃。

redis方式

redis中可通过WATCH机制轻松实现类似CAS的乐观锁:

1
2
3
4
5
6
7
WATCH key_version              # 监控version key
GET key_version # 拿到旧版本号old_version
# 客户端逻辑处理...
MULTI
SET key_data new_value # 设置新数据
INCR key_version # 更新版本以用于后续监控
EXEC # 如果有别的client改了key,就会失败

实现思路跟上面提到的mysql类似:

  • 监视key,在事务提交前如果key变了,则整个更新放弃,需要重新计算数据再提交。

并发场景

全量更新

多读一写(双Buffer)

写线程修改备份,修改完了再切换flag,让读线程读备份,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#include <atomic>
#include <thread>
#include <vector>
#include <chrono>
#include <iostream>

struct Data {
std::vector<int> vec;
};

// 双Buffer数据结构
Data buffers[2];

// 当前读的buffer的index
std::atomic<int> read_index(0);

// 写线程,更新备份buffer再切换flag
void writer_thread() {
//初始化为备用Buffer的index,这个变量只有写线程用,无需同步手段
int current_write_idx = !read_index;

while (true) {
// 模拟写数据(写入备份buffer)
buffers[current_write_idx].vec.clear();
for (int i = 0; i < 10; ++i) {
buffers[current_write_idx].vec.push_back(i);
}

// 写完切换flag给读线程
read_index.store(current_write_idx, std::memory_order_release);

// 一个周期后再写另一个buffer
current_write_idx = !current_write_idx;

std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}

// 读线程
void reader_thread() {
while (true) {
// 开始读取flag
int cur_read_idx = read_index.load(std::memory_order_acquire);

// 模拟读操作(读时间过长可能造成跨越期间数据被修改)
for (auto val : buffers[cur_read_idx].vec) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); //模拟长耗时读(此时可能会被写线程翻转)
}

std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}

int main() {
// 启动读写线程
std::thread w(writer_thread);
std::vector<std::thread> rs;
for (int i = 0;i < 10;i++) {
std::thread r(reader_thread);
rs.push_back(std::move(r));
}

w.join();
for (auto &r : rs) {
r.join();
}

return 0;
}

正常来说读写不会操作同一份数据,发生冲突,需要读操作耗时特别大,超过了一次切换时间才有可能

1 read_index = 0
2 正在读取buffer[read_index = 0]的数据
3 第N轮写入,write_index = 1
4 写入buffer[write_index = 1]数据
5 ...
6 写完buffer[write_index = 1]数据
7 第N+1轮写入,write_index = 0
8 还在读取buffer[read_index = 0]的数据 写入buffer[write_index = 0]数据

多读少写(RCU,Read-Copy-Update)

根据wiki的定义,https://en.wikipedia.org/wiki/Read-copy-update:

In computer science, read-copy-update (RCU) is a synchronization mechanism that avoids the use of lock primitives while multiple threads concurrently read and update elements that are linked through pointers and that belong to shared data structures (e.g., linked lists, trees, hash tables).[1]

在计算机科学中,读-复制-更新 (Read-Copy-Update,RCU) 是一种同步机制,用于在多线程并发地读取和修改共享数据结构(例如链表、树、哈希表)中通过指针连接的元素时,避免使用锁原语(lock primitives)。

Whenever a thread is inserting or deleting elements of data structures in shared memory, all readers are guaranteed to see and traverse either the older or the new structure, therefore avoiding inconsistencies (e.g., dereferencing null pointers).[1]

当某个线程正在向共享内存中的数据结构插入或删除元素时,所有读取线程都保证看到并遍历或旧的结构,或新的结构,因此能避免数据不一致的问题(例如使用空指针导致的段错误)。

It is used when performance of reads is crucial and is an example of space–time tradeoff, enabling fast operations at the cost of more space. This makes all readers proceed as if there were no synchronization involved, hence they will be fast, but also making updates more difficult.

RCU机制适用于那些读取性能至关重要的场景,这是空间与时间进行权衡的一个典型例子,它的高速运行是以占用更多内存空间为代价的。所有读线程的执行过程都如同不含任何同步机制一样,因此读速度非常快;但与之相对,这种设计也使得更新操作变得更为复杂和困难。

顾名思义,“读-拷贝-更新”,简单来说,就是读线程不加锁,随意读,但更新数据的时候,需要先复制一份副本,在副本上完成修改,再一次性地替换旧数据

举一个链表的例子,假设初始链表:

1
2
3
[1,2,3] --> [5,6,7] --> [11,4,8] --> NULL
^
p

Step 1. 为更新节点分配新内存空间(copy):

1
2
3
4
5
6
7
[1,2,3] --> [5,6,7] --> [11,4,8] --> NULL
^
p

[5,6,7] (copy)
^
q (新节点copy, 尚未链接)

Step 2. copy数据及关系(p节点的数据和next关系拷贝到q):

1
2
3
4
5
6
7
[1,2,3] --> [5,6,7] --> [11,4,8] --> NULL
^
p

[5,6,7] --> [11,4,8]
^
q

Step 3. 对copy后的数据进行修改 (写操作):

1
2
3
4
5
6
7
[1,2,3] --> [5,6,7] --> [11,4,8] --> NULL
^
p (原节点未变,读者仍可能访问)

[5,2,3] --> [11,4,8]
^
q (修改后新的节点尚未发布, reader不可见)

Step 4. publish (发布新节点, 更新前一个节点的next指针,使reader可见):

1
2
3
4
5
6
7
[1,2,3] --> [5,2,3] --> [11,4,8] --> NULL
^
q (publish后,新reader访问此新数据)

[5,6,7] --> [11,4,8]
^
p (旧reader仍可持有该节点引用使用旧数据)
  • 新reader(在publish之后开始)看到 [5,2,3]
  • 旧reader(publish之前开始)看到 [5,6,7]

Step 5. 等待所有旧reader结束读取后, writer释放旧节点([5,6,7]):

1
2
3
4
5
[1,2,3] --> [5,2,3] --> [11,4,8] --> NULL
^
q

(节点 p [5,6,7] 被安全释放)
  • 每一步publish阶段之前读者看到旧节点数据,publish阶段之后新的读者看到更新后的新节点数据。
  • 只有当所有旧reader完成时,才释放旧节点数据,这就是RCU的Grace Period(宽限期)。
内核态方案

核心设计思想是:

  • 读操作不加锁,只需临时关闭抢占(preempt)后读取数据。一旦读操作完成,CPU重新恢复抢占并可能发生进程切换。因此,如果某个CPU发生了一次进程切换(称为进入一次"静默状态",quiescent state),就说明在此之前开始的读操作必定已全部结束。
  • 写操作则需要等待,直到确认所有可能使用旧数据的CPU都发生过一次进程切换,即所有读者离开了可能访问被修改数据的临界区,才能真正释放旧数据内存空间。

为了实现此机制,Linux内核引入了两个关键元素:

  • 一个表示当前等待状态的标记(用于表示写者从什么时候开始进入挂起状态);
  • 每个CPU上对应的一个变量(per-CPU变量),表示这个CPU是否经历过一次"静默状态"。

这样,当写线程完成对数据副本的更改,准备释放旧数据时:

  1. 所有CPU对应的 per-CPU 变量(初始状态)都被统一重置为0;
  2. 随着每个CPU经历一次进程切换(即经历一个quiescent state),它就会把自己的 per-CPU 变量标记为1;
  3. 当确认所有CPU的 per-CPU 变量全部都为1时,说明所有旧数据的读操作已完成(确保没有读者再引用旧数据),此时就可以安全地释放旧数据,并唤醒被挂起的写者线程继续运行。

细节可以看参考资料:

Linux 内核:RCU机制与使用

Linux中的RCU机制[一] - 原理与使用方法

无锁编程—RCU

Read-Copy Update,向无锁编程进发!

用户态方案
C版本

最出名的就是liburcu库了,他有很多种方案,其中性能最高的是qsbr,这也是对代码侵入性最高的方式

为了实现Grace Period,在qsbr里面,Grace Period是用一个全局的unsigned long(64 bits)的counter——rcu_gp来表示。 每新开始一个Grace Period,就往这个counter上加一。所以这个数值我们可以称之为gp号。

而对于每个读线程,都会有一个rcu_reader结构,这个结构里面存着最近一次的gp号缓存,以及一些额外的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
struct rcu_gp {
unsigned long ctr;
int32_t futex;
} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
extern struct rcu_gp rcu_gp;

struct rcu_reader {
/* Data used by both reader and synchronize_rcu() */
unsigned long ctr;
struct cds_list_head node
__attribute__((aligned(CAA_CACHE_LINE_SIZE)));
int waiting;
pthread_t tid;
unsigned int registered:1;
};
extern DECLARE_URCU_TLS(struct rcu_reader, rcu_reader);

在qsbr里面,read_lockread_unlock都不会改变本线程的gp缓存,只有在rcu_quiescent_state()调用的时候,会从全局的rcu_gp里面获取最新的gp号,更新到本线程缓存。

当写线程执行到synchronize_rcu()的时候,实际上就会先把rcu_gp加一,然后等待所有的读线程的gp缓存都等于最新的gp号,然后才返回。这也就是qsbr实现的Grace Period机制。

因此qsbr的使用方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
struct Foo { int a, b, c, d; };

void ReadThreadFunc() {
struct Foo* foo = NULL;
int sum = 0;
rcu_register_thread();
for (int i = 0; i < 100000000; ++i) {
for (int j = 0; j < 1000; ++j) {
rcu_read_lock();
foo = rcu_dereference(gs_foo);
if (foo) {
sum += foo->a + foo->b + foo->c + foo->d;
}
rcu_read_unlock();
}
rcu_quiescent_state();
}
rcu_unregister_thread();
}

void WriteThreadFunc() {
while (!gs_is_end) {
for (int i = 0; i < 1000; ++i) {
struct Foo* foo =
(struct Foo*) malloc(sizeof(struct Foo));
foo->a = 2; foo->b = 3;
foo->c = 4; foo->d = 5;
rcu_xchg_pointer(&gs_foo, foo);
synchronize_rcu();
if (foo) {
free(foo);
}
}
}
}

这里可以看到几个关键点:

  • 对于读者
    1. 线程开始的时候需要调用rcu_register_thread()进行注册,线程结束的时候需要调用rcu_unregister_thread()进行注销。
    2. 对于共享数据区的访问需要用rcu_read_lock()rcu_read_unlock()来表示临界区。
    3. 对于共享数据的指针,需要用rcu_dereference()来获取。
    4. 线程时不时需要调用rcu_quiescent_state()来声明线程在quiescent state。
  • 对于写者
    1. 新的数据初始化需要在替换指针之前就完成。
    2. 指针替换需要调用rcu_xchg_pointer()来完成。
    3. 替换完数据之后,需要调用synchronize_rcu()来等待Grace Period的结束。
    4. synchronize_rcu()结束之后,我们就可以放心的删除旧数据了。

参考资料:

liburcu,一个用户态的RCU实现

C++版本

本质上,RCU是写者在必要的时候才对旧数据进行内存回收,在C++,这就是智能指针的常见使用场景,很容易实现以下类完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#pragma once
#include <memory>
#include <type_traits>

/**
* SharedPtrSwapInstance 是一个线程安全的工具类,用于实现对象 data 的安全读写切换。
* 使用 shared_ptr 共享指针,配合atomic操作,让reader无需加锁即可安全地并发读取数据。
*
* 模板参数:
* T - 实例类型,通常为用户定义的结构体或类
* isNeedInit - 是否在构造函数中自动初始化一个默认空实例,如果为false则内部pointer初始化为nullptr
*
*/
template<typename T, bool isNeedInit = true>
class SharedPtrSwapInstance
{
std::shared_ptr<const T> v; // 内部指向数据实例的共享指针
public:
/**
* 构造函数,根据isNeedInit决定是否初始化空实例
*/
SharedPtrSwapInstance() : v(initV<isNeedInit>::get()){
}

/**
* 获取当前数据实例(只读),线程安全。
* @return 当前数据实例的共享指针
*/
std::shared_ptr<const T> get() const
{
std::shared_ptr<const T> oldV = std::atomic_load(&v);
return oldV;
}

/**
* 通过右值引用移动语义设置新的实例对象,线程安全。
* 仅当T具有移动语义构造函数时(即可移动构造)才启用此方法。
* @param t 待设置的新实例(rvalue引用)
*/
template <typename T1 = T>
void set(T && t,
typename std::enable_if<std::is_move_constructible<T1>::value>::type* = 0)
{
auto newV = std::make_shared<const T>(std::move(t));
std::atomic_store(&v, newV);
}

/**
* 通过shared_ptr直接设置新的实例对象,线程安全。
* @param newV 待设置的新实例的shared_ptr (const实例)
*/
void set(std::shared_ptr<const T> &newV) {
std::atomic_store(&v, newV);
}

private:
/**
* 根据isNeedInit的模板特化决定内部pointer是否初始化为空实例
*/
template <bool, typename dummy = void>
struct initV {
static std::shared_ptr<const T> get() {
return std::make_shared<const T>();
}
};
template <typename dummy>
struct initV<false, dummy> {
static std::shared_ptr<const T> get() {
return nullptr;
}
};
};

读方(reader)用法:

1
2
3
4
5
auto data = instance.get(); 
// 获得数据实例的shared_ptr,之后即可安全地访问data内数据
if(data){
// 使用data->访问实例成员,例如data->value
}

写方(writer)用法:

1
2
3
4
5
6
7
// 方法一:通过移动赋值
MyData newData{/*构造参数*/};
instance.set(std::move(newData));

// 方法二:通过shared_ptr直接赋值
auto newDataPtr = std::make_shared<MyData>(/*构造参数*/);
instance.set(newDataPtr);

需要注意的是,这个简单case用的C++标准的atomic函数操作的智能指针,实际上是有锁的

在极端性能场景下,需要使用facebook实现的原子智能指针:https://github.com/facebook/folly/blob/main/folly/concurrency/AtomicSharedPtr.h:

在64位(x64)系统中,内存地址实际上只有48个有效位,其余高16位通常未使用(操作系统与硬件结构所决定)。因此,一个64位指针表示时,可利用高位或低位的某些bit存储额外的信息,这种技术被称为"指针打包"(pointer packing)或"寄生标记"(pointer tagging)。

folly中的atomic_shared_ptr正是利用了这一点:

  • std::shared_ptr通常要两个8字节field(总16字节),分别存放指针与控制块引用计数。但folly巧妙地将整个结构压缩至8个字节(64位)。
  • folly利用了x64架构指针位数的特点:有效指针地址只占用48位,还剩余16位空间可以利用。
  • 在folly的atomic_shared_ptr实现中,其中1个额外的bit用来实现folly::PicoSpinLock——一种空间成本极小的自旋锁(spinlock)。
  • 该自旋锁用于极短暂且不频繁的写操作,从而保证写入时的线程安全。
  • 同时,由于是寄生在已有的8字节里,因此整体结构依然只有一个机器字(64位),无需额外空间开销。

总结来说,folly的atomic_shared_ptr之所以能做到只有8字节,是通过巧妙利用x64指针高位闲置的bit,塞入额外的同步控制信息(spinlock bit),实现高效且低内存占用的原子共享指针方案。

参考资料:

为什么C++标准库中atomic shared_ptr不是lockfree实现?

Go和C++基于 RCU 实现无锁并发读写

Go版本

由于 Go 语言自带 GC,因此我们无需操心数据副本,可以用比C++更简单的方式实现无锁 RCU:

定义数据:

1
2
3
4
5
6
7
import "sync/atomic"

type Data struct {
Value int
}

var current atomic.Pointer[Data]

写方:

1
2
3
// 新建数据对象并原子更新
newData := &Data{Value: 42}
current.Store(newData)

读方:

1
2
3
4
5
// 无锁读取,直接获取当前数据副本
data := current.Load()
if data != nil {
fmt.Println(data.Value)
}

要点说明:

  • 借助于 Go 自带的 GC,RCU 的废弃数据副本无需手动释放。
  • 写方新建数据后调用 atomic.Store()替换老数据。
  • 读方调用atomic.Load()安全地读取当前数据,无需锁、零成本并发读取。

少读多写,多读多写(注意读写锁陷阱)

这就基本没什么好办法了

只能用互斥锁或者读写锁

有一个误区是这种情况下读写锁一定比互斥锁好

实际上,读写锁应该慎用,在写较多的场景下性能很差,应该只有写优先的需求才应该使用

网上也有这样的讨论:

https://stackoverflow.com/questions/50972345/when-is-stdshared-timed-mutex-slower-than-stdmutex-and-when-not-to-use-it

https://stackoverflow.com/questions/14306797/c11-equivalent-to-boost-shared-mutex/45580208#45580208

例如我测试读写比例1:1的场景下,读写锁和互斥锁相同计算量下的总耗时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include <iostream>
#include <mutex>
#include <shared_mutex>
#include <thread>
#include <vector>
#include <chrono>

const int LOOP_TIMES = 100000;
const int THREAD_NUM = 200;

int data, tmp;

std::mutex mtx_m;
std::shared_mutex mtx_sm;

void mutexRead() {
for (int i = 0; i < LOOP_TIMES; ++i) {
std::lock_guard<std::mutex> lock(mtx_m);
tmp = data;
}
}

void mutexWrite() {
for (int i = 0; i < LOOP_TIMES; ++i) {
std::lock_guard<std::mutex> lock(mtx_m);
++data;
}
}

void sharedMutexRead() {
for (int i = 0; i < LOOP_TIMES; ++i) {
std::shared_lock<std::shared_mutex> lock(mtx_sm);
tmp = data;
}
}

void sharedMutexWrite() {
for (int i = 0; i < LOOP_TIMES; ++i) {
std::unique_lock<std::shared_mutex> lock(mtx_sm);
++data;
}
}

template <typename Func>
void test(const char *info, Func read, Func write) {
auto t1 = std::chrono::high_resolution_clock::now();
std::vector<std::thread> workers;
for (int i = 0; i < THREAD_NUM / 2; i++) {
workers.emplace_back(read);
workers.emplace_back(write);
}
for (auto &th : workers) {
th.join();
}
auto t2 = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count();
std::cout << info << " takes " << duration << " ms" << std::endl;
}

int main() {
test("Mutex operation", mutexRead, mutexWrite);
test("Shared Mutex operation", sharedMutexRead, sharedMutexWrite);
return 0;
}

编译运行:

1
2
3
4
~ g++ -g -Wall -std=c++17 -O2 mutex_vs_shared_mutex.cpp
~ ./a.out
Mutex operation takes 4371 ms
Shared Mutex operation takes 29615 ms

这主要是因为读写锁的内部实现机制通常较复杂。除了维护互斥(mutex)之外,还需额外维护读者数量等计数器。

部分更新(索引数据结构)

要做到部分更新,是需要某个key来定位到某个value的,因此一般用于红黑树或者哈希表这样的索引数据结构

多读多写(定时淘汰或不删除)

只锁容器本身,取出数据就可以不用加锁了,因为一旦更新了时间,要过很久才有可能删除,对单条数据再应用乐观锁或悲观锁

例如我帮业务实现的一个用于统计内存泄露的计数类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// 本实现采用了「只锁容器本身」的策略:对容器整体的增删(如map插入新条目)等操作时才需要加锁。
// 一旦某个数据项被访问者读取出来后,由于数据本身会被长时间保留,只要拿到了数据项的引用,后续对该单条数据的读写操作,就无需再额外对容器加锁了。
// 单条数据本身再通过乐观或悲观锁(例如std::atomic或std::mutex)来保证单独项的并发安全。
// 因此整体设计中,只有当对容器增加或删除条目时才需短暂对容器本身加锁,访问单个数据条目的大多数操作都能做到完全无锁,高效安全。
#include <map>
#include <string>
#include <atomic>
#include <mutex>
#include <thread>
#include <iostream>
#include <unistd.h>
#include <vector>

using namespace std;

class ClassCounterGlobalInstance {
map<string, atomic<int64_t>> m_;
mutex mtx_;
public:
ClassCounterGlobalInstance() {
thread t([this]() {
for(;;) {
map<string, int64_t> m;
{
unique_lock<mutex> lock(mtx_);
for (auto &kv : m_) {
m[kv.first] = kv.second;
}
}
for (auto &kv : m) {
cout << kv.first << ":" << kv.second << endl;
}
sleep(1);
}
});
t.detach();
}
atomic<int64_t>* getByClassName(const string& name) {
unique_lock<mutex> lock(mtx_);
return &m_[name];
}
static ClassCounterGlobalInstance* getInstance() {
static ClassCounterGlobalInstance instance;
return &instance;
}
};

template <typename T>
class ClassCounter {
static atomic<int64_t>& getCounter() {
static atomic<int64_t> *counter =
ClassCounterGlobalInstance::getInstance()->
getByClassName(typeid(T).name());
return *counter;
}
public: //三五零法则,防止子类的默认成员函数被删除
ClassCounter() {
getCounter()++;
}
virtual ~ClassCounter() {
getCounter()--;
}
ClassCounter(const ClassCounter&) {
getCounter()++;
}
ClassCounter(ClassCounter&&) noexcept {
getCounter()++;
}
ClassCounter& operator=(const ClassCounter&) noexcept {
return *this;
}
ClassCounter& operator=(ClassCounter&&) noexcept {
return *this;
}
};

使用方法:

  1. 每个主要类继承这个Counter类

    1
    2
    3
    4
    5
    class Foo : public ClassCounter<Foo> {
    };

    class Foo1 : public ClassCounter<Foo1> {
    };
  2. 随后正常使用这个类就行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    int main() {
    vector<thread> threads;

    for(int i = 0; i < 10; ++i) {
    threads.emplace_back([](){
    vector<Foo> fs;
    for(int j = 0; j < 1000000; ++j) {
    for (int k = 0;k < rand() % 1000;k++) {
    fs.push_back(Foo{});
    }
    usleep(1000);
    fs.clear();
    }
    });
    }

    for(int i = 0; i < 10; ++i) {
    threads.emplace_back([](){
    vector<Foo1> fs;
    for(int j = 0; j < 1000000; ++j) {
    for (int k = 0;k < rand() % 1000;k++) {
    fs.push_back(Foo1{});
    }
    usleep(1000);
    fs.clear();
    }
    });
    }

    for(auto& t : threads) {
    t.join();
    }
    }

多读一写(内存缓存定时更新)

缓存和数据库配合使用的场景经常出现在各类后台服务。比如服务程序启动时从数据库里加载大量数据到内存里定时更新,供后续频繁快速地读取,这叫做“内存缓存”。

还有两种常见的缓存的问题:

  • cpu缓存和内存

    和该问题相比,因为服务间rpc代价过高(包括失败代价),所以不可能通过类似MESI协议来进行同步

  • 独立缓存(redis)和数据库

    和该问题相比,多个服务节点的内存缓存场景下写入数据库以后,还是因为服务间rpc代价过高(包括失败代价),没办法操作全部服务的内存缓存,只能让内存缓存自己做定时加载

缓存通过定时加载到内存的方案,如果有的查询缓存里没找到怎么办呢?通常有两种做法:

  • 不允许“穿透”到数据库:没有就说明不存在,不再去数据库查询。

    那么使用全量更新提到的双Buffer是最佳的,RCU也不是不行

  • 允许“穿透”到数据库:缓存里没有就去数据库里查,查到的数据再存进缓存供下次查询使用。

    这就是本节重点,要根据并发情况单独分析了

单key低并发情况

这种情况下实现比较简单,可以做到更高性能

当缓存查不到,我们就:

  1. 先加个锁(防止多个线程重复去数据库取相同的数据),再去临时缓存里看看这个数据之前是否已经有人从数据库拿过了;
  2. 如果临时缓存也没数据(说明确实没人查过),那就解除锁,然后去数据库取;
  3. 数据库取回来后再加个锁,存回临时缓存。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//多个读线程
v = getBuffer(key)
if (!v) {
lock
if v = getTempCache(key); v {
unlock
return v;
}
unlock
//并发高的时候,会有很大rpc穿透
if v = getRemote(key); v {
lock
insertTempCache(key, v);
unlock
return v;
}
}

//一个定时写线程
while(1) {
construct all map
swap to Buffer
lock
cleanTempCache();
unlock
}

这个方案是有锁的,每用到一个锁第一时间要先想下能不能用thread_local避免锁

显然这里是可以的,但是这样就不能由定时写线程去清理tempCache了

我们可以这样优化:

  • 每个线程自己保存一个小的临时缓存,这样就无需加锁;
  • 当全局缓存里数据刷新后且能查到某个key了,我们就再从线程临时缓存中删掉对应的数据(即,线程缓存中数据被全局缓存取代了);
  • 如果全局缓存也找不到,再去自己线程的临时缓存中找;
  • 自己线程临时缓存也没有,就去数据库取回来,再保存到线程临时缓存,下次再用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 假设定义了线程局部的临时缓存
thread_local TempCache thread_temp_cache;

auto data = double_buffer_get(key);
if (data) {
thread_temp_cache.erase(key); //双buffer能拿到,清掉线程局部缓存
return data;
}

data = thread_temp_cache.get(key);
if (data) {
return data;
}

//并发高的时候,会有很大rpc穿透
auto rpc_data = rpc_call_and_construct(key);
if (rpc_data) {
thread_temp_cache.insert(key, rpc_data);
}
return rpc_data;

//一个定时写线程
while(1) {
construct all map
swap to Buffer
}
单key高并发情况

如果一个key同时有大量线程请求,那么上面那个方案可能会导致大量重复的数据库请求(多个线程同时去数据库里取同样的数据,浪费资源)。

这种情况下我们需要引入更加细粒度的“key级锁”,理想情况下是引入“协程锁”,这样才能让多个同时访问同一key的请求只有一个请求去数据库取数据,其他请求则等待该请求完成后直接使用结果。

更通俗地讲,这个方案是:

  • 每个key单独给它弄一个锁(协程锁,性能比线程锁更高);
  • 只有真正第一次访问key且缓存中没有数据的时候,我们才对这个key上锁,去数据库取数据;
  • 后面所有其他同时访问该key的请求,会等待锁释放之后直接读到缓存数据,因此都不需要再去数据库请求数据;
  • 因此整个方案的锁竞争几乎只有第一次,后续大量请求都是没有锁(或短暂等待锁即可)并且不会重复请求数据库。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
struct CacheEntry {
coroutine_mutex co_mtx; // 协程mutex而非线程mutex
shared_ptr<const RspT> data;
};

mutex globalMutex; // 这里仍使用普通mutex保护map结构
unordered_map<KeyT, shared_ptr<CacheEntry>> globalMap;

shared_ptr<const RspT> coroutine_get(const KeyT &key) {
shared_ptr<CacheEntry> entry;

{
lock_guard<mutex> lock(globalMutex);
auto it = globalMap.find(key);
if (it == globalMap.end()) {
entry = make_shared<CacheEntry>();
globalMap[key] = entry;
} else {
entry = it->second;
}
}

coroutine_unique_lock lock(entry->co_mtx); // 协程锁
if (entry->data) {
return entry->data;
}

// 缓存未命中,直接在协程锁环境下rpc
bool ok = false;
auto rsp = co_await rpc_func(key, ok); // rpc_func 是协程式非阻塞异步rpc
if (ok) {
entry->data = make_shared<const RspT>(move(rsp));
}

return entry->data;
}

允许穿透的场景下可以做到无锁或低粒度锁

不允许穿透的场景下,不能使用双buffer了,但是全局锁的获取只有一瞬间,其他都是用的key锁

这种情况下不能用双Buffer本质上是因为读的地方发现不存在要马上写入,所以并不是多读一写的场景

多读少写(旧版sync.Map,已淘汰)

核心思想:

read好比整个sync.Map的一个“高速缓存”,当goroutine从sync.Map中读数据时,sync.Map会首先查看read这个缓存层是否有用户需要的数据(key是否命中),如果有(key命中),则通过原子操作将数据读取并返回,这是sync.Map推荐的快路径(fast path),也是sync.Map的读性能极高的原因。

  • 新增key操作:直接写入dirty(负责写的map)

  • 更新已经存在的key:

    先读read(负责读操作的map),读到的value通过原子操作写入

    没有再读dirty(负责写操作的map),读到的value通过原子操作写入

  • 读操作:先读read(负责读操作的map),没有再读dirty(负责写操作的map)

再具体一些:

两个map的value都是指向同一个atomic变量指针

读read的Map不加锁,因为不更新map本身,只用cas更新其中的字段

读写dirty的Map加锁,用于更新map本身来新增和删除key,当miss到一定程度的时候,直接cas赋值给read的Map

但是这需要dirty的Map拥有全量数据,因此dirty被赋值过以后,第一次写入需要全量拷贝给read的Map

  • p的状态

    由于read的Map不能删除,因此需要删数据的时候得给它标记状态nil

引入p复杂状态原因:

  • expunged是为了实现nil不得已引入的状态,实现nil状态是为了在同一个key先删后写的场景下能直接cas搞定

  • 考虑不设计nil状态,那么有两种情况

    • keyA存在dirty,不存在read

      先删除,需要加锁操作dirty

      后写入,需要加锁操作dirty

    • keyA同时存在两者

      先删除,需要加锁操作dirty和read

      后写入,需要加锁操作dirty

  • 在nil状态设计下

    先删除:

    • 只要read存在,直接cas就行
    • read不存在,才需要操作锁

    后写入:

    • keyA是nil状态的时候:

      要么read和dirty同时为nil,要么read为nil,dirty不存在

      不管什么情况,都直接cas就搞定了

    • keyA是expunged才需要操作锁

参考资料:

Golang sync.Map 实现原理

Golang sync.Map 原理(两个map实现 读写分离、适用读多写少场景)

缺点

sync.Map设计得比较复杂,它为了提高效率同时维护了两个内部数据结构(一个叫readOnly只读map,一个叫dirty可读写map)。

这种设计的核心思想是:

  • 大部分时候,读操作都只访问readOnly,这个操作无锁,性能很高;
  • 当发现readOnly没数据时候,才会访问dirty,读取dirty的时候必须加全局锁,性能较差。

但问题是,Go 的官方并 没有明确的建议 告诉我们,“读写比例”在什么情况下用sync.Map更合适。

尤其像上文提到的多读一写(内存缓存定时更新)这种场景里,情况会更麻烦:

比如:

  • 当定时线程刷新缓存数据时,miss程度不够而没有更新readOnly进行提升。
  • 而此时readOnly里的数据可能是不完整或者旧数据(amend标记为true)。
  • 此后有大量的读操作查找数据,都会因为数据缺失而“穿透”到dirty map中去读取(加全局锁)。
  • 因此性能就会迅速下降。有大量的请求被迫串行地等待全局锁,直到下一次“提升”发生为止。

更简单直白一点说:

sync.Map设计得不好预测,正常时非常快,但一旦它的优化机制没及时触发,就会导致短时间内性能严重下降。

在对性能要求更高、更平稳的系统中,谨慎考虑使用sync.Map,需要时,应结合你的具体业务场景先充分进行压力测试。

通用场景(新版sync.Map)

go官方应该很早就注意到这个问题了,在最近终于优化了这个问题

在2025年2月11日的1.24的release中,对map有两个改进

sync.Map的改动是本节重点:完整的实现是https://github.com/golang/go/blob/master/src/internal/sync/hashtriemap.go

HashTrieMap 结合了哈希表和前缀树(Trie)的特点,构建了一个多层级的树状结构。其核心思想是将键的哈希值分段,逐层导航至目标节点,从而实现高效的并发访问。

结构组成:

  • 内部节点(Indirect Nodes):每个内部节点包含一个子节点数组(golang实现为2^4 = 16),这些节点根据哈希值的不同部分,将键值对分散到不同的路径上。
  • 叶子节点(Entry Nodes):叶子节点存储实际的键值对。如果多个键的哈希值在当前层级冲突,它们会被存储在同一个叶子节点的溢出列表中。
1
2
3
4
5
6
7
8
9
10
11
12
13
根 (root) indirect node [hash shift: 64 → 60]

├─ indirect node [hash shift: 60 → 56](第1个子节点)
│ ├─ ...(更多 indirect node,直到第15层)
│ │
│ └─ indirect node [第15层, hash shift: 4 → 0]
│ ├─ entry node(key1, val1) → overflow链(处理key hash完全相同的冲突节点)
│ ├─ entry node(key2, val2)
│ └─ 更多 entry nodes (或为空)
├─ ....(更多indirect node)

└─ indirect node [hash shift: 60 → 56](第16个子节点)
└─ ... (类似上面结构)

操作流程:

  • 查找(Load):计算键的哈希值,按 5 位一组,从根节点开始逐层查找对应的子节点,直到找到目标键或确定其不存在。
  • 插入(Store):与查找类似,定位到目标位置后,插入新的键值对。如果发生哈希冲突,新的键值对将被添加到对应叶子节点的溢出列表中。
  • 删除(Delete):定位到目标键所在的叶子节点,移除对应的键值对。如果删除操作导致某些节点变为空,会触发结构的收缩,以释放资源。

原理比起旧版sync.Map还是很容易理解的

性能测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package main

import (
"strconv"
"sync"
"testing"
"sync/atomic"
)

// 测试的数据总数目
const testSize = 10000000

// Benchmark sync.Map与Mutex+map模式性能对比
func BenchmarkMap(b *testing.B) {
ratios := []struct {
read, write int
}{
{10, 1},
{10, 10},
{1, 10},
}

for _, r := range ratios {
read, write := r.read, r.write
b.Run("sync.Map_"+strconv.Itoa(read)+"_"+strconv.Itoa(write), func(b *testing.B) {
benchmarkSyncMap(b, read, write)
})
b.Run("MutexMap_"+strconv.Itoa(read)+"_"+strconv.Itoa(write), func(b *testing.B) {
benchmarkMutexMap(b, read, write)
})
}
}

// 测试sync.Map
func benchmarkSyncMap(b *testing.B, readRatio, writeRatio int) {
var m sync.Map
for i := 0; i < testSize; i++ {
m.Store(i, i)
}

var wg sync.WaitGroup
total := readRatio + writeRatio

b.ResetTimer()
for i := 0; i < total; i++ {
if i < readRatio {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < b.N; j++ {
key := j % testSize
m.Load(key)
}
}()
} else {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < b.N; j++ {
key := j % testSize
m.Store(key, j)
}
}()
}
}
wg.Wait()
}

// 测试Mutex+普通map
func benchmarkMutexMap(b *testing.B, readRatio, writeRatio int) {
m := make(map[int]int, testSize)
var mu sync.RWMutex

for i := 0; i < testSize; i++ {
m[i] = i
}

var wg sync.WaitGroup
total := readRatio + writeRatio

b.ResetTimer()
for i := 0; i < total; i++ {
if i < readRatio {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < b.N; j++ {
key := j % testSize
mu.RLock()
_ = m[key]
mu.RUnlock()
}
}()
} else {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < b.N; j++ {
key := j % testSize
mu.Lock()
m[key] = j
mu.Unlock()
}
}()
}
}
wg.Wait()
}

结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
~ go test -bench=.
goos: linux
goarch: amd64
pkg: github.com/tedcy/go_test
cpu: Intel(R) Xeon(R) Platinum 8369B CPU @ 2.90GHz
BenchmarkMap/sync.Map_10_1-14 2790790 474.1 ns/op
BenchmarkMap/MutexMap_10_1-14 820602 1664 ns/op
BenchmarkMap/sync.Map_10_10-14 1000000 1099 ns/op
BenchmarkMap/MutexMap_10_10-14 358938 3389 ns/op
BenchmarkMap/sync.Map_1_10-14 1741180 656.4 ns/op
BenchmarkMap/MutexMap_1_10-14 458814 2671 ns/op
PASS
ok github.com/tedcy/go_test 94.393s

再试试关闭新版sync.Map

1
2
3
4
~ GOEXPERIMENT=nosynchashtriemap go test -bench=.
BenchmarkMap/sync.Map_10_1-14 451330 3086 ns/op
BenchmarkMap/sync.Map_10_10-14 193680 6930 ns/op
BenchmarkMap/sync.Map_1_10-14 352400 3725 ns/op

总结一下图表:

测试场景(读:写) sync.Map (默认新版开启HashTrie) sync.Map (旧版, 关闭HashTrie) Mutex+Map
10 : 1 474.1 ns/op 3086 ns/op (慢6.5倍) 1664 ns/op
10 : 10 1099 ns/op 6930 ns/op (慢6.3倍) 3389 ns/op
1 : 10 656.4 ns/op 3725 ns/op (慢5.7倍) 2671 ns/op

结论:

  • 新版sync.Map(HashTrie)显著优于其他所有实现,综合看,在绝大部分并发场景中,应使用新版默认的sync.Map (HashTrie)。
  • 旧版sync.Map性能大幅下降,相比默认开启的HashTrie慢约6倍,在所有场合下,甚至不如普通Mutex