在C++中实现协程

我在整理tars和taf的协程逻辑的时候,也实现了一个纯净版的协程框架以便于理解

网络框架

类图

sheep_cpp

看下把tcpconnection改成回调中智能指针是否合理

classDiagram
direction TB
class EventLoop {
	+void wait()
    +void stop()
    +void addPoller(uint64_t type, shared_ptr~Poller~ poller)
    +weak_ptr<Poller> getPoller(uint64_t pollerType)\n在Connector和Acceptor中用来给TcpConnection创建Event的时候指定Poller
    -map&lt;uint64_t, shared_ptr&lt;Poller&gt;&gt; pollers_;
}
class Poller {
	<<interface>>
	+vector&lt;weak_ptr&lt;Event&gt;&gt; poll(string&)
	+void addOrUpdateEvent(shared_ptr<Event> event)
	+void removeEvent(Event *event)
}
class Epoller {
	+vector&lt;weak_ptr&lt;Event&gt;&gt; poll(string&)
	+void addOrUpdateEvent(shared_ptr~Event~ event)
	+void removeEvent(Event *event)
}
class Event {
	+Event(weak_ptr~Poller~ poller, int64_t fd)
	+void setReadEvent(function&lt;void#40;#41;&gt;)
	+void setWriteEvent(function&lt;void#40;#41;&gt;)
	+void enableReadNotify()
	+void enableWriteNotify()
	+void disableReadNotify()
	+void disableWriteNotify()
	-weak_ptr~Poller~ poller_;
}
Epoller ..|> Poller
EventLoop ..> Poller
Epoller ..> Event
Event ..> Poller

class Socket {
	+void bind(const string &addr, int port) Acceptor使用
	+void listen() Acceptor使用
	+void accept() Acceptor使用
	+void connect(const string &addr, int port) Connector使用
	+int read(char* buf, int len) TcpConnection使用
    +int write(char* buf, int len) TcpConnection使用
    +void setNoblock()
}
class Buffer {
	+void push(const char *buf, uint64_t len)
	+uint64_t popHead(char *buf, uint64_t len)
	-void write(const char *buf, uint64_t len) TcpConnection使用
	-uint64_t Read(char *buf, uint64_t len) TcpConnection使用
}

class TcpConnection {
	+TcpConnection(unique_ptr~Socket~ &&socket, shared_ptr~Event~ &event)
	+void asyncRead(uint64_t expectSize, function&lt;void#40;const string &errMsg#41;&gt; func)
    +void asyncReadAny(function&lt;void#40;const string &errMsg#41;&gt;)
    +void asyncWrite(function&lt;void#40;const string &errMsg#41;&gt;)
    +void finish(const string &errMsg)
    -void initAccepted(string &errMsg)
    -void initConnected(string &errMsg)
    -void setFinishHandler(function&lt;void#40;const string &errMsg,\nshared_ptr&lt;TcpConnection&gt;&#41;&gt;)
    -Buffer readBuffer_;
    -Buffer writeBuffer_;
	-unique_ptr~Socket~ socket_;
	-shared_ptr~Event~ event_;
}

class Acceptor {
	+Acceptor(EventLoop &loop, const string &addr, int port)
	+void listen(string &errMsg)
	+void setNewConnectionHandler(function&lt;void#40;int fd#41;&gt;)
	-unique_ptr~Socket~ socket_;
	-shared_ptr~Event~ event_;
}

class Connector {
	Connector(EventLoop &loop, const string &addr, int port)
	+void connect(string &errMsg)
	+void setNewConnectionHandler(function&lt;void#40;unique_ptr&lt;Socket&gt;&,\nshared_ptr&lt;Event&gt;&#41;&gt;)
	+void setConnectFailedHandler(function&lt;void#40;const string &errMsg#41;&gt;)
	-unique_ptr~Socket~ socket_;
	-shared_ptr~Event~ event_;
}

Socket --* TcpConnection
Buffer --o TcpConnection
Event --o TcpConnection

Socket --* Acceptor
Event --o Acceptor

Socket --* Connector
Event --o Connector

class Server {
	+Server(EventLoop &loop, const string &addr, int port)
	+void serve(string &errMsg)
	+void setConnectedHandler(function&lt;void#40;const string &errMsg,TcpConnection&#41;&gt;)\n连接成功以后,在connected回调的参数来获取连接读取数据
	+void setDisconnectedHandler(function&lt;void#40;const string &errMsg#41;&gt;)
	-Acceptor acceptor_;
	set~shared_ptr~TcpConnection~~ connections_;
}
TcpConnection --o Server
Acceptor --* Server
Client --> EventLoop

class Client {
	+Client(EventLoop &loop, const string &addr, int port)
	+void asyncConnect(string &errMsg)
	+void setConnectedHandler(function&lt;void#40;const string &errMsg#41;&gt;)
	+void setDisconnectedHandler(function&lt;void#40;const string &errMsg#41;&gt;)
	+TcpConnection& getTcpConnection()\n连接成功以后,在connected回调中调用来获取连接读取数据
	-Connector connector_;
	-shared_ptr~TcpConnection~ connection_;
}
Connector --* Client
TcpConnection --o Client
Server --> EventLoop

服务端流程图

sequenceDiagram
participant main
participant EventLoop
participant Server
participant Acceptor
participant Socket
participant Event
participant Poller
participant Epoller
main ->> Server : 注册服务端连接事件<br>setConnectedHandler()<br>main::onConnect()赋值给Server::connectedHandler_
main ->> Server : 注册服务端断开连接事件<br>setDisconnectedHandler()<br>main::onDisConnect()赋值给Server::disconnectedHandler_
main ->>+ Server : serve()
Server ->>+ Acceptor : Server::newConnectionHandler()<br>注册进新连接事件:<br>acceptor_-><br>setNewConnectionHandler()<br>acceptor_->listen()
Acceptor ->>+ Socket : socket_->bind()<br>socket_->setNoBlock()<br>socket_->setNoDelay()
Socket ->>- Acceptor : return
Acceptor ->>+ Epoller : event_ = make_shared<Event><br>(Epoller::getInstance())
Epoller ->>- Acceptor : return
Acceptor ->>+ Event : Acceptor::readHandler()<br>注册进读事件:<br>event_->setReadEvent()<br>event_->enableReadNotify()
Event ->>+ Poller : poller_->addOrUpdateEvent()
Poller ->>+ Epoller : addOrUpdateEvent()
Epoller ->>- Poller : return
Poller ->>- Event : return
Event ->>- Acceptor : return
Acceptor ->>- Server : return
Server ->>- main : return
EventLoop ->>+ EventLoop : wait()
loop while(true)
EventLoop ->>+ EventLoop : runOnce()
EventLoop ->>+ Epoller : auto events = poll()
Epoller ->> Epoller : ::epoll_wait(epollfd_, &(*pollEvents_.begin()), maxSize_)
Epoller ->>- EventLoop : return
loop for event in events 
alt 服务端监听描述符
EventLoop ->>+ Event : do()
Event ->>+ Acceptor : readHandler()
Acceptor ->>+ Socket : auto fd = socket_->accept();
Socket ->>+ Socket : ::accept()
Socket ->>- Socket : return
Socket ->>- Acceptor : return
Acceptor ->>+ Server : newConnectionHandler(fd)
Server ->>+ Connection : auto connection = make_shared<TcpConnection>(fd)<br>Server::disconnectedHandler_注册进断开链接事件<br>connection->setFinishHandler():<br>connection->initAccepted()
Connection ->>+ Socket : socket_->setNoBlock()<br>socket_->setNoDelay()<br>TcpConnection::readHandler()<br>注册进读事件:<br>event_->setReadEvent()<br>event_->enableReadNotify()
Socket ->>- Connection : return
Connection ->>- Server : return
Server ->>+ Server : connectedHandler_()
Server ->>+ main : onConnect()
main ->>+ main : 注册可读事件connection.asyncRead<br>(expectSize, main::onConnectionRead)
main ->>- main : return
main ->>- Server : return
Server ->>- Acceptor : return
Acceptor ->>- Event : return
Event ->>+ EventLoop : return
end
EventLoop ->>+ Event : do()
alt 服务端被建立链接描述符可读
Event ->>+ Connection : readHandler()
Connection ->>+ Connection : char buf[1024]<br>socket_->read(buf)<br>append to readBuffer_
alt readedSize_ >= expectSize_
Connection ->>+ main : onConnectionRead()
main ->>+ main : 读取Connection的readBuffer_<br>写入读取Connection的writeBuffer_
main ->>+ Connection : 注册写完事件connection.asyncWrite<br>(main::finishWrite)
Connection ->>+ Connection : TcpConnection::writeHandler()<br>注册进写事件:<br>event_->setWriteEvent()<br>event_->enableWriteNotify()
Connection ->>- Connection : return
Connection ->>- main : return
main ->>- main : return 
main ->>- Connection : return
end
Connection ->>- Connection : return 
Connection ->>- Event : return
end
alt 服务端被建立链接描述符可写
Event ->>+ Connection : writeHandler()
Connection ->>+ Connection : socket_->Write()
Connection ->>- Connection : return
alt 全部写完
Connection ->>+ main : main::finishWrite()
main ->> Connection : return
end 
Connection ->> Event : return
end 
Event ->>+ EventLoop : return
end
EventLoop ->>- EventLoop : return
end
EventLoop ->>- EventLoop : return

客户端流程图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
```



## 协程框架

协程实现在这里[sheep_cpp/src/coroutine/](https://github.com/tedcy/sheep_cpp/tree/e2a97cdda05d4c252cb517051c10a75a7af4fb66/src/coroutine)

### 类图

```mermaid
classDiagram
direction TB
namespace 协程框架 {
class CoroutineInfo协程 {
+void registerFunc()
+void switchTo(CoroutineInfo *to) 切换到目标协程
+void switchBack() 切换回主协程
}
class CoroutineScheduler协程调度器 {
+void start() 用于启动这个协程调度器在独立线程里
+void addCoroutine(F && func) 用于其他线程给这个线程添加协程任务
+void yield() 用于挂起协程,让给其他协程执行,空闲时再转回自己
+void suspend() 用于挂起协程,必须通过resume唤醒
+void resume(CoroutineInfo*) 用于唤醒suspend挂起的协程
+void sleep() 用于定时休眠后唤醒,可以用suspend和resume封装
}
class CoSharedState["CoSharedState&lt;T&gt;"] {
+void setValue(T && t)
+void setException(std::exception_ptr p)
+void wait(int ms) 在线程或协程内,使用各自版本的cv + lock
}
class CoPromise["CoPromise&lt;T&gt;"] {
+CoFuture&lt;T&gt; getFuture()
+void setValue(T && t)
+void setException(std::exception_ptr p)
-shared_ptr~SharedState~T~~ state : \n和CoFuture共享同一个SharedState
}
class CoFuture["CoFuture&lt;T&gt;"] {
+T wait(int ms = -1)
-shared_ptr~SharedState~T~~ state : \n和CoPromise共享同一个SharedState
}
class CoASyncer {
+CoFuture&lt;T&gt async(F && func) 异步执行func,并获取一个CoFuture
}
class CoMutex {
+void lock()
+void unlock()
}
class CoConditionValue {
+void wait(int ms = -1)
+void notify_one()
+void notify_all()
}
}
CoroutineInfo协程 "N" --* CoroutineScheduler协程调度器
CoSharedState ..> CoMutex
CoSharedState ..> CoConditionValue
CoPromise ..> CoSharedState
CoFuture ..> CoSharedState
CoASyncer ..> CoFuture
CoMutex ..> CoroutineScheduler协程调度器
CoConditionValue ..> CoroutineScheduler协程调度器
CoASyncer ..> CoroutineScheduler协程调度器
namespace 网络框架 {
class EventLoop事件循环 {
-void run() 在一轮poller后调用yield
}
class Server {
+void setConnectedHandler() \n连接建立以后创建一个新协程
}
class Client {
+void asyncConnect(F && func)
+void connect(): \n创建一个CoPromise,\n调用AsyncConnect的func里面对其setValue(),\n外面的CoFuture去wait()
}
class TcpConnection {
+void asyncRead(F && func)
+void read() 同AsyncConnect
}
}
note for EventLoop事件循环 "1 服务端线程整个由调度器托管<br>EventLoop是调度器的一个协程<br>2 异步回调可以全部套一层<br>来实现同步回调"
EventLoop事件循环 ..> CoroutineScheduler协程调度器
Client ..> CoPromise
Client ..> TcpConnection
Server ..> EventLoop事件循环
Server ..> TcpConnection
TcpConnection ..> CoPromise