在C++中实现协程

我在整理tars和taf的协程逻辑的时候,也实现了一个纯净版的协程框架以便于理解

是基于我6年前编写的网络框架sheep_cpp修改的,虽然当年在魅族也是生产环境的网络框架,但是以我目前的经验来看,还是有很多不足的地方

仅作为演示协程实现的代码应该还是绰绰有余的,协程和协程套件的单元测试全部正常运行,用例在https://github.com/tedcy/sheep_cpp/blob/master/test/coroutine/coroutinue_test.cpp

网络框架

网络框架部分最大的问题还是单线程模型了,成熟的框架都是网络线程和工作线程分离的

客户端部分的话,服务发现主要靠域名解析,这个倒还不算太大的问题

类图

classDiagram
direction TB
class EventLoop {
    +void wait()
    +void stop()
    +void addPoller(uint64_t type, shared_ptr~Poller~ poller)
    +weak_ptr<Poller> getPoller(uint64_t pollerType)\n在Connector和Acceptor中用来给TcpConnection创建Event的时候指定Poller
    -map&lt;uint64_t, shared_ptr&lt;Poller&gt;&gt; pollers_;
}
class Poller {
    <<interface>>
    +vector&lt;weak_ptr&lt;Event&gt;&gt; poll(string&)
    +void addOrUpdateEvent(shared_ptr<Event> event)
    +void removeEvent(Event *event)
}
class Epoller {
    +vector&lt;weak_ptr&lt;Event&gt;&gt; poll(string&)
    +void addOrUpdateEvent(shared_ptr~Event~ event)
    +void removeEvent(Event *event)
}
class Event {
    +Event(weak_ptr~Poller~ poller, int64_t fd)
    +void setReadEvent(function&lt;void#40;#41;&gt;)
    +void setWriteEvent(function&lt;void#40;#41;&gt;)
    +void enableReadNotify()
    +void enableWriteNotify()
    +void disableReadNotify()
    +void disableWriteNotify()
    -weak_ptr~Poller~ poller_;
}
Epoller ..|> Poller
EventLoop ..> Poller
Epoller ..> Event
Event ..> Poller

class Socket {
    +void bind(const string &addr, int port) Acceptor使用
    +void listen() Acceptor使用
    +void accept() Acceptor使用
    +void connect(const string &addr, int port) Connector使用
    +int read(char* buf, int len) TcpConnection使用
    +int write(char* buf, int len) TcpConnection使用
    +void setNoblock()
}
class Buffer {
    +void push(const char *buf, uint64_t len)
    +uint64_t popHead(char *buf, uint64_t len)
    -void write(const char *buf, uint64_t len) TcpConnection使用
    -uint64_t Read(char *buf, uint64_t len) TcpConnection使用
}

class TcpConnection {
    +TcpConnection(unique_ptr~Socket~ &&socket, shared_ptr~Event~ &event)
    +void asyncRead(uint64_t expectSize, function&lt;void#40;const string &errMsg#41;&gt; func)
    +void asyncReadAny(function&lt;void#40;const string &errMsg#41;&gt;)
    +void asyncWrite(function&lt;void#40;const string &errMsg#41;&gt;)
    +void finish(const string &errMsg)
    -void initAccepted(string &errMsg)
    -void initConnected(string &errMsg)
    -void setFinishHandler(function&lt;void#40;const string &errMsg,\nshared_ptr&lt;TcpConnection&gt;&#41;&gt;)
    -Buffer readBuffer_;
    -Buffer writeBuffer_;
    -unique_ptr~Socket~ socket_;
    -shared_ptr~Event~ event_;
}

class Acceptor {
    +Acceptor(EventLoop &loop, const string &addr, int port)
    +void listen(string &errMsg)
    +void setNewConnectionHandler(function&lt;void#40;int fd#41;&gt;)
    -unique_ptr~Socket~ socket_;
    -shared_ptr~Event~ event_;
}

class Connector {
    Connector(EventLoop &loop, const string &addr, int port)
    +void connect(string &errMsg)
    +void setNewConnectionHandler(function&lt;void#40;unique_ptr&lt;Socket&gt;&,\nshared_ptr&lt;Event&gt;&#41;&gt;)
    +void setConnectFailedHandler(function&lt;void#40;const string &errMsg#41;&gt;)
    -unique_ptr~Socket~ socket_;
    -shared_ptr~Event~ event_;
}

Socket --* TcpConnection
Buffer --o TcpConnection
Event --o TcpConnection

Socket --* Acceptor
Event --o Acceptor

Socket --* Connector
Event --o Connector

class Server {
    +Server(EventLoop &loop, const string &addr, int port)
    +void serve(string &errMsg)
    +void setConnectedHandler(function&lt;void#40;const string &errMsg,shared_ptr&lt;TcpConnection&gt;&#41;&gt;)\n连接成功以后,在connected回调的参数来获取连接读取数据
    +void setDisconnectedHandler(function&lt;void#40;const string &errMsg#41;&gt;)
    -Acceptor acceptor_;
    set~shared_ptr~TcpConnection~~ connections_;
}
TcpConnection --o Server
Acceptor --* Server
Client --> EventLoop

class Client {
    +Client(EventLoop &loop, const string &addr, int port)
    +void asyncConnect(string &errMsg)
    +void setConnectedHandler(function&lt;void#40;const string &errMsg#41;&gt;)
    +void setDisconnectedHandler(function&lt;void#40;const string &errMsg#41;&gt;)
    +shared_ptr&lt;TcpConnection&gt;& getTcpConnection()\n连接成功以后,在connected回调中调用来获取连接读取数据
    -Connector connector_;
    -shared_ptr~TcpConnection~ connection_;
}
Connector --* Client
TcpConnection --o Client
Server --> EventLoop

服务端流程图

sequenceDiagram
participant main
participant EventLoop
participant Server
participant Acceptor
participant Socket
participant Event
participant Poller
participant Epoller
main ->> Server : 注册服务端连接事件<br>setConnectedHandler()<br>main::onConnect()赋值给Server::connectedHandler_
main ->> Server : 注册服务端断开连接事件<br>setDisconnectedHandler()<br>main::onDisConnect()赋值给Server::disconnectedHandler_
main ->>+ Server : serve()
Server ->>+ Acceptor : Server::newConnectionHandler()<br>注册进新连接事件:<br>acceptor_-><br>setNewConnectionHandler()<br>acceptor_->listen()
Acceptor ->>+ Socket : socket_->bind()<br>socket_->setNoBlock()<br>socket_->setNoDelay()
Socket ->>- Acceptor : return
Acceptor ->>+ Epoller : event_ = make_shared<Event><br>(Epoller::getInstance())
Epoller ->>- Acceptor : return
Acceptor ->>+ Event : Acceptor::readHandler()<br>注册进读事件:<br>event_->setReadEvent()<br>event_->enableReadNotify()
Event ->>+ Poller : poller_->addOrUpdateEvent()
Poller ->>+ Epoller : addOrUpdateEvent()
Epoller ->>- Poller : return
Poller ->>- Event : return
Event ->>- Acceptor : return
Acceptor ->>- Server : return
Server ->>- main : return
EventLoop ->>+ EventLoop : wait()
loop while(true)
EventLoop ->>+ EventLoop : runOnce()
EventLoop ->>+ Epoller : auto events = poll()
Epoller ->> Epoller : ::epoll_wait(epollfd_, &(*pollEvents_.begin()), maxSize_)
Epoller ->>- EventLoop : return
loop for event in events 
alt 服务端监听描述符
EventLoop ->>+ Event : do()
Event ->>+ Acceptor : readHandler()
Acceptor ->>+ Socket : auto fd = socket_->accept();
Socket ->>+ Socket : ::accept()
Socket ->>- Socket : return
Socket ->>- Acceptor : return
Acceptor ->>+ Server : newConnectionHandler(fd)
Server ->>+ Connection : auto connection = make_shared<TcpConnection>(fd)<br>Server::disconnectedHandler_注册进断开链接事件<br>connection->setFinishHandler():<br>connection->initAccepted()
Connection ->>+ Socket : socket_->setNoBlock()<br>socket_->setNoDelay()<br>TcpConnection::readHandler()<br>注册进读事件:<br>event_->setReadEvent()<br>event_->enableReadNotify()
Socket ->>- Connection : return
Connection ->>- Server : return
Server ->>+ Server : connectedHandler_()
Server ->>+ main : onConnect()
main ->>+ main : 注册可读事件connection.asyncRead<br>(expectSize, main::onConnectionRead)
main ->>- main : return
main ->>- Server : return
Server ->>- Acceptor : return
Acceptor ->>- Event : return
Event ->>+ EventLoop : return
end
EventLoop ->>+ Event : do()
alt 服务端被建立链接描述符可读
Event ->>+ Connection : readHandler()
Connection ->>+ Connection : char buf[1024]<br>socket_->read(buf)<br>append to readBuffer_
alt readedSize_ >= expectSize_
Connection ->>+ main : onConnectionRead()
main ->>+ main : 读取Connection的readBuffer_<br>写入读取Connection的writeBuffer_
main ->>+ Connection : 注册写完事件connection.asyncWrite<br>(main::finishWrite)
Connection ->>+ Connection : TcpConnection::writeHandler()<br>注册进写事件:<br>event_->setWriteEvent()<br>event_->enableWriteNotify()
Connection ->>- Connection : return
Connection ->>- main : return
main ->>- main : return 
main ->>- Connection : return
end
Connection ->>- Connection : return 
Connection ->>- Event : return
end
alt 服务端被建立链接描述符可写
Event ->>+ Connection : writeHandler()
Connection ->>+ Connection : socket_->Write()
Connection ->>- Connection : return
alt 全部写完
Connection ->>+ main : main::finishWrite()
main ->> Connection : return
end 
Connection ->> Event : return
end 
Event ->>+ EventLoop : return
end
EventLoop ->>- EventLoop : return
end
EventLoop ->>- EventLoop : return

客户端流程图

协程框架

协程实现在这里https://github.com/tedcy/sheep_cpp/tree/master/src/coroutine

协程套件在https://github.com/tedcy/sheep_cpp/blob/master/src/coroutine/coroutine_mutex.h

由于网络线程和工作线程没有隔离,所以协程框架的循环run()需要内置EventLoop,定时的进行调用,这个不是很合适

1
2
3
4
5
6
7
8
9
10
11
12
void run() {
int64_t loopLastRun = UnixTimeMilliSecond();
while(isRunning_) {
//为了不让resume唤醒太慢,必须延迟loop_的时间
//这会导致网络事件的处理延迟,所以tars的网络线程是纯异步的,不使用协程
if (loop_ && UnixTimeMilliSecond() > loopLastRun + 10) {
loop_->runOnce();
loopLastRun = UnixTimeMilliSecond();
}
...省略其他协程逻辑
}
}

如果不延迟loop_->runOnce(),那么唤醒会变得很慢

如果延迟,那么网络事件处理会变慢,对accept处理延迟很容易造成系统的连接队列爆掉

类图

classDiagram
direction LR
namespace 协程框架 {
class CoroutineInfo协程 {
    +void registerFunc()
    +void switchTo(CoroutineInfo *to) 切换到目标协程
    +void switchBack() 切换回主协程
}
class CoroutineScheduler协程调度器 {
    +void start() 用于启动这个协程调度器在独立线程里
    +void addCoroutine(F && func) 用于其他线程给这个线程添加协程任务
    +void yield() 用于挂起协程,让给其他协程执行,空闲时再转回自己
    +void suspend() 用于挂起协程,必须通过resume唤醒
    +void resume(CoroutineInfo*) 用于唤醒suspend挂起的协程
    +void sleep() 用于定时休眠后唤醒,可以用suspend和resume封装
}
class SharedState["SharedState&lt;T&gt;"] {
    +void setValue(T && t)
    +void setException(std::exception_ptr p)
    +void wait(int ms)
}
class Promise["Promise&lt;T&gt;"] {
    +Future&lt;T&gt; getFuture()
    +void setValue(T && t)
    +void setException(std::exception_ptr p)
    -shared_ptr~SharedState~T~~ state : \n和Future共享同一个SharedState
}
class Future["Future&lt;T&gt;"] {
    +T wait(int ms = -1)
    -shared_ptr~SharedState~T~~ state : \n和Promise共享同一个SharedState
}
class ASyncer {
    +Future&lt;T&gt async(F && func) 异步执行func投递给CoroutineScheduler池,\n并获取一个Future,func是一个T类型返回值的lambda\nasync会对这个func封装一层,用于返回值或异常赋值给promise
}
class CoMutex {
    +void lock()
    +void unlock()
}
class Mutex {
    +void lock() 锁是条件变量的特化情况:\n条件就是"当前没有其他线程/协程占用该互斥锁"
    +void unlock()
}
class ConditionVariable {
    +void wait(unique_lock<mutex>& lock, int ms = -1) 在线程或协程内,使用各自版本的cv
    +void notify_one() 优先唤醒协程的,如果找不到,再去唤醒一个线程的
    +void notify_all()
    -CoConditionVariable coCv_;
    -condition_variable cv_;
}
class CoConditionVariable {
    +void wait(int ms = -1)
    +void notify_one()
    +void notify_all()
}
}
CoroutineInfo协程 "N" --* CoroutineScheduler协程调度器
SharedState ..> ConditionVariable
Mutex ..> ConditionVariable
ConditionVariable ..> CoConditionVariable
Promise ..> SharedState
Future ..> SharedState
ASyncer ..> Future
CoMutex ..> CoroutineScheduler协程调度器
CoConditionVariable ..> CoroutineScheduler协程调度器
ASyncer ..> CoroutineScheduler协程调度器
namespace 网络框架 {
class EventLoop事件循环 {
    -void runOnce() CoroutineScheduler协程调度器\n每隔10ms调用一次\nresume是为了多线程安全是异步入队列后在协程循环中调用\n为了不让resume唤醒太慢\n必须延迟10ms的时间\n这会导致网络事件的处理延迟\n所以tars的网络线程是纯异步的,不使用协程
}
class CoServer {
    +void setConnectedHandler() \n连接建立以后创建一个新协程
}
class CoClient {
    +void asyncConnect(F && func)
    +void connect() 在SetConnectedHandler回调中进行resume\n调用AsyncConnect,随后当前协程suspend\n被唤醒后把得到的TcpConnection包成CoTcpConnection
}
class CoTcpConnection {
    +void asyncRead(F && func)
    +void read() 同AsyncConnect
}
}
note for EventLoop事件循环 "1 服务端线程整个由调度器托管<br>EventLoop是调度器的一个协程<br>2 异步回调可以全部套一层<br>来实现同步回调"
CoClient ..> CoTcpConnection
CoServer ..> EventLoop事件循环
CoServer ..> CoTcpConnection

CoroutineInfo协程

协程实例,主要功能是把寄存器的数据和当前栈整个记录在申请的内存中,随时可以将其恢复出来

记录恢复两个功能,就完成不同的栈之间的切换,而如何切换是协程调度器的工作

这个切换有点像游戏中的“存档”功能, 就好比家长(协程调度器)安排几个孩子轮流玩同一台游戏机,每个孩子玩一会儿累了, 暂停游戏并“保存进度”(记录),让出游戏机, 家长再让下一个孩子“读取进度”(恢复)继续他自己之前玩到一半的游戏内容, 就这样依次排队轮流玩,每个孩子随时都能从之前暂停的位置继续玩,不用重新开始游戏。

context库

我是基于boost的context库中的部分代码来实现的,CoroutineInfo是对context库的简单封装

fcontext_t实际上就是记录的协程数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 定义协程执行栈
struct fc_stack_t
{
void* sp; // 栈指针地址(通常指向栈底)
std::size_t size; // 栈空间大小

// 构造函数初始化为空栈
fc_stack_t()
: sp(0)
, size(0)
{}
};

// 定义浮点寄存器状态(仅保存部分浮点寄存器)
struct fp_t
{
uint32_t fc_freg[2]; // 浮点寄存器保存区域 (大小可能不同架构而变化)

// 构造函数初始化
fp_t()
: fc_freg()
{}
};

// 协程上下文结构(fcontext_t)
struct fcontext_t
{
uint64_t fc_greg[8]; // 通用寄存器(general registers)保存区:
// fc_greg[0]: RBX
// fc_greg[1]: RBP (基址指针, FramePointer)
// fc_greg[2]: R12
// fc_greg[3]: R13
// fc_greg[4]: R14
// fc_greg[5]: R15
// fc_greg[6]: RIP (指令地址寄存器, Instruction Pointer)
// fc_greg[7]: RSP (栈顶寄存器, Stack Pointer)

fc_stack_t fc_stack; // 分配给当前协程的执行栈空间

fp_t fc_fp; // 浮点寄存器状态保存区域

// 构造函数初始化寄存器与栈空间
fcontext_t()
: fc_greg()
, fc_stack()
, fc_fp()
{}
};

而操作fcontext_tmake_fcontextjump_fcontext都是汇编实现

make_fcontext:https://github.com/tedcy/sheep_cpp/blob/master/src/coroutine/tc_make_x86_64_sysv_elf_gas.S

jump_fcontext:https://github.com/tedcy/sheep_cpp/blob/master/src/coroutine/tc_jump_x86_64_sysv_elf_gas.S

而他们的函数签名是这样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 创建一个新的协程上下文函数(make function context)
// 参数说明:
// sp: 栈空间的地址(通常指向栈空间的顶端)
// size: 栈的大小,单位字节
// fn: 上下文启动后执行的协程函数指针,函数类型为 void fn(intptr_t)
// 返回值: 返回指向新创建的上下文结构的指针(fcontext_t指针)
extern "C" fcontext_t * make_fcontext(void * sp, std::size_t size, void (* fn)(intptr_t));

// 上下文跳转函数,从当前上下文切换至目标上下文。
// 参数说明:
// ofc: 用于保存当前上下文(old context)的指针
// nfc: 目标上下文(new context)的指针
// vp: 在切换到目标上下文时传递的整型参数
// preserve_fpu: 是否需要保存浮点寄存器 (默认true,防止浮点计算数据丢失)
// 返回值: 从其他上下文再切换回此上下文时,返回传递回来的intptr_t值
extern "C" intptr_t jump_fcontext(fcontext_t * ofc, fcontext_t const* nfc, intptr_t vp, bool preserve_fpu = true);

使用make_fcontext可以新开辟一个协程栈空间,运行自己期望的函数,并返回一个fcontext_t指针

随后使用jump_fcontext就可以跳转到创建的协程栈空间去了

显然jump_fcontext就是对应了上文所述的恢复功能,而记录是通过组合了make_fcontextjump_fcontext实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class CoroutineInfo : public IntrusiveListNode<CoroutineInfo> {
public:
static void initContext(intptr_t v);
void registerFunc(const std::function<void()>& func,
const std::function<void()>& endFunc);
private:
std::function<void()> func_; //用户自定义协程主函数
std::function<void()> endFunc_; //协程结束后的回调
fcontext_t* ctx_; //协程自身运行时的上下文指针(记录自身上下文)
fcontext_t ctxFrom_; //用于存储调用协程方的上下文 (记录调用方上下文,用于yield和resume的跳转)
};

// 协程开始执行时的入口函数 initContext
// 参数 v 为传进来的协程信息指针 intptr_t(this)
void CoroutineInfo::initContext(intptr_t v) {
// 汇编标记当前指令寄存器(rip)为未定义(特定于编译器优化、调试符号)
asm(".cfi_undefined rip");

CoroutineInfo *info = (CoroutineInfo*)v; // 转换回协程信息指针

// 首先跳回刚才启动协程的原始上下文,完成协程初始化流程
jump_fcontext(info->ctx_, &info->ctxFrom_, 0, false);

// 执行用户注册的协程函数
try {
info->func_();
} catch (...) {
LOG(ERROR) << "exception" << endl; // 捕获所有异常并记录日志
}

// 协程运行结束,调用用户指定的endFunc_回调函数(通常用于协程资源回收或协程复用)
info->endFunc_();

// 协程函数执行完毕,再次跳转回协程调用方原始上下文,协程生命周期结束
jump_fcontext(info->ctx_, &info->ctxFrom_, 0, false);

assert(false); // 永远不应该运行到这里,如果运行到这里说明协程出现了致命错误
}

// 注册用户定义的协程函数以及协程结束时调用的回调函数
void CoroutineInfo::registerFunc(const std::function<void()> &func,
const std::function<void()> &endFunc) {
func_ = func; // 用户协程函数
endFunc_ = endFunc; // 协程结束后的回调函数,用于通知协程调度器或进行资源回收

auto stack = default_stack(); // 创建默认栈空间供协程运行使用

stack.zero(); // 清零协程栈内存,避免脏数据

// 创建新的协程上下文 context(ctx_),用于存储协程运行状态信息
// 协程开始运行时首次执行的函数为initContext
ctx_ = make_fcontext(stack.sp, stack.size, initContext);

// 立即启动协程上下文 ctx_,并将当前 this 指针作为参数传递给协程入口函数initContext
jump_fcontext(&ctxFrom_, ctx_, intptr_t(this), false);
}

当CoroutineInfo协程实例执行了registerFunc以后,其中的ctx_记录了期望运行lambda的协程栈,而ctxFrom_记录了调用方的lambda的协程栈

随后就可以从主协程恢复到目标协程(switchTo),或者恢复回调用方主协程(swithBack)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 从当前协程切换至目标协程(to)
void CoroutineInfo::switchTo(CoroutineInfo *to) {
CoroutineScheduler::currentCoro() = to; // 更新当前正在运行的协程为目标协程(to)

// 从当前上下文跳转到目标协程(to)的上下文,保存当前上下文到to->ctxFrom_
jump_fcontext(&to->ctxFrom_, to->ctx_, intptr_t(to), false);
}

// 从当前协程切换回调用方协程(即之前调用switchTo的协程)
void CoroutineInfo::switchBack() {
auto cur = CoroutineScheduler::currentCoro(); // 获取当前协程信息

// 当前协程上下文切换回调用方协程,上下文保存至cur->ctxFrom_,恢复调用方协程上下文cur->ctx_
jump_fcontext(cur->ctx_, &cur->ctxFrom_, 0, false);
}

CoroutineScheduler协程调度器

用法

首先通过currentCoroScheduler()基于线程私有变量的单例,获取到协程调度器,随后就可以调用协程调度器的各种方法了

这个协程调度器线程私有变量,是在下面初始化的start函数中创建新线程以后进行赋值的

注意,这个方法在非协程调度器的线程中调用会返回空指针

1
2
3
4
5
6
7
8
9
10
11
12
13
class CoroutineScheduler {
public:
static CoroutineScheduler*& currentCoroScheduler() {
static thread_local CoroutineScheduler *currentCoroScheduler;
return currentCoroScheduler;
}
void start(); // 用于启动这个协程调度器在独立线程里
bool addCoroutine(const std::function<void()>& func); // 用于其他线程给这个线程添加协程任务
void yield(); // 用于挂起协程,让给其他协程执行,空闲时再转回自己
void suspend(); // 用于挂起协程,必须通过resume唤醒
void resume(CoroutineInfo*); // 用于唤醒suspend挂起的协程
void sleep(); // 用于定时休眠后唤醒,实际上是suspend的封装,所以也可以用resume提前唤醒
};

初始化

协程调度器在初始化的时候就需要创建好的全部协程实例在free队列中去,每个实例用于随时调用上文CoroutineInfo::registerFunc注册进新的lambda

yield,suspend,resume是不同的逻辑,因此也是单独的队列,这些队列只需要初始化链表头即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class CoroutineScheduler::CoroutineSchedulerImp {
...省略其他成员
CoroutineInfo *coros_;
CoroutineInfo freeListHead_;
CoroutineInfo suspendListHead_;
CoroutineInfo resumeListHead_;
CoroutineInfo yieldListHead_;
}

CoroutineSchedulerImp(int coroNum = 10240) : coros_(new CoroutineInfo[coroNum]){
IntrusiveListInit(&freeListHead_);
for (int i = 0;i < coroNum;i++) {
IntrusiveListInsert(&freeListHead_, &coros_[i]);
}

IntrusiveListInit(&suspendListHead_);
IntrusiveListInit(&resumeListHead_);
IntrusiveListInit(&yieldListHead_);
}

void CoroutineScheduler::start() {
std::thread t([this](){
CoroutineScheduler::currentCoroScheduler() = this;
run(); //运行协程调度器的调度循环
});
t_ = std::move(t);
}

yield

yield是最简单的协程切换的例子,用于挂起协程,让给其他协程执行,空闲时再转回自己

核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 调度器主循环函数,循环运行所有已准备好执行的协程
void run() {
while(isRunning_) {
...
// 一次性提取出所有处于待运行状态(yield状态)的协程
auto yieldList = IntrusiveListGetAllNodes(&yieldListHead_);

...

// 轮询每个待运行的协程,依次切换执行
for (auto &coro : yieldList) {
CoroutineInfo::switchTo(coro); // 从调度器上下文切换到待执行协程上下文
// 协程运行后可能主动调用yield返回,这里的switchTo调用会阻塞直到协程让出执行权
}
}
}

// 将指定的协程移动到yield链表(待执行协程链表)
void moveToYield(CoroutineInfo *info) {
IntrusiveListRemove(info); // 先从当前链表中移出
IntrusiveListInsert(&yieldListHead_, info); // 插入待运行链表(yieldListHead_),调度器会随后恢复它
}

// 当前运行的协程主动让出CPU执行权,回到调度器上下文
void yield() {
auto cur = CoroutineScheduler::currentCoro(); // 获取当前协程的CoroutineInfo结构体

moveToYield(cur); // 将自身重新挂入yield链表,告知调度器稍后再切换回来继续执行

CoroutineInfo::switchBack(); // 保存当前协程上下文并切换回调度器主上下文,协程会停留在此调用处直到再被调度恢复
}

yield不止用于运行一些定时检查逻辑,还是创建协程的基础

addCoroutine

由于有可能是其他线程创建的协程实例,因此加锁,先放入yield队列,等待在run线程中正式的调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 向调度器中添加一个新的协程任务
bool addCoroutine(const std::function<void()>& func) {
unique_lock<mutex> lock(mtx_); // 加锁,确保线程安全

// 尝试从空闲的协程列表(freeList)中获取一个可用的协程实例
auto info = IntrusiveListFront(&freeListHead_);
if (!info) { // 如果没有可用协程实例,返回false (代表添加失败)
return false;
}

// 为获取到的空闲协程实例注册实际要执行的任务,并指定协程结束后的回调处理:
// 当协程运行完毕,将当前协程实例从yield链表移到free链表以供复用
info->registerFunc(func, [this, info](){
IntrusiveListRemove(info); // 将结束的协程实例从yield链表移除
IntrusiveListInsert(&freeListHead_, info); // 放回到空闲链表(freeList),供下次使用
});

moveToYield(info); // 把注册好任务的协程实例插入yield链表(待执行队列中)
return true; // 返回true表示成功添加了一个新任务
}

suspend

suspend相当于是yield的休眠的哪一部分,唤醒的部分在resume去做

1
2
3
4
5
6
7
8
9
10
11
12
// 将指定的协程移动到挂起链表(suspend链表),表示该协程暂时不会被调度执行
void moveToSuspend(CoroutineInfo *info) {
IntrusiveListRemove(info); // 从原所在链表(如yield链表)中移除该协程
IntrusiveListInsert(&suspendListHead_, info); // 插入到suspend链表,协程将处于挂起状态,直到重新唤醒
}

// 当前正在运行的协程调用此函数将自身挂起(暂停执行)
void suspend() {
auto cur = CoroutineScheduler::currentCoro(); // 获取当前协程信息
moveToSuspend(cur); // 当前协程标记为挂起状态,加入挂起链表
CoroutineInfo::switchBack(); // 主动让出执行权回到协程调度器(或调用方协程),协程进入暂停,直至被恢复
}

resume

resume的唤醒比起yield的唤醒流程复杂一些,因为要考虑多线程安全问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 协程调度器的核心方法,持续运行待唤醒(resume)的协程任务
void run() {
while(isRunning_) {
...
{
unique_lock<mutex> lock(mtx_); // 加锁保证线程安全,处理待唤醒队列(resumeQueue_)

// 将resumeQueue_中的协程转移到resume链表,准备本次进行调度执行
while (!resumeQueue_.empty()) {
auto coro = resumeQueue_.front();
resumeQueue_.pop();
moveToResume(coro); // 将协程放入待恢复链表中,等待调度器逐个执行
}
}

// 取出当前所有位于resume链表中的协程,这些协程处于已被唤醒、等待恢复执行的状态
auto resumeList = IntrusiveListGetAllNodes(&resumeListHead_);
for (auto &coro : resumeList) {
CoroutineInfo::switchTo(coro); // 协程上下文切换,逐个恢复执行resume链表里的协程任务
// 当被恢复的协程再次主动调用yield或suspend时,控制权将返回调度器,继续下一轮调度
}
...
}
}

// 将指定协程移动到待恢复(resume)链表,由协程调度器在下轮调度时恢复运行
void moveToResume(CoroutineInfo *info) {
IntrusiveListRemove(info); // 从当前所在链表(如suspend链表)移除该协程
IntrusiveListInsert(&resumeListHead_, info); // 插入resume待执行链表,下次轮询时将恢复执行该协程
}

// 外部接口:其他线程或任务调用此方法,用以通知调度器将协程放入“唤醒队列”,准备随后恢复执行
void resume(CoroutineInfo *coro) {
unique_lock<mutex> lock(mtx_); // 加锁保证线程安全
resumeQueue_.push(coro); // 将指定协程加到恢复(resume)队列,每轮新调度周期中,队列中的协程会被转移到resume链表并得到执行
}

sleep

sleep则是基于suspend和resume的封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void run() {
while(isRunning_) {
...

wakeUpSleep();

...
}
}

// 使当前协程休眠指定时间(ms),到期后会被重新调度继续执行
void sleep(int ms) {
sleepQueue_.push(make_pair(UnixTimeMilliSecond() + ms,
CoroutineScheduler::currentCoro())); // 将当前协程和唤醒时间点(当前时间+ms毫秒)放入睡眠队列
suspend(); // 将当前协程挂起,暂停执行并等待被唤醒
}

// 检查睡眠队列,唤醒所有睡眠期已到的协程(通常由调度器周期性调用)
void wakeUpSleep() {
vector<CoroutineInfo *> sleepList; // 存储本次应唤醒的协程列表
auto nowMs = UnixTimeMilliSecond(); // 获取当前时间(毫秒)

// 从sleepQueue_中取出所有睡眠到期的协程
while (!sleepQueue_.empty()) {
auto &top = sleepQueue_.top();
if (top.first > nowMs) { // 如果堆顶协程的唤醒时间点未到,则终止循环(堆由最近需要唤醒的协程排序)
break;
}
sleepList.push_back(top.second); // 睡眠到期,加入待唤醒列表
sleepQueue_.pop();
}

// 将到期的协程逐一唤醒,放入待执行resume队列中
for (auto &coro : sleepList) {
resume(coro); // 恢复协程执行,后续会被调度器处理
}
}

协程套件

CoConditionVariable协程条件变量

条件变量就是当条件不满足的时候,在等待列表里面等待

其他协程发现条件满足了,就唤醒等待列表的某一项或者全部

因此它由两部分组成,CoroutineWaiter等待者类,和包含CoroutineWaiter队列的CoConditionVariable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// 协程等待器结构体,用于实现协程的挂起与唤醒机制
struct CoroutineWaiter {
CoroutineScheduler* sched_ = CoroutineScheduler::currentCoroScheduler(); // 当前协程所属的调度器
CoroutineInfo *info_ = CoroutineScheduler::currentCoro(); // 当前协程信息指针
bool isCanceled = false; // 标记协程等待状态是否已被取消或超时

// 唤醒等待的协程
void wakeup() {
if (!sched_) return; // 若协程调度器不存在,则无需唤醒
sched_->resume(info_); // 通知协程调度器恢复执行此协程
}

// 等待函数,使当前协程进入挂起状态直到被唤醒或超时
// 此处TODO:可以实现返回值bool类型isTimeout来指示是否超时导致的唤醒
void wait(std::unique_lock<std::mutex>& lock, int milliseconds) {
if (!sched_) {
throw std::runtime_error(
"CoroutineCondition::wait() must be called in coroutine"); // 未在协程中调用则抛出异常
}
if (milliseconds == -1) {
sched_->suspend(); // 无时间限制地挂起,直到被外部notify唤醒
} else {
sched_->sleep(milliseconds); // 有超时时间,挂起指定毫秒后自动唤醒
}
lock.lock(); // 唤醒后重新锁定互斥锁
isCanceled = true; // 延迟删除机制:标记协程等待状态已取消或超时
}
};

// 协程条件变量,支持协程上下文中的等待/通知机制
class CoConditionVariable {
public:
// 协程调用wait()挂起等待条件满足或者超时(milliseconds=-1表示无限等待)
void wait(std::unique_lock<std::mutex>& lock, int milliseconds = -1) {
CoroutineWaiter waiter; // 创建当前协程信息的等待器实例

waiters_.push(waiter); // 将当前协程等待器加入等待队列
lock.unlock(); // 解锁互斥锁,允许其他协程条件的变更与通知

waiter.wait(lock, milliseconds); // 挂起当前协程直到被notify或超时唤醒
}

// 通知(唤醒)等待队列的一个协程
int notify_one() {
return notify_nolock(false);
}

// 通知(唤醒)等待队列中所有的协程
int notify_all() {
return notify_nolock(true);
}

private:
// 内部实现,进行实际的协程唤醒处理(isAll为true则唤醒全部等待协程,否则唤醒一个)
int notify_nolock(bool isAll) {
int notifyCount = 0; // 已通知(唤醒)的协程计数
while (!waiters_.empty()) {
auto& front = waiters_.front();
if (front.isCanceled) { // 延迟删除机制:若当前等待协程已取消或超时
waiters_.pop(); // 直接移除,跳过处理
continue;
}
front.wakeup(); // 唤醒等待的协程
waiters_.pop(); // 移除等待队列头部协程
notifyCount++;
if (!isAll) { // 如果只需唤醒一个协程,则处理完毕退出
break;
}
}
return notifyCount; // 返回实际唤醒协程数量
}

std::queue<CoroutineWaiter> waiters_; // 存储当前等待协程的队列
};

CoroutineWaiter被唤醒有可能是超时时间到了,也有可能是被主动唤醒的,因此添加一个isCanceled标记。

超时时间到了,并不会删除队列中的这个元素,而是通过isCanceled标记的判断,在下次notfiy的过程中延迟删除

isCanceled标记也是通过wait时传入的锁来保证多线程安全的

CoMutex协程锁

在前文锁实现分析:从glibc到futex(二)中也总结了锁的概念

实际上就是一个标记位+一个队列,如果不考虑用户态和内核态的问题,可以不用futex的实现那么复杂

可以映射成一群人排队上一个厕所

  • 标记位相当于厕所门口的一个指示牌(如“有人”或“无人”)。
    • 当厕所没人时(标记位为0,门开着指示牌“有人”),走上前来的人直接进入(加锁成功)。
    • 当厕所里有人时(标记位为1,门关着指示牌“无人”),后续到达的人自动排队(进入等待队列,等待加锁)。
  • 队列就相当于门外排队的人。当里面的人使用完厕所从里面走出来(释放锁)时,需要通知排队中的下一个人可以进入厕所了:
    • 队列中第一个人收到通知后进入厕所,其他人保持继续等待状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class CoMutex {
std::mutex mtx_;
std::atomic<bool> isLock_ = {false};
std::queue<CoroutineWaiter> waiters_;
bool setLock(bool newV) {
bool expected = !newV;
return isLock_.compare_exchange_strong(expected, newV);
}
//这个和CoConditionVariable的notify_nolock实现一致
int notify_nolock(bool isAll) {
int notifyCount = 0;
while (!waiters_.empty()) {
auto& front = waiters_.front();
if (front.isCanceled) {
waiters_.pop();
continue;
}
front.wakeup();
waiters_.pop();
notifyCount++;
if (!isAll) {
break;
}
}
return notifyCount;
}
public:
void lock() {
if (!setLock(true)) {
CoroutineWaiter waiter;
std::unique_lock<std::mutex> lock(mtx_);
//再次尝试,也是为了把isLock_和waitings_放在同一个临界区内
if (setLock(true)) {
return;
}
waiters_.push(waiter);
lock.unlock();
waiter.wait(lock, -1);
}
}
void unlock() {
std::unique_lock<std::mutex> lock(mtx_);
//把isLock_和waitings_放在同一个临界区内
//如果唤醒成功,就传递isLock_的所有权到下一个协程
//否则,就解开isLock_所有权
if (notify_nolock(false) == 0) {
setLock(false);
}
}
};

这个实现有一个反直觉的地方在于:unlock的时候,并不是每次都会让isLock_(锁状态)变为解锁状态,而是只有当没有人等待时,才真正地解锁。

用前面“上厕所”的例子类比一下就会很好理解:

  • 如果门外没人排队,那么用完厕所的人出来后,门自然是打开的,所以此时isLock_变为解锁状态。
  • 如果门外有人正在排队,当厕所里的人用完出来后,他并不打开门,而是直接交给排队的第一个人进去,因此门一直处于“有人”的状态,isLock_并没有被解锁。

也就是说,两个人在交接厕所的时候,全程都是门处于“有人”的状态,没有那一瞬间“没人”。

如果我们严格按常规习惯,理想情况应该是:厕所里的人出来后主动把门打开(解锁),外面排队的人进去后再把门关上重新上锁。但是实际的代码为了简化,没有让排队的人进去之后再去关门(执行加锁逻辑),而是在解锁的瞬间,直接把厕所使用权限交给等待的人进去使用。因此看起来“厕所门的状态没有变化”,显得特别反直觉,但从代码执行的效率来看,这种方式是更高效的。

支持协程的通用套件

通用套件是指,不管在协程还是线程都可以使用的套件

前面的协程套件,核心是通过调用协程调度器的suspend,resume等等来完成的

这需要通过CoroutineScheduler::currentCoroScheduler()拿到当前线程的协程调度器,在非协程调度器的线程中使用会直接返回空指针

因此正式的业务代码,需要使用通用套件,无需关心是当前线程中有没有运行协程调度器,才有更好的体验

ConditionVariable条件变量

这个是最容易也是最核心的的实现了,只需要同时拥有协程条件变量和线程条件变量

通用条件变量是下面全部套件的基石

wait的时候判断当前线程是否存在协程调度器,存在的话就直接用协程wait,否则用线程wait

唤醒的时候,看协程条件变量有没有唤醒成功,失败的话再使用线程条件变量唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 条件变量的封装类,同时支持普通线程和协程的等待和通知机制
class ConditionVariable {
CoConditionVariable coCv_; // 协程版本的条件变量
std::condition_variable cv_; // 标准线程版本的条件变量 (std::condition_variable)

public:
// 等待条件满足,或直到超时
// 如果在协程上下文中调用,优先使用协程的条件变量等待。
// 如果在普通线程环境下调用,使用标准线程条件变量等待。
void wait(std::unique_lock<std::mutex>& lock, int milliseconds = -1) {
if (CoroutineScheduler::currentCoroScheduler()) { // 检测当前是否处于协程上下文
coCv_.wait(lock, milliseconds); // 使用协程的条件变量等待,支持协程挂起和恢复的高效切换
return;
}

if (milliseconds != -1) { // 普通线程带超时的等待操作
cv_.wait_for(lock, std::chrono::milliseconds(milliseconds));
return;
}

cv_.wait(lock); // 普通线程无限期等待操作
}

// 唤醒一个等待中的任务:
// 优先尝试唤醒协程队列中的等待者,若协程队列中没有等待者,则尝试唤醒标准线程中的等待者
void notify_one() {
if (coCv_.notify_one() == 0) { // 若当前没有协程等待被唤醒
cv_.notify_one(); // 再唤醒一个普通线程的等待者
}
}

// 唤醒所有等待中的任务(协程和普通线程都会被唤醒)
void notify_all() {
coCv_.notify_all(); // 唤醒协程队列中所有等待者
cv_.notify_all(); // 唤醒标准线程队列中所有等待者
}
};

Mutex锁

显然,条件变量的思路来实现锁是行不通的,没办法简单的搞两把锁,就能保证线程和协程都正确休眠

换一个角度去想,锁相当于保护条件为只有当前进程/协程可用的条件变量

  • 条件变量的本质是等待某个条件满足,等待时挂起线程/协程,并在被通知后重新检查条件、进入下一步;
  • 因此锁相当于是条件变量的一种特化情况——条件就是"当前没有其他线程/协程占用该互斥锁"。

因此可以直接基于通用套件的条件变量来实现通用锁,思路捋顺了,那么实现就是经典的条件变量case

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Mutex {
std::mutex mtx_;
ConditionVariable cond_;
bool isLock = false;
int waitings_ = 0; //记录有多少等待者

public:
void lock() {
//TODO:在这里可以用cas进一步double check优化
std::unique_lock<std::mutex> lock(mtx_); // 加锁保护临界区
while (isLock) {
waitings_++;
cond_.wait(lock); // 等待 isLock 状态变化
waitings_--;
}
isLock = true;
}

void unlock() {
std::unique_lock<std::mutex> lock(mtx_); // 加锁保护临界区
isLock = false;
if (waitings_ > 0) {
cond_.notify_one(); // 当存在等待者时,再进行唤醒
}
}
};

但是这里和CoMutex有一点不一样,不管有没有等待者,都会改变isLock的状态

回顾一下上面的类比CoMutex协程锁

这个实现有一个反直觉的地方在于:unlock的时候,并不是每次都会让isLock_(锁状态)变为解锁状态,而是只有当没有人等待时,才真正地解锁。

用前面“上厕所”的例子类比一下就会很好理解:

  • 如果门外没人排队,那么用完厕所的人出来后,门自然是打开的,所以此时isLock_变为解锁状态。
  • 如果门外有人正在排队,当厕所里的人用完出来后,他并不打开门,而是直接交给排队的第一个人进去,因此门一直处于“有人”的状态,isLock_并没有被解锁。

也就是说,两个人在交接厕所的时候,全程都是门处于“有人”的状态,没有那一瞬间“没人”。

如果我们严格按常规习惯,理想情况应该是:厕所里的人出来后主动把门打开(解锁),外面排队的人进去后再把门关上重新上锁。但是实际的代码为了简化,没有让排队的人进去之后再去关门(执行加锁逻辑),而是在解锁的瞬间,直接把厕所使用权限交给等待的人进去使用。因此看起来“厕所门的状态没有变化”,显得特别反直觉,但从代码执行的效率来看,这种方式是更高效的。

这里的实现,正是更理想的情况,这种写法可能不够高效,但是更好理解

PromiseFuture

C++标准库有线程版本的promise和future,这个组件对协程异步任务特别实用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void func(std::promise<int> * p)
{
int a = 10, b = 5;
int result = a + b;
std::cout << "From inside the Thread...." << std::endl; p->set_value(result);
}

int main()
{
std::promise<int> p;
std::future<int> f = p.get_future();
std::thread th(func, &p);
std::cout << f.get() << std::endl;
th.join();
return 0;

}

在C++标准库实现中,它就是条件变量的语法糖,那么在协程通用套件中也一样可以基于ConditionVariable来实现

SharedState(两者共享的Channel)

代码量看起来很多,实际上只是因为SharedState需要为void单独编写特化版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
namespace internal {
// SharedStateBase为Promise/Future实现提供基础机制(线程安全+同步等待)
class SharedStateBase {
protected:
std::mutex mtx_; // 互斥锁,确保线程安全
ConditionVariable cond_; // 条件变量,支持线程和协程等待/通知
std::exception_ptr eptr_; // 用于异步操作中发生异常时的异常指针
bool isReady_ = false; // 标记异步结果是否准备好了

public:
// 如果异步操作抛出异常,则调用该函数设置异常并通知所有等待者
void setException(std::exception_ptr eptr) {
std::unique_lock<std::mutex> lock(mtx_);
eptr_ = eptr;
isReady_ = true;
cond_.notify_all(); // 唤醒所有等待中的线程或协程
}

// 等待异步操作完成(Value已设置或者发生异常后isReady变为true)
void wait() {
std::unique_lock<std::mutex> lock(mtx_);
while (!isReady_) {
cond_.wait(lock); // 无限等待,直到异步结果准备好
}
}
};

} // namespace internal

// SharedState是异步结果的存储容器,支持返回普通值类型T
template <typename T>
class SharedState : public internal::SharedStateBase {
T value_; // 存储异步操作的返回值

public:
// 设置异步操作的返回值
void setValue(T&& value) {
std::unique_lock<std::mutex> lock(mtx_);
value_ = value;
isReady_ = true; // 标记结果已准备好
cond_.notify_all(); // 通知所有等待者
}

// 获取异步操作的返回值。如果结果未准备好或存在异常,抛出异常。
T getValue() {
std::unique_lock<std::mutex> lock(mtx_);
if (!this->isReady_) {
throw std::runtime_error(
"CoroutineSharedState::getValue() must be called after wait()");
}
if (eptr_) { // 如果之前发生过异常
std::rethrow_exception(eptr_); // 抛出异常到调用者
}
return value_;
}
};

// void特化版本,因其无返回值,所以不持有value_
template <>
class SharedState<void> : public internal::SharedStateBase {
public:
// 对于void类型,仅标记异步操作完成,并通知等待者
void setValue() {
std::unique_lock<std::mutex> lock(mtx_);
isReady_ = true; // 标记完成
cond_.notify_all(); // 通知所有等待这个结果的线程或协程
}
};
Promise(只写)

直接用SFINAE为void特化即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Promise表示设置异步操作结果的一端,被执行异步操作逻辑的一方持有使用
template <typename T>
class Promise {
std::shared_ptr<SharedState<T>> state_ = std::make_shared<SharedState<T>>(); // 和Future共享的异步结果(SharedState)

public:
// 设置异步操作返回值(非void类型)
template <typename U = T,
typename std::enable_if<!std::is_void<U>::value>::type* = nullptr>
void setValue(T&& value) {
state_->setValue(std::move(value));
}

// void特化,设置异步操作已经完成即可
template <typename U = T,
typename std::enable_if<std::is_void<U>::value>::type* = nullptr>
void setValue() {
state_->setValue();
}

// 设置异步操作抛出的异常
void setException(std::exception_ptr eptr) {
state_->setException(eptr);
}

// 获取与当前Promise关联的Future,用于等待和获取结果 (Promise只写,Future只读)
Future<T> getFuture() {
return Future<T>(state_);
}
};
Future(只读)

我稍微修改了wait的语义,让wait也具有取值语义,以方便使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Future表示异步操作结果的访问端,可等待并获得异步操作的返回值
template <typename T>
class Future {
std::shared_ptr<SharedState<T>> state_; // 与Promise共享的异步结果(SharedState)

public:
explicit Future(const std::shared_ptr<SharedState<T>>& state)
: state_(state) {}

// 非void类型版本的wait,等待结果并返回结果值
template <typename U = T,
typename std::enable_if<!std::is_void<U>::value>::type* = nullptr>
T wait() {
state_->wait(); // 等待操作完成
return state_->getValue(); // 获取并返回结果值(可能抛出异常)
}

// void特化版本的wait,只等待操作完成即可(无返回值)
template <typename U = T,
typename std::enable_if<std::is_void<U>::value>::type* = nullptr>
void wait() {
state_->wait(); // 等待操作完成即可
}
};

Async任务投递到协程调度器池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// Async 类实现了一个轻量级的协程异步任务执行器池,封装了多个 CoroutineScheduler 用于并行调度和运行异步任务。
class Async {
std::vector<std::shared_ptr<CoroutineScheduler>> schedulers_; // 协程调度器池
int getIndex() { // 轮询选择协程调度器
static std::atomic<int64_t> index = {0};
return index++ % schedulers_.size(); // 加入任务时,依次使用不同的Scheduler,实现任务均衡
}
public:
// 获取或设置协程调度器线程池的大小(默认为8个线程)
static int& getThreadNum() {
static int threads_ = 8;
return threads_;
}

// 单例模式,提供全局访问的Async实例(默认使用8个线程的协程调度器)
Async& getInstance() {
static Async instance(getThreadNum());
return instance;
}

// 构造函数,初始化并启动指定数量的协程调度器
Async(int threads) {
for (int i = 0; i < threads; ++i) {
schedulers_.emplace_back(std::make_shared<CoroutineScheduler>());
schedulers_[i]->start(); // 启动协程调度器(后台线程循环执行待执行的协程任务)
}
}

// 析构函数,安全停止所有协程调度器
~Async() {
for (auto& scheduler : schedulers_) {
scheduler->stop(); // 停止协程调度器,结束线程循环
}
}

// async接口(适用于返回值非 void 的异步任务),将异步函数封装为协程任务,并返回Future供调用方等待结果
template <typename F,
typename T = decltype(std::declval<F>()()),
typename std::enable_if<!std::is_void<T>::value>::type* = nullptr>
Future<T> async(F&& f) {
auto promise = std::make_shared<Promise<T>>(); // 创建对应的promise
schedulers_[getIndex()]->addCoroutine([f = std::move(f), promise]() {
try {
auto result = f(); // 执行异步函数
promise->setValue(std::move(result)); // 执行成功则设置结果
} catch (...) {
promise->setException(std::current_exception()); // 执行失败则捕获并设置异常
}
});
return promise->getFuture(); // 返回Future给调用方用于等待获取结果
}

// async接口重载(适用于返回值为 void 的异步任务)
template <typename F,
typename T = decltype(std::declval<F>()()),
typename std::enable_if<std::is_void<T>::value>::type* = nullptr>
Future<T> async(F&& f) {
auto promise = std::make_shared<Promise<T>>();
schedulers_[getIndex()]->addCoroutine([f = std::move(f), promise]() {
try {
f(); // 执行函数
promise->setValue(); // 无返回值,仅标记完成状态
} catch (...) {
promise->setException(std::current_exception()); // 捕获并设置异常
}
});
return promise->getFuture(); // 返回Future给调用方
}
};
使用
1
2
3
4
5
auto future = async::getInstance()->async([]() {
CoroutineScheduler::currentCoroScheduler()->sleep(100);
return 1;
});
auto res = future.wait(); //等待100ms后,res为1

总结

整个协程实现起来并不复杂,弄懂原理以后都是顺理成章的

协程实例,主要功能是把寄存器的数据和当前栈整个记录在申请的内存中,随时可以将其恢复出来

记录恢复两个功能,就完成不同的栈之间的切换,而如何切换是协程调度器的工作

这个切换有点像游戏中的“存档”功能, 就好比家长(协程调度器)安排几个孩子轮流玩同一台游戏机,每个孩子玩一会儿累了, 暂停游戏并“保存进度”(记录),让出游戏机, 家长再让下一个孩子“读取进度”(恢复)继续他自己之前玩到一半的游戏内容, 就这样依次排队轮流玩,每个孩子随时都能从之前暂停的位置继续玩,不用重新开始游戏。

而协程套件实现过程其实还是CoMutex和Mutex最有意思,记住核心思想:一个标记位+一个队列

可以映射成一群人排队上一个厕所

  • 标记位相当于厕所门口的一个指示牌(如“有人”或“无人”)。
    • 当厕所没人时(标记位为0,门开着指示牌“有人”),走上前来的人直接进入(加锁成功)。
    • 当厕所里有人时(标记位为1,门关着指示牌“无人”),后续到达的人自动排队(进入等待队列,等待加锁)。
  • 队列就相当于门外排队的人。当里面的人使用完厕所从里面走出来(释放锁)时,需要通知排队中的下一个人可以进入厕所了:
    • 队列中第一个人收到通知后进入厕所,其他人保持继续等待状态。

用条件变量去实现锁的时候,锁相当于保护条件为只有当前进程/协程可用的条件变量

  • 条件变量的本质是等待某个条件满足,等待时挂起线程/协程,并在被通知后重新检查条件、进入下一步
  • 因此锁相当于是条件变量的一种特化情况——条件就是"当前没有其他线程/协程占用该互斥锁"。

CoMutex和Mutex的实现细节有一些不一样,用等厕所来类比的话

  • Mutex实现(常规):如果我们严格按常规习惯,理想情况应该是:厕所里的人出来后主动把门打开(解锁),外面排队的人进去后再把门关上重新上锁。

  • CoMutex实现(简化):但是实际的代码为了简化,没有让排队的人进去之后再去关门(执行加锁逻辑),而是在解锁的瞬间,直接把厕所使用权限交给等待的人进去使用。因此看起来“厕所门的状态没有变化”,显得特别反直觉,但从代码执行的效率来看,这种方式是更高效的。