锁实现分析:从glibc到futex

之前在理解条件变量时在futex上误入歧途,导致犯下死锁的错误,并且写下了错误的博文,见浅析条件变量

所以打算重新理解一下futex

虽然初衷是futex,但是由于futex最初是为了优化锁的性能而提出,因此深入理解futex实际上是对锁概念的深入了理解

因此本篇的大纲

源码分析从c++的mutex开始,然后是glibc的nptl库pthread_mutex,接着是linux内核的futex实现

在源码分析完成以后,尝试对锁这个概念进行深入的分析,说明futex的诞生原因

C++的std::mutex源码分析

先看下g++版本

1
2
$ g++ --version
g++ (Ubuntu 9.4.0-1ubuntu1~20.04.1) 9.4.0

然后在/usr/include/c++/9/bits/std_mutex.h有如下代码,做了一些可读性调整

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class __mutex_base {
protected:
typedef __gthread_mutex_t __native_type;
__native_type _M_mutex = __GTHREAD_MUTEX_INIT;

constexpr __mutex_base() noexcept = default;
...
};
class mutex : private __mutex_base {
public:
typedef __native_type* native_handle_type;
...
void lock() {
int __e = __gthread_mutex_lock(&_M_mutex);

// EINVAL, EAGAIN, EBUSY, EINVAL, EDEADLK(may)
if (__e)
__throw_system_error(__e);
}
void unlock() {
// XXX EINVAL, EAGAIN, EPERM
__gthread_mutex_unlock(&_M_mutex);
}
...
};
typedef pthread_mutex_t __gthread_mutex_t;
#define __GTHREAD_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER
# define PTHREAD_MUTEX_INITIALIZER \
{ { 0, 0, 0, 0, 0, __PTHREAD_SPINS, { 0, 0 } } }
# define __PTHREAD_SPINS 0, 0

//__gthread_mutex_lock 差不多是pthread_mutex_lock,__gthread_mutex_unlock同理

可以看到c++的std::mutex几乎没有任何逻辑,就只是简单的对pthread的封装

持有了一个pthread_mutex_t类型的_M_mutex,并对其pthread_mutex_lock和pthread_mutex_unlock

glibc的mutex源码分析

pthread的实现实际上是glibc的NPTL代码

NPTL是Native POSIX Thread Library的缩写。这是Linux的一个线程库,它提供了一种实现POSIX线程(或称为Pthreads)的方法。NPTL被设计为可以提供高性能并发,特别是在多处理器和多核心系统中,并提供与POSIX标准更一致的行为。

NPTL是作为早期Linux线程实现LinuxThreads的替代品在2002年由IBM开发的,现在已经become了Linux系统上的默认POSIX线程库。在NPTL中,每个线程都是一个系统调度的实体,有独立的进程ID,这允许更有效率更一致的线程同步和并发。

像pthread_create这样的Pthreads API函数,实际上就是在使用NPTL(除非在非常老的Linux系统上可能是使用LinuxThreads)

这部分代码可以在https://github.com/bminor/glibc/tree/glibc-2.25/nptl中查看

首先查看x86上的pthread_mutex_t结构,代码在https://github.com/bminor/glibc/blob/glibc-2.25/sysdeps/x86/bits/pthreadtypes.h#L128

主要看x86_64的代码

1
2
3
4
5
6
7
8
9
typedef union
{
struct __pthread_mutex_s
{
...
} __data;
char __size[__SIZEOF_PTHREAD_MUTEX_T];
long int __align;
} pthread_mutex_t;

pthread_mutex_t是对__pthread_mutex_s的union跨平台封装

其中__size[__SIZEOF_PTHREAD_MUTEX_T],这是用于标识__pthread_mutex_s的大小的

1
2
3
4
5
6
7
8
#ifdef __x86_64__
# if __WORDSIZE == 64
# define __SIZEOF_PTHREAD_MUTEX_T 40
# else
# define __SIZEOF_PTHREAD_MUTEX_T 32
#else
# define __SIZEOF_PTHREAD_MUTEX_T 24
#endif

long int __align则是为了确保整个 pthread_mutex_t 结构体在内存中根据 long int 的对齐规则进行对齐,以提高cpu的运行效率

所以核心部分就在__pthread_mutex_s上了

1
2
3
4
5
6
7
8
9
10
11
struct __pthread_mutex_s
{
int __lock; //表示互斥锁的状态,0表示未上锁,1表示已上锁,2表示发生了竞争。
unsigned int __count; //表示重复获取此锁的次数,可用于实现可递归的互斥锁。
int __owner; //表示当前持有该锁的线程的ID。
unsigned int __nusers; //表示正在使用或等待此互斥锁的线程数。
int __kind; //表示互斥锁的类型。
short __spins; //一般在使用该锁的线程多于一个时,新请求锁的线程会自旋等待这个字段的次数,然后进入内核态等待,减少了线程上下文切换。
short __elision; //对应硬件事务内存(htm)中的展开次数,如果为非零,新事务会直接展开,而不是使用锁。
__pthread_list_t __list; //用于维护处于等待状态的线程列表。如果有多个线程在等待这个锁,它们将被添加到这个链表当中。
} __data;
  • __lock字段是这部分的重点内容了

    不是简单的0,1表示是否上锁,还有个2状态表示发生了竞争,需要陷入内核进行裁决

  • __kind字段的1-7位表示锁类型,例如默认的非递归锁PTHREAD_MUTEX_NORMAL(等价于PTHREAD_MUTEX_TIMED_NP,见这里),以及可重入锁PTHREAD_MUTEX_RECURSIVE。

    而8位表示互斥锁是被进程内的线程共享,还是跨进程共享。一般使用pthread_mutexattr_setpshared函数来设置该属性。如果是进程间的同步,需要开辟一个共享内存,将mutex放入共享内存中,然后不同进程才能操作同一个mutex。

    另外还有一些状态位是用于标识是否支持优先级继承等

pthread_mutex_lock

在https://github.com/bminor/glibc/blob/glibc-2.25/nptl/pthread_mutex_lock.c#L600可以看到

pthread_mutex_lock是一个指向__pthread_mutex_lock的强符号(这是一个gcc的概念,强符号声明可以覆盖弱符号,当不存在强符号时才会使用弱符号,详见Weak symbol

1
2
3
4
#ifndef __pthread_mutex_lock
strong_alias (__pthread_mutex_lock, pthread_mutex_lock)
hidden_def (__pthread_mutex_lock)
#endif

在默认的锁类型PTHREAD_MUTEX_NORMAL(PTHREAD_MUTEX_TIMED_NP)下,https://github.com/bminor/glibc/blob/glibc-2.25/nptl/pthread_mutex_lock.c#L63所运行的流程如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#define PTHREAD_MUTEX_TYPE_ELISION(m) \ //取mutex第8位获取锁类型
(atomic_load_relaxed (&((m)->__data.__kind)) \
& (127 | PTHREAD_MUTEX_ELISION_NP))

#ifndef LLL_MUTEX_LOCK
# define LLL_MUTEX_LOCK(mutex) \
lll_lock ((mutex)->__data.__lock, PTHREAD_MUTEX_PSHARED (mutex))
#endif

int __pthread_mutex_lock (pthread_mutex_t *mutex) {
assert (sizeof (mutex->__size) >= sizeof (mutex->__data)); //检查size数组有没有bug
unsigned int type = PTHREAD_MUTEX_TYPE_ELISION (mutex); //获取锁类型
LIBC_PROBE (mutex_entry, 1, mutex); //用于给SystemTap工具跟踪和分析锁相关的性能,标记开始加锁的探测点
if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_NP)) {
FORCE_ELISION (mutex, goto elision); //尝试硬件事务性内存来完成锁消除
/* Normal mutex. */
LLL_MUTEX_LOCK (mutex); //核心逻辑
assert (mutex->__data.__owner == 0); //检查bug,是不是还没有owner持有
}
pid_t id = THREAD_GETMEM (THREAD_SELF, tid); //获取当前的线程id
/* Record the ownership. */
mutex->__data.__owner = id; //将id写入owner变量
LIBC_PROBE (mutex_acquired, 1, mutex); //标记开始加锁成功,所有权转移之后的探测点
return 0;
}

可以看到除了事务性内存看不太明白,这里大部分都是一些边缘逻辑,写入owner变量,设置探测点等等,核心逻辑全在LLL_MUTEX_LOCK指向的lll_lock中了

而关于其中的事务性内存,首先需要打开gcc选项-fgnu-tm才能使用,其次相关的FORCE_ELISION宏的逻辑,最终执行的代码LLL_MUTEX_LOCK_ELISION实际上已经废弃

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#ifdef HAVE_ELISION
else if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_ELISION_NP))
{
elision: __attribute__((unused))
/* This case can never happen on a system without elision,
as the mutex type initialization functions will not
allow to set the elision flags. */
/* Don't record owner or users for elision case. This is a
tail call. */
return LLL_MUTEX_LOCK_ELISION (mutex);
}
#endif

/* Not actually elided so far. Needed? */
#define LLL_MUTEX_LOCK_ELISION(mutex) \
({ lll_cond_lock ((mutex)->__data.__lock, PTHREAD_MUTEX_PSHARED (mutex)); 0; })

lll_lock

https://github.com/bminor/glibc/blob/glibc-2.25/sysdeps/nptl/lowlevellock.h#L88中定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* If FUTEX is 0 (not acquired), set to 1 (acquired with no waiters) and
return. Otherwise, ensure that it is >1 (acquired, possibly with waiters)
and then block until we acquire the lock, at which point FUTEX will still be
>1. The lock is always acquired on return. */
#define __lll_lock(futex, private) \
((void) \
({ \
int *__futex = (futex); \
if (__glibc_unlikely \
(atomic_compare_and_exchange_bool_acq (__futex, 1, 0))) \
{ \
if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \
__lll_lock_wait_private (__futex); \
else \
__lll_lock_wait (__futex, private); \
} \
}))
#define lll_unlock(futex, private) \
__lll_unlock (&(futex), private)

atomic_compare_and_exchange_bool_acq是glibc的bool类型的CAS宏

原型大致是

1
2
3
4
5
6
atomic_compare_and_exchange_bool_acq(type *ptr, type desired, type *expected)
type *ptr: 这是指向要比较和可能交换值的指针
type *expected:这是指向存储 ptr 所指向的预期值的位置的指针
type desired:如果 ptr所指向的实际值等于 expected的值,则该值将被存储到 ptr 指向的位置

返回值则始终是ptr指向的旧值

因此当__futex

  • 如果为0
    • 代表目前是未锁定状态
    • 那么交换成功,返回0,函数继续运行
    • __futex的值为1,说明进入锁定状态
  • 如果为1或者2(在__lll_lock_wait_private设置为2)
    • 代表目前是锁定状态
    • 那么交换失败,返回1,函数进入__lll_lock_wait(_private),进行进程(线程)等待
    • 这是因为__futex为1或者2,已经有线程进入临界区,新线程需要进入锁定状态

__lll_lock_wait_private

https://github.com/bminor/glibc/blob/glibc-2.25/nptl/lowlevellock.c#L26中定义

1
2
3
4
5
6
7
8
void __lll_lock_wait_private (int *futex)
{
if (*futex == 2)
lll_futex_wait (futex, 2, LLL_PRIVATE); /* Wait if *futex == 2. */

while (atomic_exchange_acq (futex, 2) != 0)
lll_futex_wait (futex, 2, LLL_PRIVATE); /* Wait if *futex == 2. */
}

其中lll_futex_waithttps://github.com/bminor/glibc/blob/glibc-2.25/sysdeps/unix/sysv/linux/lowlevellock-futex.h#L92中定义

1
2
3
4
5
6
7
#define lll_futex_wait(futexp, val, private) \
lll_futex_timed_wait (futexp, val, NULL, private)

#define lll_futex_timed_wait(futexp, val, timeout, private) \
lll_futex_syscall (4, futexp, \
__lll_private_flag (FUTEX_WAIT, private), \
val, timeout)

lll_futex_wait就是对syscall调用futex的简单封装,通过传入LLL_PRIVATE来决定调用FUTEX_WAIT_PRIVATE还是FUTEX_WAIT

回到__lll_lock_wait_private函数

  • 首先,代码检查 futex 所指向的值是否为2,如果是,那么它调用 lll_futex_wait 函数,等待 futex 所指向的值不再为2。
  • 如果 futex 所指向的值不是2,函数就进入一个循环,在这个循环中,它试图通过原子操作 atomic_exchange_acqfutex 所指向的值设为2。原子操作 atomic_exchange_acq 会返回 futex 所指向的旧值。
  • 如果 atomic_exchange_acq 返回的旧值不为0(意味着锁被占用),那么代码就会再次调用 lll_futex_wait 函数,等待 futex 所指向的值不再为2。

这里的逻辑要配合unlock一起看才能看得懂,见后续的时序分析

另外,由于虚假唤醒的存在,futex系统调用有可能返回EAGAIN,因此futex系统调用是必须用while循环包住的,这里巧妙的先不进行cas判断来让当前线程陷入futex_wait,从而提高一点性能

pthread_mutex_unlock

pthread_mutex_unlock是一个指向__pthread_mutex_unlock的强符号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int __pthread_mutex_unlock (pthread_mutex_t *mutex) {
return __pthread_mutex_unlock_usercnt (mutex, 1);
}
int internal_function attribute_hidden
__pthread_mutex_unlock_usercnt (pthread_mutex_t *mutex, int decr) {
int type = PTHREAD_MUTEX_TYPE_ELISION (mutex); //获取锁类型
if (__builtin_expect (type, PTHREAD_MUTEX_TIMED_NP)
== PTHREAD_MUTEX_TIMED_NP) {
/* Always reset the owner field. */
mutex->__data.__owner = 0; //对应pthread_mutex_lock,解锁前先设置owner为空
if (decr) //从__pthread_mutex_unlock进来,这个值为1,必定执行该条件
/* One less user. */
--mutex->__data.__nusers; //减少一个等待该线程的线程数
/* Unlock. */
lll_unlock (mutex->__data.__lock, PTHREAD_MUTEX_PSHARED (mutex)); //核心逻辑
LIBC_PROBE (mutex_release, 1, mutex); //标记解锁成功的探测点
return 0;
}
}

lll_unlock

https://github.com/bminor/glibc/blob/glibc-2.25/sysdeps/nptl/lowlevellock.h#L154中定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/* Unconditionally set FUTEX to 0 (not acquired), releasing the lock.  If FUTEX
was >1 (acquired, possibly with waiters), then wake any waiters. The waiter
that acquires the lock will set FUTEX to >1.
Evaluate PRIVATE before releasing the lock so that we do not violate the
mutex destruction requirements. Specifically, we need to ensure that
another thread can destroy the mutex (and reuse its memory) once it
acquires the lock and when there will be no further lock acquisitions;
thus, we must not access the lock after releasing it, or those accesses
could be concurrent with mutex destruction or reuse of the memory. */
#define __lll_unlock(futex, private) \
((void) \
({ \
int *__futex = (futex); \
int __private = (private); \
int __oldval = atomic_exchange_rel (__futex, 0); \
if (__glibc_unlikely (__oldval > 1)) \
lll_futex_wake (__futex, 1, __private); \
}))
#define lll_unlock(futex, private) \
__lll_unlock (&(futex), private)

lll_futex_wake和__lll_lock_wait_private不同,没有任何逻辑,在https://github.com/bminor/glibc/blob/glibc-2.25/sysdeps/unix/sysv/linux/lowlevellock-futex.h#L107中定义

1
2
3
#define lll_futex_wake(futexp, nr, private)                             \
lll_futex_syscall (4, futexp, \
__lll_private_flag (FUTEX_WAKE, private), nr, 0)
  • 首先将 futex 的值设置为 0,表示锁已释放。
  • 然后检查如果原先的 futex 值大于 1,即表示锁原本是被获取的(可能有等待者),那么唤醒任何一个等待者

时序分析

对于复杂的多线程情况,进行时序分析是最有助于理解问题的

  • 首先是没有发生线程争用的情况
线程1 线程2
1 lll_lock
if (atomic_compare_and_exchange_bool_acq (futex, 1, 0))
lll_lock_wait_private (futex);
此时futex为0,因此返回1,函数直接退出,进入锁保护的临界区
2 lll_unlock
if (atomic_exchange_rel (futex, 0) > 1)
lll_futex_wake (futex, 1, private);
此时futex为1,因此返回1,函数直接退出
3 同1
4 同2
  • 接着是发生了线程争用的情况
线程1 线程2
1 lll_lock
if (atomic_compare_and_exchange_bool_acq (futex, 1, 0))
lll_lock_wait_private (futex);
此时futex为0,因此返回1,函数直接退出,进入锁保护的临界区
2 lll_lock
if (atomic_compare_and_exchange_bool_acq (futex, 1, 0))
lll_lock_wait_private (futex);
此时futex为1,因此返回0,进入lll_lock_wait_private
3 lll_lock_wait_private
while (atomic_exchange_acq (futex, 2) != 0)
lll_futex_wait (futex, 2, LLL_PRIVATE);
futex被设置为2,futex的旧值1被返回,进入lll_futex_wait
休眠直到futex不为2
4 lll_unlock
if (atomic_exchange_rel (futex, 0) > 1)
lll_futex_wake (futex, 1, private);
离开临界区
设置futex为0,由于旧值futex为2,因此返回2,进入lll_futex_wake
唤醒任何一个休眠线程
5 lll_lock_wait_private
while (atomic_exchange_acq (futex, 2) != 0)
lll_futex_wait (futex, 2, LLL_PRIVATE);
futex被设置为2,旧值0被返回,函数直接退出,进入锁保护的临界区
6 同2,但是有点奇怪的是,这里会造成一次虚假唤醒
虽然进入futex以后由于等待列表是空的也没有什么性能损耗
  • 伪线程争用
线程1 线程2
1 lll_lock
if (atomic_compare_and_exchange_bool_acq (futex, 1, 0))
lll_lock_wait_private (futex);
此时futex为0,因此返回1,函数直接退出,进入锁保护的临界区
2 lll_lock
if (atomic_compare_and_exchange_bool_acq (futex, 1, 0))
lll_lock_wait_private (futex);
此时futex为1,因此返回0,进入lll_lock_wait_private
3 lll_unlock
if (atomic_exchange_rel (futex, 0) > 1)
lll_futex_wake (futex, 1, private);
离开临界区
设置futex为0,由于旧值futex为1,因此返回1,进入lll_futex_wake
唤醒任何一个休眠线程,这里也是一次虚假唤醒
4 lll_lock_wait_private
while (atomic_exchange_acq (futex, 2) != 0)
lll_futex_wait (futex, 2, LLL_PRIVATE);
futex被设置为2,旧值0被返回,函数直接退出,进入锁保护的临界区
5 lll_unlock
if (atomic_exchange_rel (futex, 0) > 1)
lll_futex_wake (futex, 1, private);
离开临界区
设置futex为0,由于旧值futex为2,因此返回2,进入lll_futex_wake
唤醒任何一个休眠线程,这里也是一次虚假唤醒

线程争用和伪线程争用比较,可以看出当unlock的时候futex=2才是glibc争用的情况

但是也有可能是发生了伪线程争用,没有syscall就直接退出了

小结

看到这里大致模模糊糊有个概念,锁实际上是抢的一个内存地址的值

glibc通过CAS抢1来预先判断一下有没有争用,如果没有直接退出

如果抢1失败了,就设置为2,再由futex决断是否为2来休眠和唤醒

因此简化版的锁可以通过让futex决断是否为1来休眠和唤醒,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <iostream>
#include <atomic>
#include <thread>
#include <unistd.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#include <sys/time.h>
#include <assert.h>

constexpr int UNLOCKED = 0;
constexpr int LOCKED = 1;

class MyMutex {
public:
void lock() {
while(1) {
uint32_t expected = UNLOCKED;
if (m_lock.compare_exchange_strong(expected, LOCKED)) {
break;
}
int res = syscall(SYS_futex, &m_lock, FUTEX_WAIT_PRIVATE, LOCKED, nullptr, nullptr, 0);
if (res == -1 && errno != EAGAIN) {
std::cout << errno << std::endl;
assert(false);
}
}
}

void unlock() {
m_lock = UNLOCKED;
int res = syscall(SYS_futex, &m_lock, FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0);
if (res == -1) {
std::cout << errno << std::endl;
assert(false);
}
}

std::atomic<uint32_t> m_lock{UNLOCKED};
};

MyMutex g_mutex;

int64_t gIndex = 0;

void thread_function() {
for (int64_t i = 0;i < 10000000;i++) {
g_mutex.lock();
gIndex++;
g_mutex.unlock();
}
}

int main() {
std::thread t1(thread_function);
std::thread t2(thread_function);
std::thread t3(thread_function);

t1.join();
t2.join();
t3.join();

std::cout << gIndex << std::endl;

return 0;
}

futex源码分析

futex有两个api

  • futex_wait(int *uaddr, int val)
    • 只有当 *uaddr 等于 val 时,才进行等待。
    • 这保证了在发生上下文切换之前,值没有被另一个线程更改。如果值被改变,futex_wait 将直接返回,不会有任何等待。
  • futex_wake(int *uaddr, int n)
    • 用于唤醒在 *uaddr 上等待的 n 个进程或线程。

总览

线程(进程)休眠后的唤醒,主要是需要定位到线程(进程)的唯一标识

当存在大量线程(进程)的时候,红黑树显然不能满足需要,futex使用哈希表来定位,并且使用开链法来处理碰撞

开链法所需的链表表头,锁,以及当前线程的task信息都会存储在futex_q的结构上

如下图所示,线程1和线程2都在futex_q A上休眠,线程3,4,5在futex_q B上休眠

futex_q A和futex_q B由于哈希冲突,在wait时放在了同一个哈希桶下面

当例如唤醒futex_q B时,先通过哈希函数定位到哈希桶1,然后遍历哈希桶1下的链表节点,直到遇到futex_q B,就唤醒第一个,也就是线程4

graph LR
    HB1[("哈希桶 1 (hash bucket1)")]

    PLIST_HEAD["plist_head(链表头)"]
    FQA1["futex_q A(线程1)"]
    FQA2["futex_q A(线程2)"]
    FQB1["futex_q B(线程3)"]
    FQB2["futex_q B(线程4)"]
    FQB3["futex_q B(线程5)"]

    HB1 --> PLIST_HEAD
    PLIST_HEAD --> FQA1
    FQA1 --> FQA2
    FQA2 --> FQB1
    FQB1 --> FQB2
    FQB2 --> FQB3

    HB2[("哈希桶 2 (hash bucket 2)")]

    PLIST_HEAD2["plist_head(链表头)"]
    FQC1["futex_q C(线程6)"]
    FQD1["futex_q D(线程7)"]

    HB2 --> PLIST_HEAD2
    PLIST_HEAD2 --> FQC1
    FQC1 --> FQD1

分析

数据结构

futex依赖的核心数据结构futex_q在https://elixir.bootlin.com/linux/v2.6.39.4/source/kernel/futex.c#L122定义

1
2
3
4
5
6
7
8
struct futex_q {
struct plist_node list; //用于将当前futex_q插入到哈希表的链表中

struct task_struct *task; //指向正在等待在此futex上的任务的指针,在休眠的时候会进行赋值
spinlock_t *lock_ptr; //保护哈希桶链表的自旋锁指针(指向下面futex_hash_bucket中的lock)
union futex_key key; //哈希表用来做哈希的key
...
};

哈希桶的定义futex_hash_bucket在https://elixir.bootlin.com/linux/v2.6.39.4/source/kernel/futex.c#L145

1
2
3
4
5
6
struct futex_hash_bucket {
spinlock_t lock; //保护哈希桶链表的自旋锁
struct plist_head chain; //哈希表的链表表头
};

static struct futex_hash_bucket futex_queues[1<<FUTEX_HASHBITS]; //哈希桶数组

futex_q和futex_hash_bucket通过futex_q中的futex_key相关联https://elixir.bootlin.com/linux/v2.6.39.4/source/kernel/futex.c#L155

1
2
3
4
5
6
7
static struct futex_hash_bucket *hash_futex(union futex_key *key)
{
u32 hash = jhash2((u32*)&key->both.word,
(sizeof(key->both.word)+sizeof(key->both.ptr))/4,
key->both.offset);
return &futex_queues[hash & ((1 << FUTEX_HASHBITS)-1)];
}

futex_key

futex_key是一个联合体,用于区分进程和线程的情况

https://elixir.bootlin.com/linux/v2.6.39.4/source/include/linux/futex.h#L156

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
union futex_key {
struct {
unsigned long pgoff; //futex变量在文件页面的偏移量,识别映射到内存的文件部分
struct inode *inode; //futex变量所在文件的索引节点的指针,对于基于文件的共享映射,指向文件系统中文件的唯一标识符
int offset; //futex变量页内的偏移量
} shared;
struct {
unsigned long address; //futex变量在用户空间的虚拟地址
struct mm_struct *mm; //futex变量所在进程的内存空间所有信息指针
int offset; //futex变量在用户空间虚拟地址的偏移量
} private;
struct {
unsigned long word;
void *ptr;
int offset;
} both;
};

private标识线程间同步,同一个进程的多个线程,address是相同的,所以只需要虚拟地址address + offset就可以标识这个进程的futex,由于futex的整个哈希桶是全部进程共用的,因此还需要mm指针来区分不同进程

shared标识进程间同步,需要futex实际的物理地址才能标识跨进程的futex

使用private的流程会更简单,是性能优化的一部分

futex_wait

sequenceDiagram
%%{init: { 'sequence': {'useMaxWidth':false, 'showSequenceNumbers':true} } }%%
participant User
participant F as futex_wait
participant FS as futex_wait_setup
participant FQ as futex_wait_queue_me

User ->>+ F : 
F ->> F : 初始化定时器to = &timeout<br>
F ->>+ FS : [q, hb] = futex_wait_setup(uaddr, val, flag)
FS ->> FS : q->key = get_futex_key(uaddr)
FS ->> FS : hb = hash_futex(&q->key)<br>q->lock_ptr = &hb->lock<br>spin_lock(&hb->lock)
FS ->> FS : uval = get_futex_value_locked(uaddr)
alt uval != uaddr
FS ->>- F : 在休眠前存在futex_wake唤醒<br>spin_unlock(&hb->lock)<br>return -EWOULDBLOCK
F ->>- User : return -EWOULDBLOCK
else uval == uaddr
FS ->> F : return 0
end
F ->>+ FQ : futex_wait_queue_me(hb, &q, to);
FQ ->> FQ : set_current_state(TASK_INTERRUPTIBLE);
FQ ->> FQ : plist_node_init(&q->list, prio)<br>plist_add(&q->list, &hb->chain)<br>q->task = current<br>spin_unlock(&hb->lock);
alt !plist_node_empty(&q->list) 并且 休眠没有超时
FQ ->>+ 内核调度器 : schedule()
内核调度器 ->>- FQ : 唤醒
end
FQ ->> FQ : set_current_state(TASK_RUNNING)
FQ ->>- F : return
F ->> F : 出队列,并且减少q.key的引用计数<br>根据定时器和信号情况判断返回值
F ->> F : restart_block流程
F ->> User : return

futex_wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
ktime_t *abs_time, u32 bitset)
{
struct hrtimer_sleeper timeout, *to = NULL;
struct restart_block *restart;
struct futex_hash_bucket *hb;
struct futex_q q = futex_q_init;
int ret;

if (!bitset)
return -EINVAL;
q.bitset = bitset;

//初始化定时器用于超时判断,后续通过判断to可以知道有没有设置定时器
if (abs_time) {
to = &timeout;

hrtimer_init_on_stack(&to->timer, (flags & FLAGS_CLOCKRT) ?
CLOCK_REALTIME : CLOCK_MONOTONIC,
HRTIMER_MODE_ABS);
hrtimer_init_sleeper(to, current);
hrtimer_set_expires_range_ns(&to->timer, *abs_time,
current->timer_slack_ns);
}

retry:
//准备在uaddr上wait,如果成功,持有哈希桶的锁,并且增加q.key的引用计数
ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
if (ret)
goto out;

//放入哈希桶上的链表里面,并且阻塞wait直到被wake唤醒,超时或者是信号唤醒
futex_wait_queue_me(hb, &q, to);

//如果唤醒成功并且出队列成功,那么就成功了,返回ret = 0
ret = 0;
//出队列,并且减少q.key的引用计数,如果失败了,说明可能超时唤醒或者信号唤醒
if (!unqueue_me(&q))
goto out;
ret = -ETIMEDOUT;
//如果设置了to,也就是开启了定时器,并且to->task已经为空,说明定时器的对应任务已经被超时唤醒
if (to && !to->task)
goto out;

//查看有没有待处理的信号,如果没有,说明是一次虚假唤醒,进入retry重新休眠
if (!signal_pending(current))
goto retry;

//有待处理信号,如果没有设置超时,那么直接返回ERESTARTSYS即可
ret = -ERESTARTSYS;
if (!abs_time)
goto out;

//如果设置了超时,需要给当前线程的restart_block设置重启标记
//在信号函数结束以后,内核会通过调用restart->fn,也就是futex_wait_restart来重新进入futex_wait进入休眠
//因此需要重新计算休眠时间
restart = &current_thread_info()->restart_block;
restart->fn = futex_wait_restart;
restart->futex.uaddr = uaddr;
restart->futex.val = val;
restart->futex.time = abs_time->tv64;
restart->futex.bitset = bitset;
restart->futex.flags = flags | FLAGS_HAS_TIMEOUT;

ret = -ERESTART_RESTARTBLOCK;

out:
//如果设置了定时器,那么清理定时器
if (to) {
hrtimer_cancel(&to->timer);
destroy_hrtimer_on_stack(&to->timer);
}
return ret;
}

从上述代码可以看出,大致可以分成futex_wait_setup,futex_wait_queue_me,restart_block流程

futex_wait_setup

futex_wait_setup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/**
* futex_wait_setup() - 准备在一个 futex 上等待
* @uaddr: futex 在用户空间的地址
* @val: 期望值
* @flags: futex 标志(FLAGS_SHARED 等)
* @q: 相关的 futex_q
* @hb: 用于返回给调用者的 hash_bucket 指针的存储空间
*
* 设置 futex_q 并找到 hash_bucket。获取 futex 的值并与期望值比较。
* 内部处理原子操作错误。成功返回且持有 hb 锁和一个 q.key 引用,
* 失败则解锁且没有 q.key 引用。
*
* 返回值:
* 0 - uaddr 包含 val 并且 hb 已经被锁定
* <1 - -EFAULT 或 -EWOULDBLOCK(uaddr 不包含 val)并且 hb 未锁定
*/
static int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags,
struct futex_q *q, struct futex_hash_bucket **hb)
{
u32 uval; // 用于存储从用户空间地址上获取的futex值
int ret;

/*
* 在锁定 hash-bucket 之后再访问这里。
* 顺序很重要:
*
* 用户空间等待者:val = var; if (cond(val)) futex_wait(&var, val);
* 用户空间唤醒者:if (cond(var)) { var = new; futex_wake(&var); }
*
* futex 的基本逻辑保证是,仅当在阻塞时 cond(var) 已知是 true 时才会阻塞,
* 对于任何 cond。如果我们在测试 *uaddr 之后锁定 hash-bucket,
* 那么可能出现竞争条件,我们可能会因为 cond(var) 为 false 而无限阻塞,
* 这将违反保证。
*
* 另一方面,我们只有在测试 *uaddr 之后才插入 q 并释放 hash-bucket。
* 这保证了 futex_wait() 在系统调用执行期间如果 *uaddr 与所需值不匹配,
* 则不会吸收唤醒信号。
*/
retry:
// 获取futex的键,对futex_q增加引用计数
ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &q->key);
if (unlikely(ret != 0))
return ret;

retry_private:
*hb = queue_lock(q); // 锁定队列,并返回对应的 hash_bucket

ret = get_futex_value_locked(&uval, uaddr); // 从用户空间地址获取futex的值

if (ret) {
//读取失败,那么解锁尝试再读一遍
queue_unlock(q, *hb);

ret = get_user(uval, uaddr);

//如果读取失败,那么直接退出,否则重试
if (ret)
goto out;

if (!(flags & FLAGS_SHARED))
goto retry_private;

put_futex_key(&q->key);
goto retry;
}

//如果读取到的值和期望值不同,说明在这中间有唤醒,那么直接解锁返回
if (uval != val) {
queue_unlock(q, *hb);
ret = -EWOULDBLOCK;
}

out:
if (ret)
put_futex_key(&q->key);
return ret;
}

queue_lock

1
2
3
4
5
6
7
8
9
10
11
static inline struct futex_hash_bucket *queue_lock(struct futex_q *q)
__acquires(&hb->lock)
{
struct futex_hash_bucket *hb;

hb = hash_futex(&q->key);
q->lock_ptr = &hb->lock;

spin_lock(&hb->lock);
return hb;
}

get_futex_value_locked

1
2
3
4
5
6
7
8
9
10
11
static int get_futex_value_locked(u32 *dest, u32 __user *from)
{
int ret;

pagefault_disable();
ret = __copy_from_user_inatomic(dest, from, sizeof(u32));
pagefault_enable();

return ret ? -EFAULT : 0;
}

futex_wait_queue_me

futex_wait_queue_me

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
struct hrtimer_sleeper *timeout)
{
//确保在其他任务尝试唤醒之前设置任务状态。set_current_state() 函数通过 set_mb()(带有内存屏障的设置操作)来实现设置
//并且 queue_me() 在完成时会调用 spin_unlock(),强制的内存屏障和锁这两者保证对哈希列表的访问序列化
//标记为TASK_INTERRUPTIBLE状态等待schedule中实际休眠
set_current_state(TASK_INTERRUPTIBLE);

//将当前的 futex_q 结构 q 放入到与 futex 对应的哈希桶 hb,使得这个任务可以在该 futex 被释放时被唤醒。
queue_me(q, hb);

//启动定时器
if (timeout) {
hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
if (!hrtimer_active(&timeout->timer))
timeout->task = NULL;
}

//如果已经从哈希表的链表中移除,则意味着有另一个任务尝试唤醒我们,我们可以跳过调用 schedule()。
if (likely(!plist_node_empty(&q->list))) {
//如果定时器已经到期,当前任务将会被标记为需要重新调度。仅在没有设置超时,或者定时器尚未到期时调用 schedule 函数。
if (!timeout || timeout->task)
schedule();
}

//唤醒以后标记为TASK_RUNNING状态
__set_current_state(TASK_RUNNING);
}

queue_me

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static inline void queue_me(struct futex_q *q, struct futex_hash_bucket *hb)
__releases(&hb->lock)
{
int prio;

//用来注册此元素的优先级是
//- 实时线程的实际线程优先级(即,优先级低于 MAX_RT_PRIO 的线程)
//- 对于非实时线程,使用 MAX_RT_PRIO
//因此,所有实时线程会按优先级先被唤醒,其他线程则按先进先出(FIFO)顺序最后被唤醒。
prio = min(current->normal_prio, MAX_RT_PRIO);

plist_node_init(&q->list, prio);
plist_add(&q->list, &hb->chain);
q->task = current;
spin_unlock(&hb->lock);
}

restart_block流程

sequenceDiagram
%%{init: { 'sequence': {'useMaxWidth':false, 'showSequenceNumbers':true} } }%%
    participant P as 进程
    participant K as 内核
    participant FW as futex_wait_restart函数
    participant F as futex_wait函数
    participant S as 信号处理do_signal函数

    P->>K: 调用futex_wait
    K->>+F: 进入futex_wait
    F->>F: 等待futex地址uaddr
    F ->>- K: 返回内核
    K->>K: 内核调度别的进程
    alt 信号到达
        K->>+F : 唤醒
        F->>F: restart = &current_thread_info()->restart_block<br>restart->fn = futex_wait_restart<br>restart->futex.uaddr = uaddr;
        F->>-K: return -ERESTART_RESTARTBLOCK
        K->>+S: 检查信号
        S->>S: 执行信号处理程序<br>检查中断的系统调用返回值如果是-ERESTART_RESTARTBLOCK<br>那么regs->ax = NR_restart_syscall
        S->>-K: 返回内核
        alt 系统调用需要重启
            K->>P: 返回到用户空间
            P->>+K: 根据刚才设置的指令寄存器ip和ax<br>调用ax中的restart_syscall
            K->>K: restart = &current_thread_info()->restart_block
            K ->>+ FW : restart->fn(restart)
            FW ->>+ F : futex_wait(restart->futex.uaddr, ...)
            F ->>- FW : futex_wait完成
            FW ->>- K : return
            K->>-P : return
        else 系统调用不需要重启
            K->>P: 返回错误代码 (-EINTR, 等等)
        end
    else 未收到信号
        F->>P: futex_wait完成
    end

下面是一些具体的代码

在信号处理函数中https://elixir.bootlin.com/linux/v2.6.39.4/source/arch/x86/kernel/signal.c#L691

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static void do_signal(struct pt_regs *regs)
{
...

/* Did we come from a system call? */
if (syscall_get_nr(current, regs) >= 0) {
/* Restart the system call - no handlers present */
switch (syscall_get_error(current, regs)) {
...

case -ERESTART_RESTARTBLOCK:
regs->ax = NR_restart_syscall;
regs->ip -= 2;
break;
}
}
}

是系统调用是-ERESTART_RESTARTBLOCK返回值,那么ax寄存器被设置为NR_restart_syscall

在返回用户空间以后会根据ip和ax寄存器调用NR_restart_syscall

https://elixir.bootlin.com/linux/v2.6.39.4/source/arch/x86/include/asm/unistd_64.h#L503

1
2
#define __NR_restart_syscall            219
__SYSCALL(__NR_restart_syscall, sys_restart_syscall)

https://elixir.bootlin.com/linux/v2.6.39.4/source/kernel/signal.c#L2081

1
2
3
4
5
SYSCALL_DEFINE0(restart_syscall)
{
struct restart_block *restart = &current_thread_info()->restart_block;
return restart->fn(restart);
}

restart_block被指向了futex_wait_restart

https://elixir.bootlin.com/linux/v2.6.39.4/source/kernel/futex.c#L1902

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static long futex_wait_restart(struct restart_block *restart)
{
u32 __user *uaddr = restart->futex.uaddr;
ktime_t t, *tp = NULL;

if (restart->futex.flags & FLAGS_HAS_TIMEOUT) {
t.tv64 = restart->futex.time;
tp = &t;
}
restart->fn = do_no_restart_syscall;

return (long)futex_wait(uaddr, restart->futex.flags,
restart->futex.val, tp, restart->futex.bitset);
}

从而重新进入futex_wait

futex_wake

futex_wake

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
static int
futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
{
struct futex_hash_bucket *hb;
struct futex_q *this, *next;
struct plist_head *head;
union futex_key key = FUTEX_KEY_INIT;
int ret;

if (!bitset)
return -EINVAL;

//通过uaddr获取到哈希的key
ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key);
if (unlikely(ret != 0))
goto out;

//通过哈希的key定位到哈希桶
hb = hash_futex(&key);
//加哈希桶的锁
spin_lock(&hb->lock);
head = &hb->chain;

//遍历哈希桶的链表
plist_for_each_entry_safe(this, next, head, list) {
//如果当前节点和通过uaddr获取到哈希的key匹配
if (match_futex (&this->key, &key)) {

//如果 futex_q 实例有优先级反转管理(pi_state) 或者实时等待者(rt_waiter) ,futex_wake不应该去唤醒
if (this->pi_state || this->rt_waiter) {
ret = -EINVAL;
break;
}

//这里检查当前 futex_q 实例的 bitset 和调用函数提供的 bitset 是否有共同的位
if (!(this->bitset & bitset))
continue;

//唤醒当前节点
wake_futex(this);

//唤醒指定数量以后不再唤醒
if (++ret >= nr_wake)
break;
}
}

spin_unlock(&hb->lock);
put_futex_key(&key);
out:
return ret;
}

wake_futex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static void wake_futex(struct futex_q *q)
{
struct task_struct *p = q->task;

//我们在唤醒任务之前将 q->lock_ptr 设置为 NULL
//如果在另一个 CPU 上发生了一个非 futex 的唤醒,那么任务可能会退出,并且 p 会解引用一个不存在的任务结构体
//为了防止这种情况,在唤醒过程中持有 p 的引用。
get_task_struct(p);

__unqueue_futex(q);

//等待的任务可以在 q->lock_ptr 被设置为 NULL 后立即释放 futex_q,无需获取任何锁
//这里需要一个内存屏障,以防止对 lock_ptr 的存储操作超前于 plist_del 的执行。
smp_wmb();
q->lock_ptr = NULL;

//wake_up_state是调度器唤醒的入口
wake_up_state(p, TASK_NORMAL);
put_task_struct(p);
}

什么是锁

所谓的锁,在计算机里本质上就是一块内存空间。当这个空间被赋值为1的时候表示加锁了,被赋值为0的时候表示解锁了。

在多线程场景下,当多个线程抢一个锁,就是抢着要把这块内存赋值为1。

抢失败的线程就被休眠(或者死循环),这样就不会和抢成功的线程一起操作。

当抢成功的线程释放锁,抢失败的的线程就被唤醒抢锁(或者死循环抢锁)。

怎么实现锁

从刚才锁的本质分析可以看出来,锁分为休眠和死循环两种,也就是互斥锁和自旋锁。

自旋锁

自旋锁只依赖抢内存空间赋值成功,不需要唤醒抢失败者,因此实现是非常简单的。

1
2
3
4
5
6
7
spinlock() {
while ((CAS set state = 1 when state = 0) is failed) {
}
}
spinUnlock() {
CAS set state = 0
}

互斥锁

最基本的互斥锁

互斥锁的数据结构和算法就显得复杂了很多,包含了一块待抢的内存结构,和一个抢失败线程的等待唤醒列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
lock(){
if (state == 0) {
set state == 1
当前线程添加wait_list
修改线程状态为TASK_INTERRUPTIBLE
执行schedule()
}
}

unlock(){
set state = 0
从wait_list中移除任意一个阻塞线程
接着调用wake_up_process()修改该线程状态为TASK_RUNNING,然后重新添加到可运行队列中
}

理论上的互斥锁设计

互斥锁涉及到的数据结构(例如wait_list链表)的本身的线程安全是由自旋锁来保证的

无效唤醒问题

但是依然存在无效唤醒的问题,内核的无效唤醒问题包含两个方面

TASK_RUNNING被覆盖

当前线程添加wait_list修改线程状态为TASK_INTERRUPTIBLE中间执行了一整个unlock()逻辑,会导致unlock()中设置的TASK_RUNNING被lock()中设置的TASK_INTERRUPTIBLE覆盖

从而导致线程无限休眠

移除阻塞线程的时间过早
时序 线程1 线程2
1 从wait_list中移除任意一个阻塞线程
wait_list为空,直接返回,不唤醒任何线程
2 当前线程添加wait_list
3 休眠

线程2的唤醒操作实际上没有唤醒到任何线程,导致了线程1无线休眠

无效唤醒的解决办法

整体加锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
lock(){
spin_lock()
if (条件不满足) {
当前线程添加wait_list
修改线程状态为TASK_INTERRUPTIBLE
spin_unlock()
执行schedule()
}
spin_unlock()
}

unlock(){
spin_lock()
让条件满足
从wait_list中移除任意一个阻塞线程
接着调用wake_up_process()修改该线程状态为TASK_RUNNING,然后重新添加到可运行队列中
spin_unlock()
}

这种方式最为简单粗暴,顺序一致性(Sequential consistency),但是临界区太大了

异步队列
时序 线程A 线程B
1 如果条件不满足,才能进入休眠
2 入待操作队列:
让条件满足
从wait_list中移除任意一个阻塞线程
wake_up_process(A)
set A TASK_RUNNING
3 当前线程添加wait_list
修改线程状态为TASK_INTERRUPTIBLE
4 schedule()
if A == TASK_INTERRUPTIBLE
suspend()
5 调度器处理完一轮调度,准备执行待操作队列:
6 执行待操作队列:
让条件满足
从wait_list中移除任意一个阻塞线程
wake_up_process(A)
set A TASK_RUNNING

异步队列也是常见的顺序一致性(Sequential consistency)手段

线程1如果休眠,一定是不满足条件以后,线程2才开始投递操作队列

这使得线程2的执行一定强制晚于线程1,从而避免了无效唤醒

这种方式虽然没有临界区,但是会导致延迟唤醒

调整顺序

通过精心设计的顺序来避免无效唤醒的两个问题

TASK_RUNNING被覆盖
时序 线程A 线程B
1 修改线程状态为TASK_INTERRUPTIBLE
2 如果条件不满足,才能进入休眠
3 让条件满足
4 wake_up_process(A)
set A TASK_RUNNING
5 schedule()内部
if A == TASK_INTERRUPTIBLE
suspend()

休眠线程必须先设置TASK_INTERRUPTIBLE状态,再根据条件进入schedule

如果条件不满足一定 happens-before 让条件满足

修改线程状态为TASK_INTERRUPTIBLE sequenced-before 如果条件不满足

让条件满足 sequenced-before set A TASK_RUNNING

所以修改线程状态为TASK_INTERRUPTIBLE happens-before set A TASK_RUNNING

从而避免了TASK_RUNNING被覆盖导致的无效唤醒

回顾pthread_mutex的实现如何解决这个问题可以看到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
ktime_t *abs_time, u32 bitset)
{
//具体在futex_wait_queue_me()中实现

set_current_state(TASK_INTERRUPTIBLE);
//list为空的条件不满足
if (likely(!plist_node_empty(&q->list))) {
schedule();
}
}
static int
futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
{
//具体在wake_futex()中实现

//让list为空的条件满足
__unqueue_futex(q);
wake_up_state(p, TASK_NORMAL);
}

加锁时wait中修改状态为TASK_INTERRUPTIBLE然后判断条件,解锁时先让条件满足,再修改状态为TASK_RUNNING

和我分析的逻辑是一致的

移除阻塞线程的时间过早
时序 线程1 线程2
1 int current_val = lock_val
2 lock_val = lock_val + 1
3 lock(wait_list)
从wait_list中移除任意一个阻塞线程
unlock(wait_list)
4 lock(wait_list)
5 if (current_val != lock_val) return
6 当前线程添加wait_list
7 unlock(wait_list)

通过引入了一个额外的原子变量,并且通过wait_list的锁来保证这个变量和操作wait_list的原子性

移除阻塞线程的时间过早 = 从wait_list中移除任意一个阻塞线程 happens-before 当前线程添加wait_list的情况下

也相当于从wait_list中移除任意一个阻塞线程 happens-before if (current_val != lock_val) return的情况下

因为lock_val = lock_val + 1 sequenced-before 从wait_list中移除任意一个阻塞线程

所以lock_val = lock_val + 1happens-before if (current_val != lock_val) return

所以if (current_val != lock_val) return在这种情况下一定会直接返回从而避免无效唤醒

不过这里也导致了虚假唤醒的问题,只要检测到lock_val = lock_val + 1的事件,就会直接唤醒

回顾pthread_mutex的实现如何解决这个问题可以看到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
int pthread_mutex_lock (pthread_mutex_t *mutex) {
//省略...

lll_futex_wait (futex, 2, LLL_PRIVATE); /* Wait if *futex == 2. */
}

static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
ktime_t *abs_time, u32 bitset)
{
//具体在futex_wait_setup()中实现

*hb = queue_lock(q); // 锁定队列,并返回对应的 hash_bucket

ret = get_futex_value_locked(&uval, uaddr); // 从用户空间地址获取futex的值

//省略...

//如果读取到的值和期望值不同,说明在这中间有唤醒,那么直接解锁返回
if (uval != val) {
return -EWOULDBLOCK;
}

//在futex_wait_queue_me()中实现
plist_node_init(&q->list, prio);
plist_add(&q->list, &hb->chain);
spin_unlock(&hb->lock);
}

int pthread_mutex_unlock (pthread_mutex_t *mutex) {
//省略...

int __oldval = atomic_exchange_rel (__futex, 0);
lll_futex_wake (__futex, 1, __private)
}

static void futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
{
__unqueue_futex(q);
}

解锁时将futex设为0,然后再出队列

加锁时期望val为2,随后在哈希桶的链表锁中查看futex是否符合val,如果不符合判断虚假唤醒

和我分析的逻辑是一致的

futex的诞生

纯内核态的互斥锁

最早期的互斥锁是完全的内核态的来实现的,这种实现无论有没有发生竞争都需要陷入到内核中去

这里给出一个整体加锁的伪代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
lock(){
spinlock()
if (state == 0) {
set state == 1
当前线程添加wait_list
修改线程状态为TASK_INTERRUPTIBLE
spinUnlock()
执行schedule()
}
spinUnlock()
}

unlock(){
spinlock()
set state = 0
从wait_list中移除任意一个阻塞线程
接着调用wake_up_process()修改该线程状态为TASK_RUNNING,然后重新添加到可运行队列中
spinUnlock()
}

在多核下schedule()操作线程(进程)可运行队列时还需要获取这个队列的自旋锁

在线程A的schedule()入口进行加锁,关中断,关抢占,然后调用switchTo()执行线程B的时候进行解锁,开中断,开抢占

以保证schedule()操作线程状态时不会被wake_up_process()影响

纯用户态的互斥锁

通过在无效唤醒的解决办法中分析,如果纯用户态想解决无效唤醒问题

  • 移除阻塞线程的时间过早的问题

    可以通过一样的方式让pthread库和系统调用配合实现

  • TASK_RUNNING被覆盖的问题

    由于无法直接操作线程状态,所以只能通过异步队列的方式来解决

    在老版本glibc中的pthread_lockpthread_unlock

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    void internal_function __pthread_lock(struct _pthread_fastlock * lock,
    pthread_descr self)
    {
    //省略
    if (!successful_seizure) {
    for (;;) {
    suspend(self); //最后调用系统调用sigsuspend
    if (self->p_nextlock != NULL) {
    /* Count resumes that don't belong to us. */
    spurious_wakeup_count++;
    continue;
    }
    break;
    }
    goto again;
    }
    }
    int __pthread_unlock(struct _pthread_fastlock * lock)
    {
    //省略
    /* Find thread in waiting queue with maximal priority */
    ptr = (pthread_descr *) &lock->__status;
    thr = (pthread_descr) (oldstatus & ~1L);
    maxprio = 0;
    maxptr = ptr;
    while (thr != 0) {
    if (thr->p_priority >= maxprio) {
    maxptr = ptr;
    maxprio = thr->p_priority;
    }
    ptr = &(thr->p_nextlock);
    /* Prevent reordering of the load of lock->__status above and the
    load of *ptr below, as well as reordering of *ptr between
    several iterations of the while loop. Some processors (e.g.
    multiprocessor Alphas) could perform such reordering even though
    the loads are dependent. */
    READ_MEMORY_BARRIER();
    thr = *ptr;
    }
    /* Wake up the selected waiting thread */
    thr->p_nextlock = NULL;
    restart(thr); //最后调用系统调用kill
    }

    这种方式下异步队列复用了信号队列的机制,将unlock的TASK_RUNNING设置异步到查看信号队列的时刻

    这种方式显然不够高效

futex的诞生:用户态+内核态的互斥锁

纯内核态mutex非竞争下低效

纯用户态mutex异步队列低效

从而催生了用户态+内核态的互斥锁futex

futex全称fast userspace mutex ,顾名思义,是用户态优化性能的锁

由 Rusty Russell、Hubertus Franke 和 Mathew Kirkwood 在 2.5.7 中引入,是一种用于用户态应用程序的快速、轻量级内核辅助锁定原语。它提供非常快速的无竞争锁获取和释放。

futex 状态存储在用户空间变量中(在所有平台上都是无符号 32 位整数)。

futex 状态相当于把state状态从内核态剥离到了用户态,通过用户态协作,来规避没有竞争时陷入内核

参考资料