锁实现分析:从glibc到futex
之前在理解条件变量时在futex上误入歧途,导致犯下死锁的错误,并且写下了错误的博文,见浅析条件变量
所以打算重新理解一下futex
虽然初衷是futex,但是由于futex最初是为了优化锁的性能而提出,因此深入理解futex实际上是对锁概念的深入了理解
因此本篇的大纲
源码分析从c++的mutex开始,然后是glibc的nptl库pthread_mutex,接着是linux内核的futex实现
在源码分析完成以后,尝试对锁这个概念进行深入的分析,说明futex的诞生原因
C++的std::mutex源码分析
先看下g++版本
1 | g++ --version |
然后在/usr/include/c++/9/bits/std_mutex.h
有如下代码,做了一些可读性调整
1 | class __mutex_base { |
可以看到c++的std::mutex几乎没有任何逻辑,就只是简单的对pthread的封装
持有了一个pthread_mutex_t类型的_M_mutex,并对其pthread_mutex_lock和pthread_mutex_unlock
glibc的mutex源码分析
pthread的实现实际上是glibc的NPTL代码
NPTL
是Native POSIX Thread
Library的缩写。这是Linux的一个线程库,它提供了一种实现POSIX线程(或称为Pthreads)的方法。NPTL被设计为可以提供高性能并发,特别是在多处理器和多核心系统中,并提供与POSIX标准更一致的行为。
NPTL是作为早期Linux线程实现LinuxThreads
的替代品在2002年由IBM开发的,现在已经become了Linux系统上的默认POSIX线程库。在NPTL中,每个线程都是一个系统调度的实体,有独立的进程ID,这允许更有效率更一致的线程同步和并发。
像pthread_create这样的Pthreads API函数,实际上就是在使用NPTL(除非在非常老的Linux系统上可能是使用LinuxThreads)
这部分代码可以在https://github.com/bminor/glibc/tree/glibc-2.25/nptl中查看
首先查看x86上的pthread_mutex_t结构,代码在https://github.com/bminor/glibc/blob/glibc-2.25/sysdeps/x86/bits/pthreadtypes.h#L128
主要看x86_64的代码
1 | typedef union |
pthread_mutex_t是对__pthread_mutex_s的union跨平台封装
其中__size[__SIZEOF_PTHREAD_MUTEX_T]
,这是用于标识__pthread_mutex_s的大小的
1 | #ifdef __x86_64__ |
而long int __align
则是为了确保整个
pthread_mutex_t
结构体在内存中根据 long int
的对齐规则进行对齐,以提高cpu的运行效率
所以核心部分就在__pthread_mutex_s
上了
1 | struct __pthread_mutex_s |
__lock
字段是这部分的重点内容了不是简单的0,1表示是否上锁,还有个2状态表示发生了竞争,需要陷入内核进行裁决
__kind
字段的1-7位表示锁类型,例如默认的非递归锁PTHREAD_MUTEX_NORMAL(等价于PTHREAD_MUTEX_TIMED_NP,见这里),以及可重入锁PTHREAD_MUTEX_RECURSIVE。而8位表示互斥锁是被进程内的线程共享,还是跨进程共享。一般使用
pthread_mutexattr_setpshared
函数来设置该属性。如果是进程间的同步,需要开辟一个共享内存,将mutex放入共享内存中,然后不同进程才能操作同一个mutex。另外还有一些状态位是用于标识是否支持优先级继承等
pthread_mutex_lock
在https://github.com/bminor/glibc/blob/glibc-2.25/nptl/pthread_mutex_lock.c#L600可以看到
pthread_mutex_lock
是一个指向__pthread_mutex_lock
的强符号(这是一个gcc的概念,强符号声明可以覆盖弱符号,当不存在强符号时才会使用弱符号,详见Weak symbol)
1 |
|
在默认的锁类型PTHREAD_MUTEX_NORMAL(PTHREAD_MUTEX_TIMED_NP)下,https://github.com/bminor/glibc/blob/glibc-2.25/nptl/pthread_mutex_lock.c#L63所运行的流程如下
1 |
|
可以看到除了事务性内存看不太明白,这里大部分都是一些边缘逻辑,写入owner变量,设置探测点等等,核心逻辑全在LLL_MUTEX_LOCK指向的lll_lock中了
而关于其中的事务性内存,首先需要打开gcc选项-fgnu-tm
才能使用,其次相关的FORCE_ELISION宏的逻辑,最终执行的代码LLL_MUTEX_LOCK_ELISION实际上已经废弃
1 |
|
lll_lock
在https://github.com/bminor/glibc/blob/glibc-2.25/sysdeps/nptl/lowlevellock.h#L88中定义
1 | /* If FUTEX is 0 (not acquired), set to 1 (acquired with no waiters) and |
atomic_compare_and_exchange_bool_acq
是glibc的bool类型的CAS宏
原型大致是
1 | atomic_compare_and_exchange_bool_acq(type *ptr, type desired, type *expected) |
因此当__futex
- 如果为0
- 代表目前是未锁定状态
- 那么交换成功,返回0,函数继续运行
- __futex的值为1,说明进入锁定状态
- 如果为1或者2(在
__lll_lock_wait_private
设置为2)- 代表目前是锁定状态
- 那么交换失败,返回1,函数进入
__lll_lock_wait(_private)
,进行进程(线程)等待 - 这是因为__futex为1或者2,已经有线程进入临界区,新线程需要进入锁定状态
__lll_lock_wait_private
在https://github.com/bminor/glibc/blob/glibc-2.25/nptl/lowlevellock.c#L26中定义
1 | void __lll_lock_wait_private (int *futex) |
其中lll_futex_wait
在https://github.com/bminor/glibc/blob/glibc-2.25/sysdeps/unix/sysv/linux/lowlevellock-futex.h#L92中定义
1 |
lll_futex_wait就是对syscall调用futex的简单封装,通过传入LLL_PRIVATE来决定调用FUTEX_WAIT_PRIVATE还是FUTEX_WAIT
回到__lll_lock_wait_private函数
- 首先,代码检查
futex
所指向的值是否为2,如果是,那么它调用lll_futex_wait
函数,等待futex
所指向的值不再为2。 - 如果
futex
所指向的值不是2,函数就进入一个循环,在这个循环中,它试图通过原子操作atomic_exchange_acq
将futex
所指向的值设为2。原子操作atomic_exchange_acq
会返回futex
所指向的旧值。 - 如果
atomic_exchange_acq
返回的旧值不为0(意味着锁被占用),那么代码就会再次调用lll_futex_wait
函数,等待futex
所指向的值不再为2。
这里的逻辑要配合unlock一起看才能看得懂,见后续的时序分析
另外,由于虚假唤醒的存在,futex系统调用有可能返回EAGAIN,因此futex系统调用是必须用while循环包住的,这里巧妙的先不进行cas判断来让当前线程陷入futex_wait,从而提高一点性能
pthread_mutex_unlock
pthread_mutex_unlock
是一个指向__pthread_mutex_unlock
的强符号
1 | int __pthread_mutex_unlock (pthread_mutex_t *mutex) { |
lll_unlock
在https://github.com/bminor/glibc/blob/glibc-2.25/sysdeps/nptl/lowlevellock.h#L154中定义
1 | /* Unconditionally set FUTEX to 0 (not acquired), releasing the lock. If FUTEX |
lll_futex_wake和__lll_lock_wait_private不同,没有任何逻辑,在https://github.com/bminor/glibc/blob/glibc-2.25/sysdeps/unix/sysv/linux/lowlevellock-futex.h#L107中定义
1 |
- 首先将
futex
的值设置为 0,表示锁已释放。 - 然后检查如果原先的
futex
值大于 1,即表示锁原本是被获取的(可能有等待者),那么唤醒任何一个等待者
时序分析
对于复杂的多线程情况,进行时序分析是最有助于理解问题的
- 首先是没有发生线程争用的情况
线程1 | 线程2 | |
---|---|---|
1 | lll_lock: if (atomic_compare_and_exchange_bool_acq (futex, 1, 0)) lll_lock_wait_private (futex); 此时futex为0,因此返回1,函数直接退出,进入锁保护的临界区 |
|
2 | lll_unlock: if (atomic_exchange_rel (futex, 0) > 1) lll_futex_wake (futex, 1, private); 此时futex为1,因此返回1,函数直接退出 |
|
3 | 同1 | |
4 | 同2 |
- 接着是发生了线程争用的情况
线程1 | 线程2 | |
---|---|---|
1 | lll_lock: if (atomic_compare_and_exchange_bool_acq (futex, 1, 0)) lll_lock_wait_private (futex); 此时futex为0,因此返回1,函数直接退出,进入锁保护的临界区 |
|
2 | lll_lock: if (atomic_compare_and_exchange_bool_acq (futex, 1, 0)) lll_lock_wait_private (futex); 此时futex为1,因此返回0,进入lll_lock_wait_private |
|
3 | lll_lock_wait_private: while (atomic_exchange_acq (futex, 2) != 0) lll_futex_wait (futex, 2, LLL_PRIVATE); futex被设置为2,futex的旧值1被返回,进入lll_futex_wait 休眠直到futex不为2 |
|
4 | lll_unlock: if (atomic_exchange_rel (futex, 0) > 1) lll_futex_wake (futex, 1, private); 离开临界区 设置futex为0,由于旧值futex为2,因此返回2,进入lll_futex_wake 唤醒任何一个休眠线程 |
|
5 | lll_lock_wait_private: while (atomic_exchange_acq (futex, 2) != 0) lll_futex_wait (futex, 2, LLL_PRIVATE); futex被设置为2,旧值0被返回,函数直接退出,进入锁保护的临界区 |
|
6 | 同2,但是有点奇怪的是,这里会造成一次虚假唤醒 虽然进入futex以后由于等待列表是空的也没有什么性能损耗 |
- 伪线程争用
线程1 | 线程2 | |
---|---|---|
1 | lll_lock: if (atomic_compare_and_exchange_bool_acq (futex, 1, 0)) lll_lock_wait_private (futex); 此时futex为0,因此返回1,函数直接退出,进入锁保护的临界区 |
|
2 | lll_lock: if (atomic_compare_and_exchange_bool_acq (futex, 1, 0)) lll_lock_wait_private (futex); 此时futex为1,因此返回0,进入lll_lock_wait_private |
|
3 | lll_unlock: if (atomic_exchange_rel (futex, 0) > 1) lll_futex_wake (futex, 1, private); 离开临界区 设置futex为0,由于旧值futex为1,因此返回1,进入lll_futex_wake 唤醒任何一个休眠线程,这里也是一次虚假唤醒 |
|
4 | lll_lock_wait_private: while (atomic_exchange_acq (futex, 2) != 0) lll_futex_wait (futex, 2, LLL_PRIVATE); futex被设置为2,旧值0被返回,函数直接退出,进入锁保护的临界区 |
|
5 | lll_unlock: if (atomic_exchange_rel (futex, 0) > 1) lll_futex_wake (futex, 1, private); 离开临界区 设置futex为0,由于旧值futex为2,因此返回2,进入lll_futex_wake 唤醒任何一个休眠线程,这里也是一次虚假唤醒 |
线程争用和伪线程争用比较,可以看出当unlock的时候futex=2才是glibc争用的情况
但是也有可能是发生了伪线程争用,没有syscall就直接退出了
小结
看到这里大致模模糊糊有个概念,锁实际上是抢的一个内存地址的值
glibc通过CAS抢1来预先判断一下有没有争用,如果没有直接退出
如果抢1失败了,就设置为2,再由futex决断是否为2来休眠和唤醒
因此简化版的锁可以通过让futex决断是否为1来休眠和唤醒,如下
1 |
|
futex源码分析
futex有两个api
futex_wait(int *uaddr, int val)
- 只有当
*uaddr
等于val
时,才进行等待。 - 这保证了在发生上下文切换之前,值没有被另一个线程更改。如果值被改变,
futex_wait
将直接返回,不会有任何等待。
- 只有当
futex_wake(int *uaddr, int n)
- 用于唤醒在
*uaddr
上等待的n
个进程或线程。
- 用于唤醒在
总览
线程(进程)休眠后的唤醒,主要是需要定位到线程(进程)的唯一标识
当存在大量线程(进程)的时候,红黑树显然不能满足需要,futex使用哈希表来定位,并且使用开链法来处理碰撞
开链法所需的链表表头,锁,以及当前线程的task信息都会存储在futex_q的结构上
如下图所示,线程1和线程2都在futex_q A上休眠,线程3,4,5在futex_q B上休眠
futex_q A和futex_q B由于哈希冲突,在wait时放在了同一个哈希桶下面
当例如唤醒futex_q B时,先通过哈希函数定位到哈希桶1,然后遍历哈希桶1下的链表节点,直到遇到futex_q B,就唤醒第一个,也就是线程4
graph LR
HB1[("哈希桶 1 (hash bucket1)")]
PLIST_HEAD["plist_head(链表头)"]
FQA1["futex_q A(线程1)"]
FQA2["futex_q A(线程2)"]
FQB1["futex_q B(线程3)"]
FQB2["futex_q B(线程4)"]
FQB3["futex_q B(线程5)"]
HB1 --> PLIST_HEAD
PLIST_HEAD --> FQA1
FQA1 --> FQA2
FQA2 --> FQB1
FQB1 --> FQB2
FQB2 --> FQB3
HB2[("哈希桶 2 (hash bucket 2)")]
PLIST_HEAD2["plist_head(链表头)"]
FQC1["futex_q C(线程6)"]
FQD1["futex_q D(线程7)"]
HB2 --> PLIST_HEAD2
PLIST_HEAD2 --> FQC1
FQC1 --> FQD1
分析
数据结构
futex依赖的核心数据结构futex_q在https://elixir.bootlin.com/linux/v2.6.39.4/source/kernel/futex.c#L122定义
1 | struct futex_q { |
哈希桶的定义futex_hash_bucket在https://elixir.bootlin.com/linux/v2.6.39.4/source/kernel/futex.c#L145
1 | struct futex_hash_bucket { |
futex_q和futex_hash_bucket通过futex_q中的futex_key相关联https://elixir.bootlin.com/linux/v2.6.39.4/source/kernel/futex.c#L155
1 | static struct futex_hash_bucket *hash_futex(union futex_key *key) |
futex_key
futex_key是一个联合体,用于区分进程和线程的情况
https://elixir.bootlin.com/linux/v2.6.39.4/source/include/linux/futex.h#L156
1 | union futex_key { |
private标识线程间同步,同一个进程的多个线程,address是相同的,所以只需要虚拟地址address + offset就可以标识这个进程的futex,由于futex的整个哈希桶是全部进程共用的,因此还需要mm指针来区分不同进程
shared标识进程间同步,需要futex实际的物理地址才能标识跨进程的futex
使用private的流程会更简单,是性能优化的一部分
futex_wait
sequenceDiagram
%%{init: { 'sequence': {'useMaxWidth':false, 'showSequenceNumbers':true} } }%%
participant User
participant F as futex_wait
participant FS as futex_wait_setup
participant FQ as futex_wait_queue_me
User ->>+ F :
F ->> F : 初始化定时器to = &timeout<br>
F ->>+ FS : [q, hb] = futex_wait_setup(uaddr, val, flag)
FS ->> FS : q->key = get_futex_key(uaddr)
FS ->> FS : hb = hash_futex(&q->key)<br>q->lock_ptr = &hb->lock<br>spin_lock(&hb->lock)
FS ->> FS : uval = get_futex_value_locked(uaddr)
alt uval != uaddr
FS ->>- F : 在休眠前存在futex_wake唤醒<br>spin_unlock(&hb->lock)<br>return -EWOULDBLOCK
F ->>- User : return -EWOULDBLOCK
else uval == uaddr
FS ->> F : return 0
end
F ->>+ FQ : futex_wait_queue_me(hb, &q, to);
FQ ->> FQ : set_current_state(TASK_INTERRUPTIBLE);
FQ ->> FQ : plist_node_init(&q->list, prio)<br>plist_add(&q->list, &hb->chain)<br>q->task = current<br>spin_unlock(&hb->lock);
alt !plist_node_empty(&q->list) 并且 休眠没有超时
FQ ->>+ 内核调度器 : schedule()
内核调度器 ->>- FQ : 唤醒
end
FQ ->> FQ : set_current_state(TASK_RUNNING)
FQ ->>- F : return
F ->> F : 出队列,并且减少q.key的引用计数<br>根据定时器和信号情况判断返回值
F ->> F : restart_block流程
F ->> User : return
1 | static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val, |
从上述代码可以看出,大致可以分成futex_wait_setup,futex_wait_queue_me,restart_block流程
futex_wait_setup
1 | /** |
1 | static inline struct futex_hash_bucket *queue_lock(struct futex_q *q) |
1 | static int get_futex_value_locked(u32 *dest, u32 __user *from) |
futex_wait_queue_me
1 | static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q, |
1 | static inline void queue_me(struct futex_q *q, struct futex_hash_bucket *hb) |
restart_block流程
sequenceDiagram
%%{init: { 'sequence': {'useMaxWidth':false, 'showSequenceNumbers':true} } }%%
participant P as 进程
participant K as 内核
participant FW as futex_wait_restart函数
participant F as futex_wait函数
participant S as 信号处理do_signal函数
P->>K: 调用futex_wait
K->>+F: 进入futex_wait
F->>F: 等待futex地址uaddr
F ->>- K: 返回内核
K->>K: 内核调度别的进程
alt 信号到达
K->>+F : 唤醒
F->>F: restart = ¤t_thread_info()->restart_block<br>restart->fn = futex_wait_restart<br>restart->futex.uaddr = uaddr;
F->>-K: return -ERESTART_RESTARTBLOCK
K->>+S: 检查信号
S->>S: 执行信号处理程序<br>检查中断的系统调用返回值如果是-ERESTART_RESTARTBLOCK<br>那么regs->ax = NR_restart_syscall
S->>-K: 返回内核
alt 系统调用需要重启
K->>P: 返回到用户空间
P->>+K: 根据刚才设置的指令寄存器ip和ax<br>调用ax中的restart_syscall
K->>K: restart = ¤t_thread_info()->restart_block
K ->>+ FW : restart->fn(restart)
FW ->>+ F : futex_wait(restart->futex.uaddr, ...)
F ->>- FW : futex_wait完成
FW ->>- K : return
K->>-P : return
else 系统调用不需要重启
K->>P: 返回错误代码 (-EINTR, 等等)
end
else 未收到信号
F->>P: futex_wait完成
end
下面是一些具体的代码
在信号处理函数中https://elixir.bootlin.com/linux/v2.6.39.4/source/arch/x86/kernel/signal.c#L691
1 | static void do_signal(struct pt_regs *regs) |
是系统调用是-ERESTART_RESTARTBLOCK返回值,那么ax寄存器被设置为NR_restart_syscall
在返回用户空间以后会根据ip和ax寄存器调用NR_restart_syscall
https://elixir.bootlin.com/linux/v2.6.39.4/source/arch/x86/include/asm/unistd_64.h#L503
1 |
|
https://elixir.bootlin.com/linux/v2.6.39.4/source/kernel/signal.c#L2081
1 | SYSCALL_DEFINE0(restart_syscall) |
restart_block被指向了futex_wait_restart
https://elixir.bootlin.com/linux/v2.6.39.4/source/kernel/futex.c#L1902
1 | static long futex_wait_restart(struct restart_block *restart) |
从而重新进入futex_wait
futex_wake
1 | static int |
1 | static void wake_futex(struct futex_q *q) |
什么是锁
所谓的锁,在计算机里本质上就是一块内存空间。当这个空间被赋值为1的时候表示加锁了,被赋值为0的时候表示解锁了。
在多线程场景下,当多个线程抢一个锁,就是抢着要把这块内存赋值为1。
抢失败的线程就被休眠(或者死循环),这样就不会和抢成功的线程一起操作。
当抢成功的线程释放锁,抢失败的的线程就被唤醒抢锁(或者死循环抢锁)。
怎么实现锁
从刚才锁的本质分析可以看出来,锁分为休眠和死循环两种,也就是互斥锁和自旋锁。
自旋锁
自旋锁只依赖抢内存空间赋值成功,不需要唤醒抢失败者,因此实现是非常简单的。
1 | spinlock() { |
互斥锁
最基本的互斥锁
互斥锁的数据结构和算法就显得复杂了很多,包含了一块待抢的内存结构,和一个抢失败线程的等待唤醒列表。
1 | lock(){ |
理论上的互斥锁设计
互斥锁涉及到的数据结构(例如wait_list链表)的本身的线程安全是由自旋锁来保证的
无效唤醒问题
但是依然存在无效唤醒的问题,内核的无效唤醒问题包含两个方面
TASK_RUNNING被覆盖
在当前线程添加wait_list
和修改线程状态为TASK_INTERRUPTIBLE
中间执行了一整个unlock()
逻辑,会导致unlock()
中设置的TASK_RUNNING被lock()
中设置的TASK_INTERRUPTIBLE覆盖
从而导致线程无限休眠
移除阻塞线程的时间过早
时序 | 线程1 | 线程2 |
---|---|---|
1 | 从wait_list中移除任意一个阻塞线程 wait_list为空,直接返回,不唤醒任何线程 |
|
2 | 当前线程添加wait_list | |
3 | 休眠 |
线程2的唤醒操作实际上没有唤醒到任何线程,导致了线程1无线休眠
无效唤醒的解决办法
整体加锁
1 | lock(){ |
这种方式最为简单粗暴,顺序一致性(Sequential consistency),但是临界区太大了
异步队列
时序 | 线程A | 线程B |
---|---|---|
1 | 如果条件不满足,才能进入休眠 | |
2 | 入待操作队列: 让条件满足 从wait_list中移除任意一个阻塞线程 wake_up_process(A) set A TASK_RUNNING |
|
3 | 当前线程添加wait_list 修改线程状态为TASK_INTERRUPTIBLE |
|
4 | schedule() if A == TASK_INTERRUPTIBLE suspend() |
|
5 | 调度器处理完一轮调度,准备执行待操作队列: | |
6 | 执行待操作队列: 让条件满足 从wait_list中移除任意一个阻塞线程 wake_up_process(A) set A TASK_RUNNING |
异步队列也是常见的顺序一致性(Sequential consistency)手段
线程1如果休眠,一定是不满足条件以后,线程2才开始投递操作队列
这使得线程2的执行一定强制晚于线程1,从而避免了无效唤醒
这种方式虽然没有临界区,但是会导致延迟唤醒
PS
异步队列可以只异步化
set A TASK_RUNNING
来解决TASK_RUNNING被覆盖,从而实现纯用户态的互斥锁tars的协程实现也是只异步化了
set A TASK_RUNNING
来解决TASK_RUNNING被覆盖问题
调整顺序
通过精心设计的顺序来避免无效唤醒的两个问题
TASK_RUNNING被覆盖
时序 | 线程A | 线程B |
---|---|---|
1 | 修改线程状态为TASK_INTERRUPTIBLE | |
2 | 如果条件不满足,才能进入休眠 | |
3 | 让条件满足 | |
4 | wake_up_process(A) set A TASK_RUNNING |
|
5 | schedule()内部 if A == TASK_INTERRUPTIBLE suspend() |
休眠线程必须先设置TASK_INTERRUPTIBLE状态,再根据条件进入schedule
如果条件不满足
一定 happens-before
让条件满足
修改线程状态为TASK_INTERRUPTIBLE
sequenced-before 如果条件不满足
让条件满足
sequenced-before
set A TASK_RUNNING
所以修改线程状态为TASK_INTERRUPTIBLE
happens-before set A TASK_RUNNING
从而避免了TASK_RUNNING被覆盖导致的无效唤醒
回顾pthread_mutex的实现如何解决这个问题可以看到
1 | static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val, |
加锁时wait中修改状态为TASK_INTERRUPTIBLE然后判断条件,解锁时先让条件满足,再修改状态为TASK_RUNNING
和我分析的逻辑是一致的
移除阻塞线程的时间过早
时序 | 线程1 | 线程2 |
---|---|---|
1 | int current_val = lock_val | |
2 | lock_val = lock_val + 1 | |
3 | lock(wait_list) 从wait_list中移除任意一个阻塞线程 unlock(wait_list) |
|
4 | lock(wait_list) | |
5 | if (current_val != lock_val) return | |
6 | 当前线程添加wait_list | |
7 | unlock(wait_list) |
通过引入了一个额外的原子变量,并且通过wait_list的锁来保证这个变量和操作wait_list的原子性
移除阻塞线程的时间过早 =
从wait_list中移除任意一个阻塞线程
happens-before
当前线程添加wait_list
的情况下
也相当于从wait_list中移除任意一个阻塞线程
happens-before
if (current_val != lock_val) return
的情况下
因为lock_val = lock_val + 1
sequenced-before
从wait_list中移除任意一个阻塞线程
所以lock_val = lock_val + 1
happens-before
if (current_val != lock_val) return
所以if (current_val != lock_val) return
在这种情况下一定会直接返回从而避免无效唤醒
不过这里也导致了虚假唤醒的问题,只要检测到lock_val = lock_val + 1
的事件,就会直接唤醒
回顾pthread_mutex的实现如何解决这个问题可以看到
1 | int pthread_mutex_lock (pthread_mutex_t *mutex) { |
解锁时将futex设为0,然后再出队列
加锁时期望val为2,随后在哈希桶的链表锁中查看futex是否符合val,如果不符合判断虚假唤醒
和我分析的逻辑是一致的
futex的诞生
纯内核态的互斥锁
最早期的互斥锁是完全的内核态的来实现的,这种实现无论有没有发生竞争都需要陷入到内核中去
这里给出一个整体加锁的伪代码实现
1 | lock(){ |
在多核下schedule()操作线程(进程)可运行队列时还需要获取这个队列的自旋锁
在线程A的schedule()入口进行加锁,关中断,关抢占,然后调用switchTo()执行线程B的时候进行解锁,开中断,开抢占
以保证schedule()操作线程状态时不会被wake_up_process()影响
纯用户态的互斥锁
通过在无效唤醒的解决办法中分析,如果纯用户态想解决无效唤醒问题
移除阻塞线程的时间过早的问题
可以通过一样的方式让pthread库和系统调用配合实现
-
由于无法直接操作线程状态,所以只能通过异步队列的方式来解决
在老版本glibc中的pthread_lock和pthread_unlock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43void internal_function __pthread_lock(struct _pthread_fastlock * lock,
pthread_descr self)
{
//省略
if (!successful_seizure) {
for (;;) {
suspend(self); //最后调用系统调用sigsuspend
if (self->p_nextlock != NULL) {
/* Count resumes that don't belong to us. */
spurious_wakeup_count++;
continue;
}
break;
}
goto again;
}
}
int __pthread_unlock(struct _pthread_fastlock * lock)
{
//省略
/* Find thread in waiting queue with maximal priority */
ptr = (pthread_descr *) &lock->__status;
thr = (pthread_descr) (oldstatus & ~1L);
maxprio = 0;
maxptr = ptr;
while (thr != 0) {
if (thr->p_priority >= maxprio) {
maxptr = ptr;
maxprio = thr->p_priority;
}
ptr = &(thr->p_nextlock);
/* Prevent reordering of the load of lock->__status above and the
load of *ptr below, as well as reordering of *ptr between
several iterations of the while loop. Some processors (e.g.
multiprocessor Alphas) could perform such reordering even though
the loads are dependent. */
READ_MEMORY_BARRIER();
thr = *ptr;
}
/* Wake up the selected waiting thread */
thr->p_nextlock = NULL;
restart(thr); //最后调用系统调用kill
}这种方式下异步队列复用了信号队列的机制,将unlock的TASK_RUNNING设置异步到查看信号队列的时刻
这种方式显然不够高效
futex的诞生:用户态+内核态的互斥锁
纯内核态mutex非竞争下低效
纯用户态mutex异步队列低效
从而催生了用户态+内核态的互斥锁futex
futex全称fast userspace mutex
,顾名思义,是用户态优化性能的锁
由 Rusty Russell、Hubertus Franke 和 Mathew Kirkwood 在 2.5.7 中引入,是一种用于用户态应用程序的快速、轻量级内核辅助锁定原语。它提供非常快速的无竞争锁获取和释放。
futex 状态存储在用户空间变量中(在所有平台上都是无符号 32 位整数)。
futex 状态相当于把state状态从内核态剥离到了用户态,通过用户态协作,来规避没有竞争时陷入内核
参考资料
互斥锁(mutex)的底层原理是什么? 操作系统具体是怎么实现的?? - 躺平程序猿的回答 - 知乎
本文的起因是这一篇回答,但是他的回答有些不够精准,在关键问题的描述上模棱两可,也没有实际的源码参考例子
甚至引入了一个问答环节,让人理解起来很痛苦,当然作为一个问答论坛,引入这个环节也无可厚非
这一篇只能作为闲聊科普,不能作为很好的参考文献
Linux唤醒抢占----Linux进程的管理与调度(二十三)
这一篇文章的不仅给出了描述,还给出了实际的源码参考例子,让我少走了一些弯路
-
2019-03-14
本篇博客会深入分析条件变量实现来理解两个问题:
- 为什么条件变量要配合锁来使用 — 信号丢失问题
- 为什么条件变量需要用while循环来判断条件,而不是if — 虚假唤醒(spurious wakeup)问题
这两个问题不仅是C会遇到,所有语言的封装几乎都不可避免
先区分一下条件和条件变量,条件是指常用情况下,signal线程修改的值,使得cond_wait判断该值后不再阻塞