在C++中实现协程
我在整理tars和taf的协程逻辑的时候,也实现了一个纯净版的协程框架以便于理解
是基于我6年前编写的网络框架sheep_cpp修改的,虽然当年在魅族也是生产环境的网络框架,但是以我目前的经验来看,还是有很多不足的地方
仅作为演示协程实现的代码应该还是绰绰有余的,协程和协程套件的单元测试全部正常运行,用例在https://github.com/tedcy/sheep_cpp/blob/master/test/coroutine/coroutinue_test.cpp
网络框架
网络框架部分最大的问题还是单线程模型了,成熟的框架都是网络线程和工作线程分离的
客户端部分的话,服务发现主要靠域名解析,这个倒还不算太大的问题
类图
classDiagram
direction TB
class EventLoop {
+void wait()
+void stop()
+void addPoller(uint64_t type, shared_ptr~Poller~ poller)
+weak_ptr<Poller> getPoller(uint64_t pollerType)\n在Connector和Acceptor中用来给TcpConnection创建Event的时候指定Poller
-map<uint64_t, shared_ptr<Poller>> pollers_;
}
class Poller {
<<interface>>
+vector<weak_ptr<Event>> poll(string&)
+void addOrUpdateEvent(shared_ptr<Event> event)
+void removeEvent(Event *event)
}
class Epoller {
+vector<weak_ptr<Event>> poll(string&)
+void addOrUpdateEvent(shared_ptr~Event~ event)
+void removeEvent(Event *event)
}
class Event {
+Event(weak_ptr~Poller~ poller, int64_t fd)
+void setReadEvent(function<void#40;#41;>)
+void setWriteEvent(function<void#40;#41;>)
+void enableReadNotify()
+void enableWriteNotify()
+void disableReadNotify()
+void disableWriteNotify()
-weak_ptr~Poller~ poller_;
}
Epoller ..|> Poller
EventLoop ..> Poller
Epoller ..> Event
Event ..> Poller
class Socket {
+void bind(const string &addr, int port) Acceptor使用
+void listen() Acceptor使用
+void accept() Acceptor使用
+void connect(const string &addr, int port) Connector使用
+int read(char* buf, int len) TcpConnection使用
+int write(char* buf, int len) TcpConnection使用
+void setNoblock()
}
class Buffer {
+void push(const char *buf, uint64_t len)
+uint64_t popHead(char *buf, uint64_t len)
-void write(const char *buf, uint64_t len) TcpConnection使用
-uint64_t Read(char *buf, uint64_t len) TcpConnection使用
}
class TcpConnection {
+TcpConnection(unique_ptr~Socket~ &&socket, shared_ptr~Event~ &event)
+void asyncRead(uint64_t expectSize, function<void#40;const string &errMsg#41;> func)
+void asyncReadAny(function<void#40;const string &errMsg#41;>)
+void asyncWrite(function<void#40;const string &errMsg#41;>)
+void finish(const string &errMsg)
-void initAccepted(string &errMsg)
-void initConnected(string &errMsg)
-void setFinishHandler(function<void#40;const string &errMsg,\nshared_ptr<TcpConnection>)>)
-Buffer readBuffer_;
-Buffer writeBuffer_;
-unique_ptr~Socket~ socket_;
-shared_ptr~Event~ event_;
}
class Acceptor {
+Acceptor(EventLoop &loop, const string &addr, int port)
+void listen(string &errMsg)
+void setNewConnectionHandler(function<void#40;int fd#41;>)
-unique_ptr~Socket~ socket_;
-shared_ptr~Event~ event_;
}
class Connector {
Connector(EventLoop &loop, const string &addr, int port)
+void connect(string &errMsg)
+void setNewConnectionHandler(function<void#40;unique_ptr<Socket>&,\nshared_ptr<Event>)>)
+void setConnectFailedHandler(function<void#40;const string &errMsg#41;>)
-unique_ptr~Socket~ socket_;
-shared_ptr~Event~ event_;
}
Socket --* TcpConnection
Buffer --o TcpConnection
Event --o TcpConnection
Socket --* Acceptor
Event --o Acceptor
Socket --* Connector
Event --o Connector
class Server {
+Server(EventLoop &loop, const string &addr, int port)
+void serve(string &errMsg)
+void setConnectedHandler(function<void#40;const string &errMsg,shared_ptr<TcpConnection>)>)\n连接成功以后,在connected回调的参数来获取连接读取数据
+void setDisconnectedHandler(function<void#40;const string &errMsg#41;>)
-Acceptor acceptor_;
set~shared_ptr~TcpConnection~~ connections_;
}
TcpConnection --o Server
Acceptor --* Server
Client --> EventLoop
class Client {
+Client(EventLoop &loop, const string &addr, int port)
+void asyncConnect(string &errMsg)
+void setConnectedHandler(function<void#40;const string &errMsg#41;>)
+void setDisconnectedHandler(function<void#40;const string &errMsg#41;>)
+shared_ptr<TcpConnection>& getTcpConnection()\n连接成功以后,在connected回调中调用来获取连接读取数据
-Connector connector_;
-shared_ptr~TcpConnection~ connection_;
}
Connector --* Client
TcpConnection --o Client
Server --> EventLoop
服务端流程图
sequenceDiagram
participant main
participant EventLoop
participant Server
participant Acceptor
participant Socket
participant Event
participant Poller
participant Epoller
main ->> Server : 注册服务端连接事件<br>setConnectedHandler()<br>main::onConnect()赋值给Server::connectedHandler_
main ->> Server : 注册服务端断开连接事件<br>setDisconnectedHandler()<br>main::onDisConnect()赋值给Server::disconnectedHandler_
main ->>+ Server : serve()
Server ->>+ Acceptor : Server::newConnectionHandler()<br>注册进新连接事件:<br>acceptor_-><br>setNewConnectionHandler()<br>acceptor_->listen()
Acceptor ->>+ Socket : socket_->bind()<br>socket_->setNoBlock()<br>socket_->setNoDelay()
Socket ->>- Acceptor : return
Acceptor ->>+ Epoller : event_ = make_shared<Event><br>(Epoller::getInstance())
Epoller ->>- Acceptor : return
Acceptor ->>+ Event : Acceptor::readHandler()<br>注册进读事件:<br>event_->setReadEvent()<br>event_->enableReadNotify()
Event ->>+ Poller : poller_->addOrUpdateEvent()
Poller ->>+ Epoller : addOrUpdateEvent()
Epoller ->>- Poller : return
Poller ->>- Event : return
Event ->>- Acceptor : return
Acceptor ->>- Server : return
Server ->>- main : return
EventLoop ->>+ EventLoop : wait()
loop while(true)
EventLoop ->>+ EventLoop : runOnce()
EventLoop ->>+ Epoller : auto events = poll()
Epoller ->> Epoller : ::epoll_wait(epollfd_, &(*pollEvents_.begin()), maxSize_)
Epoller ->>- EventLoop : return
loop for event in events
alt 服务端监听描述符
EventLoop ->>+ Event : do()
Event ->>+ Acceptor : readHandler()
Acceptor ->>+ Socket : auto fd = socket_->accept();
Socket ->>+ Socket : ::accept()
Socket ->>- Socket : return
Socket ->>- Acceptor : return
Acceptor ->>+ Server : newConnectionHandler(fd)
Server ->>+ Connection : auto connection = make_shared<TcpConnection>(fd)<br>Server::disconnectedHandler_注册进断开链接事件<br>connection->setFinishHandler():<br>connection->initAccepted()
Connection ->>+ Socket : socket_->setNoBlock()<br>socket_->setNoDelay()<br>TcpConnection::readHandler()<br>注册进读事件:<br>event_->setReadEvent()<br>event_->enableReadNotify()
Socket ->>- Connection : return
Connection ->>- Server : return
Server ->>+ Server : connectedHandler_()
Server ->>+ main : onConnect()
main ->>+ main : 注册可读事件connection.asyncRead<br>(expectSize, main::onConnectionRead)
main ->>- main : return
main ->>- Server : return
Server ->>- Acceptor : return
Acceptor ->>- Event : return
Event ->>+ EventLoop : return
end
EventLoop ->>+ Event : do()
alt 服务端被建立链接描述符可读
Event ->>+ Connection : readHandler()
Connection ->>+ Connection : char buf[1024]<br>socket_->read(buf)<br>append to readBuffer_
alt readedSize_ >= expectSize_
Connection ->>+ main : onConnectionRead()
main ->>+ main : 读取Connection的readBuffer_<br>写入读取Connection的writeBuffer_
main ->>+ Connection : 注册写完事件connection.asyncWrite<br>(main::finishWrite)
Connection ->>+ Connection : TcpConnection::writeHandler()<br>注册进写事件:<br>event_->setWriteEvent()<br>event_->enableWriteNotify()
Connection ->>- Connection : return
Connection ->>- main : return
main ->>- main : return
main ->>- Connection : return
end
Connection ->>- Connection : return
Connection ->>- Event : return
end
alt 服务端被建立链接描述符可写
Event ->>+ Connection : writeHandler()
Connection ->>+ Connection : socket_->Write()
Connection ->>- Connection : return
alt 全部写完
Connection ->>+ main : main::finishWrite()
main ->> Connection : return
end
Connection ->> Event : return
end
Event ->>+ EventLoop : return
end
EventLoop ->>- EventLoop : return
end
EventLoop ->>- EventLoop : return
客户端流程图
略
协程框架
协程实现在这里https://github.com/tedcy/sheep_cpp/tree/master/src/coroutine
协程套件在https://github.com/tedcy/sheep_cpp/blob/master/src/coroutine/coroutine_mutex.h
由于网络线程和工作线程没有隔离,所以协程框架的循环run()
需要内置EventLoop,定时的进行调用,这个不是很合适
1 | void run() { |
如果不延迟loop_->runOnce()
,那么唤醒会变得很慢
如果延迟,那么网络事件处理会变慢,对accept处理延迟很容易造成系统的连接队列爆掉
类图
classDiagram
direction LR
namespace 协程框架 {
class CoroutineInfo协程 {
+void registerFunc()
+void switchTo(CoroutineInfo *to) 切换到目标协程
+void switchBack() 切换回主协程
}
class CoroutineScheduler协程调度器 {
+void start() 用于启动这个协程调度器在独立线程里
+void addCoroutine(F && func) 用于其他线程给这个线程添加协程任务
+void yield() 用于挂起协程,让给其他协程执行,空闲时再转回自己
+void suspend() 用于挂起协程,必须通过resume唤醒
+void resume(CoroutineInfo*) 用于唤醒suspend挂起的协程
+void sleep() 用于定时休眠后唤醒,可以用suspend和resume封装
}
class SharedState["SharedState<T>"] {
+void setValue(T && t)
+void setException(std::exception_ptr p)
+void wait(int ms)
}
class Promise["Promise<T>"] {
+Future<T> getFuture()
+void setValue(T && t)
+void setException(std::exception_ptr p)
-shared_ptr~SharedState~T~~ state : \n和Future共享同一个SharedState
}
class Future["Future<T>"] {
+T wait(int ms = -1)
-shared_ptr~SharedState~T~~ state : \n和Promise共享同一个SharedState
}
class ASyncer {
+Future<T> async(F && func) 异步执行func投递给CoroutineScheduler池,\n并获取一个Future,func是一个T类型返回值的lambda\nasync会对这个func封装一层,用于返回值或异常赋值给promise
}
class CoMutex {
+void lock()
+void unlock()
}
class Mutex {
+void lock() 锁是条件变量的特化情况:\n条件就是"当前没有其他线程/协程占用该互斥锁"
+void unlock()
}
class ConditionVariable {
+void wait(unique_lock<mutex>& lock, int ms = -1) 在线程或协程内,使用各自版本的cv
+void notify_one() 优先唤醒协程的,如果找不到,再去唤醒一个线程的
+void notify_all()
-CoConditionVariable coCv_;
-condition_variable cv_;
}
class CoConditionVariable {
+void wait(int ms = -1)
+void notify_one()
+void notify_all()
}
}
CoroutineInfo协程 "N" --* CoroutineScheduler协程调度器
SharedState ..> ConditionVariable
Mutex ..> ConditionVariable
ConditionVariable ..> CoConditionVariable
Promise ..> SharedState
Future ..> SharedState
ASyncer ..> Future
CoMutex ..> CoroutineScheduler协程调度器
CoConditionVariable ..> CoroutineScheduler协程调度器
ASyncer ..> CoroutineScheduler协程调度器
namespace 网络框架 {
class EventLoop事件循环 {
-void runOnce() CoroutineScheduler协程调度器\n每隔10ms调用一次\nresume是为了多线程安全是异步入队列后在协程循环中调用\n为了不让resume唤醒太慢\n必须延迟10ms的时间\n这会导致网络事件的处理延迟\n所以tars的网络线程是纯异步的,不使用协程
}
class CoServer {
+void setConnectedHandler() \n连接建立以后创建一个新协程
}
class CoClient {
+void asyncConnect(F && func)
+void connect() 在SetConnectedHandler回调中进行resume\n调用AsyncConnect,随后当前协程suspend\n被唤醒后把得到的TcpConnection包成CoTcpConnection
}
class CoTcpConnection {
+void asyncRead(F && func)
+void read() 同AsyncConnect
}
}
note for EventLoop事件循环 "1 服务端线程整个由调度器托管<br>EventLoop是调度器的一个协程<br>2 异步回调可以全部套一层<br>来实现同步回调"
CoClient ..> CoTcpConnection
CoServer ..> EventLoop事件循环
CoServer ..> CoTcpConnection
CoroutineInfo协程
协程实例,主要功能是把寄存器的数据和当前栈整个记录在申请的内存中,随时可以将其恢复出来
记录和恢复两个功能,就完成不同的栈之间的切换,而如何切换是协程调度器的工作
这个切换有点像游戏中的“存档”功能, 就好比家长(协程调度器)安排几个孩子轮流玩同一台游戏机,每个孩子玩一会儿累了, 暂停游戏并“保存进度”(记录),让出游戏机, 家长再让下一个孩子“读取进度”(恢复)继续他自己之前玩到一半的游戏内容, 就这样依次排队轮流玩,每个孩子随时都能从之前暂停的位置继续玩,不用重新开始游戏。
context库
我是基于boost的context库中的部分代码来实现的,CoroutineInfo是对context库的简单封装
fcontext_t实际上就是记录的协程数据
1 | // 定义协程执行栈 |
而操作fcontext_t
的make_fcontext和jump_fcontext都是汇编实现
make_fcontext:https://github.com/tedcy/sheep_cpp/blob/master/src/coroutine/tc_make_x86_64_sysv_elf_gas.S
jump_fcontext:https://github.com/tedcy/sheep_cpp/blob/master/src/coroutine/tc_jump_x86_64_sysv_elf_gas.S
而他们的函数签名是这样的
1 | // 创建一个新的协程上下文函数(make function context) |
使用make_fcontext可以新开辟一个协程栈空间,运行自己期望的函数,并返回一个fcontext_t
指针
随后使用jump_fcontext就可以跳转到创建的协程栈空间去了
显然jump_fcontext就是对应了上文所述的恢复功能,而记录是通过组合了make_fcontext和jump_fcontext实现的
1 | class CoroutineInfo : public IntrusiveListNode<CoroutineInfo> { |
当CoroutineInfo协程实例执行了registerFunc
以后,其中的ctx_
就记录了期望运行lambda的协程栈,而ctxFrom_
则记录了调用方的lambda的协程栈
随后就可以从主协程恢复到目标协程(switchTo
),或者恢复回调用方主协程(swithBack
)
1 | // 从当前协程切换至目标协程(to) |
CoroutineScheduler协程调度器
用法
首先通过currentCoroScheduler()基于线程私有变量的单例,获取到协程调度器,随后就可以调用协程调度器的各种方法了
这个协程调度器线程私有变量,是在下面初始化的start函数中创建新线程以后进行赋值的
注意,这个方法在非协程调度器的线程中调用会返回空指针
1 | class CoroutineScheduler { |
初始化
协程调度器在初始化的时候就需要创建好的全部协程实例在free队列中去,每个实例用于随时调用上文CoroutineInfo::registerFunc
注册进新的lambda
yield,suspend,resume是不同的逻辑,因此也是单独的队列,这些队列只需要初始化链表头即可
1 | class CoroutineScheduler::CoroutineSchedulerImp { |
yield
yield是最简单的协程切换的例子,用于挂起协程,让给其他协程执行,空闲时再转回自己
核心代码如下:
1 | // 调度器主循环函数,循环运行所有已准备好执行的协程 |
yield不止用于运行一些定时检查逻辑,还是创建协程的基础
addCoroutine
由于有可能是其他线程创建的协程实例,因此加锁,先放入yield队列,等待在run线程中正式的调用
1 | // 向调度器中添加一个新的协程任务 |
suspend
suspend相当于是yield的休眠的哪一部分,唤醒的部分在resume去做
1 | // 将指定的协程移动到挂起链表(suspend链表),表示该协程暂时不会被调度执行 |
resume
resume的唤醒比起yield的唤醒流程复杂一些,因为要考虑多线程安全问题
1 | // 协程调度器的核心方法,持续运行待唤醒(resume)的协程任务 |
sleep
sleep则是基于suspend和resume的封装
1 | void run() { |
协程套件
CoConditionVariable协程条件变量
条件变量就是当条件不满足的时候,在等待列表里面等待
其他协程发现条件满足了,就唤醒等待列表的某一项或者全部
因此它由两部分组成,CoroutineWaiter等待者类,和包含CoroutineWaiter队列的CoConditionVariable
1 | // 协程等待器结构体,用于实现协程的挂起与唤醒机制 |
CoroutineWaiter被唤醒有可能是超时时间到了,也有可能是被主动唤醒的,因此添加一个isCanceled标记。
超时时间到了,并不会删除队列中的这个元素,而是通过isCanceled标记的判断,在下次notfiy的过程中延迟删除
isCanceled标记也是通过wait时传入的锁来保证多线程安全的
CoMutex协程锁
在前文锁实现分析:从glibc到futex(二)中也总结了锁的概念
实际上就是一个标记位+一个队列,如果不考虑用户态和内核态的问题,可以不用futex的实现那么复杂
可以映射成一群人排队上一个厕所
- 标记位相当于厕所门口的一个指示牌(如“有人”或“无人”)。
- 当厕所没人时(标记位为0,门开着指示牌“有人”),走上前来的人直接进入(加锁成功)。
- 当厕所里有人时(标记位为1,门关着指示牌“无人”),后续到达的人自动排队(进入等待队列,等待加锁)。
- 队列就相当于门外排队的人。当里面的人使用完厕所从里面走出来(释放锁)时,需要通知排队中的下一个人可以进入厕所了:
- 队列中第一个人收到通知后进入厕所,其他人保持继续等待状态。
1 | class CoMutex { |
这个实现有一个反直觉的地方在于:unlock
的时候,并不是每次都会让isLock_
(锁状态)变为解锁状态,而是只有当没有人等待时,才真正地解锁。
用前面“上厕所”的例子类比一下就会很好理解:
- 如果门外没人排队,那么用完厕所的人出来后,门自然是打开的,所以此时
isLock_
变为解锁状态。 - 如果门外有人正在排队,当厕所里的人用完出来后,他并不打开门,而是直接交给排队的第一个人进去,因此门一直处于“有人”的状态,
isLock_
并没有被解锁。
也就是说,两个人在交接厕所的时候,全程都是门处于“有人”的状态,没有那一瞬间“没人”。
如果我们严格按常规习惯,理想情况应该是:厕所里的人出来后主动把门打开(解锁),外面排队的人进去后再把门关上重新上锁。但是实际的代码为了简化,没有让排队的人进去之后再去关门(执行加锁逻辑),而是在解锁的瞬间,直接把厕所使用权限交给等待的人进去使用。因此看起来“厕所门的状态没有变化”,显得特别反直觉,但从代码执行的效率来看,这种方式是更高效的。
支持协程的通用套件
通用套件是指,不管在协程还是线程都可以使用的套件
前面的协程套件,核心是通过调用协程调度器的suspend,resume等等来完成的
这需要通过CoroutineScheduler::currentCoroScheduler()
拿到当前线程的协程调度器,在非协程调度器的线程中使用会直接返回空指针
因此正式的业务代码,需要使用通用套件,无需关心是当前线程中有没有运行协程调度器,才有更好的体验
ConditionVariable条件变量
这个是最容易也是最核心的的实现了,只需要同时拥有协程条件变量和线程条件变量
而通用条件变量是下面全部套件的基石
wait的时候判断当前线程是否存在协程调度器,存在的话就直接用协程wait,否则用线程wait
唤醒的时候,看协程条件变量有没有唤醒成功,失败的话再使用线程条件变量唤醒
1 | // 条件变量的封装类,同时支持普通线程和协程的等待和通知机制 |
Mutex锁
显然,条件变量的思路来实现锁是行不通的,没办法简单的搞两把锁,就能保证线程和协程都正确休眠
换一个角度去想,锁相当于保护条件为只有当前进程/协程可用的条件变量
- 条件变量的本质是等待某个条件满足,等待时挂起线程/协程,并在被通知后重新检查条件、进入下一步;
- 因此锁相当于是条件变量的一种特化情况——条件就是"当前没有其他线程/协程占用该互斥锁"。
因此可以直接基于通用套件的条件变量来实现通用锁,思路捋顺了,那么实现就是经典的条件变量case
1 | class Mutex { |
但是这里和CoMutex有一点不一样,不管有没有等待者,都会改变isLock的状态
回顾一下上面的类比CoMutex协程锁:
这个实现有一个反直觉的地方在于:
unlock
的时候,并不是每次都会让isLock_
(锁状态)变为解锁状态,而是只有当没有人等待时,才真正地解锁。用前面“上厕所”的例子类比一下就会很好理解:
- 如果门外没人排队,那么用完厕所的人出来后,门自然是打开的,所以此时
isLock_
变为解锁状态。- 如果门外有人正在排队,当厕所里的人用完出来后,他并不打开门,而是直接交给排队的第一个人进去,因此门一直处于“有人”的状态,
isLock_
并没有被解锁。也就是说,两个人在交接厕所的时候,全程都是门处于“有人”的状态,没有那一瞬间“没人”。
如果我们严格按常规习惯,理想情况应该是:厕所里的人出来后主动把门打开(解锁),外面排队的人进去后再把门关上重新上锁。但是实际的代码为了简化,没有让排队的人进去之后再去关门(执行加锁逻辑),而是在解锁的瞬间,直接把厕所使用权限交给等待的人进去使用。因此看起来“厕所门的状态没有变化”,显得特别反直觉,但从代码执行的效率来看,这种方式是更高效的。
这里的实现,正是更理想的情况,这种写法可能不够高效,但是更好理解
PromiseFuture
C++标准库有线程版本的promise和future,这个组件对协程异步任务特别实用:
1 | void func(std::promise<int> * p) |
在C++标准库实现中,它就是条件变量的语法糖,那么在协程通用套件中也一样可以基于ConditionVariable来实现

SharedState(两者共享的Channel)
代码量看起来很多,实际上只是因为SharedState需要为void单独编写特化版本
1 | namespace internal { |
Promise(只写)
直接用SFINAE为void特化即可
1 | // Promise表示设置异步操作结果的一端,被执行异步操作逻辑的一方持有使用 |
Future(只读)
我稍微修改了wait的语义,让wait也具有取值语义,以方便使用
1 | // Future表示异步操作结果的访问端,可等待并获得异步操作的返回值 |
Async任务投递到协程调度器池
1 | // Async 类实现了一个轻量级的协程异步任务执行器池,封装了多个 CoroutineScheduler 用于并行调度和运行异步任务。 |
使用
1 | auto future = async::getInstance()->async([]() { |
总结
整个协程实现起来并不复杂,弄懂原理以后都是顺理成章的
协程实例,主要功能是把寄存器的数据和当前栈整个记录在申请的内存中,随时可以将其恢复出来
记录和恢复两个功能,就完成不同的栈之间的切换,而如何切换是协程调度器的工作
这个切换有点像游戏中的“存档”功能, 就好比家长(协程调度器)安排几个孩子轮流玩同一台游戏机,每个孩子玩一会儿累了, 暂停游戏并“保存进度”(记录),让出游戏机, 家长再让下一个孩子“读取进度”(恢复)继续他自己之前玩到一半的游戏内容, 就这样依次排队轮流玩,每个孩子随时都能从之前暂停的位置继续玩,不用重新开始游戏。
而协程套件实现过程其实还是CoMutex和Mutex最有意思,记住核心思想:一个标记位+一个队列
可以映射成一群人排队上一个厕所
- 标记位相当于厕所门口的一个指示牌(如“有人”或“无人”)。
- 当厕所没人时(标记位为0,门开着指示牌“有人”),走上前来的人直接进入(加锁成功)。
- 当厕所里有人时(标记位为1,门关着指示牌“无人”),后续到达的人自动排队(进入等待队列,等待加锁)。
- 队列就相当于门外排队的人。当里面的人使用完厕所从里面走出来(释放锁)时,需要通知排队中的下一个人可以进入厕所了:
- 队列中第一个人收到通知后进入厕所,其他人保持继续等待状态。
用条件变量去实现锁的时候,锁相当于保护条件为只有当前进程/协程可用的条件变量
- 条件变量的本质是等待某个条件满足,等待时挂起线程/协程,并在被通知后重新检查条件、进入下一步
- 因此锁相当于是条件变量的一种特化情况——条件就是"当前没有其他线程/协程占用该互斥锁"。
CoMutex和Mutex的实现细节有一些不一样,用等厕所来类比的话
Mutex实现(常规):如果我们严格按常规习惯,理想情况应该是:厕所里的人出来后主动把门打开(解锁),外面排队的人进去后再把门关上重新上锁。
CoMutex实现(简化):但是实际的代码为了简化,没有让排队的人进去之后再去关门(执行加锁逻辑),而是在解锁的瞬间,直接把厕所使用权限交给等待的人进去使用。因此看起来“厕所门的状态没有变化”,显得特别反直觉,但从代码执行的效率来看,这种方式是更高效的。