tars C++源码分析

由于公司使用的taf框架是和开源的tars框架一脉相承,虽然经过几年的改造,几乎已经面目全非了,但是主体结构上相差不大

因此从tars框架的最初版本的源码分析上就可以理解整个核心链路了

本篇会分析开源的tars1.0版本和tars3.0版本,并探索3.0版本的优化原因,最后对他们的性能做一些比较

计划中还有一篇公司taf框架的分析,很有趣的是它的发展方向和tars3.0不太一致,因此可以对其性能和tars性能也做一番比较,遗憾的是出于保密需要,无法将其post在我的博客上了

tars编译问题

tars的不同版本在我的ubuntu 20.04上都遇到了编译问题

tars1.0

在编译tars v1.0.0时遇到如下报错

1
2
3
4
5
Linking CXX executable tars2node
/usr/bin/ld: ../../util/lib/libtarsutil.a(tc_encoder.cpp.o): in function `tars::TC_Encoder::gbk2utf8(char*, int&, char const*, int)':
/root/tars/TarsCpp/util/src/tc_encoder.cpp:39: undefined reference to `libiconv_open'
/usr/bin/ld: /root/tars/TarsCpp/util/src/tc_encoder.cpp:75: undefined reference to `libiconv_close'
/usr/bin/ld: /root/tars/TarsCpp/util/src/tc_encoder.cpp:66: undefined reference to `libiconv'

但实际上我是安装了libiconv的,make --trace看下是什么问题,显示具体命令为

1
cd /root/tars/TarsCpp/build/tools/tars2node && /root/env/vim_download/cmake-3.16.8-Linux-x86_64/bin/cmake -E cmake_link_script CMakeFiles/tars2node.dir/link.txt --verbose=

确实在/root/tars/TarsCpp/build/tools/tars2node/CMakeFiles/tars2node.dir/link.txt没有链接libiconv

1
2
~/tars/TarsCpp/build# cat  /root/tars/TarsCpp/build/tools/tars2node/CMakeFiles/tars2node.dir/link.txt
/usr/bin/c++ -std=c++11 -g -O2 -Wall -Wno-deprecated -g -O2 -Wall -rdynamic CMakeFiles/tars2node.dir/code_generator.cpp.o CMakeFiles/tars2node.dir/file_util.cpp.o CMakeFiles/tars2node.dir/gen_js.cpp.o CMakeFiles/tars2node.dir/gen_js_dts.cpp.o CMakeFiles/tars2node.dir/gen_proxy.cpp.o CMakeFiles/tars2node.dir/gen_proxy_dts.cpp.o CMakeFiles/tars2node.dir/gen_server.cpp.o CMakeFiles/tars2node.dir/gen_server_dts.cpp.o CMakeFiles/tars2node.dir/gen_server_imp.cpp.o CMakeFiles/tars2node.dir/idl_scan.cpp.o CMakeFiles/tars2node.dir/idl_util.cpp.o CMakeFiles/tars2node.dir/main.cpp.o -o tars2node ../../util/lib/libtarsutil.a ../lib/libtarsparse.a ../../util/lib/libtarsutil.a

在link.txt后追加-liconv后回到build目录继续编译可以顺利完成

1
mkdir build && cd build && cmake .. && sed -i "s/$/& -liconv/g" tools/tars2node/CMakeFiles/tars2node.dir/link.txt && make -j

tars3.0

tars v3.0.0通过APPLE开关来选择链接libiconv

另外,默认不再编译example,通过ONLY_LIB开关来选择编译example

1
rm -rf build && mkdir build && cd build && cmake .. -DONLY_LIB=OFF -DAPPLE=ON && bear make -j

tars1.0流程分析

tars基金会对tars的1.0版本有过一个很好的分析,珠玉在前,对tars1.0的流程分析没有这么详细,更多的是自我体悟的总结

服务端

首先是对主要组件做一个大致介绍

Application

tars的服务端入口类为Application,可以认为Application即为服务端,他主要是对tars的配置文件等进行读取,从读取的内容对TC_EpollServer进行操作从而运行服务端

TC_EpollServer

服务端的主体逻辑都在TC_EpollServer中,它由两个重要模块组成:网络模块,业务模块

网络模块

NetThread

代表了网络模块的单个线程,使用TC_Epoller作为网络事件驱动

并使用TC_Socket,TC_BufferPool,Connection和ConnectionList来处理收发包的网络操作

服务端的流程一般为bind()->accept()->read()->write()->close()

其中bind, accept的逻辑被抽到了BindAdapter(在BindAdapter中分析),其他网络逻辑都在NetThread中

配置文件的/tars/application/server中的”netthread”字段记录了网络线程的数量

业务模块

Handle

代表了业务模块的单个线程,业务线程以条件变量的方式阻塞在死循环中,接受由网络线程派发过来的请求包

然后解包选择不同的调用业务模块的相关代码(IDL相关RPC接口的实现)

HandleGroup

组合了业务模块的单个线程,用于和BindAdapter交互(在BindAdapter中分析)

网络模块和业务模块的桥梁:BindAdapter

作为网络模块和业务模块之间的桥梁

BindAdapter顾名思义:绑定适配器,代表了业务模块对外服务时绑定的tcp端口

由于绑定了tcp端口,因此它封装了bind,accept相关的代码,网络相关的代码从网络模块提取到这里来

/tars/application/server下面的每个xxxAdapter都是BindAdapter配置

再具体一些来说,以典型的配置文件的为例:

1
2
3
4
5
6
7
<Test.StressServer.StressObjAdapter>
endpoint = tcp -h 127.0.0.1 -p 9200 -t 10000
threads = 1
servant = Test.StressServer.StressServantObj
protocol = taf
handlegroup = AAdapter
</Test.StressServer.StressObjAdapter>
对接网络模块

网络模块的线程组中第一个线程的TC_Epoller会注册BindAdapter中的TC_Socket来监听accept事件(避免多线程监听同一个fd的惊群效应)

在网络模块完成了RPC请求包的收包以后,就会将请求包发给BindAdapter的无锁队列,唤醒业务线程将其取出处理

/tars/application/server/xxxAdapter下:

  • endpoint代表了监听的ip,端口
对接业务模块

/tars/application/server/xxxAdapter下:

  • servant字段指定了和在Server类初始化时,XXXServantImp类(继承IDL相关RPC接口的实现)对应的XXXServantObj

    1
    2
    3
    void StressServer::initialize() {
    addServant<StressServantImp>(ServerConfig::Application + "." + ServerConfig::ServerName + ".StressServantObj");
    }

    如果这里的StressServantObj在配置文件没有找到任何servant,StressServer会无法启动

    • Application初始化配置文件时,会通过ServantHelperManager将BindAdapter的ID(XXXObjAdapter)和“XXXServantObj”字符串绑定起来

      1
      2
      _adapter_servant["XXXObjAdapter"] = "XXXServantObj";
      _servant_adapter["XXXServantObj"] = "XXXObjAdapter";

      这里的映射关系也意味着Servant和BindAdapter是一对一的,否则通过servant找BindAdapter会失败

    • addServant时将业务逻辑的实现代码类(XXXServantImp)绑定到"XXXServantObj"字符串上,使用"XXXServantObj"字符串可以实例化XXXServantImp类

      1
      _servant_creator["XXXServantObj"] = new ServantCreation<XXXServantImp>();
  • handlegroup字段指定了绑定的业务线程组HandleGroup的ID

    HandleGroup和BindAdapter也是一对一,这样可以保证每个业务跑在自己的业务线程组里

    以下代码限制了不一样时会直接抛出异常

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    void Application::main(int argc, char *argv[]) {
    ...
    for (size_t i = 0; i < adapters.size(); ++i) {
    string name = adapters[i]->getName();
    string groupName = adapters[i]->getHandleGroupName();
    if(name != groupName) {
    TC_EpollServer::BindAdapterPtr ptr = _epollServer->getBindAdapter(groupName);
    if (!ptr) {
    throw runtime_error("[TAF][adater `" + name + "` setHandle to group `" + groupName + "` fail!");
    }
    }
    }
    ...
    }
  • threads字段指定了HandleGroup下存在多少个ServantHandle

Handle初始化核心代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void ServantHandle::initialize() {
for (adpit = adapters.begin(); adpit != adapters.end(); ++adpit)
//adpit->first = "XXXObjAdapter"
//servant = XXXServantImp
ServantPtr servant = ServantHelperManager::getInstance()->create(adpit->first);
//servant->getName() = "XXXServantObj"
_servants[servant->getName()] = servant;
}
}
ServantPtr ServantHelperManager::create(const string &sAdapter) {
//sAdapter = "XXXObjAdapter"
//s = "XXXServantObj"
string s = _adapter_servant[sAdapter];
if(_servant_creator.find(s) != _servant_creator.end())
//servant = XXXServantImp
servant = _servant_creator[s]->create(s);
return servant;
}
template<class T> struct ServantCreation {
ServantPtr create(const string &s) { T *p = new T; p->setName(s); return p; }
};
  • Handle初始化时,会遍历对应HandleGroup相关的BindAdapter的ID("XXXObjAdapter")

    通过_adapter_servant的映射关系,找到”XXXServantObj“

    通过_servant_creator的映射关系,找到new ServantCreation<XXXServantImp>()

    这个工厂类,可以实例化XXXServantImp返回servant,然后servant->setName(”XXXServantObj“),servant->getName()也就是"XXXServantObj"了

  • 从而在被网络线程唤醒后,读取无锁队列的请求包协议字段中的BindAdapter的ID("XXXServantObj")

    通过_servants的映射关系,找到对应实例化的XXXServantImp类来进行处理

这里逻辑有点复杂的原因,是虽然Servant,HandleGroup和BindAdapter都是一对一关系,只有和Handle是一对多的关系

但是代码里面可能有历史包袱的关系,所有代码都处理了多对多关系,很容易被迷惑

小结

引用自微服务开源框架TARS的RPC源码解析 之 初识TARS C++服务端的服务端相关类图如下:

可以看到BindAdapter处于类图中的C位

BindAdapter作为网络模块和业务模块之间的桥梁,主要承担了单向通知的作用

这是因为网络模块要通知业务模块只能走条件变量的方式,而业务模块通知网络模块可以通过io事件

当网络模块收包逻辑处理完后,通知BindAdapter这个适配器,再由BindAdapter的方法去通知业务模块

业务模块对网络模块是不可见的,从而达到解耦

服务端流程

引用自微服务开源框架TARS的RPC源码解析 之 初识TARS C++服务端的流程图已经很好的进行了分析,因此不再细述,简单的将各部分流程图扒过来,最后总结一下总流程图

模块初始化流程

网络模块初始化

网络模块的初始化主要初始化各个网络线程,并且在第一个网络线程为BindAdapter监听端口

业务模块初始化

业务模块的初始化主要就是完成对接业务模块中所述部分,用于在业务模块:处理RPC请求时,能够根据请求包进行反射,由对应的XXXServantImp类来处理

最后启动Handle线程

微服务开源框架TARS的RPC源码解析 之 初识TARS C++服务端一文中有一段这样的描述

由于Handle是继承自TC_Thread的,在执行Handle::start()中,会执行虚函数Handle::run(),在Handle::run()中主要是执行两个函数,一个是ServantHandle::initialize(),另一个是Handle::handleImp():

1
2
3
4
5
6
void TC_EpollServer::Handle::run()
{
initialize();

handleImp();
}

但是实际上,run是一个虚函数,因此执行的应当是ServantHandle,在这里引入了还未稳定的协程逻辑,由于文章没有分析协程,因此文章大体上还是对的,只是有些瑕疵

1
2
3
4
5
6
7
8
9
10
11
12
13
void ServantHandle::run()
{
initialize();

if(!ServerConfig::OpenCoroutine)
{
handleImp();
}
else
{
//协程代码
}
}

请求的工作流程

网络模块
接受客户端链接
接收RPC请求
发送RPC响应
业务模块
处理RPC请求

小结

核心流程如图,可以看到在总流程中BindAdapter在中间,起到了网络模块和业务模块之间桥梁的作用

sequenceDiagram
participant NetThread
participant TC_Epoller
participant Connection
participant ServantHandle
participant BindAdapter
loop 网络线程
NetThread ->>+ TC_Epoller : 初始化:为BindAdapter的socket<br>添加 ET_LISTEN | EPOLLIN事件
TC_Epoller ->>- NetThread : return
NetThread ->>+ TC_Epoller : epoll_wait()
TC_Epoller ->>- NetThread: 触发ET_LISTEN事件
NetThread ->> NetThread : 创建 Connection<br>选取NetThread并加入NetThread的ConnectionList<br>为accepted的socket添加EPOLLIN | EPOLLOUT事件

NetThread ->>+ TC_Epoller : epoll_wait()
TC_Epoller ->>- NetThread : 触发EPOLLIN事件
NetThread ->>+ NetThread : processNet()<br>创建待解析数据:deque<tagRecvData *>> vRecvData
NetThread ->>+ Connection : recv()
Connection ->> Connection : ::read()处理EAGAIN等<br>解析协议:parseProtocol(vRecvData)<br>放入解析数据:vRecvData.push_back(recv)
Connection ->>- NetThread : 返回解析数据大小: return recv.size()
NetThread ->>+ BindAdapter : insertRecvQueue(vRecvData)
BindAdapter ->> BindAdapter : 解析数据放入BindAdapter接收的数据队列:_rbuffer.push_back(vRecvData)<br>唤醒业务线程条件变量处理:_handleGroup->monitor.notify();
BindAdapter ->>- NetThread : return
NetThread ->>- NetThread : return
end
loop 业务线程
ServantHandle ->>+ BindAdapter : _handleGroup->monitor.timedWait()<br>waitForRecvQueue()
BindAdapter ->> BindAdapter : _rbuffer.pop_front()
BindAdapter ->>- ServantHandle : return stRecvData
alt 如果 stRecvData.isOverload
	ServantHandle ->> ServantHandle : handleOverload(stRecvData)
else 如果 stRecvData.isClosed
	ServantHandle ->> ServantHandle : handleClose(stRecvData)
else 如果 (now - stRecvData.recvTimeStamp) > adapter->getQueueTimeout()
	ServantHandle ->> ServantHandle : handleTimeout(stRecvData)
else 如果 一切正常
	ServantHandle ->>+ ServantHandle : handle()
	ServantHandle ->>+ TarsCurrent : 初始化请求上下文<br>current = new TarsCurrent(this)<br>current->initialize(stRecvData)
	TarsCurrent ->>- ServantHandle : return
	ServantHandle ->>+ ServantHandle : handleTarsProtocol()
	ServantHandle ->>+ XXXServantImp: vector<char> buffer<br>int ret = onDispatch(current, buffer)<br>执行对应接口实现
	XXXServantImp ->>- ServantHandle: return
	ServantHandle ->>+ TarsCurrent: current->sendResponse(ret, buffer)
	TarsCurrent ->> TarsCurrent: ResponsePacket response<br>response.sbuffer = buffer<br>response转换成string sSendBuffer<br>_pEpollServer->send(sSendBuffer)<br>NetThread* netThread = getNetThreadOfFd(fd)
	TarsCurrent ->>- ServantHandle: return
	ServantHandle ->>- ServantHandle : return
	ServantHandle ->>- ServantHandle : return
	ServantHandle ->>+ NetThread : netThread->send(sSendBuffer)
	NetThread ->>+ NetThread : tagSendData* send = new tagSendData()<br>send->buff = sSendBuffer<br>_sbuffer.push_back(send)
	NetThread ->> TC_Epoller : 通知ET_NOTIFY事件:<br>_epoller.mod(<br>_notify.getfd(), H64(ET_NOTIFY), EPOLLOUT)
	NetThread ->>- NetThread : return
	NetThread ->>- ServantHandle : return
end
end
loop 网络线程
NetThread ->>+ TC_Epoller : epoll_wait()
TC_Epoller ->>- NetThread : 触发ET_NOTIFY事件
NetThread ->>+ NetThread : processPipe()<br>创建待发送数据:deque<tagSendData *> deSendData<br>从发送队列读取发送数据:_sbuffer.swap(deSendData)<br>
NetThread ->>+ Connection: for iter : deSendData<br>(*iter)->buffer是业务线程中onDispatch处理完的buffer: send((*iter)->buffer)
Connection ->> Connection: ::send()处理EAGAIN等<br>没发完buffer的存到_sendbuffer:<br>for (slice : buffer) _sendbuffer.push_back(slice)
Connection ->>- NetThread : return
NetThread ->>- NetThread : return

NetThread ->>+ TC_Epoller : epoll_wait()
TC_Epoller ->>- NetThread : 触发EPOLLOUT事件
NetThread ->>+ NetThread : processNet()
NetThread ->>+ Connection : ::send()发送_sendbuffer中存起来的
Connection ->>- Connection : return
NetThread ->>- NetThread : return

NetThread ->>+ TC_Epoller : epoll_wait()
TC_Epoller ->>- NetThread : 触发EPOLLIN事件
NetThread ->>+ NetThread : processNet()
NetThread ->>+ Connection : recv()
Connection ->> Connection : 客户端关闭连接:::read() <= 0
Connection ->>- NetThread : return -1
NetThread ->>+ NetThread: 删除连接:delConnection(EM_CLIENT_CLOSE)
NetThread ->>- NetThread : return
NetThread ->>- NetThread : return
end

客户端

Communicator

Communicator代表了一个客户端实体,Communicator中维护了一组客户端网络线程等数据,因此创建和销毁的代价很高。一般通过Application::getCommunicator()获得一个全局唯一的客户端实体

如果非常重要的客户端,发送紧急控制命令不希望被其他客户端干扰,对这个客户端new一个Communicator也是可以的

网络模块

CommunicatorEpoll

网络线程的代码全部在CommunicatorEpoll中,CommunicatorEpoll的生命周期由Communicator管理,和Communicator是聚合关系

这样对同一个Communicator而言,不同的服务访问,都可以复用同一组网络线程

服务代理模块

ServantProxy

ServantProxy代表了对一个服务进行访问的客户端代理

JCE生成的XXXProxy会继承ServantProxy

一般通过Communicator的方法template<class T> void stringToProxy()获取到JCE生成的XXXProxy,就可以使用基类ServantProxy的各种接口了

ObjectProxy

ServantProxy管理了一组ObjectProxy,ObjectProxy代表了这个服务代理下的每个网络线程,依赖CommunicatorEpoll来进行操作

因此ServantProxy可以通过负载均衡策略选择不同的ObjectProxy也就是不同的网络线程

AdapterProxy

ObjectProxy管理了一组AdapterProxy,AdapterProxy代表了服务的每个不同实例(IP端口)

小结

网络模块和服务代理模块是并列关系,服务代理模块的3个模块从上往下,都是聚合的关系

看类图会更加清晰

引用自微服务开源框架TARS的RPC源码解析 之 初识TARS C++客户端的客户端相关类图如下:

客户端流程

一样,引用来自微服务开源框架TARS的RPC源码解析 之 初识TARS C++客户端的流程图

模块初始化流程

这个流程图有一个第5步略去了:如下

请求的工作流程

请求发起线程
客户端网络线程
发送RPC请求
接收RPC请求

小结

引用的几个流程图其实不太细致

  • 没有介绍主控逻辑(服务发现),服务发现对客户端来说是很重要的一环

  • 没有介绍超时逻辑

  • 非阻塞socket的流程会复杂很多

    connect,send,readv都是有可能多次才能完成的,在流程图中没有体现

sequenceDiagram
loop 请求发起线程
ServantProxy ->> ServantProxy : 调用JCE生成接口代码:prx->test()<br>tars_invoke(..., rsp)<br>初始化本次请求包:auto msg = new ReqMessage()<br>获取线程私有变量:auto pSptd = ServantProxyThreadData::getData()<br>选择网络线程:selectNetThreadInfo(pSptd, pObjectProxy, pReqQ)
ServantProxy ->> ServantProxy : 对线程变量_netSeq自增,轮询选择网络线程:<br>选择网络线程对应pObjectProxy:pObjProxy = *(_objectProxy + pSptd->_netSeq)<br>选择线程变量中,网络线程对应的队列:pReqQ = pSptd->_reqQueue[pSptd->_netSeq]<br>pSptd->_netSeq++<br>return
ServantProxy ->> ServantProxy : 网络线程对应的队列写入数据:pReq->push_back(msg)
CommunicatorEpoll ->>+ CommunicatorEpoll : 通知网络线程:pObjProxy->getCommunicatorEpoll()-><br>notify(pSptd->_reqQNo, pReqQ)
CommunicatorEpoll ->> CommunicatorEpoll : * 每个网络线程在ServantProxyThreadData::<br>getData()时会获得一个全局序列号_reqQNo:iSeq = pSptd->_reqQNo<br>* 根据序列号为caller线程在已有socket数组中创建通知socket:<br>_notify[iSeq].notify.createSocket()<br>* 通知信息中加上队列:_notify[iSeq].stFDInfo.p = pReqQ<br>* 带上FDInfo类型的stFDInfo通知信息通知网络线程<br>(未connect或者listen的socket会被唤醒EPOLLHUP事件):<br>_ep.add(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN)
CommunicatorEpoll ->>- ServantProxy : return
ServantProxy ->> ServantProxy : 条件变量阻塞等待唤醒:msg->pMonitor->wait()
end
loop 网络线程
CommunicatorEpoll ->> CommunicatorEpoll : run()<br>网络线程被唤醒:evs = epoll_wait()<br>处理每一个唤醒事件:for (ev : evs) handle(ev)<br>从ev获取FDInfo类型的通知信息:pFDInfo = ev.data<br>caller线程事件:if(FDInfo::ET_C_NOTIFY == pFDInfo->iType)<br>从通知信息获取网络线程队列:ReqInfoQueue * pInfoQueue=pFDInfo->p<br>获取到msg:pInfoQueue->pop_front(msg)
CommunicatorEpoll ->>+ ObjectProxy : 获取到ObjectProxy:<br>msg->pObjectProxy->invoke(msg)
ObjectProxy ->>+ EndpointManager : 选择AdapterProxy:<br>selectAdapterProxy(msg, pAdatapterProxy)
EndpointManager ->> EndpointManager : 多种选择策略,比如轮询可用的服务端连接_activeProxys:<br>getNextValidProxy()
EndpointManager ->>+ AdapterProxy : 第一次连接时,检查是否可用:checkActive()
AdapterProxy ->>+ TcpTransceiver : reconnect()
TcpTransceiver ->> TcpTransceiver : 初始化socket操作:createSocket()<br>尝试连接:::connect()
TcpTransceiver ->>+ CommunicatorEpoll : 为创建的socket添加EPOLLIN|EPOLLOUT事件:<br>_adapterProxy->getObjProxy()-><br>getCommunicatorEpoll()-><br>addFd(EPOLLIN|EPOLLOUT)
CommunicatorEpoll ->>- TcpTransceiver : return
TcpTransceiver ->>- AdapterProxy : return
AdapterProxy ->>- EndpointManager : return
EndpointManager ->>- ObjectProxy : return pAdatapterProxy
ObjectProxy ->>+ AdapterProxy : pAdapterProxy->invoke(msg)
AdapterProxy ->> TcpTransceiver : 发送数据:_trans->sendRequest
TcpTransceiver ->> TcpTransceiver : 第一次请求时还未连接成功,直接return<br>后续请求时已经连接成功,先尝试::send直到EAGAIN
AdapterProxy ->> AdapterProxy : 没写完的,msg写入超时队列:_timeoutQueue->push(msg)
AdapterProxy ->>- ObjectProxy : return
ObjectProxy ->>- CommunicatorEpoll : return
CommunicatorEpoll ->> CommunicatorEpoll : run()<br>网络线程被唤醒:evs = epoll_wait()<br>处理每一个唤醒事件:for (ev : evs) handle(ev)<br>从ev获取FDInfo类型的通知信息:pFDInfo = ev.data<br>非caller线程事件:if(FDInfo::ET_C_NOTIFY != pFDInfo->iType)<br>从通知信息获取:Transceiver *pTransceiver = pFDInfo->p<br>handleOutputImp
CommunicatorEpoll ->>+ TcpTransceiver : 如果是isConnecting状态,那么连接上了:pTransceiver->setConnected()<br>pTransceiver->doRequest()
TcpTransceiver ->>+ AdapterProxy : _adapterProxy->doInvoke()
AdapterProxy ->>+ TcpTransceiver : 发送数据:_trans->sendRequest
TcpTransceiver ->> TcpTransceiver : 尝试::send直到EAGAIN<br>如果发送完成,从超时队列中删除:_timeoutQueue->popSend
TcpTransceiver ->>- AdapterProxy : return
AdapterProxy ->>- TcpTransceiver : return
CommunicatorEpoll ->> CommunicatorEpoll : run()<br>网络线程被唤醒:evs = epoll_wait()<br>处理每一个唤醒事件:for (ev : evs) handle(ev)<br>从ev获取FDInfo类型的通知信息:pFDInfo = ev.data<br>非caller线程事件:if(FDInfo::ET_C_NOTIFY != pFDInfo->iType)<br>从通知信息获取:Transceiver *pTransceiver = pFDInfo->p<br>handleInputImp
CommunicatorEpoll ->>+ TcpTransceiver : pTransceiver->doResponse()
TcpTransceiver ->> TcpTransceiver : 准备_recvBuffer和栈上缓冲区用于readv收包:<br>vecs[0].iov_base = _recvBuffer.WriteAddr()<br>vecs[1].iov_base = stackBuffer
TcpTransceiver ->> TcpTransceiver : readv后全部收包到_recvBuffer:<br>readv(vecs, 2)<br>_recvBuffer.PushData(stackBuffer)
TcpTransceiver ->>+ AppProtocol : 对_recvBuffer增量解包:<br>_adapterProxy->getObjProxy()<br>->getProxyProtocol()<br>.responseFunc(_recvBuffer)
alt 如果完整收到包
	AppProtocol ->>- TcpTransceiver : return
	TcpTransceiver ->>- CommunicatorEpoll : return
	CommunicatorEpoll ->>+ AdapterProxy : pTransceiver->getAdapterProxy()->finishInvoke(*it)
	AdapterProxy ->> AdapterProxy : 超时队列删除msg:_timeoutQueue->erase(msg)<br>finishInvoke()<br>唤醒caller线程msg->pMonitor->notify()
	AdapterProxy ->>- CommunicatorEpoll : return
else 如果包太大或者有其他问题,抛出异常
	AppProtocol ->> TcpTransceiver : throw exception
	TcpTransceiver ->> TcpTransceiver : close()
	TcpTransceiver ->> CommunicatorEpoll : return
	CommunicatorEpoll ->> CommunicatorEpoll : 走超时流程唤醒caller线程
end
end
loop 请求发起线程
ServantProxy ->> ServantProxy : 被唤醒<br>rsp = msg->response
end

我阅读下来发现v1.0.0有个明显的缺点,所有的服务端第一次连接都需要走reconnect流程,也就是close一下再connect,这个挺不好的。后面的版本优化了这一块逻辑

序列号

客户端的线程私有变量有_netSeq_reqQNo两个序列号,初看代码时有点让人迷糊,简单区分一下

  • _netSeq

    这是用来自增,轮询选取网络线程的,同时也会用来选择向网络线程写入的队列ReqInfoQueue * _reqQueue[MAX_CLIENT_THREAD_NUM]

    这个队列也是线程私有变量,在通知网络线程时主要就是把它发给网络线程,让网络线程从中pop出msg来发包

  • _reqQNo

    caller线程和客户端网络线程通信时,是通过epoll来唤醒的,因此每个caller线程需要自己独一无二的唤醒socket

    因此有一个序列号管理器,在caller发起时申请一个,不需要时又可以回收

    caller线程通信所用的socket取自需要通信网络线程下NotifyInfo _notify[MAX_CLIENT_NOTIFYEVENT_NUM]这个事先创建好的当前CommunicatorEpoll唯一的socket数组,使用_notify[_reqQNo]即可

主控流程
sequenceDiagram
loop 请求发起线程
Communicator ->>+ ServantProxy : stringToProxy("XXXObj", prx)
ServantProxy ->> ServantProxy : 略去一些流程
ServantProxy ->>+ ObjectProxy : pObjectProxy = new ObjectProxy(...)
ObjectProxy ->>+ EndpointManager : _endpointManger.reset(new EndpointManager(this, ...))<br>init()<br>setObjName()<br>如果是ip端口: sEndpoints从objName里面取<br>如果是XXXOBJ,从Communicator初始化_queryFPrx这个ServantProxy:<br>_queryFPrx = _communicator->stringToProxy<QueryFPrx>()
ObjectProxy ->>- ServantProxy : return
ServantProxy ->>- Communicator : return
end
loop 网络线程
Communicator ->> Communicator : 如上个流程图:略去一些流程
Communicator ->> EndpointManager : _endpointManger->selectAdapterProxy(msg, pAdapterProxy)
EndpointManager ->> EndpointManager : 刷新主控:refreshReg()<br>从主控的servantProxy拉可用的服务端activeEp:_queryFPrx->findObjectById()<br>doEndpoints(activeEp)
loop 如果请求主控失败
EndpointManager ->> ObjectProxy : if(!_valid) return
ObjectProxy ->> ObjectProxy :加入请求等待的超时队列:_reqTimeoutQueue.push(msg)<br>等待下一次请求时刷新主控,如果请求成功会重试请求
ObjectProxy ->> CommunicatorEpoll : return
end
loop 如果请求主控成功
EndpointManager ->> EndpointManager : notifyEndpoints(activeEp)<br>初始化AdapterProxy并加入_activeProxys<br>doNotify()
EndpointManager ->>+ ObjectProxy : _objectProxy->doInvoke()
ObjectProxy ->> ObjectProxy : 遍历超时队列:while(!_reqTimeoutQueue.empty()) _reqTimeoutQueue.pop()<br>_endpointManger->selectAdapterProxy(msg)<br>pAdapterProxy->invoke(msg)
ObjectProxy ->>- ObjectProxy : return
end
end
处理超时流程

在客户端中有两个超时队列

  • AdapterProxy的_timeoutQueue

    这是发送请求等待回包的超时,维护对每个服务端的

  • ObjectProxy的_reqTimeoutQueue

    这是对服务发现等待请求列表的超时,这是维护对主控的

sequenceDiagram
loop 超时流程
CommunicatorEpoll ->> CommunicatorEpoll : run()<br>网络线程被唤醒<br>doTimeout()
CommunicatorEpoll ->>+ AdapterProxy : 对全部AdapterProxy:doTimeout()
AdapterProxy ->> AdapterProxy : 取出超时msg:while(_timeoutQueue->timeout(msg))<br>finishInvoke(msg)<br>唤醒caller线程msg->pMonitor->notify()
AdapterProxy ->>- CommunicatorEpoll : return
CommunicatorEpoll ->>+ ObjectProxy : _objectProxyFactory->getObjectProxy(i)->doTimeout()
ObjectProxy ->> ObjectProxy : 遍历_reqTimeoutQueue取出msg,执行doInvokeException(msg)<br>标识请求异常:msg->eStatus = ReqMessage::REQ_EXC<br>唤醒超时的caller线程msg->pMonitor->notify()
ObjectProxy ->>- CommunicatorEpoll : return
end

tars3.0流程分析

在客户端和服务端模块都完全不变的情况下,tars3.0在框架层面的流程有一些变化

最大的变化是网络线程(包括服务端NetThread和客户端的)不再跑epoll为主的事件循环流程,而是跑协程调度器流程,并且在每跑一定数量的协程后进行epoll_wait,分析如下

协程调度器流程

tars3.0最大的变化就是协程调度器,因此对其流程进行分析

sequenceDiagram
TC_EpollServer ->> TC_EpollServer : waitForShutdown()<br>startHandle()<br>启动线程运行Handle::handleLoopCoroutine()
TC_EpollServer ->>+ TC_CoroutineScheduler : 向协程投递服务端处理协程:<br>_scheduler->go(&Handle::handleCoroutine)
TC_CoroutineScheduler ->>- TC_EpollServer : return
TC_EpollServer ->>+ TC_CoroutineScheduler : 执行主协程,直到服务端退出:<br>_scheduler->run()
loop 主协程循环
TC_CoroutineScheduler ->>+ TC_Epoller : 如果没有待执行的协程<br>执行_epoller->done(1000)
TC_Epoller ->> TC_Epoller : 触发定时事件<br>::epoll_wait()<br>触发EPOLLIN或EPOLLOUT等事件<br>例如TC_EpollServer::acceptCallback()等
TC_Epoller ->> TC_Epoller : 触发空闲事件<br>例如NET_THREAD_MERGE_HANDLES_THREAD模式下的<br>Handle::handleOnceThread<br>例如NET_THREAD_MERGE_HANDLES_CO模式下的<br>Handle::handleOnceCoroutine
TC_Epoller ->>- TC_CoroutineScheduler : return
TC_CoroutineScheduler ->> TC_CoroutineScheduler : wakeup()<br>唤醒被put()重新激活的协程,放进_acvive
TC_CoroutineScheduler ->> TC_CoroutineScheduler : wakeupbytimeout()<br>唤醒sleep的协程,放进_acvive
TC_CoroutineScheduler ->> TC_CoroutineScheduler : wakeupbyself()<br>唤醒暂时yield,随时可以唤醒的协程,放进_avail
TC_CoroutineScheduler ->> TC_CoroutineScheduler : 执行最多100个_acvive协程
TC_CoroutineScheduler ->> TC_CoroutineScheduler : 执行全部的_avail协程
end

可以看到,流程图中协程有执行active的,也有avail的。那么avail和active区别是什么?

  • avail的逻辑

    • go创建的协程放在avail

    • 调用yield(true)的协程放在_needActiveCoroId里面

      然后主协程定时调用wakeupbyself()唤醒_needActiveCoroId的协程,放进avail

  • active的逻辑

    • 调用yield(false)的协程不会自动唤醒

    • 需要调用put()把协程放到_activeCoroQueue里面

      然后主协程定时调用wakeup()唤醒_activeCoroQueue的协程,放进active

因此avail的协程是yield(true)以后不需要唤醒也可以跑,始终可用(avail),而active的协程调用了yield(false)需要put触发来激活(active)

另外一方面,每一轮主协程逻辑,被激活的协程一次只能跑100个,但是始终可用的协程比较重要,必须要执行

举例来说

  • 在HANDLES_CO的1,3模式下,TC_EpollServer::Handle::handleOnceCoroutine作为第一个被go()投递的协程(协程ID为1),需要定时yield(true),来切换到go跑的其他协程上去,它是可以自动从inactive变成avail的

    sequenceDiagram
    loop handleCoroutine协程
    TC_EpollServer ->> TC_EpollServer : 从网络收包协程取得数据:_dataBuffer->pop(data)
    TC_EpollServer ->>+ TC_CoroutineScheduler : 对每个pop的data<br>向协程投递单次业务逻辑处理协程:_scheduler->go(&Handle::handle)
    TC_CoroutineScheduler ->>- TC_EpollServer : return
    TC_EpollServer ->> TC_CoroutineScheduler : 挂起等待下一次执行:scheduler->yield()
    end
  • 在HANDLES_CO的1,3模式下,ServantHandle::handle在go()第一次投递时,是在avail里面,是可以自动从inactive变成avail的。

    sequenceDiagram
    loop 服务端handle协程
    ServantHandle ->> ServantHandle : handleTarsProtocol()
    ServantHandle ->>+ XXXServantImp: onDispatch()<br>执行对应接口实现
    XXXServantImp ->>- ServantHandle: return
    ServantHandle ->> ServantHandle : 回包流程:_servantHandle->sendResponse()
    ServantHandle ->>+ TC_EpollServer : 发送回包:_epollServer->send()
    TC_EpollServer ->>- ServantHandle : return
    end
  • 在HANDLES_CO的1,3模式下,运行ServantHandle::handle的业务逻辑中,如果有客户端rpc发包结束后这个caller协程在yield(false)后是inactive的,一定需要网络协程处理完以后,通过put唤醒了这个协程以后,才能继续工作

    将上面的服务端流程图中加入一次rpc调用如下

    sequenceDiagram
    loop 服务端handle协程,客户端caller协程
    ServantHandle ->> ServantHandle : handleTarsProtocol()
    ServantHandle ->>+ XXXServantImp: onDispatch()<br>执行对应接口实现
    XXXServantImp ->>+ ServantProxy : tars_invoke()
    ServantProxy ->> ServantProxy : 发包
    ServantProxy ->> ServantProxy : 挂起等待被put唤醒:msg->sched->yield(false)
    ServantProxy ->>- XXXServantImp : return
    XXXServantImp ->>- ServantHandle: return
    end
    loop 客户端网络协程
    TC_CoroutineScheduler ->>+ TC_Epoller : 如果没有待执行的协程,<br>执行_epoller->done(1000)<br>触发EPOLLIN或EPOLLOUT等事件
    TC_Epoller ->>+ CommunicatorEpoll : 客户端发包流程<br>handleOutputImp()<br>后续流程和tars1.0一致,省略
    CommunicatorEpoll ->>- TC_Epoller : return
    TC_Epoller ->>- TC_CoroutineScheduler : return
    TC_CoroutineScheduler ->>+ TC_Epoller : 如果没有待执行的协程,<br>执行_epoller->done(1000)<br>触发EPOLLIN或EPOLLOUT等事件
    TC_Epoller ->>+ CommunicatorEpoll : 客户端收包流程<br>handleInputImp()<br>唤醒caller协程<br>msg->sched->put(msg->iCoroId)
    CommunicatorEpoll ->>- TC_Epoller : return
    TC_Epoller ->>- TC_CoroutineScheduler : return
    end
    loop 服务端handle协程
    ServantHandle ->> ServantHandle : 服务端回包流程:_servantHandle->sendResponse()
    ServantHandle ->>+ TC_EpollServer : 发送回包:_epollServer->send()
    TC_EpollServer ->>- ServantHandle : return
    end

在加入协程调度器后,逻辑侧改动其实不大,只是把线程部分协程化了,原本线程间的通信,替换成了协程的yield(true)和put()来完成原本的条件变量wait(),notify()部分

引入协程调度器后,服务端和客户端可以共用同一个线程了(在下面NET多种模式会详细解释),共用一个epoll,因此触发函数统一使用TC_Epoller::EpollInfo::registerCallback进行注册

  • 服务端

    • 为bindAdapter监听的端口的EPOLLIN注册TC_EpollServer::acceptCallback函数

    • 为accept到的socket的EPOLLIN和EPOLLOUT

      分别注册TC_EpollServer::Connection::handleInputImp和TC_EpollServer::Connection::handleOutputImp

  • 客户端

    • 为客户端发起连接的socket的EPOLLIN和EPOLLOUT

      分别注册CommunicatorEpoll::handleInputImp和CommunicatorEpoll::handleOutputImp

    • 在caller线程不存在协程调度器的情况下

      给客户端网络线程注册NetThread::processPipe来等待caller线程唤醒

NET多种模式

在tars1.0中,服务端分为网络线程和业务线程,客户端分为caller线程和客户端网络线程

在tars3.0中,通过改变/tars/application/server/opencoroutine的值,有4种模式可以选择

  • NET_THREAD_QUEUE_HANDLES_THREAD = 0

    默认模式,和tars1.0一致

  • NET_THREAD_QUEUE_HANDLES_CO = 1

  • NET_THREAD_MERGE_HANDLES_THREAD = 2

  • NET_THREAD_MERGE_HANDLES_CO = 3

可以看出,4种模式是以NET_THREAD_XXX开头的2种模式和HANDLES_XXX结尾的2种模式的组合

NET_THREAD_QUEUE和NET_THREAD_MERGE的区别

  • NET_THREAD_QUEUE开头的0,1模式
    • 和tars1.0一致
    • 独立网络线程 + 独立业务线程
    • 网络线程负责收发包,通过队列唤醒handle线程中处理
  • NET_THREAD_MERGE开头的2,3模式
    • 合并网络线程 + 业务线程
    • 网络线程负责收发包,也负责原本在业务线程中处理的逻辑,这种模式下延时最小
    • PS:
      • 线程个数以处理线程配置为准, 网络线程配置无效
      • 如果是UDP, 则网络线程竞争接收包

NET_THREAD_QUEUE和tars1.0一致,不再分析

对NET_THREAD_MERGE如何合并网络和业务线程的进行分析

sequenceDiagram
loop 服务端主线程
TC_EpollServer ->> TC_EpollServer : waitForShutdown()
TC_EpollServer ->> TC_EpollServer : initHandle()<br>根据配置业务线程数量,来启动相应数量的网络线程<br>为网络线程设定业务线程的Handle::initialize()回调<br>如果是HANDLES_CO,那么给epoller的idle回调加入Handle::handleOnceCoroutine<br>如果是HANDLES_THREAD,那么给epoller的idle回调加入Handle::handleOnceThread
TC_EpollServer ->> TC_EpollServer : startHandle()<br>对每个业务线程getNetThread()->start()
TC_EpollServer ->> TC_EpollServer : 等待服务端主线程退出
end
loop 服务端网络线程
NetThread ->> NetThread : 启动协程调度器流程:_scheduler->run()
loop 服务端网络模块
NetThread ->> NetThread : epoll_wait触发EPOLLIN事件<br>::read()处理EAGAIN等<br>解包完毕
NetThread ->> BindAdapter : insertRecvQueue(recv)
BindAdapter ->> BindAdapter : push数据到队列中, 同时通过epoll唤醒某个等待处理线程<br>_rbuffer.push_back(recv)<br>_epoller->notify()
end
loop 服务端业务模块
ServantHandle ->> ServantHandle : 从队列取数据<br>_rbuffer.pop_front()
ServantHandle ->> ServantHandle : 调用XXXImp的对应接口处理<br>handleTarsProtocol()<br>onDispatch()
ServantHandle ->> NetThread : 选择对应的网络线程发包<br>adapter->getNetThread()->send(data)
NetThread ->> NetThread : 如果网络线程和业务线程没有合并(NET_THREAD_QUEUE)<br>流程和tars1.0相同,入队列再通知网络线程去发包<br>_sbuffer.push_back(data)
NetThread ->> NetThread : 如果网络线程和业务线程合并了(NET_THREAD_MERGE)<br>直接发包
end
end

HANDLES_THREAD和HANDLES_CO的区别

  • HANDLES_THREAD结尾的0,2模式

    • 和tars1.0一致
    • 服务端的业务线程中不启用协程处理
  • HANDLES_CO结尾的1,3模式,服务端的业务线程中启用协程处理,每个包都启动一个新协程

    • 业务线程启动的客户端,在协程模式下,会共用当前线程的协程

      因此在这两种模式下,在发起请求后会与服务端的业务线程共用同一个网络线程

    • 对3模式而言,由于服务端的网络线程和业务线程也合并了,这意味着整个服务端只有一个线程来处理,这种模式下的延时是4个模式中最小的

在协程调度器中简单的介绍了HANDLES_CO下服务端网络线程进行一次rpc调用的情况,下面是详细的流程介绍

sequenceDiagram
loop handle协程
ServantHandle ->> ServantHandle : handleTarsProtocol()
ServantHandle ->>+ XXXServantImp: onDispatch()<br>执行对应接口实现
XXXServantImp ->>+ ServantProxy : tars_invoke()
ServantProxy ->>+ ServantProxy : 虽然接口名还叫选择网络线程,实际上同一网络线程:<br>selectNetThreadInfo()
ServantProxy ->>+ CommunicatorEpoll: 直接发包,不进行通知:<br>msg->pObjectProxy->getCommunicatorEpoll()<br>->handle(pSptd->_reqQNo)
CommunicatorEpoll ->>+ ObjectProxy : msg->pObjectProxy->invoke(msg)<br>后续流程和tars1.0一致,省略
ObjectProxy ->>- CommunicatorEpoll : return
CommunicatorEpoll ->>- ServantProxy : return
ServantProxy ->> ServantProxy : 挂起等待被put唤醒:msg->sched->yield(false)
ServantProxy ->>- XXXServantImp : return
XXXServantImp ->>- ServantHandle: return
end
loop 网络协程
TC_CoroutineScheduler ->>+ TC_Epoller : 如果没有待执行的协程,<br>执行_epoller->done(1000)<br>触发EPOLLIN或EPOLLOUT等事件
TC_Epoller ->>+ CommunicatorEpoll : handleOutputImp()<br>后续流程和tars1.0一致,省略
CommunicatorEpoll ->>- TC_Epoller : return
TC_Epoller ->>- TC_CoroutineScheduler : return
TC_CoroutineScheduler ->>+ TC_Epoller : 如果没有待执行的协程,<br>执行_epoller->done(1000)<br>触发EPOLLIN或EPOLLOUT等事件
TC_Epoller ->>+ CommunicatorEpoll : handleInputImp()
CommunicatorEpoll ->>+ TC_TCPTransceiver : _trans->doResponse()
TC_TCPTransceiver ->> TC_TCPTransceiver : 对EGAIN等非阻塞的处理和tars1.0一致<br>循环读取数据解析协议<br>
TC_TCPTransceiver ->>+ AdapterProxy : 调用_onParserCallback回调<br>_onParserCallback回调在initializeClient中进行注册<br>实际是AdapterProxy::onParserCallback
AdapterProxy ->> AdapterProxy : 唤醒caller协程<br>finishInvoke(rsp)<br>finishInvoke_parallel()<br>finishInvoke(msg)<br>msg->sched->put(msg->iCoroId)
AdapterProxy ->>- TC_TCPTransceiver : return
TC_TCPTransceiver ->>- CommunicatorEpoll : return
CommunicatorEpoll ->>- TC_Epoller : return
TC_Epoller ->>- TC_CoroutineScheduler : return
end
loop handle协程
ServantHandle ->> ServantHandle : 回包流程:_servantHandle->sendResponse()
ServantHandle ->>+ TC_EpollServer : 发送回包:_epollServer->send()
TC_EpollServer ->>- ServantHandle : return
end

性能测试

在根目录的examples/StressDemo下就有性能测试的代码

StressImp的服务端代码如下

1
2
3
4
5
tars::Int32 StressImp::testStr(const std::string& in, std::string &out, tars::TarsCurrentPtr current)
{
out = in;
return 0;
}

现在对tars模式NET_THREAD_QUEUE_HANDLES_THREAD,NET_THREAD_QUEUE_HANDLES_CO,NET_THREAD_MERGE_HANDLES_CO和公司框架一起做测试

服务端都是相同网络线程,客户端相同模式下使用5个线程并行发起请求

服务端不限cpu

首先进行服务端不限cpu的测试

pthread id一行会分别打印线程id和消耗时间

  • NET_THREAD_QUEUE_HANDLES_THREAD
1
2
3
4
5
6
7
8
9
10
root@121:~/tars/v3.0.0# ./build/bin/TarsStressServer --config=./config.conf
OpenCoroutine(opencoroutine) 0
servant TestApp.BServer.StressObj
endpoint tcp -h 127.0.0.1 -p 9100 -t 10000

root@121:~/tars/v3.0.0# ./build/bin/TarsStressClient 5 1500000 "TestApp.BServer.StressObj@tcp -h 127.0.0.1 -p 9100" 4
init tp succ
times:1500000
pthread id: 140093070972672 | 120493
succ:1500000
  • NET_THREAD_QUEUE_HANDLES_CO
1
2
3
4
5
6
7
8
9
10
root@121:~/tars/v3.0.0# ./build/bin/TarsStressServer --config=./config_coro.conf
OpenCoroutine(opencoroutine) 1
servant TestApp.AServer.StressObj
endpoint tcp -h 127.0.0.1 -p 9000 -t 10000

root@121:~/tars/v3.0.0# ./build/bin/TarsStressClient 5 1500000 "TestApp.AServer.StressObj@tcp -h 127.0.0.1 -p 9000" 4
init tp succ
times:1500000
pthread id: 140629329286912 | 116722
succ:1500000
  • NET_THREAD_MERGE_HANDLES_CO
1
2
3
4
5
6
7
8
9
10
root@121:~/tars/v3.0.0# ./build/bin/TarsStressServer --config=./config_coro3.conf
OpenCoroutine(opencoroutine) 3
servant TestApp.DServer.StressObj
endpoint tcp -h 127.0.0.1 -p 9300 -t 10000

root@121:~/tars/v3.0.0# ./build/bin/TarsStressClient 5 1500000 "TestApp.DServer.StressObj@tcp -h 127.0.0.1 -p 9300" 4
init tp succ
times:1500000
pthread id: 140555908507392 | 97816
succ:1500000
  • 公司框架
1
2
3
4
5
6
7
8
9
root@121:~/taf_test/server# ./Server --config=config.conf
servant TestApp.CServer.StressObj
endpoint tcp -h 127.0.0.1 -p 9200 -t 10000

root@121:~/taf_test/client# ./Client 5 1500000 "TestApp.CServer.StressObj@tcp -h 127.0.0.1 -p 9200" 4
init tp succ
times:1500000
pthread id: 140642864662272 | 131473
succ:1500000

服务端限制1核

  • NET_THREAD_QUEUE_HANDLES_THREAD

    1
    2
    3
    4
    init tp succ
    times:1500000
    pthread id: 139798232692480 | 198703
    succ:1500000
  • NET_THREAD_QUEUE_HANDLES_CO

    1
    2
    3
    4
    init tp succ
    times:1500000
    pthread id: 139790615820032 | 177378
    succ:1500000
  • NET_THREAD_MERGE_HANDLES_CO

    1
    2
    3
    4
    init tp succ
    times:1500000
    pthread id: 140052203300608 | 104109
    succ:1500000
  • 公司框架

    1
    2
    3
    4
    init tp succ
    times:1500000
    pthread id: 139851859920640 | 228572
    succ:1500000

小结

服务端不限cpu(qps) 服务端限制1核(qps) 限制后性能降低
NET_THREAD_QUEUE_HANDLES_THREAD 12448 7548 39.36%
NET_THREAD_QUEUE_HANDLES_CO 12851 8456 34.19%
NET_THREAD_MERGE_HANDLES_CO 15334 14407 6.04%
公司框架 11409 6562 42.48%

稍微整理可知,NET_THREAD_MERGE_HANDLES_CO由于减少了线程切换,虽然网络协程到业务协程依然依赖队列,但是从业务协程处理完后,向网络协程回报不再依赖队列,减少了队列操作后,性能确实是最好的,在限制cpu后,性能降低也是最少的

NET_THREAD_QUEUE_HANDLES_CO相对NET_THREAD_QUEUE_HANDLES_THREAD,业务模块使用协程来处理以后,性能也得到了提升

公司框架相对而言就差了不少,特别是在模拟资源受限的情况下,一方面,qps只有NET_THREAD_MERGE_HANDLES_CO的45%,另外一方面性能损失也特别大,存在较大优化空间

读代码时的一些测试

tars的HANDLES_CO模式测试

看逻辑看的有点糊涂,主要希望测试出来客户端和服务端是否同一个线程

  • 测试方法:
    • 给客户端创建的线程加点日志打印线程id
    • 给服务端收包时加日志打印线程id
  • 结论:
    • 是同一个网络线程,且都为服务端业务线程
    • 客户端创建的线程不会被使用(经过其他测试,如果客户端不开启HANDLES_CO模式,就会进入客户端网络线程)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
if (msg->sched) {    
TLOGTARS("in sched handle: " << this << ", " << msg->request.sServantName << endl);
//协程中, 直接发包了
msg->pObjectProxy->getCommunicatorEpoll()->handle(pSptd->_reqQNo);
}else {
TLOGTARS("not in sched handle: " << this << ", " << msg->request.sServantName << endl);
msg->pObjectProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo);
}
...
if (!msg->sched) {
TLOGTARS("pMonitor wait: " << this << ", " << msg->request.sServantName << " before" << endl);
msg->pMonitor->wait();
TLOGTARS("pMonitor wait: " << this << ", " << msg->request.sServantName << " after" << endl);
if(!msg->pMonitor->bMonitorFin) {
TLOGERROR("[ServantProxy::invoke communicator terminate]" << endl);
return;
}
}else {
TLOGTARS("sched yield: " << this << ", " << msg->request.sServantName << " before" << endl);
msg->sched->yield(false);
TLOGTARS("sched yield: " << this << ", " << msg->request.sServantName << " after" << endl);
}
  • 没开启HANDLES_CO的客户端打印:

    1
    2
    3
    4
    5
    2022-10-27 23:43:32|140229114881792|[ServantProxy.cpp::invoke::897]|not in sched handle: 0x563d60a46bb0, TestApp.BServer.BServantObj
    2022-10-27 23:43:32|140229141268224|[ObjectProxy.cpp::invoke::155]|[ObjectProxy::invoke, objname:TestApp.BServer.BServantObj, begin...]
    2022-10-27 23:43:32|140229114881792|[ServantProxy.cpp::invoke::908]|pMonitor wait: 0x563d60a46bb0, TestApp.BServer.BServantObj before
    ...
    2022-10-27 23:43:32|140229114881792|[ServantProxy.cpp::invoke::912]|pMonitor wait: 0x563d60a46bb0, TestApp.BServer.BServantObj after

    140229114881792线程通过notify通知客户端网络线程工作,最终唤醒当前线程结束条件变量wait

  • 开启HANDLES_CO的客户端打印:

    1
    2
    3
    2022-10-27 23:43:32|140030806836992|[ServantProxy.cpp::invoke::890]|in sched handle: 0x7f5b6800ae50, TestApp.AServer.AServantObj
    2022-10-27 23:43:32|140030806836992|[ObjectProxy.cpp::invoke::155]|[ObjectProxy::invoke, objname:TestApp.AServer.AServantObj, begin...]
    ...

    一直是相同的线程打印日志,说明没发生切换

触发connect测试

看代码的时候对什么时候触发connect有点不太确定,打印堆栈看看

在EndpointManager对AdapterProxy进行checkActive时,进行::connect,并且加入epoll,如果EINPROGRESS那么就等待epoll唤醒

1
2
3
4
5
6
7
8
9
10
0x000055b279417b1a <tars::TC_Transceiver::doConnect(int, sockaddr const*, unsigned int)+0x13a>
0x000055b279419bdb <tars::TC_Transceiver::connect()+0x72b>
0x000055b2793877c5 <tars::AdapterProxy::checkActive(bool)+0x515>
0x000055b2793a1685 <tars::EndpointManager::getNextValidProxy()+0x85>
0x000055b2793a72e4 <tars::EndpointManager::selectAdapterProxy(tars::ReqMessage*, tars::AdapterProxy*&)+0xb4>
0x000055b2793428b3 <tars::ObjectProxy::invoke(tars::ReqMessage*)+0x253>
0x000055b279333c03 <tars::CommunicatorEpoll::handleNotify(std::shared_ptr<tars::TC_Epoller::EpollInfo> const&)+0x83>
0x000055b27936e757 <tars::ServantProxy::invoke(tars::ReqMessage*, bool)+0xb07>
0x000055b27936fe74 <tars::ServantProxy::servant_invoke(tars::ReqMessage*, bool)+0x84>
0x000055b279371283 <tars::ServantProxy::tars_invoke(...)

随后在handleLoopCoroutine中,epoll被唤醒,从而设置为已连接状态

1
2
3
4
5
6
7
8
9
10
11
0x000055b279343330 <tars::ObjectProxy::prepareConnection(tars::AdapterProxy*)+0x1a0>
0x000055b279387eec <tars::AdapterProxy::onConnectCallback(tars::TC_Transceiver*)+0xdc>
0x000055b279416e46 <tars::TC_Transceiver::onSetConnected()+0xe6>
0x000055b2794165a5 <tars::TC_Transceiver::setConnected()+0x75>
0x000055b2794169c0 <tars::TC_Transceiver::checkConnect()+0x360>
0x000055b279416b4b <tars::TC_Transceiver::doRequest()+0x2b>
0x000055b2793336ed <tars::CommunicatorEpoll::handleOutputImp(std::shared_ptr<tars::TC_Epoller::EpollInfo> const&)+0x5d>
0x000055b2793e2ef9 <tars::TC_Epoller::EpollInfo::fireEvent(unsigned int)+0x119>
0x000055b2793e307a <tars::TC_Epoller::done(unsigned long)+0xaa>
0x000055b2793c5e24 <tars::TC_CoroutineScheduler::run()+0x164>
0x000055b2793d0908 <tars::TC_EpollServer::Handle::handleLoopCoroutine()+0x188>