锁实现分析:从glibc到futex(二)

本文续接锁实现分析:从glibc到futex(一)

上一篇对glibc和futex进行了源码分析,是我对具体实现的梳理,没有总结性的内容,可以略过不看直接看这一篇总结

本篇总结尝试深入一些,继续挖掘锁这个概念

分析内核在实现锁的过程中是如何解决无效唤醒问题的

为了解决无效唤醒问题,纯用户态的互斥锁性能不够好;纯内核态又在非竞争的条件时需要陷入内核,从而诞生了futex

什么是锁

所谓的锁,在计算机里本质上就是一块内存空间。当这个空间被赋值为1的时候表示加锁了,被赋值为0的时候表示解锁了。

在多线程场景下,当多个线程抢一个锁,就是抢着要把这块内存赋值为1。

抢失败的线程就被休眠(或者死循环),这样就不会和抢成功的线程一起操作。

当抢成功的线程释放锁,抢失败的的线程就被唤醒抢锁(或者死循环抢锁)。

怎么实现锁

从刚才锁的本质分析可以看出来,锁分为休眠和死循环两种,也就是互斥锁和自旋锁。

自旋锁

自旋锁只依赖抢内存空间赋值成功,不需要唤醒抢失败者,因此实现是非常简单的。

1
2
3
4
5
6
7
spinlock() {
while ((CAS set state = 1 when state = 0) is failed) {
}
}
spinUnlock() {
CAS set state = 0
}

互斥锁

最基本的互斥锁

互斥锁的数据结构和算法就显得复杂了很多,包含了一块待抢的内存结构,和一个抢失败线程的等待唤醒列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
lock(){
if (state == 0) {
set state == 1
当前线程添加进wait_list
修改线程状态为TASK_INTERRUPTIBLE
执行schedule()
}
}

unlock(){
set state = 0
从wait_list中移除任意一个阻塞线程
接着调用wake_up_process()修改该线程状态为TASK_RUNNING,然后重新添加到可运行队列中
}

理论上的互斥锁设计

互斥锁涉及到的数据结构(例如wait_list链表)的本身的线程安全是由自旋锁来保证的

无效唤醒问题

但是依然存在无效唤醒的问题,这个问题包含两个方面

TASK_RUNNING被覆盖
时序 线程A 线程B
1 当前线程添加进wait_list
2 set state = 0
从wait_list中移除任意一个阻塞线程
接着调用wake_up_process()修改该线程状态为TASK_RUNNING,然后重新添加到可运行队列中
3 修改线程状态为TASK_INTERRUPTIBLE

当前线程添加进wait_list修改线程状态为TASK_INTERRUPTIBLE中间执行了一整个unlock()逻辑,会导致unlock()中设置的TASK_RUNNING被lock()中设置的TASK_INTERRUPTIBLE覆盖

从而导致线程无限休眠

移除阻塞线程的时间过早
时序 线程A 线程B
1 从wait_list中移除任意一个阻塞线程
wait_list为空,直接返回,不唤醒任何线程
2 当前线程添加进wait_list
3 休眠

线程B的唤醒操作实际上没有唤醒到任何线程,导致了线程A无线休眠

无效唤醒的解决办法

整体加锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
lock(){
spin_lock()
if (条件不满足) {
当前线程添加进wait_list
修改线程状态为TASK_INTERRUPTIBLE
spin_unlock()
执行schedule()
}
spin_unlock()
}

unlock(){
spin_lock()
让条件满足
从wait_list中移除任意一个阻塞线程
接着调用wake_up_process()修改该线程状态为TASK_RUNNING,然后重新添加到可运行队列中
spin_unlock()
}

这种方式最为简单粗暴,顺序一致性(Sequential consistency),但是临界区太大了

异步队列
时序 线程A 线程B
1 如果条件不满足,才能进入休眠
2 入待操作队列:
让条件满足
从wait_list中移除任意一个阻塞线程
wake_up_process(A)
set A TASK_RUNNING
3 当前线程添加进wait_list
修改线程状态为TASK_INTERRUPTIBLE
4 schedule()
if A == TASK_INTERRUPTIBLE
suspend()
5 调度器处理完一轮调度,准备执行待操作队列:
6 执行待操作队列:
让条件满足
从wait_list中移除任意一个阻塞线程
wake_up_process(A)
set A TASK_RUNNING

异步队列也是常见的顺序一致性(Sequential consistency)手段

线程A如果休眠,一定是不满足条件以后,线程B才开始投递操作队列

这使得线程B的执行一定强制晚于线程A,从而避免了无效唤醒

这种方式虽然没有临界区,但是会导致延迟唤醒

调整顺序

通过精心设计的顺序来避免无效唤醒的两个问题

TASK_RUNNING被覆盖
时序 线程A 线程B
1 修改线程状态为TASK_INTERRUPTIBLE
2 如果条件不满足,才能进入休眠
3 让条件满足
4 wake_up_process(A)
set A TASK_RUNNING
5 schedule()内部
if A == TASK_INTERRUPTIBLE
suspend()

休眠线程必须先设置TASK_INTERRUPTIBLE状态,再根据条件进入schedule

如果条件不满足一定 happens-before 让条件满足

修改线程状态为TASK_INTERRUPTIBLE sequenced-before 如果条件不满足

让条件满足 sequenced-before set A TASK_RUNNING

所以修改线程状态为TASK_INTERRUPTIBLE happens-before set A TASK_RUNNING

从而避免了TASK_RUNNING被覆盖导致的无效唤醒

回顾pthread_mutex的实现如何解决这个问题可以看到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
ktime_t *abs_time, u32 bitset)
{
//具体在futex_wait_queue_me()中实现

set_current_state(TASK_INTERRUPTIBLE);
//list为空的条件不满足
if (likely(!plist_node_empty(&q->list))) {
schedule();
}
}
static int
futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
{
//具体在wake_futex()中实现

//让list为空的条件满足
__unqueue_futex(q);
wake_up_state(p, TASK_NORMAL);
}

加锁时wait中修改状态为TASK_INTERRUPTIBLE然后判断条件,解锁时先让条件满足,再修改状态为TASK_RUNNING

和我分析的逻辑是一致的

移除阻塞线程的时间过早
时序 线程A 线程B
1 int current_val = lock_val
2 lock_val = lock_val + 1
3 lock(wait_list)
从wait_list中移除任意一个阻塞线程
unlock(wait_list)
4 lock(wait_list)
5 if (current_val != lock_val) return
6 当前线程添加进wait_list
7 unlock(wait_list)

通过引入了一个额外的原子变量,并且通过wait_list的锁来保证这个变量和操作wait_list的原子性

移除阻塞线程的时间过早 = 从wait_list中移除任意一个阻塞线程 happens-before 当前线程添加进wait_list的情况下

也相当于从wait_list中移除任意一个阻塞线程 happens-before if (current_val != lock_val) return的情况下

因为lock_val = lock_val + 1 sequenced-before 从wait_list中移除任意一个阻塞线程

所以lock_val = lock_val + 1happens-before if (current_val != lock_val) return

在这种情况下if (current_val != lock_val) return一定会直接返回从而避免无效唤醒

不过这里也导致了虚假唤醒的问题,只要检测到lock_val = lock_val + 1的事件,就会直接唤醒

回顾pthread_mutex的实现如何解决这个问题可以看到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
int pthread_mutex_lock (pthread_mutex_t *mutex) {
//省略...

lll_futex_wait (futex, 2, LLL_PRIVATE); /* Wait if *futex == 2. */
}

static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
ktime_t *abs_time, u32 bitset)
{
//具体在futex_wait_setup()中实现

*hb = queue_lock(q); // 锁定队列,并返回对应的 hash_bucket

ret = get_futex_value_locked(&uval, uaddr); // 从用户空间地址获取futex的值

//省略...

//如果读取到的值和期望值不同,说明在这中间有唤醒,那么直接解锁返回
if (uval != val) {
return -EWOULDBLOCK;
}

//在futex_wait_queue_me()中实现
plist_node_init(&q->list, prio);
plist_add(&q->list, &hb->chain);
spin_unlock(&hb->lock);
}

int pthread_mutex_unlock (pthread_mutex_t *mutex) {
//省略...

int __oldval = atomic_exchange_rel (__futex, 0);
lll_futex_wake (__futex, 1, __private)
}

static void futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
{
__unqueue_futex(q);
}

解锁时将futex设为0,然后再出队列

加锁时期望val为2,随后在哈希桶的链表锁中查看futex是否符合val,如果不符合判断虚假唤醒

和我分析的逻辑是一致的

schedule原子性

调整顺序这一节的所有讨论都是基于schedule()函数是原子的前提下,进入schedule()函数后就不会被wake_up_process()影响

在多核下schedule()操作线程(进程)可运行队列时需要获取这个队列的自旋锁

在线程A的schedule()入口进行加锁,关中断,关抢占,然后调用switchTo()执行线程B的时候进行解锁,开中断,开抢占

以保证schedule()操作线程状态时不会被wake_up_process()影响

futex的诞生

纯内核态的互斥锁

最早期的互斥锁是完全的内核态的来实现的,这种实现无论有没有发生竞争都需要陷入到内核中去

这里给出一个整体加锁的伪代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
lock(){
spinlock()
if (state == 0) {
set state == 1
当前线程添加进wait_list
修改线程状态为TASK_INTERRUPTIBLE
spinUnlock()
执行schedule()
}
spinUnlock()
}

unlock(){
spinlock()
set state = 0
从wait_list中移除任意一个阻塞线程
接着调用wake_up_process()修改该线程状态为TASK_RUNNING,然后重新添加到可运行队列中
spinUnlock()
}

纯用户态的互斥锁

通过在无效唤醒的解决办法中分析,如果纯用户态想解决无效唤醒问题

  • 移除阻塞线程的时间过早的问题

    可以通过一样的方式让pthread库和系统调用配合实现

  • TASK_RUNNING被覆盖的问题

    由于无法直接操作线程状态,所以只能通过异步队列的方式来解决

    在老版本glibc中的pthread_lockpthread_unlock

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    void internal_function __pthread_lock(struct _pthread_fastlock * lock,
    pthread_descr self)
    {
    //省略
    if (!successful_seizure) {
    for (;;) {
    suspend(self); //最后调用系统调用sigsuspend
    if (self->p_nextlock != NULL) {
    /* Count resumes that don't belong to us. */
    spurious_wakeup_count++;
    continue;
    }
    break;
    }
    goto again;
    }
    }
    int __pthread_unlock(struct _pthread_fastlock * lock)
    {
    //省略
    /* Find thread in waiting queue with maximal priority */
    ptr = (pthread_descr *) &lock->__status;
    thr = (pthread_descr) (oldstatus & ~1L);
    maxprio = 0;
    maxptr = ptr;
    while (thr != 0) {
    if (thr->p_priority >= maxprio) {
    maxptr = ptr;
    maxprio = thr->p_priority;
    }
    ptr = &(thr->p_nextlock);
    /* Prevent reordering of the load of lock->__status above and the
    load of *ptr below, as well as reordering of *ptr between
    several iterations of the while loop. Some processors (e.g.
    multiprocessor Alphas) could perform such reordering even though
    the loads are dependent. */
    READ_MEMORY_BARRIER();
    thr = *ptr;
    }
    /* Wake up the selected waiting thread */
    thr->p_nextlock = NULL;
    restart(thr); //最后调用系统调用kill
    }

    这种方式下异步队列复用了信号队列的机制,将unlock的TASK_RUNNING设置异步到查看信号队列的时刻

    这种方式显然不够高效

futex的诞生:用户态+内核态的互斥锁

纯内核态mutex非竞争下低效

纯用户态mutex异步队列低效

从而催生了用户态+内核态的互斥锁futex

futex全称fast userspace mutex ,顾名思义,是用户态优化性能的锁

由 Rusty Russell、Hubertus Franke 和 Mathew Kirkwood 在 2.5.7 中引入,是一种用于用户态应用程序的快速、轻量级内核辅助锁定原语。它提供非常快速的无竞争锁获取和释放。

futex 状态存储在用户空间变量中(在所有平台上都是无符号 32 位整数)。

futex 状态相当于把state状态从内核态剥离到了用户态,通过用户态协作,来规避没有竞争时陷入内核

参考资料