限流器:令牌桶和漏桶

本周为了Taf框架引入了限流器算法,用于Trace上报时进行限流

早在写go的时候就使用过著名的golang.org/x/time/rate限流器,这是一个令牌桶算法,它允许在保证平均rate的情况下,有一些突发流量

还用过uber开源的github.com/uber-go/ratelimit限流器,这是一个漏桶算法,它能严格的控制每个请求的最小访问间隔,并允许配置一个最大松弛量(maxSlack)用于最大间隔误差

简单介绍下两种算法的大概实现和区别,随后分别深入两种算法的实现

  • 令牌桶算法

    由一个令牌桶和生成令牌的间隔时间组成。一开始,令牌桶被填满,然后以固定的速率生成新的令牌,直到桶满为止。当请求进入系统时,需要从桶中删除一个令牌。如果桶是空的(没有令牌可以删除),请求则会被拒绝或等待。

  • 漏桶算法

    模拟了一个漏水的桶。进入系统的数据被放入桶中,然后以固定的速率流出。如果桶已满,新到的数据则被丢弃或等待。由于输出的数据流是恒定的,因此可以用于控制数据的整体速率。

令牌桶算法和漏桶算法主要区别在于突发流量

例如在一个爬虫系统中

  • 对第三方打码系统的接口,由于只在乎以天为级别的预算,有一些突发流量也无所谓,因此使用golang.org/x的rate限流器
  • 对被爬取平台的接口,由于平台会有秒级的反爬虫封ip策略,不能有任何的突发流量,因此需要使用github.com/uber-go的ratelimit限流器

令牌桶算法

最简单实现

看描述就很容易写出来:

  • 一个后台定时线程,定时增加令牌计数,直到令牌上限
  • 消费接口减少令牌,并返回减少的令牌数

进阶实现

这个后台线程是可以省略的,只需要每次调用消费接口的时候,根据上次消费时间计算当前应该有多少令牌即可

例如jaegerTracing自带的RateLimiter.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
template <typename ClockType = std::chrono::steady_clock>
class RateLimiter {
public:
using Clock = ClockType;

RateLimiter(double creditsPerSecond, double maxBalance)
: _creditsPerSecond(creditsPerSecond)
, _maxBalance(maxBalance)
, _balance(_maxBalance)
, _lastTick(Clock::now())
{
}

bool checkCredit(double itemCost)
{
std::lock_guard<std::mutex> lock(_mutex);
const auto currentTime = Clock::now();
const auto elapsedTime =
std::chrono::duration<double>(currentTime - _lastTick);
_lastTick = currentTime;

_balance += elapsedTime.count() * _creditsPerSecond;
if (_balance > _maxBalance) {
_balance = _maxBalance;
}

if (_balance >= itemCost) {
_balance -= itemCost;
return true;
}

return false;
}

private:
double _creditsPerSecond;
double _maxBalance;
double _balance;
typename Clock::time_point _lastTick;
std::mutex _mutex;
};

这个算法的缺点在于锁,作为一个高qps的限流器算法,有锁版本放在taf的util库下给业务使用一定会被喷的

最终实现

我的初步想法是使用thread_local变量来将一个令牌桶的消费拆分成多个令牌桶的消费,并且允许偷取别的线程的令牌桶,这个实现显然也太过复杂了

幸好在我付诸行动之前,看到facebook已经有一个比较成熟的实现TokenBucket.h,通过git记录来看这个祖传算法比我工龄还长

核心代码在TokenBucket.h#L134

这个线程安全算法的核心在于只有zeroTime_这一个变量,因此只需要CAS来保证这个变量的线程安全即可

剩余令牌数量是通过(now - zeroTime_) / rate来得到的,zeroTime_几乎不具有任何的计时意义(除非令牌消耗完,此时zeroTime_为当前时间)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
template <typename Callback>
double consume(
double rate,
double burstSize,
double nowInSeconds,
const Callback& callback) {
assert(rate > 0);
assert(burstSize > 0);

double zeroTimeOld;
double zeroTimeNew;
double consumed;
do {
zeroTimeOld = zeroTime();
double tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize);
consumed = callback(tokens);
double tokensNew = tokens - consumed;
if (consumed == 0.0) {
return consumed;
}

zeroTimeNew = nowInSeconds - tokensNew / rate;
} while (FOLLY_UNLIKELY(
!compare_exchange_weak_relaxed(zeroTime_, zeroTimeOld, zeroTimeNew)));

return consumed;
}

但是这个算法在我的场景下也是有缺陷的

fix:时间精度问题

找到这份代码的单元测试TokenBucketTest.cpp

1
2
3
TokenBucket tokenBucket(rate, burstSize, 0);
改成
TokenBucket tokenBucket(rate, 1, 0);

如果希望把令牌桶当做漏桶使用,也就是至少每隔1/rate时间才允许获取到一个令牌,不允许突发流量

就需要把令牌桶的上限改为1,单元测试就不再能够通过了

这是因为当令牌消耗完以后(此时zeroTime_为当前时间),由于时间的精度是秒级的,只有过去整整1秒,nowInSeconds - zeroTimeOld才不为0

当rate大于1时,此时(nowInSeconds - zeroTimeOld) * rate会获取到不止一个token,由于和burstSize取min,因此最多只能拿到1个令牌

超过1的rate没有任何意义,会强制限流到rate = 1

fix:double的溢出问题

当我把时间精度从秒改成微秒以后,单元测试依然不能通过

这是因为存储的double类型直接溢出了

double类型一共64位,第一位是符号位(S),随后的11位是指数位(E),剩下52位尾数位(F):

[s][eeeeeeeeeee][fffffffffffffffffffffffffffffffffffffffffffffffffff]

一个double类型的数值表示为:\((-1)^{符号位} * (1 + 尾数部分) * 2^{指数部分-1023}\)

因此一个double类型最多可以精确表达\(2^{53}\)的整数,比尾数部分的52多1,是尾数部分假定的隐含位

这个值差不多是\(9 * 10^{15}\),而一个微秒级的时间戳例如1705509033000000\(10^{16}\),存储到double类型刚好会影响到精度

fix:第一次初始化的溢出问题

即使改成uint64_t存储也并不是高枕无忧,最多也只能存下大约\(10^{19}\)而已

当第一次初始化时,zeroTimeOld为0,微秒级时间戳下(now - 0) * rate,rate最多只能到1000左右,再多就会导致本次计算溢出

所以需要单独判断如果zeroTimeOld,那么直接double tokens = burstSize

成品代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// 无锁令牌桶
// 支持最大100w的qps,再大会限在100w上(再大的限制也没有必要使用令牌桶了)
class TokenBucket{
public:
TokenBucket(double tokenAddRatePerSec, uint32_t bucketMaxSize = 1)
: tokenAddRatePerUs_(tokenAddRatePerSec / usPerSec),
bucketMaxSize_(bucketMaxSize) {}

TokenBucket(const TokenBucket&) = delete;

//consume在消费不到指定num时返回0
uint32_t consume(uint32_t num = 1) {
return consumeBase(num, [](uint32_t need, uint32_t curTokens) {
return need > curTokens ? 0 : need;
});
}

//consumeOrDrain会有多少剩余令牌就拿取多少
uint32_t consumeOrDrain(uint32_t num = 1) {
return consumeBase(num, [](uint32_t need, uint32_t curTokens) {
return std::min(need, curTokens);
});
}

private:
template <typename consumeCb>
uint32_t consumeBase(uint32_t num, const consumeCb &cb) {
uint64_t oldTokenTime, newTokenTime;
do {
oldTokenTime = tokenTime_.load(std::memory_order_relaxed);
auto now = getTime();
if (now <= oldTokenTime) {
return 0;
}
if (oldTokenTime == 0) {
newTokenTime = now;
num = 0;
} else {
uint32_t curTokens = std::min(
uint64_t(tokenAddRatePerUs_ * (now - oldTokenTime)),
bucketMaxSize_);
if (curTokens == 0) {
return 0;
}
num = cb(num, curTokens);
auto newTokens = curTokens - num;
newTokenTime = now - newTokens / tokenAddRatePerUs_;
}
} while (!_unlikely_(tokenTime_.compare_exchange_weak(
oldTokenTime, newTokenTime, std::memory_order_relaxed)));
return num;
}
inline uint64_t getTime() {
return std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now().time_since_epoch())
.count();
}
std::atomic<uint64_t> tokenTime_ = {0};
double tokenAddRatePerUs_;
uint64_t bucketMaxSize_ = 1;
static constexpr uint64_t usPerSec = 1000000;
};

相对Folly的功能上还是有些弱,没有借令牌和还令牌的接口,但是对我和业务同学来说足够用了

漏桶算法

在漏桶算法的实现上踩坑了,一开始实现了一个支持设置bucketMaxSize的漏桶

当漏掉多少值时,才允许消费走多少值,实现完我才发现,如果一段时间没有消费,那么可以突发消费掉最大bucketMaxSize的“令牌”

这和令牌桶完全没有区别!

我查了一下资料,发现早有前人总结了这个问题

漏桶可以当作计量器或者队列

计量器

Comparison_with_the_token_bucket_algorithm

As can be seen, these two descriptions are essentially mirror images of one another: one adds something to the bucket on a regular basis and takes something away for conforming packets down to a limit of zero; the other takes away regularly and adds for conforming packets up to a limit of the bucket's capacity. So, is an implementation that adds tokens for a conforming packet and removes them at a fixed rate an implementation of the leaky bucket or of the token bucket? Similarly, which algorithm is used in an implementation that removes water for a conforming packet and adds water at a fixed rate? In fact both are effectively the same, i.e. implementations of both the leaky bucket and token bucket, as these are the same basic algorithm described differently. This explains why, given equivalent parameters, the two algorithms will see exactly the same packets as conforming or nonconforming. The differences in the properties and performance of implementations of the leaky and token bucket algorithms thus result entirely from the differences in the implementations, i.e. they do not stem from differences in the underlying algorithms.

如你所见,这两种描述实质上是彼此的镜像:一个算法定期向桶中添加某物,并为符合要求的数据包减少该物体,直到零为止;另一个算法则定期减少某物,并为符合要求的数据包增加该物体,直到达到桶的容量上限。那么,一个为符合要求的数据包添加令牌并以固定速度移除令牌的实现是漏桶还是令牌桶的实现呢?同样,一个为符合要求的数据包移除水分并以固定速度添加水分的实现使用的是哪种算法呢?实际上,这两者本质上是相同的,即都是漏桶和令牌桶的应用,只是描述的方式不同。这就解释了为什么,在给定等效参数的情况下,这两种算法会准确的将同样的数据包视为符合要求或不符合要求。因此,漏桶和令牌桶算法的各种实现在属性和性能上的差异完全源于实现方式的不同,而不是基础算法的差异。

在给定等效参数的情况下,这两种算法会准确的将同样的数据包视为符合要求或不符合要求

因此当把令牌桶的桶上限设为1时,就能达到漏桶的限流目的:严格的控制每个请求的最小访问间隔

队列

github.com/uber-go的ratelimit限流器对每次的请求Wait一段时间,在不需要入队列的情况下,也达到了一样的效果

fix:最大超时时间

ratelimit限流器不允许设置最大超时时间,当大量请求到来时,应该允许设置最大超时时间来提前失败

fix:精确等待

在等待时间小于1ms时,即使是空闲时,usleep和协程唤醒都会不太精确

在等待10us时,实际等待了100us,然后后续的9次consume都不再需要等待,从而平均等待时间为10us

在设置严格等待后,会保证等待间隔的误差不超过1%

但是这也会导致更多的上下文切换,因此默认关闭

成品代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
extern void taf_coro_sleep(int ms); //根据当前线程有没有协程调度来,来进行协程或线程休眠接口

class LeakBucket {
public:
explicit LeakBucket(double leakRatePerSecond)
: leakRatePerSecond_(leakRatePerSecond) {}

// 设置是否严格等待,默认false
void setWaitStrictMode(bool isStrict) {
waitStrictMode_ = isStrict;
}

bool consume(int64_t timeoutMs = -1) {
double oldTokenTime, newTokenTime, needSleepTime;
do {
oldTokenTime = tokenTime_.load(std::memory_order_relaxed);
double now = getTime();
if (tokenTime_ == 0) {
newTokenTime = now;
needSleepTime = 0;
} else {
newTokenTime = oldTokenTime + usPerSec / leakRatePerSecond_;
if (now >= newTokenTime) {
needSleepTime = 0;
} else {
needSleepTime = newTokenTime - now;
if (timeoutMs != -1 && needSleepTime > timeoutMs) {
return false;
}
}
}
} while (!_unlikely_(tokenTime_.compare_exchange_weak(
oldTokenTime, newTokenTime, std::memory_order_relaxed)));

if (needSleepTime == 0) {
return true;
}
this->sleep(needSleepTime);
return true;
}

private:
inline void sleep(double needSleepTime) {
if (!waitStrictMode_ || needSleepTime > 1000) {
taf_coro_sleep(needSleepTime / 1000);
return;
}
int64_t startTime = getTime();
do {
taf_coro_sleep(0);
} while (getTime() - startTime < needSleepTime);
}
inline uint64_t getTime() {
return std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now().time_since_epoch())
.count();
}
static constexpr uint64_t usPerSec = 1000000;
std::atomic<double> tokenTime_ = {0};
double leakRatePerSecond_ = 0;
atomic<bool> waitStrictMode_ = {false};
};