tars C++源码分析

由于公司使用的taf框架是和开源的tars框架一脉相承,虽然经过几年的改造,几乎已经面目全非了,但是主体结构上相差不大

因此从tars框架的最初版本的源码分析上就可以理解整个核心链路了

本篇会分析开源的tars1.0版本和tars3.0版本,并探索3.0版本的优化原因,最后对他们的性能做一些比较

计划中还有一篇公司taf框架的分析,很有趣的是它的发展方向和tars3.0不太一致,因此可以对其性能和tars性能也做一番比较,遗憾的是出于保密需要,无法将其post在我的博客上了

tars编译问题

tars的不同版本在我的ubuntu 20.04上都遇到了编译问题

tars1.0

在编译tars v1.0.0时遇到如下报错

1
2
3
4
5
Linking CXX executable tars2node
/usr/bin/ld: ../../util/lib/libtarsutil.a(tc_encoder.cpp.o): in function `tars::TC_Encoder::gbk2utf8(char*, int&, char const*, int)':
/root/tars/TarsCpp/util/src/tc_encoder.cpp:39: undefined reference to `libiconv_open'
/usr/bin/ld: /root/tars/TarsCpp/util/src/tc_encoder.cpp:75: undefined reference to `libiconv_close'
/usr/bin/ld: /root/tars/TarsCpp/util/src/tc_encoder.cpp:66: undefined reference to `libiconv'

但实际上我是安装了libiconv的,make --trace看下是什么问题,显示具体命令为

1
cd /root/tars/TarsCpp/build/tools/tars2node && /root/env/vim_download/cmake-3.16.8-Linux-x86_64/bin/cmake -E cmake_link_script CMakeFiles/tars2node.dir/link.txt --verbose=

确实在/root/tars/TarsCpp/build/tools/tars2node/CMakeFiles/tars2node.dir/link.txt没有链接libiconv

1
2
~/tars/TarsCpp/build# cat  /root/tars/TarsCpp/build/tools/tars2node/CMakeFiles/tars2node.dir/link.txt
/usr/bin/c++ -std=c++11 -g -O2 -Wall -Wno-deprecated -g -O2 -Wall -rdynamic CMakeFiles/tars2node.dir/code_generator.cpp.o CMakeFiles/tars2node.dir/file_util.cpp.o CMakeFiles/tars2node.dir/gen_js.cpp.o CMakeFiles/tars2node.dir/gen_js_dts.cpp.o CMakeFiles/tars2node.dir/gen_proxy.cpp.o CMakeFiles/tars2node.dir/gen_proxy_dts.cpp.o CMakeFiles/tars2node.dir/gen_server.cpp.o CMakeFiles/tars2node.dir/gen_server_dts.cpp.o CMakeFiles/tars2node.dir/gen_server_imp.cpp.o CMakeFiles/tars2node.dir/idl_scan.cpp.o CMakeFiles/tars2node.dir/idl_util.cpp.o CMakeFiles/tars2node.dir/main.cpp.o -o tars2node ../../util/lib/libtarsutil.a ../lib/libtarsparse.a ../../util/lib/libtarsutil.a

在link.txt后追加-liconv后回到build目录继续编译可以顺利完成

1
mkdir build && cd build && cmake .. && sed -i "s/$/& -liconv/g" tools/tars2node/CMakeFiles/tars2node.dir/link.txt && make -j

tars3.0

tars v3.0.0通过APPLE开关来选择链接libiconv

另外,默认不再编译example,通过ONLY_LIB开关来选择编译example

1
rm -rf build && mkdir build && cd build && cmake .. -DONLY_LIB=OFF -DAPPLE=ON && bear make -j

tars1.0流程分析

tars基金会对tars的1.0版本有过一个很好的分析,珠玉在前,对tars1.0的流程分析没有这么详细,更多的是自我体悟的总结

服务端

首先是对主要组件做一个大致介绍

Application

tars的服务端入口类为Application,可以认为Application即为服务端,他主要是对tars的配置文件等进行读取,从读取的内容对TC_EpollServer进行操作从而运行服务端

TC_EpollServer

服务端的主体逻辑都在TC_EpollServer中,它由两个重要模块组成:网络模块,业务模块

网络模块

NetThread

代表了网络模块的单个线程,使用TC_Epoller作为网络事件驱动

并使用TC_Socket,TC_BufferPool,Connection和ConnectionList来处理收发包的网络操作

服务端的流程一般为bind()->accept()->read()->write()->close()

其中bind, accept的逻辑被抽到了BindAdapter(在BindAdapter中分析),其他网络逻辑都在NetThread中

配置文件的/tars/application/server中的”netthread”字段记录了网络线程的数量

业务模块

Handle

代表了业务模块的单个线程,业务线程以条件变量的方式阻塞在死循环中,接受由网络线程派发过来的请求包

然后解包选择不同的调用业务模块的相关代码(IDL相关RPC接口的实现)

HandleGroup

组合了业务模块的单个线程,用于和BindAdapter交互(在BindAdapter中分析)

网络模块和业务模块的桥梁:BindAdapter

作为网络模块和业务模块之间的桥梁

BindAdapter顾名思义:绑定适配器,代表了业务模块对外服务时绑定的tcp端口

由于绑定了tcp端口,因此它封装了bind,accept相关的代码,网络相关的代码从网络模块提取到这里来

/tars/application/server下面的每个xxxAdapter都是BindAdapter配置

再具体一些来说:

对接网络模块

网络模块的线程组中第一个线程的TC_Epoller会注册BindAdapter中的TC_Socket来监听accept事件(避免多线程监听同一个fd的惊群效应)

在网络模块完成了RPC请求包的收包以后,就会将请求包发给BindAdapter的无锁队列,唤醒业务线程将其取出处理

/tars/application/server/xxxAdapter下:

  • “endpoint”代表了监听的ip,端口
对接业务模块

/tars/application/server/xxxAdapter下:

  • “servant”字段指定了和XXXServantImp类(继承IDL相关RPC接口的实现)对应的"XXXServantObj"

    • Application初始化配置文件时,会通过ServantHelperManager将BindAdapter的ID(“XXXObjAdapter”)和"XXXServantObj"绑定起来

      1
      2
      _adapter_servant["XXXObjAdapter"] = "XXXServantObj";
      _servant_adapter["XXXServantObj"] = "XXXObjAdapter";
    • addServant时将业务逻辑的实现代码类(XXXServantImp)绑定到"XXXServantObj"的key上,使用"XXXServantObj"可以实例化XXXServantImp类

      1
      _servant_creator["XXXServantObj"] = new ServantCreation<XXXServantImp>();
  • ”handlegroup”字段指定了绑定的业务线程组HandleGroup的ID

    • HandleGroup的ID一般和BindAdapter的ID是一样的(“XXXObjAdapter”),这样可以保证每个业务跑在自己的业务线程组里

    • 但是也可以一个HandleGroup对应多个BindAdapter,所以需要建立映射关系

    • HandleGroup初始化时,会将其和BindAdapter的ID绑定起来

      1
      setHandleGroup<ServantHandle>(_handleGroupName, _iHandleNum, adapter);

Handle初始化核心代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void ServantHandle::initialize() {
for (adpit = adapters.begin(); adpit != adapters.end(); ++adpit)
//adpit->first = "XXXObjAdapter"
//servant = XXXServantImp
ServantPtr servant = ServantHelperManager::getInstance()->create(adpit->first);
//servant->getName() = "XXXServantObj"
_servants[servant->getName()] = servant;
}
}
ServantPtr ServantHelperManager::create(const string &sAdapter) {
//sAdapter = "XXXObjAdapter"
//s = "XXXServantObj"
string s = _adapter_servant[sAdapter];
if(_servant_creator.find(s) != _servant_creator.end())
//servant = XXXServantImp
servant = _servant_creator[s]->create(s);
return servant;
}
template<class T> struct ServantCreation {
ServantPtr create(const string &s) { T *p = new T; p->setName(s); return p; }
};
  • Handle初始化时,会遍历对应HandleGroup相关的BindAdapter的ID(“XXXObjAdapter”)

    通过_adapter_servant的映射关系,找到”XXXServantObj“

    通过_servant_creator的映射关系,找到new ServantCreation<XXXServantImp>()

    这个工厂类,可以实例化XXXServantImp返回servant,然后servant->setName(”XXXServantObj“),servant->getName()也就是"XXXServantObj"了

  • 从而在被网络线程唤醒后,读取无锁队列的请求包协议字段中的BindAdapter的ID(“XXXServantObj”)

    通过_servants的映射关系,找到对应实例化的XXXServantImp类来进行处理

这里逻辑有点复杂的原因,是虽然“XXXObjAdapter”和"XXXServantObj"是一对一的关系,但是没有省略“XXXServantObj”

因此有的地方用“XXXObjAdapter”,有的地方用“XXXServantObj”,需要各种转换

如果全部用BindAdapter的ID,省略掉“XXXServantObj”,来完成全部的逻辑,会简单很多

小结

引用自微服务开源框架TARS的RPC源码解析 之 初识TARS C++服务端的服务端相关类图如下:

可以看到BindAdapter处于类图中的C位

BindAdapter作为网络模块和业务模块之间的桥梁,主要承担了单向通知的作用

这是因为网络模块要通知业务模块只能走条件变量的方式,而业务模块通知网络模块可以通过io事件

当网络模块收包逻辑处理完后,通知BindAdapter这个适配器,再由BindAdapter的方法去通知业务模块

业务模块对网络模块是不可见的,从而达到解耦

服务端流程

引用自微服务开源框架TARS的RPC源码解析 之 初识TARS C++服务端的流程图已经很好的进行了分析,因此不再细述,简单的将各部分流程图扒过来,最后总结一下总流程图

模块初始化流程

网络模块初始化

网络模块的初始化主要初始化各个网络线程,并且在第一个网络线程为BindAdapter监听端口

业务模块初始化

业务模块的初始化主要就是完成对接业务模块中所述部分,用于在业务模块:处理RPC请求时,能够根据请求包进行反射,由对应的XXXServantImp类来处理

最后启动Handle线程

微服务开源框架TARS的RPC源码解析 之 初识TARS C++服务端一文中有一段这样的描述

由于Handle是继承自TC_Thread的,在执行Handle::start()中,会执行虚函数Handle::run(),在Handle::run()中主要是执行两个函数,一个是ServantHandle::initialize(),另一个是Handle::handleImp():

1
2
3
4
5
6
void TC_EpollServer::Handle::run()
{
initialize();

handleImp();
}

但是实际上,run是一个虚函数,因此执行的应当是ServantHandle,在这里引入了还未稳定的协程逻辑,由于文章没有分析协程,因此文章大体上还是对的,只是有些瑕疵

1
2
3
4
5
6
7
8
9
10
11
12
13
void ServantHandle::run()
{
initialize();

if(!ServerConfig::OpenCoroutine)
{
handleImp();
}
else
{
//协程代码
}
}

请求的工作流程

网络模块
接受客户端链接
接收RPC请求
发送RPC响应
业务模块
处理RPC请求

小结

核心流程如图,可以看到在总流程中BindAdapter在中间,起到了网络模块和业务模块之间桥梁的作用

sequenceDiagram
%%{init: { 'sequence': {'useMaxWidth':false, 'showSequenceNumbers':true} } }%%
loop 网络线程
NetThread ->>+ TC_Epoller : 初始化:为BindAdapter的socket添加 ET_LISTEN | EPOLLIN事件
TC_Epoller ->>- NetThread : return
NetThread ->>+ TC_Epoller : epoll_wait()
TC_Epoller ->>- NetThread: 触发ET_LISTEN事件
NetThread ->>+ NetThread : 创建 Connection<br>选取NetThread并加入NetThread的ConnectionList<br>为accepted的socket添加EPOLLIN | EPOLLOUT事件
NetThread ->>+ TC_Epoller : epoll_wait()
TC_Epoller ->>- NetThread : 触发EPOLLIN事件
NetThread ->>+ NetThread : processNet()<br>创建待解析数据:deque<tagRecvData *>> vRecvData
NetThread ->>+ Connection : recv()
Connection ->>+ Connection : ::read()处理EAGAIN等<br>解析协议:parseProtocol(vRecvData)<br>放入解析数据:vRecvData.push_back(recv)
Connection ->>- NetThread : 返回解析数据大小: return recv.size()
NetThread ->>+ BindAdapter : insertRecvQueue(vRecvData)
BindAdapter ->>+ BindAdapter : 解析数据放入BindAdapter接收的数据队列:_rbuffer.push_back(vRecvData)<br>唤醒业务线程条件变量处理:_handleGroup->monitor.notify();
BindAdapter ->>- NetThread : return
end
loop 业务线程
ServantHandle ->>+ BindAdapter : _handleGroup->monitor.timedWait()<br>waitForRecvQueue()
BindAdapter ->>+ BindAdapter : _rbuffer.pop_front()
BindAdapter ->>- ServantHandle : return
ServantHandle ->>+ ServantHandle : handleTarsProtocol()
ServantHandle ->>+ XXXServantImp: onDispatch()<br>执行对应接口实现
XXXServantImp ->>- ServantHandle: return
ServantHandle ->>+ ServantHandle: _pEpollServer->send()<br>NetThread* netThread = getNetThreadOfFd(fd)
ServantHandle ->>+ NetThread : netThread->send()
NetThread ->>+ NetThread : tagSendData* send = new tagSendData()<br>_sbuffer.push_back(send)
NetThread ->>+ TC_Epoller : 通知ET_NOTIFY事件:_epoller.mod(_notify.getfd(), H64(ET_NOTIFY), EPOLLOUT)
end
loop 网络线程
NetThread ->>+ TC_Epoller : epoll_wait()
TC_Epoller ->>- NetThread : 触发ET_NOTIFY事件
NetThread ->>+ NetThread : processPipe()<br>创建待发送数据:deque<tagSendData *> deSendData<br>从发送队列读取发送数据:_sbuffer.swap(deSendData)<br>
NetThread ->>+ Connection: send()
Connection ->>+ Connection: ::send()处理EAGAIN等<br>没发完的存起来:for (slice : buffer) _sendbuffer.push_back(slice)
NetThread ->>+ TC_Epoller : epoll_wait()
TC_Epoller ->>- NetThread : 触发EPOLLOUT事件
NetThread ->>+ NetThread : processNet()
NetThread ->>+ Connection : ::send()发送_sendbuffer中存起来的
TC_Epoller ->>- NetThread : 触发EPOLLIN事件
NetThread ->>+ NetThread : processNet()
NetThread ->>+ Connection : recv()
Connection ->>+ Connection : 客户端关闭连接:::read() <= 0
Connection ->>- NetThread : return -1
NetThread ->>+ NetThread: 删除连接:delConnection(EM_CLIENT_CLOSE)
end

客户端

Communicator

Communicator代表了一个客户端实体,Communicator中维护了一组客户端网络线程等数据,因此创建和销毁的代价很高。一般通过Application::getCommunicator()获得一个全局唯一的客户端实体

如果非常重要的客户端,发送紧急控制命令不希望被其他客户端干扰,对这个客户端new一个Communicator也是可以的

网络模块

CommunicatorEpoll

网络线程的代码全部在CommunicatorEpoll中,CommunicatorEpoll的生命周期由Communicator管理,和Communicator是聚合关系

这样对同一个Communicator而言,不同的服务访问,都可以复用同一组网络线程

服务代理模块

ServantProxy

ServantProxy代表了对一个服务进行访问的客户端代理

JCE生成的XXXProxy会继承ServantProxy

一般通过Communicator的方法template<class T> void stringToProxy()获取到JCE生成的XXXProxy,就可以使用基类ServantProxy的各种接口了

ObjectProxy

ServantProxy管理了一组ObjectProxy,ObjectProxy代表了这个服务代理下的每个网络线程,依赖CommunicatorEpoll来进行操作

因此ServantProxy可以通过负载均衡策略选择不同的ObjectProxy也就是不同的网络线程

AdapterProxy

ObjectProxy管理了一组AdapterProxy,AdapterProxy代表了服务的每个不同实例(IP端口)

小结

网络模块和服务代理模块是并列关系,服务代理模块的3个模块从上往下,都是聚合的关系

看类图会更加清晰

引用自微服务开源框架TARS的RPC源码解析 之 初识TARS C++客户端的客户端相关类图如下:

客户端流程

一样,引用来自微服务开源框架TARS的RPC源码解析 之 初识TARS C++客户端的流程图

模块初始化流程

这个流程图有一个第5步略去了:如下

请求的工作流程

请求发起线程
客户端网络线程
发送RPC请求
接收RPC请求

小结

引用的几个流程图其实不太细致

  • 没有介绍主控逻辑(服务发现),服务发现对客户端来说是很重要的一环

  • 没有介绍超时逻辑

  • 非阻塞socket的流程会复杂很多

    connect,send,readv都是有可能多次才能完成的,在流程图中没有体现

sequenceDiagram
%%{init: { 'sequence': {'useMaxWidth':false, 'showSequenceNumbers':true} } }%%
loop 请求发起线程
ServantProxy ->> ServantProxy : 调用JCE生成接口代码:prx->test()<br>tars_invoke(..., rsp)<br>初始化本次请求包:auto msg = new ReqMessage()<br>获取线程私有变量:auto pSptd = ServantProxyThreadData::getData()<br>选择网络线程:selectNetThreadInfo(pSptd, pObjectProxy, pReqQ)
ServantProxy ->> ServantProxy : 对线程变量_netSeq自增,轮询选择网络线程:<br>选择网络线程对应pObjectProxy:pObjProxy = *(_objectProxy + pSptd->_netSeq)<br>选择线程变量中,网络线程对应的队列:pReqQ = pSptd->_reqQueue[pSptd->_netSeq]<br>pSptd->_netSeq++<br>return
ServantProxy ->> ServantProxy : 网络线程对应的队列写入数据:pReq->push_back(msg)
CommunicatorEpoll ->>+ CommunicatorEpoll : 通知网络线程:pObjProxy->getCommunicatorEpoll()-><br>notify(pSptd->_reqQNo, pReqQ)
CommunicatorEpoll ->> CommunicatorEpoll : * 每个网络线程在ServantProxyThreadData::<br>getData()时会获得一个全局序列号_reqQNo:iSeq = pSptd->_reqQNo<br>* 根据序列号为caller线程在已有socket数组中创建通知socket:<br>_notify[iSeq].notify.createSocket()<br>* 通知信息中加上队列:_notify[iSeq].stFDInfo.p = pReqQ<br>* 带上FDInfo类型的stFDInfo通知信息通知网络线程<br>(未connect或者listen的socket会被唤醒EPOLLHUP事件):<br>_ep.add(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN)
CommunicatorEpoll ->>- ServantProxy : return
ServantProxy ->> ServantProxy : 条件变量阻塞等待唤醒:msg->pMonitor->wait()
end
loop 网络线程
CommunicatorEpoll ->> CommunicatorEpoll : run()<br>网络线程被唤醒:evs = epoll_wait()<br>处理每一个唤醒事件:for (ev : evs) handle(ev)<br>从ev获取FDInfo类型的通知信息:pFDInfo = ev.data<br>caller线程事件:if(FDInfo::ET_C_NOTIFY == pFDInfo->iType)<br>从通知信息获取网络线程队列:ReqInfoQueue * pInfoQueue=pFDInfo->p<br>获取到msg:pInfoQueue->pop_front(msg)
CommunicatorEpoll ->>+ ObjectProxy : 获取到ObjectProxy:<br>msg->pObjectProxy->invoke(msg)
ObjectProxy ->>+ EndpointManager : 选择AdapterProxy:<br>selectAdapterProxy(msg, pAdatapterProxy)
EndpointManager ->> EndpointManager : 多种选择策略,比如轮询可用的服务端连接_activeProxys:<br>getNextValidProxy()
EndpointManager ->>+ AdapterProxy : 第一次连接时,检查是否可用:checkActive()
AdapterProxy ->>+ TcpTransceiver : reconnect()
TcpTransceiver ->> TcpTransceiver : 初始化socket操作:createSocket()<br>尝试连接:::connect()
TcpTransceiver ->>+ CommunicatorEpoll : 为创建的socket添加EPOLLIN|EPOLLOUT事件:<br>_adapterProxy->getObjProxy()-><br>getCommunicatorEpoll()-><br>addFd(EPOLLIN|EPOLLOUT)
CommunicatorEpoll ->>- TcpTransceiver : return
TcpTransceiver ->>- AdapterProxy : return
AdapterProxy ->>- EndpointManager : return
EndpointManager ->>- ObjectProxy : return pAdatapterProxy
ObjectProxy ->>+ AdapterProxy : pAdapterProxy->invoke(msg)
AdapterProxy ->> TcpTransceiver : 发送数据:_trans->sendRequest
TcpTransceiver ->> TcpTransceiver : 第一次请求时还未连接成功,直接return<br>后续请求时已经连接成功,先尝试::send直到EAGAIN
AdapterProxy ->> AdapterProxy : 没写完的,msg写入超时队列:_timeoutQueue->push(msg)
AdapterProxy ->>- ObjectProxy : return
ObjectProxy ->>- CommunicatorEpoll : return
CommunicatorEpoll ->> CommunicatorEpoll : run()<br>网络线程被唤醒:evs = epoll_wait()<br>处理每一个唤醒事件:for (ev : evs) handle(ev)<br>从ev获取FDInfo类型的通知信息:pFDInfo = ev.data<br>非caller线程事件:if(FDInfo::ET_C_NOTIFY != pFDInfo->iType)<br>从通知信息获取:Transceiver *pTransceiver = pFDInfo->p<br>handleOutputImp
CommunicatorEpoll ->>+ TcpTransceiver : 如果是isConnecting状态,那么连接上了:pTransceiver->setConnected()<br>pTransceiver->doRequest()
TcpTransceiver ->>+ AdapterProxy : _adapterProxy->doInvoke()
AdapterProxy ->>+ TcpTransceiver : 发送数据:_trans->sendRequest
TcpTransceiver ->> TcpTransceiver : 尝试::send直到EAGAIN<br>如果发送完成,从超时队列中删除:_timeoutQueue->popSend
TcpTransceiver ->>- AdapterProxy : return
AdapterProxy ->>- TcpTransceiver : return
CommunicatorEpoll ->> CommunicatorEpoll : run()<br>网络线程被唤醒:evs = epoll_wait()<br>处理每一个唤醒事件:for (ev : evs) handle(ev)<br>从ev获取FDInfo类型的通知信息:pFDInfo = ev.data<br>非caller线程事件:if(FDInfo::ET_C_NOTIFY != pFDInfo->iType)<br>从通知信息获取:Transceiver *pTransceiver = pFDInfo->p<br>handleInputImp
CommunicatorEpoll ->>+ TcpTransceiver : pTransceiver->doResponse()
TcpTransceiver ->> TcpTransceiver : readv直到完整收包
TcpTransceiver ->>- CommunicatorEpoll : 已经完整收包:return > 0
CommunicatorEpoll ->>+ AdapterProxy : pTransceiver->getAdapterProxy()->finishInvoke(*it)
AdapterProxy ->> AdapterProxy : 超时队列删除msg:_timeoutQueue->erase(msg)<br>finishInvoke()<br>唤醒caller线程msg->pMonitor->notify()
AdapterProxy ->>- CommunicatorEpoll : return
end
loop 请求发起线程
ServantProxy ->> ServantProxy : 被唤醒<br>rsp = msg->response
end

我阅读下来发现v1.0.0有个明显的缺点,所有的服务端第一次连接都需要走reconnect流程,也就是close一下再connect,这个挺不好的。后面的版本优化了这一块逻辑

序列号

客户端的线程私有变量有_netSeq_reqQNo两个序列号,初看代码时有点让人迷糊,简单区分一下

  • _netSeq

    这是用来自增,轮询选取网络线程的,同时也会用来选择向网络线程写入的队列ReqInfoQueue * _reqQueue[MAX_CLIENT_THREAD_NUM]

    这个队列也是线程私有变量,在通知网络线程时主要就是把它发给网络线程,让网络线程从中pop出msg来发包

  • _reqQNo

    caller线程和客户端网络线程通信时,是通过epoll来唤醒的,因此每个caller线程需要自己独一无二的唤醒socket

    因此有一个序列号管理器,在caller发起时申请一个,不需要时又可以回收

    caller线程通信所用的socket取自需要通信网络线程下NotifyInfo _notify[MAX_CLIENT_NOTIFYEVENT_NUM]这个事先创建好的当前CommunicatorEpoll唯一的socket数组,使用_notify[_reqQNo]即可

主控流程
sequenceDiagram
%%{init: { 'sequence': {'useMaxWidth':false, 'showSequenceNumbers':true} } }%%
loop 请求发起线程
Communicator ->>+ ServantProxy : stringToProxy("XXXObj", prx)
ServantProxy ->>+ ObjectProxy : 略
ObjectProxy ->>+ EndpointManager : _endpointManger.reset(new EndpointManager(this, ...))<br>init()<br>setObjName()<br>如果是ip端口: sEndpoints从objName里面取<br>如果是XXXOBJ,从Communicator初始化_queryFPrx这个ServantProxy:<br>_queryFPrx = _communicator->stringToProxy<QueryFPrx>()
ObjectProxy ->>- ServantProxy : return
ServantProxy ->>- Communicator : return
end
loop 网络线程
Communicator ->> EndpointManager : 如上个流程图:略
EndpointManager ->> EndpointManager : 刷新主控:refreshReg()<br>从主控的servantProxy拉可用的服务端activeEp:_queryFPrx->findObjectById()<br>doEndpoints(activeEp)
loop 如果请求主控失败
EndpointManager ->> ObjectProxy : if(!_valid) return
ObjectProxy ->> ObjectProxy :加入请求等待的超时队列:_reqTimeoutQueue.push(msg)<br>等待下一次请求时刷新主控,如果请求成功会重试请求
ObjectProxy ->> CommunicatorEpoll : return
end
loop 如果请求主控成功
EndpointManager ->> EndpointManager : notifyEndpoints(activeEp)<br>初始化AdapterProxy并加入_activeProxys<br>doNotify()
EndpointManager ->>+ ObjectProxy : _objectProxy->doInvoke()
ObjectProxy ->> ObjectProxy : 遍历超时队列:while(!_reqTimeoutQueue.empty()) _reqTimeoutQueue.pop()<br>_endpointManger->selectAdapterProxy(msg)<br>pAdapterProxy->invoke(msg)
ObjectProxy ->>- ObjectProxy : return
end
end
处理超时流程

在客户端中有两个超时队列

  • AdapterProxy的_timeoutQueue

    这是发送请求等待回包的超时,维护对每个服务端的

  • ObjectProxy的_reqTimeoutQueue

    这是对服务发现等待请求列表的超时,这是维护对主控的

sequenceDiagram
%%{init: { 'sequence': {'useMaxWidth':false, 'showSequenceNumbers':true} } }%%
loop 超时流程
CommunicatorEpoll ->> CommunicatorEpoll : run()<br>网络线程被唤醒<br>doTimeout()
CommunicatorEpoll ->>+ AdapterProxy : 对全部AdapterProxy:doTimeout()
AdapterProxy ->> AdapterProxy : 取出超时msg:while(_timeoutQueue->timeout(msg))<br>finishInvoke(msg)<br>唤醒caller线程msg->pMonitor->notify()
AdapterProxy ->>- CommunicatorEpoll : return
CommunicatorEpoll ->>+ ObjectProxy : _objectProxyFactory->getObjectProxy(i)->doTimeout()
ObjectProxy ->> ObjectProxy : 遍历_reqTimeoutQueue取出msg,执行doInvokeException(msg)<br>标识请求异常:msg->eStatus = ReqMessage::REQ_EXC<br>唤醒超时的caller线程msg->pMonitor->notify()
ObjectProxy ->>- CommunicatorEpoll : return
end

tars3.0流程分析

在客户端和服务端模块都完全不变的情况下,tars3.0在框架层面的流程有一些变化

在tars1.0中,服务端分为网络线程和业务线程,客户端分为caller线程和客户端网络线程

在tars3.0中,通过改变/tars/application/server/opencoroutine的值,有4种模式可以选择

  • NET_THREAD_QUEUE_HANDLES_THREAD = 0

    默认模式,和tars1.0一致

  • NET_THREAD_QUEUE_HANDLES_CO = 1

  • NET_THREAD_MERGE_HANDLES_THREAD = 2

  • NET_THREAD_MERGE_HANDLES_CO = 3

这4种模式可以做如下的归类:

  • NET_THREAD_QUEUE开头的0,1模式

    • 独立网络线程 + 独立业务线程
    • 网络线程负责收发包,通过队列唤醒handle线程中处理
  • NET_THREAD_MERGE开头的2,3模式

    • 合并网络线程 + 业务线程
    • 网络线程负责收发包,也负责原本在业务线程中处理的逻辑,这种模式下延时最小
    • PS:
      • 线程个数以处理线程配置为准, 网络线程配置无效
      • 如果是UDP, 则网络线程竞争接收包
  • HANDLES_THREAD结尾的0,2模式,服务端的业务线程中不启用协程处理,业务线程的流程和tars1.0一致

  • HANDLES_CO结尾的1,3模式,服务端的业务线程中启用协程处理,每个包都启动一个新协程

    • 业务线程启动的客户端,在协程模式下,会共用当前线程的协程

      因此在这两种模式下,在发起请求后会与服务端的业务线程共用同一个网络线程

    • 对3模式而言,由于服务端的网络线程和业务线程也合并了,这意味着整个服务端只有一个线程来处理,这种模式下的延时是4个模式中最小的

协程调度器流程

tars3.0最大的变化就是协程调度器,因此对其流程进行分析

sequenceDiagram
%%{init: { 'sequence': {'useMaxWidth':false, 'showSequenceNumbers':true} } }%%
TC_EpollServer ->> TC_EpollServer : waitForShutdown()<br>startHandle()<br>启动线程运行Handle::handleLoopCoroutine()
TC_EpollServer ->>+ TC_CoroutineScheduler : 向协程投递服务端处理协程:<br>_scheduler->go(&Handle::handleCoroutine)
TC_CoroutineScheduler ->>- TC_EpollServer : return
TC_EpollServer ->>+ TC_CoroutineScheduler : 执行主协程,直到服务端退出:<br>_scheduler->run()
loop 主协程循环
TC_CoroutineScheduler ->>+ TC_Epoller : 如果没有待执行的协程<br>执行_epoller->done(1000)
TC_Epoller ->> TC_Epoller : 触发定时事件<br>::epoll_wait()<br>触发EPOLLIN或EPOLLOUT等事件<br>例如TC_EpollServer::acceptCallback()等
TC_Epoller ->> TC_Epoller : 触发空闲事件<br>例如NET_THREAD_MERGE_HANDLES_THREAD模式下的<br>Handle::handleOnceThread<br>例如NET_THREAD_MERGE_HANDLES_CO模式下的<br>Handle::handleOnceCoroutine
TC_Epoller ->>- TC_CoroutineScheduler : return
TC_CoroutineScheduler ->> TC_CoroutineScheduler : wakeup()<br>唤醒被put()重新激活的协程,放进_acvive
TC_CoroutineScheduler ->> TC_CoroutineScheduler : wakeupbytimeout()<br>唤醒sleep的协程,放进_acvive
TC_CoroutineScheduler ->> TC_CoroutineScheduler : wakeupbyself()<br>唤醒暂时yield,随时可以唤醒的协程,放进_avail
TC_CoroutineScheduler ->> TC_CoroutineScheduler : 执行最多100个_acvive协程
TC_CoroutineScheduler ->> TC_CoroutineScheduler : 执行全部的_avail协程
end

可以看到,流程图中协程有执行active的,也有avail的。那么avail和active区别是什么?

  • avail的逻辑

    • go创建的协程放在avail

    • 调用yield(true)的协程放在_needActiveCoroId里面

      然后主协程定时调用wakeupbyself()唤醒_needActiveCoroId的协程,放进avail

  • active的逻辑

    • 调用yield(false)的协程不会自动唤醒

    • 需要调用put()把协程放到_activeCoroQueue里面

      然后主协程定时调用wakeup()唤醒_activeCoroQueue的协程,放进active

因此avail的协程是yield(true)以后不需要唤醒也可以跑,始终可用(avail),而active的协程调用了yield(false)需要put触发来激活(active)

另外一方面,每一轮主协程逻辑,被激活的协程一次只能跑100个,但是始终可用的协程比较重要,必须要执行

举例来说

  • 在HANDLES_CO的1,3模式下,TC_EpollServer::Handle::handleOnceCoroutine作为第一个被go()投递的协程(协程ID为1),需要定时yield(true),来切换到go跑的其他协程上去,它是可以自动从inactive变成avail的

    sequenceDiagram
    %%{init: { 'sequence': {'useMaxWidth':false, 'showSequenceNumbers':true} } }%%
    loop handleCoroutine协程
    TC_EpollServer ->> TC_EpollServer : 从网络收包协程取得数据:_dataBuffer->pop(data)
    TC_EpollServer ->>+ TC_CoroutineScheduler : 对每个pop的data<br>向协程投递单次业务逻辑处理协程:_scheduler->go(&Handle::handle)
    TC_CoroutineScheduler ->>- TC_EpollServer : return
    TC_EpollServer ->> TC_CoroutineScheduler : 挂起等待下一次执行:scheduler->yield()
    end
  • 在HANDLES_CO的1,3模式下,ServantHandle::handle在go()第一次投递时,是在avail里面,是可以自动从inactive变成avail的。

    sequenceDiagram
    %%{init: { 'sequence': {'useMaxWidth':false, 'showSequenceNumbers':true} } }%%
    loop handle协程
    ServantHandle ->> ServantHandle : handleTarsProtocol()
    ServantHandle ->>+ XXXServantImp: onDispatch()<br>执行对应接口实现
    XXXServantImp ->>- ServantHandle: return
    ServantHandle ->> ServantHandle : 回包流程:_servantHandle->sendResponse()
    ServantHandle ->>+ TC_EpollServer : 发送回包:_epollServer->send()
    TC_EpollServer ->>- ServantHandle : return
    end
  • 在HANDLES_CO的1,3模式下,运行ServantHandle::handle的业务逻辑中,如果有客户端rpc发包结束后这个caller协程在yield(false)后是inactive的,一定需要网络协程处理完以后,通过put唤醒了这个协程以后,才能继续工作

    将上面的客户端流程图中加入一次rpc调用如下

    sequenceDiagram
    %%{init: { 'sequence': {'useMaxWidth':false, 'showSequenceNumbers':true} } }%%
    loop handle协程
    ServantHandle ->> ServantHandle : handleTarsProtocol()
    ServantHandle ->>+ XXXServantImp: onDispatch()<br>执行对应接口实现
    XXXServantImp ->>+ ServantProxy : tars_invoke()
    ServantProxy ->>+ ServantProxy : 虽然接口名还叫选择网络线程,实际上同一网络线程:<br>selectNetThreadInfo()
    ServantProxy ->>+ CommunicatorEpoll: 直接发包,不进行通知:<br>msg->pObjectProxy->getCommunicatorEpoll()<br>->handle(pSptd->_reqQNo)
    CommunicatorEpoll ->>+ ObjectProxy : msg->pObjectProxy->invoke(msg)<br>后续流程和tars1.0一致,省略
    ObjectProxy ->>- CommunicatorEpoll : return
    CommunicatorEpoll ->>- ServantProxy : return
    ServantProxy ->> ServantProxy : 挂起等待被put唤醒:msg->sched->yield(false)
    ServantProxy ->>- XXXServantImp : return
    XXXServantImp ->>- ServantHandle: return
    end
    loop 网络协程
    TC_CoroutineScheduler ->>+ TC_Epoller : 如果没有待执行的协程,<br>执行_epoller->done(1000)<br>触发EPOLLIN或EPOLLOUT等事件
    TC_Epoller ->>+ CommunicatorEpoll : handleOutputImp()<br>后续流程和tars1.0一致,省略
    CommunicatorEpoll ->>- TC_Epoller : return
    TC_Epoller ->>- TC_CoroutineScheduler : return
    TC_CoroutineScheduler ->>+ TC_Epoller : 如果没有待执行的协程,<br>执行_epoller->done(1000)<br>触发EPOLLIN或EPOLLOUT等事件
    TC_Epoller ->>+ CommunicatorEpoll : handleInputImp()
    CommunicatorEpoll ->>+ TC_TCPTransceiver : _trans->doResponse()
    TC_TCPTransceiver ->> TC_TCPTransceiver : 对EGAIN等非阻塞的处理和tars1.0一致<br>循环读取数据解析协议<br>
    TC_TCPTransceiver ->>+ AdapterProxy : 调用_onParserCallback回调<br>_onParserCallback回调在initializeClient中进行注册<br>实际是AdapterProxy::onParserCallback
    AdapterProxy ->> AdapterProxy : 唤醒caller协程<br>finishInvoke(rsp)<br>finishInvoke_parallel()<br>finishInvoke(msg)<br>msg->sched->put(msg->iCoroId)
    AdapterProxy ->>- TC_TCPTransceiver : return
    TC_TCPTransceiver ->>- CommunicatorEpoll : return
    CommunicatorEpoll ->>- TC_Epoller : return
    TC_Epoller ->>- TC_CoroutineScheduler : return
    end
    loop handle协程
    ServantHandle ->> ServantHandle : 回包流程:_servantHandle->sendResponse()
    ServantHandle ->>+ TC_EpollServer : 发送回包:_epollServer->send()
    TC_EpollServer ->>- ServantHandle : return
    end

在加入协程调度器后,逻辑侧改动其实不大,只是把线程部分协程化了,协程化改动总结如下

  • 对服务端而言,网络线程变成了网络协程

    • 业务线程在NET_THREAD_MERGE的2,3模式运行在主协程中
    • 业务线程在HANDLES_CO的1,3模式下,包处理变成了业务协程
  • 对客户端而言

    • 如果caller线程存在协程调度器,那么网络线程变成了网络协程,中间使用协程的yield(true)和put()来完成原本的条件变量wait(),notify()部分
  • 由于服务端和客户端可能是同一个线程了,共用一个epoll,因此触发函数统一使用TC_Epoller::EpollInfo::registerCallback进行注册

    • 服务端

      • 为bindAdapter监听的端口的EPOLLIN注册TC_EpollServer::acceptCallback函数

      • 为accept到的socket的EPOLLIN和EPOLLOUT

        分别注册TC_EpollServer::Connection::handleInputImp和TC_EpollServer::Connection::handleOutputImp

    • 客户端

      • 为客户端发起连接的socket的EPOLLIN和EPOLLOUT

        分别注册CommunicatorEpoll::handleInputImp和CommunicatorEpoll::handleOutputImp

      • 在caller线程不存在协程调度器的情况下

        给客户端网络线程注册NetThread::processPipe来等待caller线程唤醒

NET_THREAD_MERGE模式

刚才的流程分析主要是基于HANDLES_CO的,在这里简单介绍一下NET_THREAD_MERGE模式是如何合并网络线程和业务线程的

sequenceDiagram
%%{init: { 'sequence': {'useMaxWidth':false, 'showSequenceNumbers':true} } }%%
loop 服务端主线程
TC_EpollServer ->> TC_EpollServer : waitForShutdown()
TC_EpollServer ->> TC_EpollServer : initHandle()<br>根据配置业务线程数量,来启动相应数量的网络线程<br>为网络线程设定业务线程的Handle::initialize()回调<br>如果是HANDLES_CO,那么给epoller的idle回调加入Handle::handleOnceCoroutine<br>如果是HANDLES_THREAD,那么给epoller的idle回调加入Handle::handleOnceThread
TC_EpollServer ->> TC_EpollServer : startHandle()<br>对每个业务线程,getNetThread()->start()
TC_EpollServer ->> TC_EpollServer : 等待服务端主线程退出
end
loop 服务端网络线程
NetThread ->> NetThread : 启动协程调度器流程:_scheduler->run()
end

性能测试

在根目录的examples/StressDemo下就有性能测试的代码

StressImp的服务端代码如下

1
2
3
4
5
tars::Int32 StressImp::testStr(const std::string& in, std::string &out, tars::TarsCurrentPtr current)
{
out = in;
return 0;
}

现在对tars模式NET_THREAD_QUEUE_HANDLES_THREAD,NET_THREAD_QUEUE_HANDLES_CO,NET_THREAD_MERGE_HANDLES_CO和公司框架一起做测试

服务端都是相同网络线程,客户端相同模式下使用5个线程并行发起请求

服务端不限cpu

首先进行服务端不限cpu的测试

pthread id一行会分别打印线程id和消耗时间

  • NET_THREAD_QUEUE_HANDLES_THREAD
1
2
3
4
5
6
7
8
9
10
root@121:~/tars/v3.0.0# ./build/bin/TarsStressServer --config=./config.conf
OpenCoroutine(opencoroutine) 0
servant TestApp.BServer.StressObj
endpoint tcp -h 127.0.0.1 -p 9100 -t 10000

root@121:~/tars/v3.0.0# ./build/bin/TarsStressClient 5 1500000 "TestApp.BServer.StressObj@tcp -h 127.0.0.1 -p 9100" 4
init tp succ
times:1500000
pthread id: 140093070972672 | 120493
succ:1500000
  • NET_THREAD_QUEUE_HANDLES_CO
1
2
3
4
5
6
7
8
9
10
root@121:~/tars/v3.0.0# ./build/bin/TarsStressServer --config=./config_coro.conf
OpenCoroutine(opencoroutine) 1
servant TestApp.AServer.StressObj
endpoint tcp -h 127.0.0.1 -p 9000 -t 10000

root@121:~/tars/v3.0.0# ./build/bin/TarsStressClient 5 1500000 "TestApp.AServer.StressObj@tcp -h 127.0.0.1 -p 9000" 4
init tp succ
times:1500000
pthread id: 140629329286912 | 116722
succ:1500000
  • NET_THREAD_MERGE_HANDLES_CO
1
2
3
4
5
6
7
8
9
10
root@121:~/tars/v3.0.0# ./build/bin/TarsStressServer --config=./config_coro3.conf
OpenCoroutine(opencoroutine) 3
servant TestApp.DServer.StressObj
endpoint tcp -h 127.0.0.1 -p 9300 -t 10000

root@121:~/tars/v3.0.0# ./build/bin/TarsStressClient 5 1500000 "TestApp.DServer.StressObj@tcp -h 127.0.0.1 -p 9300" 4
init tp succ
times:1500000
pthread id: 140555908507392 | 97816
succ:1500000
  • 公司框架
1
2
3
4
5
6
7
8
9
root@121:~/taf_test/server# ./Server --config=config.conf
servant TestApp.CServer.StressObj
endpoint tcp -h 127.0.0.1 -p 9200 -t 10000

root@121:~/taf_test/client# ./Client 5 1500000 "TestApp.CServer.StressObj@tcp -h 127.0.0.1 -p 9200" 4
init tp succ
times:1500000
pthread id: 140642864662272 | 131473
succ:1500000

服务端限制1核

  • NET_THREAD_QUEUE_HANDLES_THREAD

    1
    2
    3
    4
    init tp succ
    times:1500000
    pthread id: 139798232692480 | 198703
    succ:1500000
  • NET_THREAD_QUEUE_HANDLES_CO

    1
    2
    3
    4
    init tp succ
    times:1500000
    pthread id: 139790615820032 | 177378
    succ:1500000
  • NET_THREAD_MERGE_HANDLES_CO

    1
    2
    3
    4
    init tp succ
    times:1500000
    pthread id: 140052203300608 | 104109
    succ:1500000
  • 公司框架

    1
    2
    3
    4
    init tp succ
    times:1500000
    pthread id: 139851859920640 | 228572
    succ:1500000

小结

服务端不限cpu(qps) 服务端限制1核(qps) 限制后性能降低
NET_THREAD_QUEUE_HANDLES_THREAD 12448 7548 39.36%
NET_THREAD_QUEUE_HANDLES_CO 12851 8456 34.19%
NET_THREAD_MERGE_HANDLES_CO 15334 14407 6.04%
公司框架 11409 6562 42.48%

稍微整理可知,NET_THREAD_MERGE_HANDLES_CO由于减少了线程切换,因此性能确实是最好的,在限制cpu后,性能降低也是最少的

NET_THREAD_QUEUE_HANDLES_CO相对NET_THREAD_QUEUE_HANDLES_THREAD,业务模块使用协程来处理以后,性能也得到了提升

公司框架相对而言就差了不少,特别是在模拟资源受限的情况下,一方面,qps只有NET_THREAD_MERGE_HANDLES_CO的45%,另外一方面性能损失也特别大,存在较大优化空间

额外小知识

在分析tars源码的时候,发现在通知网络线程的时候时候是EPOLLIN,有时候是EPOLLOUT,然后不需要读写。这样居然能唤醒网络线程?

这就超出我知识盲区了,我在自己的rpc框架中写过类似代码,但是是把管道加入了epoll中,对其读写来唤醒的

根据tc_epoller.h写了一下测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <sys/types.h>
#include <sys/socket.h>
#include <thread>
#include <unistd.h>
#include <iostream>

#include "tc_epoller.h"

using namespace std;

int main() {
tars::TC_Epoller ep;
ep.create(1000);

auto sock = socket(AF_INET, SOCK_STREAM, 0);

std::thread t([&]() {
int num = ep.wait(-1);
for (int i = 0; i < num; ++i) {
const epoll_event& ev = ep.get(i);
cout << ev.data.u64 << endl;
cout << ev.events << endl;
}
});

sleep(1);

ep.add(sock, 100, EPOLLIN);

t.join();
}

编译运行

1
2
3
4
root:~/tars/epoll_test# g++ -std=c++11 -g -Wall main.cpp tc_epoller.cpp -o out.exe -lpthread
root:~/tars/epoll_test# ./out.exe
100
16

确实被唤醒了,读到了写入的数据100,唤醒的事件是16,也就是EPOLLHUP。

查了一下资料,即使对于阻塞模式下也确实如此Why am I getting the EPOLLHUP event on a brand new socket

See my other comment with the pastebin code. An uninitialized (i.e. before connect / listen) socket always seems to cause EPOLLHUP (at least when in blocking mode)

嗯,又学到了一手