tars C++源码分析
由于公司使用的taf框架是和开源的tars框架一脉相承,虽然经过几年的改造,几乎已经面目全非了,但是主体结构上相差不大
因此从tars框架的最初版本的源码分析上就可以理解整个核心链路了
本篇会分析开源的tars1.0版本和tars3.0版本,并探索3.0版本的优化原因,最后对他们的性能做一些比较
计划中还有一篇公司taf框架的分析,很有趣的是它的发展方向和tars3.0不太一致,因此可以对其性能和tars性能也做一番比较,遗憾的是出于保密需要,无法将其post在我的博客上了
tars编译问题
tars的不同版本在我的ubuntu 20.04上都遇到了编译问题
tars1.0
在编译tars v1.0.0时遇到如下报错
1 | Linking CXX executable tars2node |
但实际上我是安装了libiconv的,make --trace
看下是什么问题,显示具体命令为
1 | cd /root/tars/TarsCpp/build/tools/tars2node && /root/env/vim_download/cmake-3.16.8-Linux-x86_64/bin/cmake -E cmake_link_script CMakeFiles/tars2node.dir/link.txt --verbose= |
确实在/root/tars/TarsCpp/build/tools/tars2node/CMakeFiles/tars2node.dir/link.txt
没有链接libiconv
1 | ~/tars/TarsCpp/build# cat /root/tars/TarsCpp/build/tools/tars2node/CMakeFiles/tars2node.dir/link.txt |
在link.txt后追加-liconv
后回到build目录继续编译可以顺利完成
1 | mkdir build && cd build && cmake .. && sed -i "s/$/& -liconv/g" tools/tars2node/CMakeFiles/tars2node.dir/link.txt && make -j |
tars3.0
tars v3.0.0通过APPLE开关来选择链接libiconv
另外,默认不再编译example,通过ONLY_LIB开关来选择编译example
1 | rm -rf build && mkdir build && cd build && cmake .. -DONLY_LIB=OFF -DAPPLE=ON && bear make -j |
tars1.0流程分析
tars基金会对tars的1.0版本有过一个很好的分析,珠玉在前,对tars1.0的流程分析没有这么详细,更多的是自我体悟的总结
服务端
首先是对主要组件做一个大致介绍
Application
tars的服务端入口类为Application,可以认为Application即为服务端,他主要是对tars的配置文件等进行读取,从读取的内容对TC_EpollServer进行操作从而运行服务端
TC_EpollServer
服务端的主体逻辑都在TC_EpollServer中,它由两个重要模块组成:网络模块,业务模块
网络模块
NetThread
代表了网络模块的单个线程,使用TC_Epoller作为网络事件驱动
并使用TC_Socket,TC_BufferPool,Connection和ConnectionList来处理收发包的网络操作
服务端的流程一般为bind()->accept()->read()->write()->close()
其中bind, accept的逻辑被抽到了BindAdapter(在BindAdapter中分析),其他网络逻辑都在NetThread中
配置文件的/tars/application/server中的”netthread”字段记录了网络线程的数量
业务模块
Handle
代表了业务模块的单个线程,业务线程以条件变量的方式阻塞在死循环中,接受由网络线程派发过来的请求包
然后解包选择不同的调用业务模块的相关代码(IDL相关RPC接口的实现)
HandleGroup
组合了业务模块的单个线程,用于和BindAdapter交互(在BindAdapter中分析)
网络模块和业务模块的桥梁:BindAdapter
作为网络模块和业务模块之间的桥梁
BindAdapter顾名思义:绑定适配器,代表了业务模块对外服务时绑定的tcp端口
由于绑定了tcp端口,因此它封装了bind,accept相关的代码,网络相关的代码从网络模块提取到这里来
/tars/application/server下面的每个xxxAdapter都是BindAdapter配置
再具体一些来说,以典型的配置文件的为例:
1 | <Test.StressServer.StressObjAdapter> |
对接网络模块
网络模块的线程组中第一个线程的TC_Epoller会注册BindAdapter中的TC_Socket来监听accept事件(避免多线程监听同一个fd的惊群效应)
在网络模块完成了RPC请求包的收包以后,就会将请求包发给BindAdapter的无锁队列,唤醒业务线程将其取出处理
/tars/application/server/xxxAdapter
下:
endpoint
代表了监听的ip,端口
对接业务模块
/tars/application/server/xxxAdapter
下:
servant
字段指定了和在Server类初始化时,XXXServantImp类(继承IDL相关RPC接口的实现)对应的XXXServantObj1
2
3void StressServer::initialize() {
addServant<StressServantImp>(ServerConfig::Application + "." + ServerConfig::ServerName + ".StressServantObj");
}如果这里的StressServantObj在配置文件没有找到任何servant,StressServer会无法启动
Application初始化配置文件时,会通过ServantHelperManager将BindAdapter的ID(XXXObjAdapter)和“XXXServantObj”字符串绑定起来
1
2_adapter_servant["XXXObjAdapter"] = "XXXServantObj";
_servant_adapter["XXXServantObj"] = "XXXObjAdapter";这里的映射关系也意味着Servant和BindAdapter是一对一的,否则通过servant找BindAdapter会失败
addServant时将业务逻辑的实现代码类(XXXServantImp)绑定到"XXXServantObj"字符串上,使用"XXXServantObj"字符串可以实例化XXXServantImp类
1
_servant_creator["XXXServantObj"] = new ServantCreation<XXXServantImp>();
handlegroup
字段指定了绑定的业务线程组HandleGroup的IDHandleGroup和BindAdapter也是一对一,这样可以保证每个业务跑在自己的业务线程组里
以下代码限制了不一样时会直接抛出异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14void Application::main(int argc, char *argv[]) {
...
for (size_t i = 0; i < adapters.size(); ++i) {
string name = adapters[i]->getName();
string groupName = adapters[i]->getHandleGroupName();
if(name != groupName) {
TC_EpollServer::BindAdapterPtr ptr = _epollServer->getBindAdapter(groupName);
if (!ptr) {
throw runtime_error("[TAF][adater `" + name + "` setHandle to group `" + groupName + "` fail!");
}
}
}
...
}threads
字段指定了HandleGroup下存在多少个ServantHandle
Handle初始化核心代码如下
1 | void ServantHandle::initialize() { |
Handle初始化时,会遍历对应HandleGroup相关的BindAdapter的ID("XXXObjAdapter")
通过
_adapter_servant
的映射关系,找到”XXXServantObj“通过
_servant_creator
的映射关系,找到new ServantCreation<XXXServantImp>()
这个工厂类,可以实例化XXXServantImp返回servant,然后servant->setName(”XXXServantObj“),servant->getName()也就是"XXXServantObj"了
从而在被网络线程唤醒后,读取无锁队列的请求包协议字段中的BindAdapter的ID("XXXServantObj")
通过
_servants
的映射关系,找到对应实例化的XXXServantImp类来进行处理
这里逻辑有点复杂的原因,是虽然Servant,HandleGroup和BindAdapter都是一对一关系,只有和Handle是一对多的关系
但是代码里面可能有历史包袱的关系,所有代码都处理了多对多关系,很容易被迷惑
小结
引用自微服务开源框架TARS的RPC源码解析 之 初识TARS C++服务端的服务端相关类图如下:
可以看到BindAdapter处于类图中的C位
BindAdapter作为网络模块和业务模块之间的桥梁,主要承担了单向通知的作用
这是因为网络模块要通知业务模块只能走条件变量的方式,而业务模块通知网络模块可以通过io事件
当网络模块收包逻辑处理完后,通知BindAdapter这个适配器,再由BindAdapter的方法去通知业务模块
业务模块对网络模块是不可见的,从而达到解耦
服务端流程
引用自微服务开源框架TARS的RPC源码解析 之 初识TARS C++服务端的流程图已经很好的进行了分析,因此不再细述,简单的将各部分流程图扒过来,最后总结一下总流程图
模块初始化流程
网络模块初始化
网络模块的初始化主要初始化各个网络线程,并且在第一个网络线程为BindAdapter监听端口
业务模块初始化
业务模块的初始化主要就是完成对接业务模块中所述部分,用于在业务模块:处理RPC请求时,能够根据请求包进行反射,由对应的XXXServantImp类来处理
最后启动Handle线程
在微服务开源框架TARS的RPC源码解析 之 初识TARS C++服务端一文中有一段这样的描述
由于Handle是继承自TC_Thread的,在执行Handle::start()中,会执行虚函数Handle::run(),在Handle::run()中主要是执行两个函数,一个是ServantHandle::initialize(),另一个是Handle::handleImp():
1
2
3
4
5
6 void TC_EpollServer::Handle::run()
{
initialize();
handleImp();
}
但是实际上,run是一个虚函数,因此执行的应当是ServantHandle,在这里引入了还未稳定的协程逻辑,由于文章没有分析协程,因此文章大体上还是对的,只是有些瑕疵
1 | void ServantHandle::run() |
请求的工作流程
网络模块
接受客户端链接
接收RPC请求
发送RPC响应
业务模块
处理RPC请求
小结
核心流程如图,可以看到在总流程中BindAdapter在中间,起到了网络模块和业务模块之间桥梁的作用
sequenceDiagram
participant NetThread
participant TC_Epoller
participant Connection
participant ServantHandle
participant BindAdapter
loop 网络线程
NetThread ->>+ TC_Epoller : 初始化:为BindAdapter的socket<br>添加 ET_LISTEN | EPOLLIN事件
TC_Epoller ->>- NetThread : return
NetThread ->>+ TC_Epoller : epoll_wait()
TC_Epoller ->>- NetThread: 触发ET_LISTEN事件
NetThread ->> NetThread : 创建 Connection<br>选取NetThread并加入NetThread的ConnectionList<br>为accepted的socket添加EPOLLIN | EPOLLOUT事件
NetThread ->>+ TC_Epoller : epoll_wait()
TC_Epoller ->>- NetThread : 触发EPOLLIN事件
NetThread ->>+ NetThread : processNet()<br>创建待解析数据:deque<tagRecvData *>> vRecvData
NetThread ->>+ Connection : recv()
Connection ->> Connection : ::read()处理EAGAIN等<br>解析协议:parseProtocol(vRecvData)<br>放入解析数据:vRecvData.push_back(recv)
Connection ->>- NetThread : 返回解析数据大小: return recv.size()
NetThread ->>+ BindAdapter : insertRecvQueue(vRecvData)
BindAdapter ->> BindAdapter : 解析数据放入BindAdapter接收的数据队列:_rbuffer.push_back(vRecvData)<br>唤醒业务线程条件变量处理:_handleGroup->monitor.notify();
BindAdapter ->>- NetThread : return
NetThread ->>- NetThread : return
end
loop 业务线程
ServantHandle ->>+ BindAdapter : _handleGroup->monitor.timedWait()<br>waitForRecvQueue()
BindAdapter ->> BindAdapter : _rbuffer.pop_front()
BindAdapter ->>- ServantHandle : return stRecvData
alt 如果 stRecvData.isOverload
ServantHandle ->> ServantHandle : handleOverload(stRecvData)
else 如果 stRecvData.isClosed
ServantHandle ->> ServantHandle : handleClose(stRecvData)
else 如果 (now - stRecvData.recvTimeStamp) > adapter->getQueueTimeout()
ServantHandle ->> ServantHandle : handleTimeout(stRecvData)
else 如果 一切正常
ServantHandle ->>+ ServantHandle : handle()
ServantHandle ->>+ TarsCurrent : 初始化请求上下文<br>current = new TarsCurrent(this)<br>current->initialize(stRecvData)
TarsCurrent ->>- ServantHandle : return
ServantHandle ->>+ ServantHandle : handleTarsProtocol()
ServantHandle ->>+ XXXServantImp: vector<char> buffer<br>int ret = onDispatch(current, buffer)<br>执行对应接口实现
XXXServantImp ->>- ServantHandle: return
ServantHandle ->>+ TarsCurrent: current->sendResponse(ret, buffer)
TarsCurrent ->> TarsCurrent: ResponsePacket response<br>response.sbuffer = buffer<br>response转换成string sSendBuffer<br>_pEpollServer->send(sSendBuffer)<br>NetThread* netThread = getNetThreadOfFd(fd)
TarsCurrent ->>- ServantHandle: return
ServantHandle ->>- ServantHandle : return
ServantHandle ->>- ServantHandle : return
ServantHandle ->>+ NetThread : netThread->send(sSendBuffer)
NetThread ->>+ NetThread : tagSendData* send = new tagSendData()<br>send->buff = sSendBuffer<br>_sbuffer.push_back(send)
NetThread ->> TC_Epoller : 通知ET_NOTIFY事件:<br>_epoller.mod(<br>_notify.getfd(), H64(ET_NOTIFY), EPOLLOUT)
NetThread ->>- NetThread : return
NetThread ->>- ServantHandle : return
end
end
loop 网络线程
NetThread ->>+ TC_Epoller : epoll_wait()
TC_Epoller ->>- NetThread : 触发ET_NOTIFY事件
NetThread ->>+ NetThread : processPipe()<br>创建待发送数据:deque<tagSendData *> deSendData<br>从发送队列读取发送数据:_sbuffer.swap(deSendData)<br>
NetThread ->>+ Connection: for iter : deSendData<br>(*iter)->buffer是业务线程中onDispatch处理完的buffer: send((*iter)->buffer)
Connection ->> Connection: ::send()处理EAGAIN等<br>没发完buffer的存到_sendbuffer:<br>for (slice : buffer) _sendbuffer.push_back(slice)
Connection ->>- NetThread : return
NetThread ->>- NetThread : return
NetThread ->>+ TC_Epoller : epoll_wait()
TC_Epoller ->>- NetThread : 触发EPOLLOUT事件
NetThread ->>+ NetThread : processNet()
NetThread ->>+ Connection : ::send()发送_sendbuffer中存起来的
Connection ->>- Connection : return
NetThread ->>- NetThread : return
NetThread ->>+ TC_Epoller : epoll_wait()
TC_Epoller ->>- NetThread : 触发EPOLLIN事件
NetThread ->>+ NetThread : processNet()
NetThread ->>+ Connection : recv()
Connection ->> Connection : 客户端关闭连接:::read() <= 0
Connection ->>- NetThread : return -1
NetThread ->>+ NetThread: 删除连接:delConnection(EM_CLIENT_CLOSE)
NetThread ->>- NetThread : return
NetThread ->>- NetThread : return
end
客户端
Communicator
Communicator代表了一个客户端实体,Communicator中维护了一组客户端网络线程等数据,因此创建和销毁的代价很高。一般通过Application::getCommunicator()
获得一个全局唯一的客户端实体
如果非常重要的客户端,发送紧急控制命令不希望被其他客户端干扰,对这个客户端new一个Communicator也是可以的
网络模块
CommunicatorEpoll
网络线程的代码全部在CommunicatorEpoll中,CommunicatorEpoll的生命周期由Communicator管理,和Communicator是聚合关系
这样对同一个Communicator而言,不同的服务访问,都可以复用同一组网络线程
服务代理模块
ServantProxy
ServantProxy代表了对一个服务进行访问的客户端代理
JCE生成的XXXProxy会继承ServantProxy
一般通过Communicator的方法template<class T> void stringToProxy()
获取到JCE生成的XXXProxy,就可以使用基类ServantProxy的各种接口了
ObjectProxy
ServantProxy管理了一组ObjectProxy,ObjectProxy代表了这个服务代理下的每个网络线程,依赖CommunicatorEpoll来进行操作
因此ServantProxy可以通过负载均衡策略选择不同的ObjectProxy也就是不同的网络线程
AdapterProxy
ObjectProxy管理了一组AdapterProxy,AdapterProxy代表了服务的每个不同实例(IP端口)
小结
网络模块和服务代理模块是并列关系,服务代理模块的3个模块从上往下,都是聚合的关系
看类图会更加清晰
引用自微服务开源框架TARS的RPC源码解析 之 初识TARS C++客户端的客户端相关类图如下:
客户端流程
一样,引用来自微服务开源框架TARS的RPC源码解析 之 初识TARS C++客户端的流程图
模块初始化流程
这个流程图有一个第5步略去了:如下
请求的工作流程
请求发起线程
客户端网络线程
发送RPC请求
接收RPC请求
小结
引用的几个流程图其实不太细致
没有介绍主控逻辑(服务发现),服务发现对客户端来说是很重要的一环
没有介绍超时逻辑
非阻塞socket的流程会复杂很多
connect,send,readv都是有可能多次才能完成的,在流程图中没有体现
sequenceDiagram
loop 请求发起线程
ServantProxy ->> ServantProxy : 调用JCE生成接口代码:prx->test()<br>tars_invoke(..., rsp)<br>初始化本次请求包:auto msg = new ReqMessage()<br>获取线程私有变量:auto pSptd = ServantProxyThreadData::getData()<br>选择网络线程:selectNetThreadInfo(pSptd, pObjectProxy, pReqQ)
ServantProxy ->> ServantProxy : 对线程变量_netSeq自增,轮询选择网络线程:<br>选择网络线程对应pObjectProxy:pObjProxy = *(_objectProxy + pSptd->_netSeq)<br>选择线程变量中,网络线程对应的队列:pReqQ = pSptd->_reqQueue[pSptd->_netSeq]<br>pSptd->_netSeq++<br>return
ServantProxy ->> ServantProxy : 网络线程对应的队列写入数据:pReq->push_back(msg)
CommunicatorEpoll ->>+ CommunicatorEpoll : 通知网络线程:pObjProxy->getCommunicatorEpoll()-><br>notify(pSptd->_reqQNo, pReqQ)
CommunicatorEpoll ->> CommunicatorEpoll : * 每个网络线程在ServantProxyThreadData::<br>getData()时会获得一个全局序列号_reqQNo:iSeq = pSptd->_reqQNo<br>* 根据序列号为caller线程在已有socket数组中创建通知socket:<br>_notify[iSeq].notify.createSocket()<br>* 通知信息中加上队列:_notify[iSeq].stFDInfo.p = pReqQ<br>* 带上FDInfo类型的stFDInfo通知信息通知网络线程<br>(未connect或者listen的socket会被唤醒EPOLLHUP事件):<br>_ep.add(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN)
CommunicatorEpoll ->>- ServantProxy : return
ServantProxy ->> ServantProxy : 条件变量阻塞等待唤醒:msg->pMonitor->wait()
end
loop 网络线程
CommunicatorEpoll ->> CommunicatorEpoll : run()<br>网络线程被唤醒:evs = epoll_wait()<br>处理每一个唤醒事件:for (ev : evs) handle(ev)<br>从ev获取FDInfo类型的通知信息:pFDInfo = ev.data<br>caller线程事件:if(FDInfo::ET_C_NOTIFY == pFDInfo->iType)<br>从通知信息获取网络线程队列:ReqInfoQueue * pInfoQueue=pFDInfo->p<br>获取到msg:pInfoQueue->pop_front(msg)
CommunicatorEpoll ->>+ ObjectProxy : 获取到ObjectProxy:<br>msg->pObjectProxy->invoke(msg)
ObjectProxy ->>+ EndpointManager : 选择AdapterProxy:<br>selectAdapterProxy(msg, pAdatapterProxy)
EndpointManager ->> EndpointManager : 多种选择策略,比如轮询可用的服务端连接_activeProxys:<br>getNextValidProxy()
EndpointManager ->>+ AdapterProxy : 第一次连接时,检查是否可用:checkActive()
AdapterProxy ->>+ TcpTransceiver : reconnect()
TcpTransceiver ->> TcpTransceiver : 初始化socket操作:createSocket()<br>尝试连接:::connect()
TcpTransceiver ->>+ CommunicatorEpoll : 为创建的socket添加EPOLLIN|EPOLLOUT事件:<br>_adapterProxy->getObjProxy()-><br>getCommunicatorEpoll()-><br>addFd(EPOLLIN|EPOLLOUT)
CommunicatorEpoll ->>- TcpTransceiver : return
TcpTransceiver ->>- AdapterProxy : return
AdapterProxy ->>- EndpointManager : return
EndpointManager ->>- ObjectProxy : return pAdatapterProxy
ObjectProxy ->>+ AdapterProxy : pAdapterProxy->invoke(msg)
AdapterProxy ->> TcpTransceiver : 发送数据:_trans->sendRequest
TcpTransceiver ->> TcpTransceiver : 第一次请求时还未连接成功,直接return<br>后续请求时已经连接成功,先尝试::send直到EAGAIN
AdapterProxy ->> AdapterProxy : 没写完的,msg写入超时队列:_timeoutQueue->push(msg)
AdapterProxy ->>- ObjectProxy : return
ObjectProxy ->>- CommunicatorEpoll : return
CommunicatorEpoll ->> CommunicatorEpoll : run()<br>网络线程被唤醒:evs = epoll_wait()<br>处理每一个唤醒事件:for (ev : evs) handle(ev)<br>从ev获取FDInfo类型的通知信息:pFDInfo = ev.data<br>非caller线程事件:if(FDInfo::ET_C_NOTIFY != pFDInfo->iType)<br>从通知信息获取:Transceiver *pTransceiver = pFDInfo->p<br>handleOutputImp
CommunicatorEpoll ->>+ TcpTransceiver : 如果是isConnecting状态,那么连接上了:pTransceiver->setConnected()<br>pTransceiver->doRequest()
TcpTransceiver ->>+ AdapterProxy : _adapterProxy->doInvoke()
AdapterProxy ->>+ TcpTransceiver : 发送数据:_trans->sendRequest
TcpTransceiver ->> TcpTransceiver : 尝试::send直到EAGAIN<br>如果发送完成,从超时队列中删除:_timeoutQueue->popSend
TcpTransceiver ->>- AdapterProxy : return
AdapterProxy ->>- TcpTransceiver : return
CommunicatorEpoll ->> CommunicatorEpoll : run()<br>网络线程被唤醒:evs = epoll_wait()<br>处理每一个唤醒事件:for (ev : evs) handle(ev)<br>从ev获取FDInfo类型的通知信息:pFDInfo = ev.data<br>非caller线程事件:if(FDInfo::ET_C_NOTIFY != pFDInfo->iType)<br>从通知信息获取:Transceiver *pTransceiver = pFDInfo->p<br>handleInputImp
CommunicatorEpoll ->>+ TcpTransceiver : pTransceiver->doResponse()
TcpTransceiver ->> TcpTransceiver : 准备_recvBuffer和栈上缓冲区用于readv收包:<br>vecs[0].iov_base = _recvBuffer.WriteAddr()<br>vecs[1].iov_base = stackBuffer
TcpTransceiver ->> TcpTransceiver : readv后全部收包到_recvBuffer:<br>readv(vecs, 2)<br>_recvBuffer.PushData(stackBuffer)
TcpTransceiver ->>+ AppProtocol : 对_recvBuffer增量解包:<br>_adapterProxy->getObjProxy()<br>->getProxyProtocol()<br>.responseFunc(_recvBuffer)
alt 如果完整收到包
AppProtocol ->>- TcpTransceiver : return
TcpTransceiver ->>- CommunicatorEpoll : return
CommunicatorEpoll ->>+ AdapterProxy : pTransceiver->getAdapterProxy()->finishInvoke(*it)
AdapterProxy ->> AdapterProxy : 超时队列删除msg:_timeoutQueue->erase(msg)<br>finishInvoke()<br>唤醒caller线程msg->pMonitor->notify()
AdapterProxy ->>- CommunicatorEpoll : return
else 如果包太大或者有其他问题,抛出异常
AppProtocol ->> TcpTransceiver : throw exception
TcpTransceiver ->> TcpTransceiver : close()
TcpTransceiver ->> CommunicatorEpoll : return
CommunicatorEpoll ->> CommunicatorEpoll : 走超时流程唤醒caller线程
end
end
loop 请求发起线程
ServantProxy ->> ServantProxy : 被唤醒<br>rsp = msg->response
end
我阅读下来发现v1.0.0有个明显的缺点,所有的服务端第一次连接都需要走reconnect流程,也就是close一下再connect,这个挺不好的。后面的版本优化了这一块逻辑
序列号
客户端的线程私有变量有_netSeq
和_reqQNo
两个序列号,初看代码时有点让人迷糊,简单区分一下
_netSeq
这是用来自增,轮询选取网络线程的,同时也会用来选择向网络线程写入的队列
ReqInfoQueue * _reqQueue[MAX_CLIENT_THREAD_NUM]
这个队列也是线程私有变量,在通知网络线程时主要就是把它发给网络线程,让网络线程从中pop出msg来发包
_reqQNo
caller线程和客户端网络线程通信时,是通过epoll来唤醒的,因此每个caller线程需要自己独一无二的唤醒socket
因此有一个序列号管理器,在caller发起时申请一个,不需要时又可以回收
caller线程通信所用的socket取自需要通信网络线程下
NotifyInfo _notify[MAX_CLIENT_NOTIFYEVENT_NUM]
这个事先创建好的当前CommunicatorEpoll唯一的socket数组,使用_notify[_reqQNo]
即可
主控流程
sequenceDiagram
loop 请求发起线程
Communicator ->>+ ServantProxy : stringToProxy("XXXObj", prx)
ServantProxy ->> ServantProxy : 略去一些流程
ServantProxy ->>+ ObjectProxy : pObjectProxy = new ObjectProxy(...)
ObjectProxy ->>+ EndpointManager : _endpointManger.reset(new EndpointManager(this, ...))<br>init()<br>setObjName()<br>如果是ip端口: sEndpoints从objName里面取<br>如果是XXXOBJ,从Communicator初始化_queryFPrx这个ServantProxy:<br>_queryFPrx = _communicator->stringToProxy<QueryFPrx>()
ObjectProxy ->>- ServantProxy : return
ServantProxy ->>- Communicator : return
end
loop 网络线程
Communicator ->> Communicator : 如上个流程图:略去一些流程
Communicator ->> EndpointManager : _endpointManger->selectAdapterProxy(msg, pAdapterProxy)
EndpointManager ->> EndpointManager : 刷新主控:refreshReg()<br>从主控的servantProxy拉可用的服务端activeEp:_queryFPrx->findObjectById()<br>doEndpoints(activeEp)
loop 如果请求主控失败
EndpointManager ->> ObjectProxy : if(!_valid) return
ObjectProxy ->> ObjectProxy :加入请求等待的超时队列:_reqTimeoutQueue.push(msg)<br>等待下一次请求时刷新主控,如果请求成功会重试请求
ObjectProxy ->> CommunicatorEpoll : return
end
loop 如果请求主控成功
EndpointManager ->> EndpointManager : notifyEndpoints(activeEp)<br>初始化AdapterProxy并加入_activeProxys<br>doNotify()
EndpointManager ->>+ ObjectProxy : _objectProxy->doInvoke()
ObjectProxy ->> ObjectProxy : 遍历超时队列:while(!_reqTimeoutQueue.empty()) _reqTimeoutQueue.pop()<br>_endpointManger->selectAdapterProxy(msg)<br>pAdapterProxy->invoke(msg)
ObjectProxy ->>- ObjectProxy : return
end
end
处理超时流程
在客户端中有两个超时队列
AdapterProxy的
_timeoutQueue
这是发送请求等待回包的超时,维护对每个服务端的
ObjectProxy的
_reqTimeoutQueue
这是对服务发现等待请求列表的超时,这是维护对主控的
sequenceDiagram
loop 超时流程
CommunicatorEpoll ->> CommunicatorEpoll : run()<br>网络线程被唤醒<br>doTimeout()
CommunicatorEpoll ->>+ AdapterProxy : 对全部AdapterProxy:doTimeout()
AdapterProxy ->> AdapterProxy : 取出超时msg:while(_timeoutQueue->timeout(msg))<br>finishInvoke(msg)<br>唤醒caller线程msg->pMonitor->notify()
AdapterProxy ->>- CommunicatorEpoll : return
CommunicatorEpoll ->>+ ObjectProxy : _objectProxyFactory->getObjectProxy(i)->doTimeout()
ObjectProxy ->> ObjectProxy : 遍历_reqTimeoutQueue取出msg,执行doInvokeException(msg)<br>标识请求异常:msg->eStatus = ReqMessage::REQ_EXC<br>唤醒超时的caller线程msg->pMonitor->notify()
ObjectProxy ->>- CommunicatorEpoll : return
end
tars3.0流程分析
在客户端和服务端模块都完全不变的情况下,tars3.0在框架层面的流程有一些变化
最大的变化是网络线程(包括服务端NetThread和客户端的)不再跑epoll为主的事件循环流程,而是跑协程调度器流程,并且在每跑一定数量的协程后进行epoll_wait,分析如下
协程调度器流程
tars3.0最大的变化就是协程调度器,因此对其流程进行分析
sequenceDiagram
TC_EpollServer ->> TC_EpollServer : waitForShutdown()<br>startHandle()<br>启动线程运行Handle::handleLoopCoroutine()
TC_EpollServer ->>+ TC_CoroutineScheduler : 向协程投递服务端处理协程:<br>_scheduler->go(&Handle::handleCoroutine)
TC_CoroutineScheduler ->>- TC_EpollServer : return
TC_EpollServer ->>+ TC_CoroutineScheduler : 执行主协程,直到服务端退出:<br>_scheduler->run()
loop 主协程循环
TC_CoroutineScheduler ->>+ TC_Epoller : 如果没有待执行的协程<br>执行_epoller->done(1000)
TC_Epoller ->> TC_Epoller : 触发定时事件<br>::epoll_wait()<br>触发EPOLLIN或EPOLLOUT等事件<br>例如TC_EpollServer::acceptCallback()等
TC_Epoller ->> TC_Epoller : 触发空闲事件<br>例如NET_THREAD_MERGE_HANDLES_THREAD模式下的<br>Handle::handleOnceThread<br>例如NET_THREAD_MERGE_HANDLES_CO模式下的<br>Handle::handleOnceCoroutine
TC_Epoller ->>- TC_CoroutineScheduler : return
TC_CoroutineScheduler ->> TC_CoroutineScheduler : wakeup()<br>唤醒被put()重新激活的协程,放进_acvive
TC_CoroutineScheduler ->> TC_CoroutineScheduler : wakeupbytimeout()<br>唤醒sleep的协程,放进_acvive
TC_CoroutineScheduler ->> TC_CoroutineScheduler : wakeupbyself()<br>唤醒暂时yield,随时可以唤醒的协程,放进_avail
TC_CoroutineScheduler ->> TC_CoroutineScheduler : 执行最多100个_acvive协程
TC_CoroutineScheduler ->> TC_CoroutineScheduler : 执行全部的_avail协程
end
可以看到,流程图中协程有执行active的,也有avail的。那么avail和active区别是什么?
avail的逻辑
go创建的协程放在avail
调用yield(true)的协程放在_needActiveCoroId里面
然后主协程定时调用wakeupbyself()唤醒_needActiveCoroId的协程,放进avail
active的逻辑
调用yield(false)的协程不会自动唤醒
需要调用put()把协程放到_activeCoroQueue里面
然后主协程定时调用wakeup()唤醒_activeCoroQueue的协程,放进active
因此avail的协程是yield(true)以后不需要唤醒也可以跑,始终可用(avail),而active的协程调用了yield(false)需要put触发来激活(active)
另外一方面,每一轮主协程逻辑,被激活的协程一次只能跑100个,但是始终可用的协程比较重要,必须要执行
举例来说
在HANDLES_CO的1,3模式下,TC_EpollServer::Handle::handleOnceCoroutine作为第一个被go()投递的协程(协程ID为1),需要定时yield(true),来切换到go跑的其他协程上去,它是可以自动从inactive变成avail的
sequenceDiagram loop handleCoroutine协程 TC_EpollServer ->> TC_EpollServer : 从网络收包协程取得数据:_dataBuffer->pop(data) TC_EpollServer ->>+ TC_CoroutineScheduler : 对每个pop的data<br>向协程投递单次业务逻辑处理协程:_scheduler->go(&Handle::handle) TC_CoroutineScheduler ->>- TC_EpollServer : return TC_EpollServer ->> TC_CoroutineScheduler : 挂起等待下一次执行:scheduler->yield() end
在HANDLES_CO的1,3模式下,ServantHandle::handle在go()第一次投递时,是在avail里面,是可以自动从inactive变成avail的。
sequenceDiagram loop 服务端handle协程 ServantHandle ->> ServantHandle : handleTarsProtocol() ServantHandle ->>+ XXXServantImp: onDispatch()<br>执行对应接口实现 XXXServantImp ->>- ServantHandle: return ServantHandle ->> ServantHandle : 回包流程:_servantHandle->sendResponse() ServantHandle ->>+ TC_EpollServer : 发送回包:_epollServer->send() TC_EpollServer ->>- ServantHandle : return end
在HANDLES_CO的1,3模式下,运行ServantHandle::handle的业务逻辑中,如果有客户端rpc发包结束后这个caller协程在yield(false)后是inactive的,一定需要网络协程处理完以后,通过put唤醒了这个协程以后,才能继续工作
将上面的服务端流程图中加入一次rpc调用如下
sequenceDiagram loop 服务端handle协程,客户端caller协程 ServantHandle ->> ServantHandle : handleTarsProtocol() ServantHandle ->>+ XXXServantImp: onDispatch()<br>执行对应接口实现 XXXServantImp ->>+ ServantProxy : tars_invoke() ServantProxy ->> ServantProxy : 发包 ServantProxy ->> ServantProxy : 挂起等待被put唤醒:msg->sched->yield(false) ServantProxy ->>- XXXServantImp : return XXXServantImp ->>- ServantHandle: return end loop 客户端网络协程 TC_CoroutineScheduler ->>+ TC_Epoller : 如果没有待执行的协程,<br>执行_epoller->done(1000)<br>触发EPOLLIN或EPOLLOUT等事件 TC_Epoller ->>+ CommunicatorEpoll : 客户端发包流程<br>handleOutputImp()<br>后续流程和tars1.0一致,省略 CommunicatorEpoll ->>- TC_Epoller : return TC_Epoller ->>- TC_CoroutineScheduler : return TC_CoroutineScheduler ->>+ TC_Epoller : 如果没有待执行的协程,<br>执行_epoller->done(1000)<br>触发EPOLLIN或EPOLLOUT等事件 TC_Epoller ->>+ CommunicatorEpoll : 客户端收包流程<br>handleInputImp()<br>唤醒caller协程<br>msg->sched->put(msg->iCoroId) CommunicatorEpoll ->>- TC_Epoller : return TC_Epoller ->>- TC_CoroutineScheduler : return end loop 服务端handle协程 ServantHandle ->> ServantHandle : 服务端回包流程:_servantHandle->sendResponse() ServantHandle ->>+ TC_EpollServer : 发送回包:_epollServer->send() TC_EpollServer ->>- ServantHandle : return end
在加入协程调度器后,逻辑侧改动其实不大,只是把线程部分协程化了,原本线程间的通信,替换成了协程的yield(true)和put()来完成原本的条件变量wait(),notify()部分
引入协程调度器后,服务端和客户端可以共用同一个线程了(在下面NET多种模式会详细解释),共用一个epoll,因此触发函数统一使用TC_Epoller::EpollInfo::registerCallback
进行注册
服务端
为bindAdapter监听的端口的EPOLLIN注册TC_EpollServer::acceptCallback函数
为accept到的socket的EPOLLIN和EPOLLOUT
分别注册TC_EpollServer::Connection::handleInputImp和TC_EpollServer::Connection::handleOutputImp
客户端
为客户端发起连接的socket的EPOLLIN和EPOLLOUT
分别注册CommunicatorEpoll::handleInputImp和CommunicatorEpoll::handleOutputImp
在caller线程不存在协程调度器的情况下
给客户端网络线程注册NetThread::processPipe来等待caller线程唤醒
NET多种模式
在tars1.0中,服务端分为网络线程和业务线程,客户端分为caller线程和客户端网络线程
在tars3.0中,通过改变/tars/application/server/opencoroutine的值,有4种模式可以选择
NET_THREAD_QUEUE_HANDLES_THREAD = 0
默认模式,和tars1.0一致
NET_THREAD_QUEUE_HANDLES_CO = 1
NET_THREAD_MERGE_HANDLES_THREAD = 2
NET_THREAD_MERGE_HANDLES_CO = 3
可以看出,4种模式是以NET_THREAD_XXX开头的2种模式和HANDLES_XXX结尾的2种模式的组合
NET_THREAD_QUEUE和NET_THREAD_MERGE的区别
- NET_THREAD_QUEUE开头的0,1模式
- 和tars1.0一致
- 独立网络线程 + 独立业务线程
- 网络线程负责收发包,通过队列唤醒handle线程中处理
- NET_THREAD_MERGE开头的2,3模式
- 合并网络线程 + 业务线程
- 网络线程负责收发包,也负责原本在业务线程中处理的逻辑,这种模式下延时最小
- PS:
- 线程个数以处理线程配置为准, 网络线程配置无效
- 如果是UDP, 则网络线程竞争接收包
NET_THREAD_QUEUE和tars1.0一致,不再分析
对NET_THREAD_MERGE如何合并网络和业务线程的进行分析
sequenceDiagram
loop 服务端主线程
TC_EpollServer ->> TC_EpollServer : waitForShutdown()
TC_EpollServer ->> TC_EpollServer : initHandle()<br>根据配置业务线程数量,来启动相应数量的网络线程<br>为网络线程设定业务线程的Handle::initialize()回调<br>如果是HANDLES_CO,那么给epoller的idle回调加入Handle::handleOnceCoroutine<br>如果是HANDLES_THREAD,那么给epoller的idle回调加入Handle::handleOnceThread
TC_EpollServer ->> TC_EpollServer : startHandle()<br>对每个业务线程getNetThread()->start()
TC_EpollServer ->> TC_EpollServer : 等待服务端主线程退出
end
loop 服务端网络线程
NetThread ->> NetThread : 启动协程调度器流程:_scheduler->run()
loop 服务端网络模块
NetThread ->> NetThread : epoll_wait触发EPOLLIN事件<br>::read()处理EAGAIN等<br>解包完毕
NetThread ->> BindAdapter : insertRecvQueue(recv)
BindAdapter ->> BindAdapter : push数据到队列中, 同时通过epoll唤醒某个等待处理线程<br>_rbuffer.push_back(recv)<br>_epoller->notify()
end
loop 服务端业务模块
ServantHandle ->> ServantHandle : 从队列取数据<br>_rbuffer.pop_front()
ServantHandle ->> ServantHandle : 调用XXXImp的对应接口处理<br>handleTarsProtocol()<br>onDispatch()
ServantHandle ->> NetThread : 选择对应的网络线程发包<br>adapter->getNetThread()->send(data)
NetThread ->> NetThread : 如果网络线程和业务线程没有合并(NET_THREAD_QUEUE)<br>流程和tars1.0相同,入队列再通知网络线程去发包<br>_sbuffer.push_back(data)
NetThread ->> NetThread : 如果网络线程和业务线程合并了(NET_THREAD_MERGE)<br>直接发包
end
end
HANDLES_THREAD和HANDLES_CO的区别
HANDLES_THREAD结尾的0,2模式
- 和tars1.0一致
- 服务端的业务线程中不启用协程处理
HANDLES_CO结尾的1,3模式,服务端的业务线程中启用协程处理,每个包都启动一个新协程
业务线程启动的客户端,在协程模式下,会共用当前线程的协程
因此在这两种模式下,在发起请求后会与服务端的业务线程共用同一个网络线程
对3模式而言,由于服务端的网络线程和业务线程也合并了,这意味着整个服务端只有一个线程来处理,这种模式下的延时是4个模式中最小的
在协程调度器中简单的介绍了HANDLES_CO下服务端网络线程进行一次rpc调用的情况,下面是详细的流程介绍
sequenceDiagram
loop handle协程
ServantHandle ->> ServantHandle : handleTarsProtocol()
ServantHandle ->>+ XXXServantImp: onDispatch()<br>执行对应接口实现
XXXServantImp ->>+ ServantProxy : tars_invoke()
ServantProxy ->>+ ServantProxy : 虽然接口名还叫选择网络线程,实际上同一网络线程:<br>selectNetThreadInfo()
ServantProxy ->>+ CommunicatorEpoll: 直接发包,不进行通知:<br>msg->pObjectProxy->getCommunicatorEpoll()<br>->handle(pSptd->_reqQNo)
CommunicatorEpoll ->>+ ObjectProxy : msg->pObjectProxy->invoke(msg)<br>后续流程和tars1.0一致,省略
ObjectProxy ->>- CommunicatorEpoll : return
CommunicatorEpoll ->>- ServantProxy : return
ServantProxy ->> ServantProxy : 挂起等待被put唤醒:msg->sched->yield(false)
ServantProxy ->>- XXXServantImp : return
XXXServantImp ->>- ServantHandle: return
end
loop 网络协程
TC_CoroutineScheduler ->>+ TC_Epoller : 如果没有待执行的协程,<br>执行_epoller->done(1000)<br>触发EPOLLIN或EPOLLOUT等事件
TC_Epoller ->>+ CommunicatorEpoll : handleOutputImp()<br>后续流程和tars1.0一致,省略
CommunicatorEpoll ->>- TC_Epoller : return
TC_Epoller ->>- TC_CoroutineScheduler : return
TC_CoroutineScheduler ->>+ TC_Epoller : 如果没有待执行的协程,<br>执行_epoller->done(1000)<br>触发EPOLLIN或EPOLLOUT等事件
TC_Epoller ->>+ CommunicatorEpoll : handleInputImp()
CommunicatorEpoll ->>+ TC_TCPTransceiver : _trans->doResponse()
TC_TCPTransceiver ->> TC_TCPTransceiver : 对EGAIN等非阻塞的处理和tars1.0一致<br>循环读取数据解析协议<br>
TC_TCPTransceiver ->>+ AdapterProxy : 调用_onParserCallback回调<br>_onParserCallback回调在initializeClient中进行注册<br>实际是AdapterProxy::onParserCallback
AdapterProxy ->> AdapterProxy : 唤醒caller协程<br>finishInvoke(rsp)<br>finishInvoke_parallel()<br>finishInvoke(msg)<br>msg->sched->put(msg->iCoroId)
AdapterProxy ->>- TC_TCPTransceiver : return
TC_TCPTransceiver ->>- CommunicatorEpoll : return
CommunicatorEpoll ->>- TC_Epoller : return
TC_Epoller ->>- TC_CoroutineScheduler : return
end
loop handle协程
ServantHandle ->> ServantHandle : 回包流程:_servantHandle->sendResponse()
ServantHandle ->>+ TC_EpollServer : 发送回包:_epollServer->send()
TC_EpollServer ->>- ServantHandle : return
end
性能测试
在根目录的examples/StressDemo下就有性能测试的代码
StressImp的服务端代码如下
1 | tars::Int32 StressImp::testStr(const std::string& in, std::string &out, tars::TarsCurrentPtr current) |
现在对tars模式NET_THREAD_QUEUE_HANDLES_THREAD,NET_THREAD_QUEUE_HANDLES_CO,NET_THREAD_MERGE_HANDLES_CO和公司框架一起做测试
服务端都是相同网络线程,客户端相同模式下使用5个线程并行发起请求
服务端不限cpu
首先进行服务端不限cpu的测试
pthread id一行会分别打印线程id和消耗时间
- NET_THREAD_QUEUE_HANDLES_THREAD
1 | root@121:~/tars/v3.0.0# ./build/bin/TarsStressServer --config=./config.conf |
- NET_THREAD_QUEUE_HANDLES_CO
1 | root@121:~/tars/v3.0.0# ./build/bin/TarsStressServer --config=./config_coro.conf |
- NET_THREAD_MERGE_HANDLES_CO
1 | root@121:~/tars/v3.0.0# ./build/bin/TarsStressServer --config=./config_coro3.conf |
- 公司框架
1 | root@121:~/taf_test/server# ./Server --config=config.conf |
服务端限制1核
NET_THREAD_QUEUE_HANDLES_THREAD
1
2
3
4init tp succ
times:1500000
pthread id: 139798232692480 | 198703
succ:1500000NET_THREAD_QUEUE_HANDLES_CO
1
2
3
4init tp succ
times:1500000
pthread id: 139790615820032 | 177378
succ:1500000NET_THREAD_MERGE_HANDLES_CO
1
2
3
4init tp succ
times:1500000
pthread id: 140052203300608 | 104109
succ:1500000公司框架
1
2
3
4init tp succ
times:1500000
pthread id: 139851859920640 | 228572
succ:1500000
小结
服务端不限cpu(qps) | 服务端限制1核(qps) | 限制后性能降低 | |
---|---|---|---|
NET_THREAD_QUEUE_HANDLES_THREAD | 12448 | 7548 | 39.36% |
NET_THREAD_QUEUE_HANDLES_CO | 12851 | 8456 | 34.19% |
NET_THREAD_MERGE_HANDLES_CO | 15334 | 14407 | 6.04% |
公司框架 | 11409 | 6562 | 42.48% |
稍微整理可知,NET_THREAD_MERGE_HANDLES_CO由于减少了线程切换,虽然网络协程到业务协程依然依赖队列,但是从业务协程处理完后,向网络协程回报不再依赖队列,减少了队列操作后,性能确实是最好的,在限制cpu后,性能降低也是最少的
NET_THREAD_QUEUE_HANDLES_CO相对NET_THREAD_QUEUE_HANDLES_THREAD,业务模块使用协程来处理以后,性能也得到了提升
公司框架相对而言就差了不少,特别是在模拟资源受限的情况下,一方面,qps只有NET_THREAD_MERGE_HANDLES_CO的45%,另外一方面性能损失也特别大,存在较大优化空间
读代码时的一些测试
tars的HANDLES_CO模式测试
看逻辑看的有点糊涂,主要希望测试出来客户端和服务端是否同一个线程
- 测试方法:
- 给客户端创建的线程加点日志打印线程id
- 给服务端收包时加日志打印线程id
- 结论:
- 是同一个网络线程,且都为服务端业务线程
- 客户端创建的线程不会被使用(经过其他测试,如果客户端不开启HANDLES_CO模式,就会进入客户端网络线程)
1 | if (msg->sched) { |
没开启HANDLES_CO的客户端打印:
1
2
3
4
52022-10-27 23:43:32|140229114881792|[ServantProxy.cpp::invoke::897]|not in sched handle: 0x563d60a46bb0, TestApp.BServer.BServantObj
2022-10-27 23:43:32|140229141268224|[ObjectProxy.cpp::invoke::155]|[ObjectProxy::invoke, objname:TestApp.BServer.BServantObj, begin...]
2022-10-27 23:43:32|140229114881792|[ServantProxy.cpp::invoke::908]|pMonitor wait: 0x563d60a46bb0, TestApp.BServer.BServantObj before
...
2022-10-27 23:43:32|140229114881792|[ServantProxy.cpp::invoke::912]|pMonitor wait: 0x563d60a46bb0, TestApp.BServer.BServantObj after140229114881792线程通过notify通知客户端网络线程工作,最终唤醒当前线程结束条件变量wait
开启HANDLES_CO的客户端打印:
1
2
32022-10-27 23:43:32|140030806836992|[ServantProxy.cpp::invoke::890]|in sched handle: 0x7f5b6800ae50, TestApp.AServer.AServantObj
2022-10-27 23:43:32|140030806836992|[ObjectProxy.cpp::invoke::155]|[ObjectProxy::invoke, objname:TestApp.AServer.AServantObj, begin...]
...一直是相同的线程打印日志,说明没发生切换
触发connect测试
看代码的时候对什么时候触发connect有点不太确定,打印堆栈看看
在EndpointManager对AdapterProxy进行checkActive时,进行::connect,并且加入epoll,如果EINPROGRESS那么就等待epoll唤醒
1 | 0x000055b279417b1a <tars::TC_Transceiver::doConnect(int, sockaddr const*, unsigned int)+0x13a> |
随后在handleLoopCoroutine中,epoll被唤醒,从而设置为已连接状态
1 | 0x000055b279343330 <tars::ObjectProxy::prepareConnection(tars::AdapterProxy*)+0x1a0> |
-
2022-11-30
Here's something encrypted, password is required to continue reading.