tars日志库源码分析

tars的日志库的类图关系很绕,很容易忘,记录一下备用

源码分析基于tag v3.0.0

首先介绍一下会用到的主要代码路径和功能

1
2
3
TarsCpp/util/include/util/tc_logger.h        日志库头文件
TarsCpp/util/include/util/tc_logger.cpp 日志库cpp
TarsCpp/servant/servant/RemoteLogger.h 远程日志实现,本地日志的宏定义

taf的日志库从流程上,分初始化,滚动日志,按天日志三部分,暂时只分析了滚动日志相关逻辑

初始化

先看类图

classDiagram
direction TB
class LocalRollLogger {
	TC_Logger~RollWriteT, TC_RollBySize~ _logger;
	TC_LoggerThreadGroup _local;
}
LocalRollLogger --> TC_Logger
LocalRollLogger --> TC_LoggerThreadGroup
class TC_LoggerThreadGroup {
	set~TC_LoggerRollPtr, KeyComp~ logger_set;
}
class TC_Logger["TC_Logger<WriteT, template<class> class RollPolicy>"] {
	LoggerBuffer _buffer;
	std::ostream _stream;
}
class RollWrapperBase["RollWrapperBase<TC_RollBySize<RollWriteT>>"] {
	
}
class RollWrapperI["RollPolicy<WriteT>::RollWrapperI"] {
	+void init(const string &path) : 转发_roll->init(...)
}
class RollWrapperBase["RollWrapperBase<RollPolicy<WriteT>>"] {
	RollPolicy<WriteT>> _roll;
}
TC_Logger --|> RollWrapperI
RollWrapperI --|> RollWrapperBase
TC_Logger --> LoggerStream
class TC_RollBySize["TC_RollBySize<WriteT>"] {
	-WriteT _t;
	+void init(const string &path)
}
class TC_RollByTime["TC_RollByTime<WriteT>"] {
	-WriteT _t;
	+void init(const string &path)
}
RollWrapperBase --> TC_RollBySize
RollWrapperBase --> TC_RollByTime

namespace WriteT定义写操作 {
class RollWriteT {
	+void operator() 写本地
}
class TimeWriteT {
	-TC_Logger~RemoteTimeWriteT, TC_RollByTime~ _pRemoteTimeLogger
	+void operator() 写本地,并通过_pRemoteTimeLogger再异步写远程
}
class RemoteTimeWriteT {
	+void operator() 写远程
}
}
TimeWriteT --> RemoteTimeWriteT
TimeWriteT --> TC_Logger

其中TC_Logger的实现较为复杂,通过CRTP继承了好几次,但其实这几次CRTP的继承实际上只是为了组合,而不是为了抽象接口,所以其实有个简化版理解

classDiagram
direction TB
class TC_Logger["TC_Logger<WriteT, template<class> class RollPolicy>"] {
	-LoggerBuffer _buffer;
	-std::ostream _stream;
	-RollPolicy<WriteT>> _roll;
	+void init(const string &path) : 转发_roll->init(...)
}
class TC_RollBySize["TC_RollBySize<WriteT>"] {
	-WriteT _t;
	+void init(const string &path)
}
class TC_RollByTime["TC_RollByTime<WriteT>"] {
	-WriteT _t;
	+void init(const string &path)
}
TC_Logger --> TC_RollBySize
TC_Logger --> TC_RollByTime

初始化的使用方式位于RemoteLogger.h

1
2
3
TarsRollLogger::getInstance()->setLogInfo(ServerConfig::Application, ServerConfig::ServerName, ServerConfig::LogPath, ServerConfig::LogSize, ServerConfig::LogNum, _communicator, ServerConfig::Log);
//例如
//TarsRollLogger::getInstance()->setLogInfo("Hello", "HelloServer", ".", 52428800, 10, "tars.hellolog.LogObj");

看下大致看下流程(可以结合时序图看)

时序图

sequenceDiagram
LocalRollLogger ->>+ TC_LoggerThreadGroup : start()<br>new thread(run, this);
TC_LoggerThreadGroup ->>- LocalRollLogger : return
participant RollLogger as TC_Logger<RollWriteT,TC_RollBySize>
participant TC_LoggerRoll as TC_RollBySize<RollWriteT>
loop run
	TC_LoggerThreadGroup ->>+ TC_LoggerThreadGroup : flush()
	TC_LoggerThreadGroup ->>+ RollLogger : logger_set::iterator it = _logger.begin()<br>while (it != logger.end()) {<br>it->get()->flush()<br>}
	RollLogger ->>+ TC_LoggerRoll : TC_CasQueue qt<br>_buffer.swap(qt)<br>roll(qt)
	TC_LoggerRoll ->>+ RollWriteT : lock(*this)<br>_t(_of, buffer)
	RollWriteT ->>+ RollWriteT : 实际写文件
	RollWriteT ->>- TC_LoggerRoll : return
	TC_LoggerRoll ->>- RollLogger : return
	RollLogger ->>- TC_LoggerThreadGroup : return
end
LocalRollLogger ->>+ LocalRollLogger : sync
LocalRollLogger ->>+ RollLogger : _logger.setupThread(&_local)
RollLogger ->>+ TC_LoggerRoll : _roll->setupThread(ltg)
TC_LoggerRoll ->>+ TC_LoggerThreadGroup : _pThreadGroup->registerLogger(self);
TC_LoggerThreadGroup ->>+ TC_LoggerThreadGroup : _logger.insert(l);
TC_LoggerRoll ->>- RollLogger : return
RollLogger ->>- LocalRollLogger : return

首先在setLogInfo中,调用

1
_local.start(1);

TC_LoggerThreadGroup::start开启了新线程

1
2
3
4
5
6
7
void TC_LoggerThreadGroup::start(size_t iThreadNum)
{
if(_thread == NULL)
{
_thread = new std::thread(&TC_LoggerThreadGroup::run, this);
}
}

在新线程中,运行TC_LoggerThreadGroup::run

1
2
3
4
5
6
7
8
9
10
11
12
13
void TC_LoggerThreadGroup::run()
{
while (!_bTerminate)
{
//100ms
{
std::unique_lock<std::mutex> lock(_mutex);
_cond.wait_for(lock, std::chrono::milliseconds(100));
}

flush();
}
}

run函数中死循环定时唤醒执行TC_LoggerThreadGroup::flush函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void TC_LoggerThreadGroup::flush()
{
logger_set logger;

{
std::lock_guard<std::mutex> lock(_mutex);
logger = _logger;
}

logger_set::iterator it = logger.begin();
while (it != logger.end())
{
try
{
it->get()->flush();
}
catch(exception &ex)
{
cerr << "[TC_LoggerThreadGroup::flush] log flush error:" << ex.what() << endl;
}
catch (...)
{
}
++it;
}
}

flush函数遍历logger_set,并对每一项调用TC_LoggerRoll::flush

1
2
3
4
5
6
7
8
9
10
11
void TC_LoggerRoll::flush()
{
TC_CasQueue<pair<size_t, string> >::queue_type qt;

_buffer.swap(qt);

if (!qt.empty())
{
roll(qt);
}
}

flush函数从_buffer里取出所有内容,调用TC_LoggerRoll::roll

对于TC_RollBySize<RollWriteT>来说,TC_RollBySize::roll是这样的

1
2
3
4
5
6
7
8
9
void TC_RollBySize::roll(const deque<pair<size_t, string> > &buffer) {
std::lock_guard<std::mutex> lock(*this);

//写入文件,调用RollWriteT的写入逻辑RollWriteT::operator()
_t(_of, buffer);

//轮转逻辑
//...
}

回到LocalRollLogger::setLogInfo

继续执行

1
sync(false)

在LocalRollLogger::sync中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void LocalRollLogger::sync(bool bSync)
{
if(bSync)
{
_logger.unSetupThread();

TC_ThreadRLock lock(_mutex);
for(auto e : _logger_ex)
{
e.second->unSetupThread();
}
}
else
{
_logger.setupThread(&_local);

TC_ThreadRLock lock(_mutex);
for(auto e : _logger_ex)
{
e.second->setupThread(&_local);
}
}
}

执行了TC_Logger<RollWriteT, TC_RollBySize>::setupThread,它继承自

TC_RollBySize<RollWriteT>::RollWrapperI::setupThread,而它又继承自

RollWrapperBase<TC_RollBySize<RollWriteT>>::setupThread

1
void setupThread(TC_LoggerThreadGroup *ltg) { _roll->setupThread(ltg); }

这里调用了TC_RollBySize<RollWriteT>::setupThread方法,它继承自TC_LoggerRoll::setupThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void TC_LoggerRoll::setupThread(TC_LoggerThreadGroup *pThreadGroup)
{
assert(pThreadGroup != NULL);

unSetupThread();

std::lock_guard<std::mutex> lock(_mutex);

_pThreadGroup = pThreadGroup;

TC_LoggerRollPtr self = this;

_pThreadGroup->registerLogger(self);
}

这里调用了TC_LoggerThreadGroup::registerLogger

1
2
3
4
5
6
void TC_LoggerThreadGroup::registerLogger(TC_LoggerRollPtr &l)
{
std::lock_guard<std::mutex> lock(_mutex);

_logger.insert(l);
}

因此TC_LoggerThreadGroup的flush函数中可以获取注册到的TC_RollBySize<RollWriteT>实例

小结

初始化主要是在LocalRollLogger的sync开始的流程,将TC_RollBySize<RollWriteT>实例注册给TC_LoggerThreadGroup的flush函数用

然后在start开始的流程中,新起线程定时将TC_CasQueue的数据实际落磁盘

那么TC_CasQueue的数据来自哪里呢?

滚动日志

1
#define LOG (LocalRollLogger::getInstance()->logger())

滚动日志库的使用方式是LOG宏,位于RemoteLogger.h,因此从这里切入(同样可以结合时序图来看)

时序图

sequenceDiagram
participant RollLogger as TC_Logger<RollWriteT,TC_RollBySize>
RollLogger ->>+ LoggerStream : debug()
LoggerStream ->>- RollLogger : return
LoggerStream ->>+ LoggerBuffer : LoggerStream(TC_Logger._stream)<br>operator<<<br>TC_Logger._stream.flush()<br>_buffer.sync()
LoggerBuffer ->>+ TC_RollBySize : _roll->write()
TC_RollBySize ->>+ TC_CasQueue :_buffer.push_back()
TC_CasQueue ->>- TC_RollBySize : return
TC_RollBySize ->>- LoggerBuffer : return
LoggerBuffer ->>- LoggerStream : return

首先在TC_Logger中

1
LoggerStream debug() { return stream(DEBUG_LOG_LEVEL); }

生成了一个临时变量LoggerStream,当LoggerStream析构的时候会_stream->flush()

1
2
3
4
5
6
7
8
9
10
~LoggerStream()
{
if (_stream)
{
TC_LockT<TC_ThreadMutex> lock(_mutex);
_stream->clear();
(*_stream) << _buffer.str();
_stream->flush();
}
}

flush中会调用LoggerBuffer::sync方法(具体调用的代码这里见调用LoggerBuffer的sync的测试的解释)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int LoggerBuffer::sync()
{
//有数据
if (pptr() > pbase())
{
std::streamsize len = pptr() - pbase();

if (_roll)
{
//具体的写逻辑
_roll->write(make_pair(TC_Thread::CURRENT_THREADID(), string(pbase(), len)));
}

//重新设置put缓冲区, pptr()重置到pbase()处
setp(pbase(), epptr());
}
return 0;
}

sync方法调用了TC_LoggerRoll::write

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void TC_LoggerRoll::write(const pair<std::size_t, string> &buffer)
{
size_t ThreadID = 0;
if (_bDyeingFlag)
{
TC_LockT<TC_SpinLock> lock(_mutexDyeing);

if (_mapThreadID.find(TC_Thread::CURRENT_THREADID()) != _mapThreadID.end())
{
ThreadID = TC_Thread::CURRENT_THREADID();
}
}

if (_pThreadGroup)
{
_buffer.push_back(make_pair(ThreadID, buffer.second));
}
else
{
//同步记录日志
deque<pair<size_t, string> > ds;
ds.push_back(make_pair(ThreadID, buffer.second));
roll(ds);
}
}

这里_buffer是一个无锁队列TC_CasQueue<pair<std::size_t, string>> _buffer

小结

滚动日志通过LoggerStream的析构,将数据写入TC_CasQueue

调用LoggerBuffer的sync的测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <iostream>
#include <string>
using namespace std;

class LoggerBuffer : public std::basic_streambuf<char> {
public:
LoggerBuffer(int num){cout<<"LoggerBuffer init"<<endl;}
~LoggerBuffer(){cout<<"LoggerBuffer xigou"<<endl;}
virtual int sync(){cout<<"sync"<<endl;}
};
class TC_Logger {
public:
TC_Logger()
:_buffer(1)
, _stream(&_buffer)
{}
~TC_Logger(){cout<<"TC_Logger xigou"<<endl;}
void stream(int level)
{
ostream *ost = NULL;
ost = &_stream;
//调用flush后会调用LoggerBuffer中的sync方法
ost->flush();
}
LoggerBuffer _buffer;
std::ostream _stream;
};
int main() {
TC_Logger logger;
logger.stream(1);
cout<<"main fun end"<<endl;
}

总结

日志打印时写入一个TC_CasQueue无锁队列,然后由一个线程定时落磁盘

参考资料

Tars-C++ 揭秘篇:日志类源码解析

腾讯Tars 日志模块,可单独运行调试