在C++中实现协程
我在整理tars和taf的协程逻辑的时候,也实现了一个纯净版的协程框架以便于理解
网络框架
类图
看下把tcpconnection改成回调中智能指针是否合理
classDiagram
direction TB
class EventLoop {
+void wait()
+void stop()
+void addPoller(uint64_t type, shared_ptr~Poller~ poller)
+weak_ptr<Poller> getPoller(uint64_t pollerType)\n在Connector和Acceptor中用来给TcpConnection创建Event的时候指定Poller
-map<uint64_t, shared_ptr<Poller>> pollers_;
}
class Poller {
<<interface>>
+vector<weak_ptr<Event>> poll(string&)
+void addOrUpdateEvent(shared_ptr<Event> event)
+void removeEvent(Event *event)
}
class Epoller {
+vector<weak_ptr<Event>> poll(string&)
+void addOrUpdateEvent(shared_ptr~Event~ event)
+void removeEvent(Event *event)
}
class Event {
+Event(weak_ptr~Poller~ poller, int64_t fd)
+void setReadEvent(function<void#40;#41;>)
+void setWriteEvent(function<void#40;#41;>)
+void enableReadNotify()
+void enableWriteNotify()
+void disableReadNotify()
+void disableWriteNotify()
-weak_ptr~Poller~ poller_;
}
Epoller ..|> Poller
EventLoop ..> Poller
Epoller ..> Event
Event ..> Poller
class Socket {
+void bind(const string &addr, int port) Acceptor使用
+void listen() Acceptor使用
+void accept() Acceptor使用
+void connect(const string &addr, int port) Connector使用
+int read(char* buf, int len) TcpConnection使用
+int write(char* buf, int len) TcpConnection使用
+void setNoblock()
}
class Buffer {
+void push(const char *buf, uint64_t len)
+uint64_t popHead(char *buf, uint64_t len)
-void write(const char *buf, uint64_t len) TcpConnection使用
-uint64_t Read(char *buf, uint64_t len) TcpConnection使用
}
class TcpConnection {
+TcpConnection(unique_ptr~Socket~ &&socket, shared_ptr~Event~ &event)
+void asyncRead(uint64_t expectSize, function<void#40;const string &errMsg#41;> func)
+void asyncReadAny(function<void#40;const string &errMsg#41;>)
+void asyncWrite(function<void#40;const string &errMsg#41;>)
+void finish(const string &errMsg)
-void initAccepted(string &errMsg)
-void initConnected(string &errMsg)
-void setFinishHandler(function<void#40;const string &errMsg,\nshared_ptr<TcpConnection>)>)
-Buffer readBuffer_;
-Buffer writeBuffer_;
-unique_ptr~Socket~ socket_;
-shared_ptr~Event~ event_;
}
class Acceptor {
+Acceptor(EventLoop &loop, const string &addr, int port)
+void listen(string &errMsg)
+void setNewConnectionHandler(function<void#40;int fd#41;>)
-unique_ptr~Socket~ socket_;
-shared_ptr~Event~ event_;
}
class Connector {
Connector(EventLoop &loop, const string &addr, int port)
+void connect(string &errMsg)
+void setNewConnectionHandler(function<void#40;unique_ptr<Socket>&,\nshared_ptr<Event>)>)
+void setConnectFailedHandler(function<void#40;const string &errMsg#41;>)
-unique_ptr~Socket~ socket_;
-shared_ptr~Event~ event_;
}
Socket --* TcpConnection
Buffer --o TcpConnection
Event --o TcpConnection
Socket --* Acceptor
Event --o Acceptor
Socket --* Connector
Event --o Connector
class Server {
+Server(EventLoop &loop, const string &addr, int port)
+void serve(string &errMsg)
+void setConnectedHandler(function<void#40;const string &errMsg,TcpConnection)>)\n连接成功以后,在connected回调的参数来获取连接读取数据
+void setDisconnectedHandler(function<void#40;const string &errMsg#41;>)
-Acceptor acceptor_;
set~shared_ptr~TcpConnection~~ connections_;
}
TcpConnection --o Server
Acceptor --* Server
Client --> EventLoop
class Client {
+Client(EventLoop &loop, const string &addr, int port)
+void asyncConnect(string &errMsg)
+void setConnectedHandler(function<void#40;const string &errMsg#41;>)
+void setDisconnectedHandler(function<void#40;const string &errMsg#41;>)
+TcpConnection& getTcpConnection()\n连接成功以后,在connected回调中调用来获取连接读取数据
-Connector connector_;
-shared_ptr~TcpConnection~ connection_;
}
Connector --* Client
TcpConnection --o Client
Server --> EventLoop
服务端流程图
sequenceDiagram
participant main
participant EventLoop
participant Server
participant Acceptor
participant Socket
participant Event
participant Poller
participant Epoller
main ->> Server : 注册服务端连接事件<br>setConnectedHandler()<br>main::onConnect()赋值给Server::connectedHandler_
main ->> Server : 注册服务端断开连接事件<br>setDisconnectedHandler()<br>main::onDisConnect()赋值给Server::disconnectedHandler_
main ->>+ Server : serve()
Server ->>+ Acceptor : Server::newConnectionHandler()<br>注册进新连接事件:<br>acceptor_-><br>setNewConnectionHandler()<br>acceptor_->listen()
Acceptor ->>+ Socket : socket_->bind()<br>socket_->setNoBlock()<br>socket_->setNoDelay()
Socket ->>- Acceptor : return
Acceptor ->>+ Epoller : event_ = make_shared<Event><br>(Epoller::getInstance())
Epoller ->>- Acceptor : return
Acceptor ->>+ Event : Acceptor::readHandler()<br>注册进读事件:<br>event_->setReadEvent()<br>event_->enableReadNotify()
Event ->>+ Poller : poller_->addOrUpdateEvent()
Poller ->>+ Epoller : addOrUpdateEvent()
Epoller ->>- Poller : return
Poller ->>- Event : return
Event ->>- Acceptor : return
Acceptor ->>- Server : return
Server ->>- main : return
EventLoop ->>+ EventLoop : wait()
loop while(true)
EventLoop ->>+ EventLoop : runOnce()
EventLoop ->>+ Epoller : auto events = poll()
Epoller ->> Epoller : ::epoll_wait(epollfd_, &(*pollEvents_.begin()), maxSize_)
Epoller ->>- EventLoop : return
loop for event in events
alt 服务端监听描述符
EventLoop ->>+ Event : do()
Event ->>+ Acceptor : readHandler()
Acceptor ->>+ Socket : auto fd = socket_->accept();
Socket ->>+ Socket : ::accept()
Socket ->>- Socket : return
Socket ->>- Acceptor : return
Acceptor ->>+ Server : newConnectionHandler(fd)
Server ->>+ Connection : auto connection = make_shared<TcpConnection>(fd)<br>Server::disconnectedHandler_注册进断开链接事件<br>connection->setFinishHandler():<br>connection->initAccepted()
Connection ->>+ Socket : socket_->setNoBlock()<br>socket_->setNoDelay()<br>TcpConnection::readHandler()<br>注册进读事件:<br>event_->setReadEvent()<br>event_->enableReadNotify()
Socket ->>- Connection : return
Connection ->>- Server : return
Server ->>+ Server : connectedHandler_()
Server ->>+ main : onConnect()
main ->>+ main : 注册可读事件connection.asyncRead<br>(expectSize, main::onConnectionRead)
main ->>- main : return
main ->>- Server : return
Server ->>- Acceptor : return
Acceptor ->>- Event : return
Event ->>+ EventLoop : return
end
EventLoop ->>+ Event : do()
alt 服务端被建立链接描述符可读
Event ->>+ Connection : readHandler()
Connection ->>+ Connection : char buf[1024]<br>socket_->read(buf)<br>append to readBuffer_
alt readedSize_ >= expectSize_
Connection ->>+ main : onConnectionRead()
main ->>+ main : 读取Connection的readBuffer_<br>写入读取Connection的writeBuffer_
main ->>+ Connection : 注册写完事件connection.asyncWrite<br>(main::finishWrite)
Connection ->>+ Connection : TcpConnection::writeHandler()<br>注册进写事件:<br>event_->setWriteEvent()<br>event_->enableWriteNotify()
Connection ->>- Connection : return
Connection ->>- main : return
main ->>- main : return
main ->>- Connection : return
end
Connection ->>- Connection : return
Connection ->>- Event : return
end
alt 服务端被建立链接描述符可写
Event ->>+ Connection : writeHandler()
Connection ->>+ Connection : socket_->Write()
Connection ->>- Connection : return
alt 全部写完
Connection ->>+ main : main::finishWrite()
main ->> Connection : return
end
Connection ->> Event : return
end
Event ->>+ EventLoop : return
end
EventLoop ->>- EventLoop : return
end
EventLoop ->>- EventLoop : return
客户端流程图
1 | ``` |
相关文章
-
2025-01-12
截止25年1月12日,打算使用dlmopen来装载serverless平台第三方动态库的计划,在挣扎了2周后正式宣布破产
总的来说dlmopen虽然已经有几十年历史了,但是在当前还是非常不成熟的,有很多细节问题没有解决
想用上dlmopen,需要深入理解glibc的实现原理,和patches的可能bug斗智斗勇,这个投入产出比很低
背景
serverless平台的设计原理属于机密,略过不提