本周为了Taf框架引入了限流器算法,用于Trace上报时进行限流
早在写go的时候就使用过著名的golang.org/x/time/rate限流器,这是一个令牌桶算法,它允许在保证平均rate的情况下,有一些突发流量
还用过uber开源的github.com/uber-go/ratelimit 限流器,这是一个漏桶算法,它能严格的控制每个请求的最小访问间隔,并允许配置一个最大松弛量(maxSlack)用于最大间隔误差
简单介绍下两种算法的大概实现和区别,随后分别深入两种算法的实现
令牌桶算法和漏桶算法主要区别在于突发流量
例如在一个爬虫系统中
对第三方打码系统的接口,由于只在乎以天为级别的预算,有一些突发流量也无所谓,因此使用golang.org/x的rate限流器 
对被爬取平台的接口,由于平台会有秒级的反爬虫封ip策略,不能有任何的突发流量,因此需要使用github.com/uber-go的ratelimit限流器 
 
令牌桶算法 
最简单实现 
看描述就很容易写出来:
一个后台定时线程,定时增加令牌计数,直到令牌上限 
消费接口减少令牌,并返回减少的令牌数 
 
进阶实现 
这个后台线程是可以省略的,只需要每次调用消费接口的时候,根据上次消费时间计算当前应该有多少令牌即可
例如jaegerTracing自带的RateLimiter.h 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 template  <typename  ClockType = std::chrono::steady_clock>class  RateLimiter {  public :     using  Clock = ClockType;     RateLimiter (double  creditsPerSecond, double  maxBalance)         : _creditsPerSecond(creditsPerSecond)         , _maxBalance(maxBalance)         , _balance(_maxBalance)         , _lastTick(Clock::now ())     {     }     bool  checkCredit (double  itemCost)       {        std::lock_guard<std::mutex> lock (_mutex)  ;         const  auto  currentTime = Clock::now ();         const  auto  elapsedTime =             std::chrono::duration <double >(currentTime - _lastTick);         _lastTick = currentTime;         _balance += elapsedTime.count () * _creditsPerSecond;         if  (_balance > _maxBalance) {             _balance = _maxBalance;         }         if  (_balance >= itemCost) {             _balance -= itemCost;             return  true ;         }         return  false ;     }   private :     double  _creditsPerSecond;     double  _maxBalance;     double  _balance;     typename  Clock::time_point _lastTick;     std::mutex _mutex; }; 
这个算法的缺点在于锁,作为一个高qps的限流器算法,有锁版本放在taf的util库下给业务使用一定会被喷的
最终实现 
我的初步想法是使用thread_local变量来将一个令牌桶的消费拆分成多个令牌桶的消费,并且允许偷取别的线程的令牌桶,这个实现显然也太过复杂了
幸好在我付诸行动之前,看到facebook已经有一个比较成熟的实现TokenBucket.h ,通过git记录来看这个祖传算法比我工龄还长
核心代码在TokenBucket.h#L134 
这个线程安全算法的核心在于只有zeroTime_这一个变量,因此只需要CAS来保证这个变量的线程安全即可
剩余令牌数量是通过(now - zeroTime_) / rate来得到的,zeroTime_几乎不具有任何的计时意义(除非令牌消耗完,此时zeroTime_为当前时间)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 template  <typename  Callback>double  consume (     double  rate,     double  burstSize,     double  nowInSeconds,     const  Callback& callback)      assert (rate > 0 );     assert (burstSize > 0 );     double  zeroTimeOld;     double  zeroTimeNew;     double  consumed;     do  {         zeroTimeOld = zeroTime ();         double  tokens = std::min ((nowInSeconds - zeroTimeOld) * rate, burstSize);         consumed = callback (tokens);         double  tokensNew = tokens - consumed;         if  (consumed == 0.0 ) {             return  consumed;         }         zeroTimeNew = nowInSeconds - tokensNew / rate;     } while  (FOLLY_UNLIKELY (         !compare_exchange_weak_relaxed (zeroTime_, zeroTimeOld, zeroTimeNew)));     return  consumed; } 
但是这个算法在我的场景下也是有缺陷的
fix:时间精度问题 
找到这份代码的单元测试TokenBucketTest.cpp 
1 2 3 TokenBucket tokenBucket (rate, burstSize, 0 )  ;改成 TokenBucket tokenBucket (rate, 1 , 0 )  ;
如果希望把令牌桶当做漏桶使用,也就是至少每隔1/rate时间才允许获取到一个令牌,不允许突发流量
就需要把令牌桶的上限改为1,单元测试就不再能够通过了
这是因为当令牌消耗完以后(此时zeroTime_为当前时间),由于时间的精度是秒级的,只有过去整整1秒,nowInSeconds - zeroTimeOld才不为0
当rate大于1时,此时(nowInSeconds - zeroTimeOld) * rate会获取到不止一个token,由于和burstSize取min,因此最多只能拿到1个令牌
超过1的rate没有任何意义,会强制限流到rate = 1
fix:double的溢出问题 
当我把时间精度从秒改成微秒以后,单元测试依然不能通过
这是因为存储的double类型直接溢出了
double类型一共64位,第一位是符号位(S),随后的11位是指数位(E),剩下52位尾数位(F):
[s][eeeeeeeeeee][fffffffffffffffffffffffffffffffffffffffffffffffffff]
一个double类型的数值表示为:\((-1)^{符号位} * (1 + 尾数部分) * 2^{指数部分-1023}\) 
因此一个double类型最多可以精确表达\(2^{53}\) 的整数,比尾数部分的52多1,是尾数部分假定的隐含位
这个值差不多是\(9 * 10^{15}\) ,而一个微秒级的时间戳例如1705509033000000是\(10^{16}\) ,存储到double类型刚好会影响到精度
fix:第一次初始化的溢出问题 
即使改成uint64_t存储也并不是高枕无忧,最多也只能存下大约\(10^{19}\) 而已
当第一次初始化时,zeroTimeOld为0,微秒级时间戳下(now - 0) * rate,rate最多只能到1000左右,再多就会导致本次计算溢出
所以需要单独判断如果zeroTimeOld,那么直接double tokens = burstSize
成品代码 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 class  TokenBucket {public :    TokenBucket (double  tokenAddRatePerSec, uint32_t  bucketMaxSize = 1 )         : tokenAddRatePerUs_ (tokenAddRatePerSec / usPerSec),           bucketMaxSize_ (bucketMaxSize) {}     TokenBucket (const  TokenBucket&) = delete ;          uint32_t  consume (uint32_t  num = 1 )           return  consumeBase (num, [](uint32_t  need, uint32_t  curTokens) {             return  need > curTokens ? 0  : need;         });     }          uint32_t  consumeOrDrain (uint32_t  num = 1 )           return  consumeBase (num, [](uint32_t  need, uint32_t  curTokens) {             return  std::min (need, curTokens);         });     } private :    template  <typename  consumeCb>     uint32_t  consumeBase (uint32_t  num, const  consumeCb &cb)           uint64_t  oldTokenTime, newTokenTime;         do  {             oldTokenTime = tokenTime_.load (std::memory_order_relaxed);             auto  now = getTime ();             if  (now <= oldTokenTime) {                 return  0 ;             }             if  (oldTokenTime == 0 ) {                 newTokenTime = now;                 num = 0 ;             } else  {                 uint32_t  curTokens = std::min (                     uint64_t (tokenAddRatePerUs_ * (now - oldTokenTime)),                     bucketMaxSize_);                 if  (curTokens == 0 ) {                     return  0 ;                 }                 num = cb (num, curTokens);                 auto  newTokens = curTokens - num;                 newTokenTime = now - newTokens / tokenAddRatePerUs_;             }         } while  (!_unlikely_(tokenTime_.compare_exchange_weak (             oldTokenTime, newTokenTime, std::memory_order_relaxed)));         return  num;     }     inline  uint64_t  getTime ()           return  std::chrono::duration_cast <std::chrono::microseconds>(                    std::chrono::high_resolution_clock::now ().time_since_epoch ())             .count ();     }     std::atomic<uint64_t > tokenTime_ = {0 };     double  tokenAddRatePerUs_;     uint64_t  bucketMaxSize_ = 1 ;     static  constexpr  uint64_t  usPerSec = 1000000 ; }; 
相对Folly的功能上还是有些弱,没有借令牌和还令牌的接口,但是对我和业务同学来说足够用了
漏桶算法 
在漏桶算法的实现上踩坑了,一开始实现了一个支持设置bucketMaxSize的漏桶
当漏掉多少值时,才允许消费走多少值,实现完我才发现,如果一段时间没有消费,那么可以突发消费掉最大bucketMaxSize的“令牌”
这和令牌桶完全没有区别!
我查了一下资料,发现早有前人总结了这个问题
漏桶可以当作计量器或者队列
计量器 
Comparison_with_the_token_bucket_algorithm 
As can be seen, these two descriptions are essentially mirror images of one another: one adds something to the bucket on a regular basis and takes something away for conforming packets down to a limit of zero; the other takes away regularly and adds for conforming packets up to a limit of the bucket's capacity. So, is an implementation that adds tokens for a conforming packet and removes them at a fixed rate an implementation of the leaky bucket or of the token bucket? Similarly, which algorithm is used in an implementation that removes water for a conforming packet and adds water at a fixed rate? In fact both are effectively the same, i.e. implementations of both the leaky bucket and token bucket, as these are the same basic algorithm described differently. This explains why, given equivalent parameters, the two algorithms will see exactly the same packets as conforming or nonconforming. The differences in the properties and performance of implementations of the leaky and token bucket algorithms thus result entirely from the differences in the implementations, i.e. they do not stem from differences in the underlying algorithms.
如你所见,这两种描述实质上是彼此的镜像:一个算法定期向桶中添加某物,并为符合要求的数据包减少该物体,直到零为止;另一个算法则定期减少某物,并为符合要求的数据包增加该物体,直到达到桶的容量上限。那么,一个为符合要求的数据包添加令牌并以固定速度移除令牌的实现是漏桶还是令牌桶的实现呢?同样,一个为符合要求的数据包移除水分并以固定速度添加水分的实现使用的是哪种算法呢?实际上,这两者本质上是相同的,即都是漏桶和令牌桶的应用,只是描述的方式不同。这就解释了为什么,在给定等效参数的情况下,这两种算法会准确的将同样的数据包视为符合要求或不符合要求。因此,漏桶和令牌桶算法的各种实现在属性和性能上的差异完全源于实现方式的不同,而不是基础算法的差异。
 
在给定等效参数的情况下,这两种算法会准确的将同样的数据包视为符合要求或不符合要求 
因此当把令牌桶的桶上限设为1时,就能达到漏桶的限流目的:严格的控制每个请求的最小访问间隔
队列 
github.com/uber-go的ratelimit限流器对每次的请求Wait一段时间,在不需要入队列的情况下,也达到了一样的效果
fix:最大超时时间 
ratelimit限流器不允许设置最大超时时间,当大量请求到来时,应该允许设置最大超时时间来提前失败
fix:精确等待 
在等待时间小于1ms时,即使是空闲时,usleep和协程唤醒都会不太精确
在等待10us时,实际等待了100us,然后后续的9次consume都不再需要等待,从而平均等待时间为10us
在设置严格等待后,会保证等待间隔的误差不超过1%
但是这也会导致更多的上下文切换,因此默认关闭
成品代码 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 extern  void  taf_coro_sleep (int  ms) class  LeakBucket  {public :    explicit  LeakBucket (double  leakRatePerSecond)          : leakRatePerSecond_(leakRatePerSecond) { }         void  setWaitStrictMode (bool  isStrict)           waitStrictMode_ = isStrict;     }     bool  consume (int64_t  timeoutMs = -1 )           double  oldTokenTime, newTokenTime, needSleepTime;         do  {             oldTokenTime = tokenTime_.load (std::memory_order_relaxed);             double  now = getTime ();             if  (tokenTime_ == 0 ) {                 newTokenTime = now;                 needSleepTime = 0 ;             } else  {                 newTokenTime = oldTokenTime + usPerSec / leakRatePerSecond_;                 if  (now >= newTokenTime) {                     needSleepTime = 0 ;                 } else  {                     needSleepTime = newTokenTime - now;                     if  (timeoutMs != -1  && needSleepTime > timeoutMs) {                         return  false ;                     }                 }             }         } while  (!_unlikely_(tokenTime_.compare_exchange_weak (             oldTokenTime, newTokenTime, std::memory_order_relaxed)));         if  (needSleepTime == 0 ) {             return  true ;         }         this ->sleep (needSleepTime);         return  true ;     } private :    inline  void  sleep (double  needSleepTime)           if  (!waitStrictMode_ || needSleepTime > 1000 ) {             taf_coro_sleep (needSleepTime / 1000 );             return ;         }         int64_t  startTime = getTime ();         do  {             taf_coro_sleep (0 );         } while  (getTime () - startTime < needSleepTime);     }     inline  uint64_t  getTime ()           return  std::chrono::duration_cast <std::chrono::microseconds>(                    std::chrono::high_resolution_clock::now ().time_since_epoch ())             .count ();     }     static  constexpr  uint64_t  usPerSec = 1000000 ;     std::atomic<double > tokenTime_ = {0 };     double  leakRatePerSecond_ = 0 ;     atomic<bool > waitStrictMode_ = {false }; };