tars日志库源码分析

原创内容,转载请注明出处

Posted by Weakyon Blog on December 12, 2021

tars的日志库的类图关系很绕,很容易忘,记录一下备用

源码分析基于tag v3.0.0

首先介绍一下会用到的主要代码路径和功能

TarsCpp/util/include/util/tc_logger.h        日志库头文件
TarsCpp/util/include/util/tc_logger.cpp      日志库cpp
TarsCpp/servant/servant/RemoteLogger.h       远程日志实现,本地日志的宏定义

taf的日志库从流程上,分初始化,滚动日志,按天日志三部分,暂时只分析了滚动日志相关逻辑

1 初始化

先看类图

classDiagram
class LocalRollLogger {
	TC_Logger<RollWriteT, TC_RollBySize> _logger
	TC_LoggerThreadGroup _local
}
LocalRollLogger --> TC_Logger~RollWriteT, TC_RollBySize~
LocalRollLogger --> TC_LoggerThreadGroup
class TC_LoggerThreadGroup {
	set<TC_LoggerRollPtr, KeyComp> logger_set
}
class TC_Logger~RollWriteT, TC_RollBySize~ {
LoggerBuffer _buffer
std::ostream _stream
}
TC_Logger~RollWriteT, TC_RollBySize~ --|> RollWrapperI~RollWriteT, TC_RollBySize~
RollWrapperI~RollWriteT, TC_RollBySize~ --|> RollWrapperBase~TC_RollBySize<RollWriteT>~
TC_Logger~RollWriteT, TC_RollBySize~ --> LoggerStream

初始化的使用方式位于RemoteLogger.h

TarsRollLogger::getInstance()->setLogInfo(ServerConfig::Application, ServerConfig::ServerName, ServerConfig::LogPath, ServerConfig::LogSize, ServerConfig::LogNum, _communicator, ServerConfig::Log);
//例如
//TarsRollLogger::getInstance()->setLogInfo("Hello", "HelloServer", ".", 52428800, 10, "tars.hellolog.LogObj");

看下流程,首先在setLogInfo中,调用

_local.start(1);

TC_LoggerThreadGroup::start开启了新线程

void TC_LoggerThreadGroup::start(size_t iThreadNum)
{
  if(_thread == NULL)
  {   
      _thread = new std::thread(&TC_LoggerThreadGroup::run, this);
  }   
}

在新线程中,运行TC_LoggerThreadGroup::run

void TC_LoggerThreadGroup::run()
{
  while (!_bTerminate)
  {
      //100ms
      {
          std::unique_lock<std::mutex> lock(_mutex);
          _cond.wait_for(lock, std::chrono::milliseconds(100));
      }

      flush();
  }
}

run函数中死循环定时唤醒执行TC_LoggerThreadGroup::flush函数

void TC_LoggerThreadGroup::flush()
{   
  logger_set logger;
  
  {   
      std::lock_guard<std::mutex> lock(_mutex);
      logger = _logger;
  }
  
  logger_set::iterator it = logger.begin();
  while (it != logger.end())
  {   
      try
      {   
          it->get()->flush();
      }
      catch(exception &ex)
      {   
          cerr << "[TC_LoggerThreadGroup::flush] log flush error:" << ex.what() << endl;
      }
      catch (...)
      {
      }
      ++it;
  }
}

flush函数遍历logger_set,并对每一项调用TC_LoggerRoll::flush

void TC_LoggerRoll::flush()
{
  TC_CasQueue<pair<size_t, string> >::queue_type qt;

  _buffer.swap(qt);

  if (!qt.empty())
  {
      roll(qt);
  }
}

flush函数从_buffer里取出所有内容,调用TC_LoggerRoll::roll

对于TC_RollBySize<RollWriteT>来说,TC_RollBySize::roll是这样的

void TC_RollBySize::roll(const deque<pair<size_t, string> > &buffer) {
	std::lock_guard<std::mutex> lock(*this);
	
	//写入文件,调用RollWriteT的写入逻辑RollWriteT::operator()
	_t(_of, buffer);
	
	//轮转逻辑
	//...
}

回到LocalRollLogger::setLogInfo

继续执行

sync(false)

在LocalRollLogger::sync中

void LocalRollLogger::sync(bool bSync)
{
  if(bSync)
  {
      _logger.unSetupThread();
  
      TC_ThreadRLock lock(_mutex);
      for(auto e : _logger_ex)
      {
          e.second->unSetupThread();
      }
  }   
  else
  {   
      _logger.setupThread(&_local);

      TC_ThreadRLock lock(_mutex);
      for(auto e : _logger_ex)
      {
          e.second->setupThread(&_local);
      }
  }
}

执行了TC_Logger<RollWriteT, TC_RollBySize>::setupThread,它继承自TC_RollBySize<RollWriteT>::RollWrapperI::setupThread,而它又继承自RollWrapperBase<TC_RollBySize<RollWriteT>>::setupThread

void setupThread(TC_LoggerThreadGroup *ltg) { _roll->setupThread(ltg); }

这里调用了TC_RollBySize<RollWriteT>::setupThread方法,它继承自TC_LoggerRoll::setupThread

void TC_LoggerRoll::setupThread(TC_LoggerThreadGroup *pThreadGroup)
{   
  assert(pThreadGroup != NULL);
  
  unSetupThread();
  
  std::lock_guard<std::mutex> lock(_mutex);
  
  _pThreadGroup = pThreadGroup;
  
  TC_LoggerRollPtr self = this;
  
  _pThreadGroup->registerLogger(self);
}

这里调用了TC_LoggerThreadGroup::registerLogger

void TC_LoggerThreadGroup::registerLogger(TC_LoggerRollPtr &l)
{
  std::lock_guard<std::mutex> lock(_mutex);

  _logger.insert(l);
}

因此TC_LoggerThreadGroup的flush函数中可以获取注册到的TC_RollBySize<RollWriteT>实例

1.1 时序图

sequenceDiagram
LocalRollLogger ->>+ TC_LoggerThreadGroup : start()<br>new thread(run, this);
TC_LoggerThreadGroup ->>- LocalRollLogger : return
participant RollLogger as TC_Logger<RollWriteT,TC_RollBySize>
participant TC_LoggerRoll as TC_RollBySize<RollWriteT>
loop run
	TC_LoggerThreadGroup ->>+ TC_LoggerThreadGroup : flush()
	TC_LoggerThreadGroup ->>+ RollLogger : logger_set::iterator it = _logger.begin()<br>while (it != logger.end()) {<br>it->get()->flush()<br>}
	RollLogger ->>+ TC_LoggerRoll : TC_CasQueue qt<br>_buffer.swap(qt)<br>roll(qt)
	TC_LoggerRoll ->>+ RollWriteT : lock(*this)<br>_t(_of, buffer)
	RollWriteT ->>+ RollWriteT : 实际写文件
	RollWriteT ->>- TC_LoggerRoll : return
	TC_LoggerRoll ->>- RollLogger : return
	RollLogger ->>- TC_LoggerThreadGroup : return
end
LocalRollLogger ->>+ LocalRollLogger : sync
LocalRollLogger ->>+ RollLogger : _logger.setupThread(&_local)
RollLogger ->>+ TC_LoggerRoll : _roll->setupThread(ltg)
TC_LoggerRoll ->>+ TC_LoggerThreadGroup : _pThreadGroup->registerLogger(self);
TC_LoggerThreadGroup ->>+ TC_LoggerThreadGroup : _logger.insert(l);
TC_LoggerRoll ->>- RollLogger : return
RollLogger ->>- LocalRollLogger : return

2 滚动日志

滚动日志库的使用方式是LOG宏,位于RemoteLogger.h,因此从这里切入

#define LOG (LocalRollLogger::getInstance()->logger())

首先在TC_Logger中

LoggerStream debug() { return stream(DEBUG_LOG_LEVEL); }

生成了一个临时变量LoggerStream,当LoggerStream析构的时候会_stream->flush()

~LoggerStream()
{    
  if (_stream)
  {    
      TC_LockT<TC_ThreadMutex> lock(_mutex);
      _stream->clear();
      (*_stream) << _buffer.str();
      _stream->flush();
  }    
}

flush中会调用调用LoggerBuffer::sync方法(这里见2.2的解释)

int LoggerBuffer::sync()
{
  //有数据
  if (pptr() > pbase())
  {   
      std::streamsize len = pptr() - pbase();

      if (_roll)
      {   
          //具体的写逻辑
          _roll->write(make_pair(TC_Thread::CURRENT_THREADID(), string(pbase(), len)));
      }   

      //重新设置put缓冲区, pptr()重置到pbase()处
      setp(pbase(), epptr());
  }   
  return 0;
}

sync方法调用了TC_LoggerRoll::write

void TC_LoggerRoll::write(const pair<std::size_t, string> &buffer)
{
  size_t ThreadID = 0;
  if (_bDyeingFlag)
  {
      TC_LockT<TC_SpinLock> lock(_mutexDyeing);

      if (_mapThreadID.find(TC_Thread::CURRENT_THREADID()) != _mapThreadID.end())
      {
          ThreadID = TC_Thread::CURRENT_THREADID();
      }
  }

  if (_pThreadGroup)
  {
      _buffer.push_back(make_pair(ThreadID, buffer.second));
  }
  else
  {
      //同步记录日志
      deque<pair<size_t, string> > ds;
      ds.push_back(make_pair(ThreadID, buffer.second));
      roll(ds);
  }
}

这里_buffer是一个无锁队列TC_CasQueue<pair<std::size_t, string> > _buffer

2.1 时序图

sequenceDiagram
participant RollLogger as TC_Logger<RollWriteT,TC_RollBySize>
RollLogger ->>+ LoggerStream : debug()
LoggerStream ->>- RollLogger : return
LoggerStream ->>+ LoggerBuffer : LoggerStream(TC_Logger._stream)<br>operator<<<br>TC_Logger._stream.flush()<br>_buffer.sync()
LoggerBuffer ->>+ TC_RollBySize : _roll->write()
TC_RollBySize ->>+ TC_CasQueue :_buffer.push_back()
TC_CasQueue ->>- TC_RollBySize : return
TC_RollBySize ->>- LoggerBuffer : return
LoggerBuffer ->>- LoggerStream : return

2.2 调用LoggerBuffer的sync的逻辑

#include <iostream>
#include <string>
using namespace std;

class LoggerBuffer : public std::basic_streambuf<char> {
public:
    LoggerBuffer(int num){cout<<"LoggerBuffer init"<<endl;}
    ~LoggerBuffer(){cout<<"LoggerBuffer xigou"<<endl;}
	virtual int sync(){cout<<"sync"<<endl;}
};
class TC_Logger {
public:
    TC_Logger()
    :_buffer(1)
    , _stream(&_buffer) 
    {}
	~TC_Logger(){cout<<"TC_Logger xigou"<<endl;}
    void stream(int level)
    {
        ostream *ost = NULL;
        ost = &_stream;
        //调用flush后会调用LoggerBuffer中的sync方法
		ost->flush();
    }
    LoggerBuffer    _buffer;
    std::ostream    _stream;
};
int main() {    
    TC_Logger logger;
    logger.stream(1);
	cout<<"main fun end"<<endl;
}

3 总结

日志打印时写入一个TC_CasQueue无锁队列,然后由一个线程定时落磁盘

4 参考资料

Tars-C++ 揭秘篇:日志类源码解析

腾讯Tars 日志模块,可单独运行调试

12 Dec 2021