锁实现分析:从glibc到futex(一)
之前在理解条件变量时在futex上误入歧途,导致犯下死锁的错误,并且写下了错误的博文,见浅析条件变量
所以打算重新理解一下futex
虽然初衷是futex,但是由于futex最初是为了优化锁的性能而提出,因此深入理解futex实际上是对锁概念的深入了理解
因此本篇的大纲
源码分析从c++的mutex开始,然后是glibc的nptl库pthread_mutex,接着是linux内核的futex实现
在源码分析完成以后,第二篇锁实现分析:从glibc到futex(二)尝试对锁这个概念进行深入的分析,说明futex的诞生原因
C++的std::mutex源码分析
先看下g++版本
1 | g++ --version |
然后在/usr/include/c++/9/bits/std_mutex.h
有如下代码,做了一些可读性调整
1 | class __mutex_base { |
可以看到c++的std::mutex几乎没有任何逻辑,就只是简单的对pthread的封装
持有了一个pthread_mutex_t类型的_M_mutex,并对其pthread_mutex_lock和pthread_mutex_unlock
glibc的mutex源码分析
pthread的实现实际上是glibc的NPTL代码
NPTL
是Native POSIX Thread
Library的缩写。这是Linux的一个线程库,它提供了一种实现POSIX线程(或称为Pthreads)的方法。NPTL被设计为可以提供高性能并发,特别是在多处理器和多核心系统中,并提供与POSIX标准更一致的行为。
NPTL是作为早期Linux线程实现LinuxThreads
的替代品在2002年由IBM开发的,现在已经become了Linux系统上的默认POSIX线程库。在NPTL中,每个线程都是一个系统调度的实体,有独立的进程ID,这允许更有效率更一致的线程同步和并发。
像pthread_create这样的Pthreads API函数,实际上就是在使用NPTL(除非在非常老的Linux系统上可能是使用LinuxThreads)
这部分代码可以在https://github.com/bminor/glibc/tree/glibc-2.25/nptl中查看
首先查看x86上的pthread_mutex_t结构,代码在https://github.com/bminor/glibc/blob/glibc-2.25/sysdeps/x86/bits/pthreadtypes.h#L128
主要看x86_64的代码
1 | typedef union |
pthread_mutex_t是对__pthread_mutex_s的union跨平台封装
其中__size[__SIZEOF_PTHREAD_MUTEX_T]
,这是用于标识__pthread_mutex_s的大小的
1 | #ifdef __x86_64__ |
而long int __align
则是为了确保整个
pthread_mutex_t
结构体在内存中根据 long int
的对齐规则进行对齐,以提高cpu的运行效率
所以核心部分就在__pthread_mutex_s
上了
1 | struct __pthread_mutex_s |
__lock
字段是这部分的重点内容了不是简单的0,1表示是否上锁,还有个2状态表示发生了竞争,需要陷入内核进行裁决
__kind
字段的1-7位表示锁类型,例如默认的非递归锁PTHREAD_MUTEX_NORMAL(等价于PTHREAD_MUTEX_TIMED_NP,见这里),以及可重入锁PTHREAD_MUTEX_RECURSIVE。而8位表示互斥锁是被进程内的线程共享,还是跨进程共享。一般使用
pthread_mutexattr_setpshared
函数来设置该属性。如果是进程间的同步,需要开辟一个共享内存,将mutex放入共享内存中,然后不同进程才能操作同一个mutex。另外还有一些状态位是用于标识是否支持优先级继承等
pthread_mutex_lock
在https://github.com/bminor/glibc/blob/glibc-2.25/nptl/pthread_mutex_lock.c#L600可以看到
pthread_mutex_lock
是一个指向__pthread_mutex_lock
的强符号(这是一个gcc的概念,强符号声明可以覆盖弱符号,当不存在强符号时才会使用弱符号,详见Weak symbol)
1 |
|
在默认的锁类型PTHREAD_MUTEX_NORMAL(PTHREAD_MUTEX_TIMED_NP)下,https://github.com/bminor/glibc/blob/glibc-2.25/nptl/pthread_mutex_lock.c#L63所运行的流程如下
1 |
|
可以看到除了事务性内存看不太明白,这里大部分都是一些边缘逻辑,写入owner变量,设置探测点等等,核心逻辑全在LLL_MUTEX_LOCK指向的lll_lock中了
而关于其中的事务性内存,首先需要打开gcc选项-fgnu-tm
才能使用,其次相关的FORCE_ELISION宏的逻辑,最终执行的代码LLL_MUTEX_LOCK_ELISION实际上已经废弃
1 |
|
lll_lock
在https://github.com/bminor/glibc/blob/glibc-2.25/sysdeps/nptl/lowlevellock.h#L88中定义
1 | /* If FUTEX is 0 (not acquired), set to 1 (acquired with no waiters) and |
atomic_compare_and_exchange_bool_acq
是glibc的bool类型的CAS宏
原型大致是
1 | atomic_compare_and_exchange_bool_acq(type *ptr, type desired, type *expected) |
因此当__futex
- 如果为0
- 代表目前是未锁定状态
- 那么交换成功,返回0,函数继续运行
- 如果为1或者2(在
__lll_lock_wait_private
设置为2)- 代表目前是锁定状态
- 那么交换失败,返回1,函数进入
__lll_lock_wait(_private)
,进行进程(线程)等待 - 这是因为__futex为1或者2,已经有线程进入临界区,新线程需要进入锁定状态
__lll_lock_wait_private
在https://github.com/bminor/glibc/blob/glibc-2.25/nptl/lowlevellock.c#L26中定义
1 | void __lll_lock_wait_private (int *futex) |
其中lll_futex_wait
在https://github.com/bminor/glibc/blob/glibc-2.25/sysdeps/unix/sysv/linux/lowlevellock-futex.h#L92中定义
1 |
lll_futex_wait就是对syscall调用futex的简单封装,通过传入LLL_PRIVATE来决定调用FUTEX_WAIT_PRIVATE还是FUTEX_WAIT
回到__lll_lock_wait_private函数
- 首先,代码检查
futex
所指向的值是否为2,如果是,那么它调用lll_futex_wait
函数,等待futex
所指向的值不再为2。 - 如果
futex
所指向的值不是2,函数就进入一个循环,在这个循环中,它试图通过原子操作atomic_exchange_acq
将futex
所指向的值设为2。原子操作atomic_exchange_acq
会返回futex
所指向的旧值。 - 如果
atomic_exchange_acq
返回的旧值不为0(意味着锁被占用),那么代码就会再次调用lll_futex_wait
函数,等待futex
所指向的值不再为2。
这里的逻辑要配合unlock一起看才能看得懂,见后续的时序分析
另外,由于虚假唤醒的存在,futex系统调用有可能返回EAGAIN,因此futex系统调用是必须用while循环包住的,这里巧妙的先不进行cas判断来让当前线程陷入futex_wait,从而提高一点性能
pthread_mutex_unlock
pthread_mutex_unlock
是一个指向__pthread_mutex_unlock
的强符号
1 | int __pthread_mutex_unlock (pthread_mutex_t *mutex) { |
lll_unlock
在https://github.com/bminor/glibc/blob/glibc-2.25/sysdeps/nptl/lowlevellock.h#L154中定义
1 | /* Unconditionally set FUTEX to 0 (not acquired), releasing the lock. If FUTEX |
lll_futex_wake和__lll_lock_wait_private不同,没有任何逻辑,在https://github.com/bminor/glibc/blob/glibc-2.25/sysdeps/unix/sysv/linux/lowlevellock-futex.h#L107中定义
1 |
- 首先将
futex
的值设置为 0,表示锁已释放。 - 然后检查如果原先的
futex
值大于 1,即表示锁原本是被获取的(可能有等待者),那么唤醒任何一个等待者
时序分析
对于复杂的多线程情况,进行时序分析是最有助于理解问题的
- 首先是没有发生线程争用的情况
线程1 | 线程2 | |
---|---|---|
1 | lll_lock: if (atomic_compare_and_exchange_bool_acq (futex, 1, 0)) lll_lock_wait_private (futex); 此时futex旧值为0,因此返回0,函数直接退出,进入锁保护的临界区 |
|
2 | lll_unlock: if (atomic_exchange_rel (futex, 0) > 1) lll_futex_wake (futex, 1, private); 此时futex为1,因此返回1,函数直接退出 |
|
3 | 同1 | |
4 | 同2 |
- 接着是发生了线程争用的情况
线程1 | 线程2 | |
---|---|---|
1 | lll_lock: if (atomic_compare_and_exchange_bool_acq (futex, 1, 0)) lll_lock_wait_private (futex); 此时futex旧值为0,因此返回0,函数直接退出,进入锁保护的临界区 |
|
2 | lll_lock: if (atomic_compare_and_exchange_bool_acq (futex, 1, 0)) lll_lock_wait_private (futex); 此时futex旧值为1,因此返回1,进入lll_lock_wait_private |
|
3 | lll_lock_wait_private: while (atomic_exchange_acq (futex, 2) != 0) lll_futex_wait (futex, 2, LLL_PRIVATE); futex被设置为2,futex的旧值1被返回,进入lll_futex_wait 休眠直到futex不为2 |
|
4 | lll_unlock: if (atomic_exchange_rel (futex, 0) > 1) lll_futex_wake (futex, 1, private); 离开临界区 设置futex为0,由于旧值futex为2,因此返回2,进入lll_futex_wake 唤醒任何一个休眠线程 |
|
5 | lll_lock_wait_private: while (atomic_exchange_acq (futex, 2) != 0) lll_futex_wait (futex, 2, LLL_PRIVATE); futex被设置为2,旧值0被返回,函数直接退出,进入锁保护的临界区 |
|
6 | 同线程1的时序4,但是有点奇怪的是,这里会造成一次无效唤醒(没有任何线程可以被唤醒) 虽然进入futex以后由于等待列表是空的也没有什么性能损耗 |
- 伪线程争用
线程1 | 线程2 | |
---|---|---|
1 | lll_lock: if (atomic_compare_and_exchange_bool_acq (futex, 1, 0)) lll_lock_wait_private (futex); 此时futex旧值为0,因此返回0,函数直接退出,进入锁保护的临界区 |
|
2 | lll_lock: if (atomic_compare_and_exchange_bool_acq (futex, 1, 0)) lll_lock_wait_private (futex); 此时futex旧值为1,因此返回1,进入lll_lock_wait_private |
|
3 | lll_unlock: if (atomic_exchange_rel (futex, 0) > 1) lll_futex_wake (futex, 1, private); 离开临界区 设置futex为0,由于旧值futex为1,因此返回1,进入lll_futex_wake 唤醒任何一个休眠线程,这里也是一次无效唤醒 |
|
4 | lll_lock_wait_private: while (atomic_exchange_acq (futex, 2) != 0) lll_futex_wait (futex, 2, LLL_PRIVATE); futex被设置为2,旧值0被返回,函数直接退出,进入锁保护的临界区 |
|
5 | lll_unlock: if (atomic_exchange_rel (futex, 0) > 1) lll_futex_wake (futex, 1, private); 离开临界区 设置futex为0,由于旧值futex为2,因此返回2,进入lll_futex_wake 唤醒任何一个休眠线程,这里也是一次无效唤醒 |
线程争用和伪线程争用比较,可以看出当unlock的时候futex=2才是glibc争用的情况
但是也有可能是发生了伪线程争用,没有syscall就直接退出了
小结
看到这里大致模模糊糊有个概念,锁实际上是抢的一个内存地址的值
glibc通过CAS抢1来预先判断一下有没有争用,如果没有直接退出
如果抢1失败了,就设置为2,再由futex决断是否为2来休眠和唤醒
因此简化版的锁可以通过让futex决断是否为1来休眠和唤醒,如下
1 |
|
futex源码分析
futex有两个api
futex_wait(int *uaddr, int val)
- 只有当
*uaddr
等于val
时,才进行等待。 - 这保证了在发生上下文切换之前,值没有被另一个线程更改。如果值被改变,
futex_wait
将直接返回,不会有任何等待。
- 只有当
futex_wake(int *uaddr, int n)
- 用于唤醒在
*uaddr
上等待的n
个进程或线程。
- 用于唤醒在
总览
线程(进程)休眠后的唤醒,主要是需要定位到线程(进程)的唯一标识
当存在大量线程(进程)的时候,红黑树显然不能满足需要,futex使用哈希表来定位,并且使用开链法来处理碰撞
开链法所需的链表表头,锁,以及当前线程的task信息都会存储在futex_q的结构上
如下图所示,线程1和线程2都在futex_q A上休眠,线程3,4,5在futex_q B上休眠
futex_q A和futex_q B由于哈希冲突,在wait时放在了同一个哈希桶下面
当例如唤醒futex_q B时,先通过哈希函数定位到哈希桶1,然后遍历哈希桶1下的链表节点,直到遇到futex_q B,就唤醒第一个,也就是线程4
graph LR
HB1[("哈希桶 1 (hash bucket1)")]
PLIST_HEAD["plist_head(链表头)"]
FQA1["futex_q A(线程1)"]
FQA2["futex_q A(线程2)"]
FQB1["futex_q B(线程3)"]
FQB2["futex_q B(线程4)"]
FQB3["futex_q B(线程5)"]
HB1 --> PLIST_HEAD
PLIST_HEAD --> FQA1
FQA1 --> FQA2
FQA2 --> FQB1
FQB1 --> FQB2
FQB2 --> FQB3
HB2[("哈希桶 2 (hash bucket 2)")]
PLIST_HEAD2["plist_head(链表头)"]
FQC1["futex_q C(线程6)"]
FQD1["futex_q D(线程7)"]
HB2 --> PLIST_HEAD2
PLIST_HEAD2 --> FQC1
FQC1 --> FQD1
分析
数据结构
futex依赖的核心数据结构futex_q在https://elixir.bootlin.com/linux/v2.6.39.4/source/kernel/futex.c#L122定义
1 | struct futex_q { |
哈希桶的定义futex_hash_bucket在https://elixir.bootlin.com/linux/v2.6.39.4/source/kernel/futex.c#L145
1 | struct futex_hash_bucket { |
futex_q和futex_hash_bucket通过futex_q中的futex_key相关联https://elixir.bootlin.com/linux/v2.6.39.4/source/kernel/futex.c#L155
1 | static struct futex_hash_bucket *hash_futex(union futex_key *key) |
futex_key
futex_key是一个联合体,用于区分进程和线程的情况
https://elixir.bootlin.com/linux/v2.6.39.4/source/include/linux/futex.h#L156
1 | union futex_key { |
private标识线程间同步,同一个进程的多个线程,address是相同的,所以只需要虚拟地址address + offset就可以标识这个进程的futex,由于futex的整个哈希桶是全部进程共用的,因此还需要mm指针来区分不同进程
shared标识进程间同步,需要futex实际的物理地址才能标识跨进程的futex
使用private的流程会更简单,是性能优化的一部分
futex_wait
sequenceDiagram
participant User
participant F as futex_wait
participant FS as futex_wait_setup
participant FQ as futex_wait_queue_me
User ->>+ F :
F ->> F : 初始化定时器to = &timeout<br>
F ->>+ FS : [q, hb] = futex_wait_setup(uaddr, val, flag)
FS ->> FS : q->key = get_futex_key(uaddr)
FS ->> FS : hb = hash_futex(&q->key)<br>q->lock_ptr = &hb->lock<br>spin_lock(&hb->lock)
FS ->> FS : uval = get_futex_value_locked(uaddr)
alt uval != uaddr
FS ->>- F : 在休眠前存在futex_wake唤醒<br>spin_unlock(&hb->lock)<br>return -EWOULDBLOCK
F ->>- User : return -EWOULDBLOCK
else uval == uaddr
FS ->> F : return 0
end
F ->>+ FQ : futex_wait_queue_me(hb, &q, to);
FQ ->> FQ : set_current_state(TASK_INTERRUPTIBLE);
FQ ->> FQ : plist_node_init(&q->list, prio)<br>plist_add(&q->list, &hb->chain)<br>q->task = current<br>spin_unlock(&hb->lock);
alt !plist_node_empty(&q->list) 并且 休眠没有超时
FQ ->>+ 内核调度器 : schedule()
内核调度器 ->>- FQ : 唤醒
end
FQ ->> FQ : set_current_state(TASK_RUNNING)
FQ ->>- F : return
F ->> F : 出队列,并且减少q.key的引用计数<br>根据定时器和信号情况判断返回值
F ->> F : restart_block流程
F ->> User : return
1 | static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val, |
从上述代码可以看出,大致可以分成futex_wait_setup,futex_wait_queue_me,restart_block流程
futex_wait_setup
1 | /** |
1 | static inline struct futex_hash_bucket *queue_lock(struct futex_q *q) |
1 | static int get_futex_value_locked(u32 *dest, u32 __user *from) |
futex_wait_queue_me
1 | static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q, |
1 | static inline void queue_me(struct futex_q *q, struct futex_hash_bucket *hb) |
restart_block流程
sequenceDiagram
participant P as 进程
participant K as 内核
participant FW as futex_wait_restart函数
participant F as futex_wait函数
participant S as 信号处理do_signal函数
P->>K: 调用futex_wait
K->>+F: 进入futex_wait
F->>F: 等待futex地址uaddr
F ->>- K: 返回内核
K->>K: 内核调度别的进程
alt 信号到达
K->>+F : 唤醒
F->>F: restart = ¤t_thread_info()->restart_block<br>restart->fn = futex_wait_restart<br>restart->futex.uaddr = uaddr;
F->>-K: return -ERESTART_RESTARTBLOCK
K->>+S: 检查信号
S->>S: 执行信号处理程序<br>检查中断的系统调用返回值如果是-ERESTART_RESTARTBLOCK<br>那么regs->ax = NR_restart_syscall
S->>-K: 返回内核
alt 系统调用需要重启
K->>P: 返回到用户空间
P->>+K: 根据刚才设置的指令寄存器ip和ax<br>调用ax中的restart_syscall
K->>K: restart = ¤t_thread_info()->restart_block
K ->>+ FW : restart->fn(restart)
FW ->>+ F : futex_wait(restart->futex.uaddr, ...)
F ->>- FW : futex_wait完成
FW ->>- K : return
K->>-P : return
else 系统调用不需要重启
K->>P: 返回错误代码 (-EINTR, 等等)
end
else 未收到信号
F->>P: futex_wait完成
end
下面是一些具体的代码
在信号处理函数中https://elixir.bootlin.com/linux/v2.6.39.4/source/arch/x86/kernel/signal.c#L691
1 | static void do_signal(struct pt_regs *regs) |
是系统调用是-ERESTART_RESTARTBLOCK返回值,那么ax寄存器被设置为NR_restart_syscall
在返回用户空间以后会根据ip和ax寄存器调用NR_restart_syscall
https://elixir.bootlin.com/linux/v2.6.39.4/source/arch/x86/include/asm/unistd_64.h#L503
1 |
|
https://elixir.bootlin.com/linux/v2.6.39.4/source/kernel/signal.c#L2081
1 | SYSCALL_DEFINE0(restart_syscall) |
restart_block被指向了futex_wait_restart
https://elixir.bootlin.com/linux/v2.6.39.4/source/kernel/futex.c#L1902
1 | static long futex_wait_restart(struct restart_block *restart) |
从而重新进入futex_wait
futex_wake
1 | static int |
1 | static void wake_futex(struct futex_q *q) |
参考资料
-
2019-03-14
本篇博客会深入分析条件变量实现来理解两个问题:
- 为什么条件变量要配合锁来使用 — 信号丢失问题
- 为什么条件变量需要用while循环来判断条件,而不是if — 虚假唤醒(spurious wakeup)问题
这两个问题不仅是C会遇到,所有语言的封装几乎都不可避免
先区分一下条件和条件变量,条件是指常用情况下,signal线程修改的值,使得cond_wait判断该值后不再阻塞
-
2023-08-05
上一篇对glibc和futex进行了源码分析,是我对具体实现的梳理,没有总结性的内容,可以略过不看直接看这一篇总结
本篇总结尝试深入一些,继续挖掘锁这个概念
分析内核在实现锁的过程中是如何解决无效唤醒问题的
为了解决无效唤醒问题,纯用户态的互斥锁性能不够好;纯内核态又在非竞争的条件时需要陷入内核,从而诞生了futex