框架的一致性哈希重构

当前的一致性哈希存在四个bug,分别进行分析

以这个版本https://git.huya.com/server_arch/taf/-/blob/924950284557f183bd025ed758dc2e878ae36938/src/libservant/EndpointManager.cpp#L2448为例

我新增了部分日志,总体流程的关键代码在getConHashProxyForNormal

他的输入是hashCode(也就是prx->taf_consistent_hash(hashCode)传入的),输出是本次负载均衡选出的节点指针

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
AdapterProxyPtr EndpointManager::getConHashProxyForNormal(int64_t hashCode)
{
//_vLastConHashProxys是上一次更新一致性哈希时记下的节点
//checkConHashChange中如果当前节点和上一次的有变化,那么返回true
if(checkConHashChange(false, _vLastConHashProxys)) {

//根据当前节点,把数据写入到_consistentHash
updateConHashProxyWeighted(true, _vLastConHashProxys, _consistentHash);
}
LOG_INFO << "[TAF][EndpointManager::getConHashProxyForNormal _sObjName:"
<< _sObjName
<< "|_consistentHash.size():" << _consistentHash.size() << endl;

if(_consistentHash.size() > 0) {
//根据_consistentHash数据一致性哈希选节点
...

//没选出来,返回空
return nullptr;
}
//_consistentHash是空的,降级到普通hash

return getHashProxyForNormal(hashCode);
}

子流程updateConHashProxyWeighted的更新逻辑https://git.huya.com/server_arch/taf/-/blob/924950284557f183bd025ed758dc2e878ae36938/src/libservant/EndpointManager.cpp#L2292如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void EndpointManager::updateConHashProxyWeighted(bool bStatic, vector<AdapterProxyPtr> &vLastConHashProxys, TC_ConsistentHashNew &conHash)
{
//上面传入true,因此进入这个流程
if(bStatic) {
vLastConHashProxys = _vRegProxys; //记录下当前节点到_vLastConHashProxys,下次checkConHashChange中可以比较是否因为变化需要更新
conHash.clear(); //这里需要清空_consistentHash
}

for(size_t i = 0; i < _vRegProxys.size(); ++i)
{
//上面传入true,因此这个后端节点取从主控返回的权重
//不过true,false都无所谓了,这里算了一坨iWeight,然后最后没用上,明显的第一个bug
int iWeight = (bStatic ? (_vRegProxys[i]->getWeight()) : 100);
if(iWeight > 0)
{
LOG_INFO << "[TAF]EndpointManager::updateConHashProxyWeighted bStatic:" << bStatic
<< "|_sObjName:" << _sObjName
<< "|endpoint:" << _vRegProxys[i]->endpoint().desc()
<< "|weight:" << _vRegProxys[i]->getWeight() << endl;
iWeight = iWeight / 4;
if(iWeight <= 0)
{
iWeight = 1;
}

//这个后端节点拿主控返回的权重,加入一致性哈希环
//权重是100,就会为这个后端节点加入100个虚拟节点
conHash.addNode(_vRegProxys[i]->endpoint().desc(), i, _vRegProxys[i]->getWeight());
}
}

//一致性哈希环排序
conHash.sortNode();
}

第一个bug(计算变量没用到)

这一段代码最明显的bug在于iWeight变量根本没用上,计算了半天,最后使用的从主控拉取的权重信息

第二个bug(一致性哈希的生成key不对)

这一段代码不那么容易注意到的bug,在于一致性哈希新增节点,使用的是_vRegProxys[i]->endpoint().desc(),也就是ip+端口的信息

对于不同的servant而言,由于使用了不同的ip端口,因此会把请求导向不同的节点,对于依赖一致性哈希的有状态服务而言,这一步可能会大量穿透到兜底逻辑

第三个bug(最严重,一致性哈希完全没有用,一直在用普通哈希!)

比较隐蔽的bug也是最严重的bug在运行以后才能发现,日志如下:

1
2
3
4
5
6
2025-06-06 00:09:48.842|538363|INFO|/root/taf/src/libservant/EndpointManager.cpp:2206 [TAF]EndpointManager::updateConHashProxyWeighted bStatic:1|_sObjName:HUYASZ.TafStressS
erver.StressObj|endpoint:tcp -h 10.132.153.193 -p 2345 -t 60000|weight:-1
2025-06-06 00:09:48.842|538363|INFO|/root/taf/src/libservant/EndpointManager.cpp:2372 [TAF][EndpointManager::getConHashProxyForNormal update _sObjName:HUYASZ.TafStressServe
r.StressObj|timecost(ms):0
2025-06-06 00:09:48.842|538363|INFO|/root/taf/src/libservant/EndpointManager.cpp:2375 [TAF][EndpointManager::getConHashProxyForNormal _sObjName:HUYASZ.TafStressServer.Stres
sObj|_consistentHash.size():0

由于虎牙这边根本没有使用后端权重,10.132.153.193这个节点的权重是-1,因此哈希环没有任何节点,_consistentHash.size():0

所以直接降级到了普通哈希!

实验

实验验证一下实际影响,以下代码生成1-1000的hashkey请求,记录后端关系,然后下一次请求前删掉一个节点,再比较和上一次请求有多少发生了变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
taf::Int32 StressImp::testStr(const std::string& in, std::string &out, taf::JceCurrentPtr current)
{
// 生成1-1000的hashkey请求,记录前后端关系,然后下一次请求前删掉一个节点,再比较和上一次请求有多少发生了变化
auto prx = Application::getCommunicator()->stringToProxy<Test::StressPrx>(
"HUYASZ.TafStressServer.StressObj");
static map<int, string> _last;
static std::hash<string> hasher;
map<int, string> cur;

(void)prx->getActiveCount();
sleep(1); // 等待节点信息更新
FDLOG("test") << "当前可用节点数量:" << prx->getActiveCount() << endl;

for (int i = 1; i <= 1000; ++i) {
auto prx =
Application::getCommunicator()->stringToProxy<Test::StressPrx>(
"HUYASZ.TafStressServer.StressObj");

// 使用taf_consistent_hash选择节点
// 不直接使用下标作为hash_code,否则会因为一致性哈希的特性选择同一个节点
int hashed_value = static_cast<int>(hasher(to_string(i)));
prx->taf_consistent_hash(hashed_value)->testStr(in, out);

// 获取实际调用的节点信息
cur[i] = prx->taf_invoke_endpoint().getHost();
//FDLOG("test") << "请求" << i << "|hashed_value:" << hashed_value
// << "|映射到节点:" << cur[i] << endl;
}
if (!_last.empty()) {
int changedCount = 0;

for (int i = 1; i <= 1000; ++i) {
if (cur[i] != _last[i]) {
++changedCount;
}
}

double changeRatio = changedCount / 1000.0 * 100;
FDLOG("test") << "节点变化后,映射关系发生变化的请求数量:"
<< changedCount << std::endl;
FDLOG("test") << "变化比例:" << changeRatio << "%" << std::endl;
}
_last = cur;
return 0;
int num = atoi(in.c_str());
if (num < 0) {
num = -num;
for (int i = 0; i < num; i++) {
string out;
prx->taf_hash(-1)->testStr("", out);
}
return num;
}
for (int i = 0; i < num; i++) {
AwaitRun(=) {
string out;
prx->taf_hash(-1)->testStr("", out);
};
}
return num;
}

第一次调用后端6个节点,第二次调用摘除一个节点,输出日志如下

1
2
3
4
2025-06-06 10:55:22.151|当前可用节点数量:6
2025-06-06 11:00:29.785|当前可用节点数量:5
2025-06-06 11:00:30.315|节点变化后,映射关系发生变化的请求数量:831
2025-06-06 11:00:30.315|变化比例:83.1%

与之相对的,在修复后的https://git.huya.com/server_arch/taf/-/commit/337d561745655a92b002cf8a1382912c42b0e4cd,输出日志如下

1
2
3
4
2025-06-09 09:31:29.766|当前可用节点数量:6
2025-06-09 09:35:11.646|当前可用节点数量:5
2025-06-09 09:35:12.287|节点变化后,映射关系发生变化的请求数量:177
2025-06-09 09:35:12.287|变化比例:17.7%

第四个bug(比较严重,迷之降级普通哈希了!)

第四个bug,我最初怎么看都觉得怎么不太对劲,但是由于taf-java也有这个功能,因此应该是腾讯以前的大佬在讨论后特意设计的

通过一系列本地的算法实验,我终于能确定这个算法一定有问题

源码分析

还是修复前三个bug的https://git.huya.com/server_arch/taf/-/commit/337d561745655a92b002cf8a1382912c42b0e4cd这个commit

在总体流程的关键代码,getConHashProxyForNormal

他的输入是hashCode(也就是prx->taf_consistent_hash(hashCode)传入的),输出是本次负载均衡选出的节点指针

上文已经分析过主流程了,现在细化一下根据一致性哈希选节点的流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
AdapterProxyPtr EndpointManager::getConHashProxyForNormal(int64_t hashCode)
{
//_vLastConHashProxys是上一次更新一致性哈希时记下的节点
//checkConHashChange中如果当前节点和上一次的有变化,那么返回true
if(checkConHashChange(false, _vLastConHashProxys)) {

//根据当前节点,把数据写入到_consistentHash
updateConHashProxyWeighted(true, _vLastConHashProxys, _consistentHash);
}
LOG_INFO << "[TAF][EndpointManager::getConHashProxyForNormal _sObjName:"
<< _sObjName
<< "|_consistentHash.size():" << _consistentHash.size() << endl;

if(_consistentHash.size() > 0) {
//根据_consistentHash数据一致性哈希选节点

unsigned int iIndex = 0;

// 通过一致性hash取到对应的节点
_consistentHash.getIndex(hashCode, iIndex);

//被hash到的节点在主控是active的才走在流程
if (_vRegProxys[iIndex]->isActiveInReg() && _vRegProxys[iIndex]->checkActive() == AdapterProxy::ActiveStatus::connected)
{
return _vRegProxys[iIndex];
} else
{
//在active节点中再次hash
vector<AdapterProxyPtr> thisHash = _vActiveProxys;
vector<AdapterProxyPtr> conn;
size_t hash = 0;
AdapterProxyPtr firstConnectingProxy = nullptr;

do
{
hash = ((int64_t)hashCode) % thisHash.size();

AdapterProxy::ActiveStatus status = thisHash[hash]->checkActive();
if (status == AdapterProxy::ActiveStatus::connected)
{
return thisHash[hash];
}
...
}
while(!thisHash.empty());
...

//没选出来,返回空
return nullptr;
}
//_consistentHash是空的,降级到普通hash

return getHashProxyForNormal(hashCode);
}

这里先对全部节点(不管是不是active)进行一致性哈希,然后发现有节点不是active的,或者已经断开连接,就在active节点中降级到了普通哈希(还不是复用代码的,直接从普通哈希那边复制粘贴过来的)

乍一看好像没问题,只会让非active或故障的节点重映射

算法实验

在初步的算法实验,对比完整的一致性哈希,和一致性哈希降级到普通哈希

单节点故障

我模拟了单个节点挂掉的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
TEST_F(KetamaConsistentHashTest, MissTestConsistentHash) {
TC_ConsistentHashNew ketamaHashOld(E_TC_CONHASH_KETAMAHASH);
TC_ConsistentHashNew ketamaHashNew(E_TC_CONHASH_KETAMAHASH);

for (int node = 0; node < 6; ++node) {
ketamaHashOld.addNode("node" + to_string(node), node, 100);
}
ketamaHashOld.sortNode();

for (int node = 0; node < 5; ++node) {
ketamaHashNew.addNode("node" + to_string(node), node, 100);
}
ketamaHashNew.sortNode();

int totalKeys = 100000;
int missCount = 0;

std::hash<string> hasher;

vector<int> oldNodeHits(6, 0);
vector<int> newNodeHits(5, 0);

for (int key = 1; key <= totalKeys; ++key) {
string keyStr = to_string(key);
unsigned int oldIndex, newIndex;

int64_t hashCode = hasher(keyStr);

ketamaHashOld.getIndex(hashCode, oldIndex);
ketamaHashNew.getIndex(hashCode, newIndex);

oldNodeHits[oldIndex]++;
newNodeHits[newIndex]++;

if (oldIndex != newIndex) {
missCount++;
}
}

cout << "[一致性哈希测试(Ketama)]" << endl;
cout << "请求总数: " << totalKeys << endl;
cout << "节点数变化: 6 -> 5" << endl;
cout << "重新映射的请求数: " << missCount << endl;
cout << "重新映射比例: " << (missCount * 100.0 / totalKeys) << "%" << endl;

cout << "\n旧节点负载统计:" << endl;
for (int node = 0; node < 6; ++node) {
cout << "节点" << (node + 1) << ": " << oldNodeHits[node] << "次" << endl;
}
cout << "标准差: " << calculateStdDev(oldNodeHits) << endl;

cout << "\n新节点负载统计:" << endl;
for (int node = 0; node < 5; ++node) {
cout << "节点" << (node + 1) << ": " << newNodeHits[node] << "次" << endl;
}
cout << "标准差: " << calculateStdDev(newNodeHits) << endl;
}
TEST_F(KetamaConsistentHashTest, MissTestConsistentHashWithFallback) {
TC_ConsistentHashNew ketamaHash(E_TC_CONHASH_KETAMAHASH);

for (int node = 0; node < 6; ++node) {
ketamaHash.addNode("node" + to_string(node), node, 100);
}
ketamaHash.sortNode();

int totalKeys = 100000;
int missCount = 0;

std::hash<string> hasher;

vector<int> oldNodeHits(6, 0);
vector<int> newNodeHits(5, 0);

for (int key = 1; key <= totalKeys; ++key) {
string keyStr = to_string(key);
unsigned int oldIndex;

int64_t hashCode = hasher(keyStr);

ketamaHash.getIndex(hashCode, oldIndex);

oldNodeHits[oldIndex]++;

auto newIndex = oldIndex;

if (newIndex == 5) {
newIndex = myHash(hashCode, 5);
}

newNodeHits[newIndex]++;

if (oldIndex != newIndex) {
missCount++;
}
}

cout << "[一致性哈希测试(Ketama) - 降级普通哈希]" << endl;
cout << "请求总数: " << totalKeys << endl;
cout << "节点数变化: 6 -> 5 (命中节点6降级普通哈希)" << endl;
cout << "重新映射的请求数: " << missCount << endl;
cout << "重新映射比例: " << (missCount * 100.0 / totalKeys) << "%" << endl;

cout << "\n旧节点负载统计:" << endl;
for (int node = 0; node < 6; ++node) {
cout << "节点" << (node + 1) << ": " << oldNodeHits[node] << "次" << endl;
}
cout << "标准差: " << calculateStdDev(oldNodeHits) << endl;

cout << "\n降级后的节点负载统计:" << endl;
for (int node = 0; node < 5; ++node) {
cout << "节点" << (node + 1) << ": " << newNodeHits[node] << "次" << endl;
}
cout << "标准差: " << calculateStdDev(newNodeHits) << endl;
}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
[ RUN      ] KetamaConsistentHashTest.MissTestConsistentHash
[一致性哈希测试(Ketama)]
请求总数: 100000
节点数变化: 6 -> 5
重新映射的请求数: 17279
重新映射比例: 17.279%

旧节点负载统计:
节点1: 17561次
节点2: 15998次
节点3: 15372次
节点4: 17015次
节点5: 16775次
节点6: 17279次
标准差: 756.211

新节点负载统计:
节点1: 20513次
节点2: 20100次
节点3: 18794次
节点4: 20455次
节点5: 20138次
标准差: 625.088
[ OK ] KetamaConsistentHashTest.MissTestConsistentHash (29 ms)
[ RUN ] KetamaConsistentHashTest.MissTestConsistentHashWithFallback
[一致性哈希测试(Ketama) - 降级普通哈希]
请求总数: 100000
节点数变化: 6 -> 5 (命中节点6降级普通哈希)
重新映射的请求数: 17279
重新映射比例: 17.279%

旧节点负载统计:
节点1: 17561次
节点2: 15998次
节点3: 15372次
节点4: 17015次
节点5: 16775次
节点6: 17279次
标准差: 756.211

降级后的节点负载统计:
节点1: 20998次
节点2: 19337次
节点3: 18895次
节点4: 20505次
节点5: 20265次
标准差: 772.25
[ OK ] KetamaConsistentHashTest.MissTestConsistentHashWithFallback (20 ms)

可以发现一致性哈希的降级完全没有影响重映射比例,只是负载上,标准差稍微变大了一点点而已

两次单节点故障

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
TEST_F(KetamaConsistentHashTest, MissTestConsistentHashTwice) {
TC_ConsistentHashNew ketamaHash6(E_TC_CONHASH_KETAMAHASH);
TC_ConsistentHashNew ketamaHash5(E_TC_CONHASH_KETAMAHASH);
TC_ConsistentHashNew ketamaHash4(E_TC_CONHASH_KETAMAHASH);

// 初始6个节点
for (int node = 0; node < 6; ++node) {
ketamaHash6.addNode("node" + to_string(node), node, 100);
}
ketamaHash6.sortNode();

// 第一次摘除节点后剩余5个节点(摘除节点5)
for (int node = 0; node < 5; ++node) {
ketamaHash5.addNode("node" + to_string(node), node, 100);
}
ketamaHash5.sortNode();

// 第二次摘除节点后剩余4个节点(再摘除节点4)
for (int node = 0; node < 4; ++node) {
ketamaHash4.addNode("node" + to_string(node), node, 100);
}
ketamaHash4.sortNode();

int totalKeys = 100000;

std::hash<string> hasher;

vector<int> nodeHits6(6, 0); // 初始6节点
vector<int> nodeHits5(5, 0); // 第一次摘除后5节点
vector<int> nodeHits4(4, 0); // 第二次摘除后4节点

int missCountFirst = 0;
int missCountSecond = 0;

for (int key = 1; key <= totalKeys; ++key) {
string keyStr = to_string(key);
unsigned int index6, index5, index4;

int64_t hashCode = hasher(keyStr);

ketamaHash6.getIndex(hashCode, index6);
ketamaHash5.getIndex(hashCode, index5);
ketamaHash4.getIndex(hashCode, index4);

nodeHits6[index6]++;
nodeHits5[index5]++;
nodeHits4[index4]++;

if (index6 != index5) {
missCountFirst++;
}

if (index5 != index4) {
missCountSecond++;
}
}

cout << "[一致性哈希测试(Ketama) - 两次摘除节点]" << endl;
cout << "请求总数: " << totalKeys << endl;

cout << "\n初始节点负载统计:" << endl;
for (int node = 0; node < 6; ++node) {
cout << "节点" << (node + 1) << ": " << nodeHits6[node] << "次" << endl;
}
cout << "标准差: " << calculateStdDev(nodeHits6) << endl;

cout << "\n第一次摘除节点(6->5):" << endl;
cout << "重新映射的请求数: " << missCountFirst << endl;
cout << "重新映射比例: " << (missCountFirst * 100.0 / totalKeys) << "%" << endl;
cout << "节点负载统计:" << endl;
for (int node = 0; node < 5; ++node) {
cout << "节点" << (node + 1) << ": " << nodeHits5[node] << "次" << endl;
}
cout << "标准差: " << calculateStdDev(nodeHits5) << endl;

cout << "\n第二次摘除节点(5->4):" << endl;
cout << "重新映射的请求数: " << missCountSecond << endl;
cout << "重新映射比例: " << (missCountSecond * 100.0 / totalKeys) << "%" << endl;
cout << "节点负载统计:" << endl;
for (int node = 0; node < 4; ++node) {
cout << "节点" << (node + 1) << ": " << nodeHits4[node] << "次" << endl;
}
cout << "标准差: " << calculateStdDev(nodeHits4) << endl;
}

TEST_F(KetamaConsistentHashTest, MissTestConsistentHashWithFallbackTwice) {
TC_ConsistentHashNew ketamaHash(E_TC_CONHASH_KETAMAHASH);

// 初始6个节点
for (int node = 0; node < 6; ++node) {
ketamaHash.addNode("node" + to_string(node), node, 100);
}
ketamaHash.sortNode();

int totalKeys = 100000;

std::hash<string> hasher;

vector<int> nodeHits6(6, 0); // 初始6节点
vector<int> nodeHits5(5, 0); // 第一次摘除后5节点
vector<int> nodeHits4(4, 0); // 第二次摘除后4节点

int missCountFirst = 0;
int missCountSecond = 0;

for (int key = 1; key <= totalKeys; ++key) {
string keyStr = to_string(key);
unsigned int index6;

int64_t hashCode = hasher(keyStr);

ketamaHash.getIndex(hashCode, index6);
nodeHits6[index6]++;

// 第一次摘除节点5
unsigned int index5 = index6;
if (index5 >= 5) {
index5 = myHash(hashCode, 5);
}
nodeHits5[index5]++;
if (index6 != index5) {
missCountFirst++;
}

// 第二次摘除节点4(在第一次摘除基础上)
unsigned int index4 = index6;
if (index4 >= 4) {
index4 = myHash(hashCode, 4);
}
nodeHits4[index4]++;
if (index5 != index4) {
missCountSecond++;
}
}

cout << "[一致性哈希测试(Ketama) - 降级普通哈希 - 两次摘除节点]" << endl;
cout << "请求总数: " << totalKeys << endl;

cout << "\n初始节点负载统计:" << endl;
for (int node = 0; node < 6; ++node) {
cout << "节点" << (node + 1) << ": " << nodeHits6[node] << "次" << endl;
}
cout << "标准差: " << calculateStdDev(nodeHits6) << endl;

cout << "\n第一次摘除节点(6->5):" << endl;
cout << "重新映射的请求数: " << missCountFirst << endl;
cout << "重新映射比例: " << (missCountFirst * 100.0 / totalKeys) << "%" << endl;
cout << "节点负载统计:" << endl;
for (int node = 0; node < 5; ++node) {
cout << "节点" << (node + 1) << ": " << nodeHits5[node] << "次" << endl;
}
cout << "标准差: " << calculateStdDev(nodeHits5) << endl;

cout << "\n第二次摘除节点(5->4):" << endl;
cout << "重新映射的请求数: " << missCountSecond << endl;
cout << "重新映射比例: " << (missCountSecond * 100.0 / totalKeys) << "%" << endl;
cout << "节点负载统计:" << endl;
for (int node = 0; node < 4; ++node) {
cout << "节点" << (node + 1) << ": " << nodeHits4[node] << "次" << endl;
}
cout << "标准差: " << calculateStdDev(nodeHits4) << endl;
}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
[ RUN      ] KetamaConsistentHashTest.MissTestConsistentHashTwice
[一致性哈希测试(Ketama) - 两次摘除节点]
请求总数: 100000

初始节点负载统计:
节点1: 17561次
节点2: 15998次
节点3: 15372次
节点4: 17015次
节点5: 16775次
节点6: 17279次
标准差: 756.211

第一次摘除节点(6->5):
重新映射的请求数: 17279
重新映射比例: 17.279%
节点负载统计:
节点1: 20513次
节点2: 20100次
节点3: 18794次
节点4: 20455次
节点5: 20138次
标准差: 625.088

第二次摘除节点(5->4):
重新映射的请求数: 20138
重新映射比例: 20.138%
节点负载统计:
节点1: 24808次
节点2: 26125次
节点3: 23111次
节点4: 25956次
标准差: 1202.57
[ OK ] KetamaConsistentHashTest.MissTestConsistentHashTwice (39 ms)
[ RUN ] KetamaConsistentHashTest.MissTestConsistentHashWithFallbackTwice
[一致性哈希测试(Ketama) - 降级普通哈希 - 两次摘除节点]
请求总数: 100000

初始节点负载统计:
节点1: 17561次
节点2: 15998次
节点3: 15372次
节点4: 17015次
节点5: 16775次
节点6: 17279次
标准差: 756.211

第一次摘除节点(6->5):
重新映射的请求数: 17279
重新映射比例: 17.279%
节点负载统计:
节点1: 20998次
节点2: 19337次
节点3: 18895次
节点4: 20505次
节点5: 20265次
标准差: 772.25

第二次摘除节点(5->4):
重新映射的请求数: 30583
重新映射比例: 30.583%
节点负载统计:
节点1: 26244次
节点2: 24406次
节点3: 23991次
节点4: 25359次
标准差: 872.831
[ OK ] KetamaConsistentHashTest.MissTestConsistentHashWithFallbackTwice (21 ms)

可以发现,降级到普通哈希的方案,发生了30%的重映射,高于完整一致性哈希方案的20%

理论分析

假设每个物理节点两个虚拟节点,一共存在6个物理节点

初始情况

1
2
3
4
5
6
7
8
9
10
11
12
13
                    [虚拟节点1(物理节点4)]
/ \
[虚拟节点12(物理节点1)] [虚拟节点2(物理节点6)]
| |
[虚拟节点11(物理节点6)] [虚拟节点3(物理节点2)]
| |
[虚拟节点10(物理节点5)] [虚拟节点4(物理节点2)]
| |
[虚拟节点9(物理节点3)] [虚拟节点5(物理节点5)]
| |
[虚拟节点8(物理节点4)] [虚拟节点6(物理节点3)]
\ /
[虚拟节点7(物理节点1)]

物理节点6故障

两种方案都是物理节点6所属的虚拟节点2和虚拟节点11发生了重映射

纯一致性哈希:

1
2
3
4
5
6
7
8
9
10
11
12
13
                    [虚拟节点1(物理节点4)]
/ \
[虚拟节点12(物理节点1)] [虚拟节点2(按顺时针指向下一个物理节点2)]
| |
[虚拟节点11(按顺时针指向下一个物理节点1)] [虚拟节点3(物理节点2)]
| |
[虚拟节点10(物理节点5)] [虚拟节点4(物理节点2)]
| |
[虚拟节点9(物理节点3)] [虚拟节点5(物理节点5)]
| |
[虚拟节点8(物理节点4)] [虚拟节点6(物理节点3)]
\ /
[虚拟节点7(物理节点1)]

一致性哈希降级到普通哈希:

1
2
3
4
5
6
7
8
9
10
11
12
13
                    [虚拟节点1(物理节点4)]
/ \
[虚拟节点12(物理节点1)] [虚拟节点2(降级到普通哈希hashCode%5)]
| |
[虚拟节点11(降级到普通哈希hashCode%5)] [虚拟节点3(物理节点2)]
| |
[虚拟节点10(物理节点5)] [虚拟节点4(物理节点2)]
| |
[虚拟节点9(物理节点3)] [虚拟节点5(物理节点5)]
| |
[虚拟节点8(物理节点4)] [虚拟节点6(物理节点3)]
\ /
[虚拟节点7(物理节点1)]

一致性哈希在某个节点不可用后第一次降级到普通哈希,发生重映射的哈希环区域是一致性哈希决定的,因此两种方案的重映射比例完全一致

物理节点5又故障

纯一致性哈希:

1
2
3
4
5
6
7
8
9
10
11
12
13
                    [虚拟节点1(物理节点4)]
/ \
[虚拟节点12(物理节点1)] [虚拟节点2(按顺时针指向下一个物理节点2)]
| |
[虚拟节点11(按顺时针指向下一个物理节点1)] [虚拟节点3(物理节点2)]
| |
[虚拟节点10(按顺时针指向下一个物理节点1)] [虚拟节点4(物理节点2)]
| |
[虚拟节点9(物理节点3)] [虚拟节点5(按顺时针指向下一个物理节点3)]
| |
[虚拟节点8(物理节点4)] [虚拟节点6(物理节点3)]
\ /
[虚拟节点7(物理节点1)]

一致性哈希降级到普通哈希:

1
2
3
4
5
6
7
8
9
10
11
12
13
                    [虚拟节点1(物理节点4)]
/ \
[虚拟节点12(物理节点1)] [虚拟节点2(降级到普通哈希hashCode%4)]
| |
[虚拟节点11(降级到普通哈希hashCode%4)] [虚拟节点3(物理节点2)]
| |
[虚拟节点10(降级到普通哈希hashCode%4)] [虚拟节点4(物理节点2)]
| |
[虚拟节点9(物理节点3)] [虚拟节点5(降级到普通哈希hashCode%4)]
| |
[虚拟节点8(物理节点4)] [虚拟节点6(物理节点3)]
\ /
[虚拟节点7(物理节点1)]
  • 纯一致性哈希方案,只有虚拟节点10和5发生了重映射

  • 降级普通哈希的方案,虚拟节点10和5也发生了重映射

    但是之前虚拟节点2和11,由于普通哈希的取模基数变化,也有很大几率发生重映射!

总结

  • 一致性哈希的核心思想是将请求和节点映射到同一个哈希环上,节点增删时只有被摘除节点负责的区域(即顺时针到下一个节点之间的区域)会发生重映射,其他区域不受影响,因此重映射比例较低。
  • 普通哈希的映射方式是直接对节点数取模,节点增删时几乎所有请求的映射都会发生变化,重映射比例较高。
  • 如果在一致性哈希方案中引入普通哈希作为降级策略,会导致两种哈希策略混合使用,产生不良后果:
    • 第一次单节点故障时,降级到普通哈希的方案与纯一致性哈希方案表现一致,重映射比例相同。
    • 但当再次发生节点增删时,普通哈希的取模基数发生变化,之前已经降级到普通哈希的区域会再次发生重映射,导致额外的请求重新映射,显著增加重映射比例。

因此,在一致性哈希方案中混入普通哈希作为降级策略,会破坏一致性哈希的稳定性和负载均衡性,导致更多的请求重映射

最终成了不伦不类的负载均衡算法!

实机实验

设计一个实机实验,来验证理论分析的正确性

还是第三个bug中涉及的实验代码,这一次摘除一个节点以后,再摘除一个节点,观察变化比例

旧版本输出如下:

1
2
3
4
5
6
7
2025-06-09 09:31:29.766|当前可用节点数量:6
2025-06-09 09:35:11.646|当前可用节点数量:5
2025-06-09 09:35:12.287|节点变化后,映射关系发生变化的请求数量:177
2025-06-09 09:35:12.287|变化比例:17.7%
2025-06-09 09:36:21.154|当前可用节点数量:4
2025-06-09 09:36:21.763|节点变化后,映射关系发生变化的请求数量:296
2025-06-09 09:36:21.763|变化比例:29.6%

修复版本https://git.huya.com/server_arch/taf/-/blob/faef46e02fe76672167600b6447f12927eef232f/src/libservant/EndpointManager.cpp#L2367输出如下:

1
2
3
4
5
6
7
2025-06-13 00:02:36.661|当前可用节点数量:6
2025-06-13 00:04:28.003|当前可用节点数量:5
2025-06-13 00:04:28.608|节点变化后,映射关系发生变化的请求数量:168
2025-06-13 00:04:28.608|变化比例:16.8%
2025-06-13 00:06:47.904|当前可用节点数量:4
2025-06-13 00:06:48.460|节点变化后,映射关系发生变化的请求数量:211
2025-06-13 00:06:48.460|变化比例:21.1%

修复思路

最直接的思路是一次性扫描所有节点,知道哪些可用,哪些不可用,就可以绕过因为节点不可用而需要降级的问题了

但是扫描节点的可用性,本质上是要向下游服务端建立连接

那么在稀疏请求的情况下,为了一个请求,向全部下游服务端建立连接,这显然不合理

那么尝试在每次选中的节点不可用的时候,再构造一次一致性哈希怎么样呢?

这里有两种思路:

  • 直接重建一个新的

    这个可能计算花费有点大

    1
    2
    3
    4
    5
    6
    7
    do {
    auto node = conhash.getNode();
    if (node->status != OK) {
    actives.erase(node);
    conhash = new ConHash(actives); //时间开销巨大
    }
    }while(!actives.empty())

    并且对于100个节点被删除掉50个节点的case,需要循环里面判断50次节点,这个计算花费就更大了

  • 增量删除不可用节点的虚拟节点,并且在下一次计算的时候还得加回去(因为下一次计算的时候这一次不可用的节点,可能已经可用了)

    当前一致性哈希是基于数组的二分查找实现的,要支持随时可以删除插入和二分查找,只能使用链状结构的红黑树了

    这个性能和复杂程度可能有点高,暂时不对其进行改造

    1
    2
    3
    4
    5
    6
    7
    conhash.recovery(); //删掉的加回去,如果是第49次,要加回去49*100个虚拟节点
    do {
    auto node = conhash.getNode();
    if (node->status != OK) {
    conhash.remove(node); //有问题的删掉,删除100个虚拟节点
    }
    }while(!actives.empty())

    并且对于100个节点被删除掉50个节点的case,加回去的这个计算花费也小不到哪里去

所以只能引入缓存了,单元测试看下有没有缓存差多少性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
TEST_F(KetamaConsistentHashTest, PerformanceTest) {
const int totalLoops = 1000; // 总共循环次数
const int nodeCount = 600; // 每次生成的节点数
const int virtualNodes = 25; // 每个节点的权重

std::srand(std::time(nullptr));

auto start = std::chrono::high_resolution_clock::now();

for (int loop = 0; loop < totalLoops; ++loop) {
TC_ConsistentHashNew ketamaHash(E_TC_CONHASH_KETAMAHASH);

// 每次循环生成新的哈希环
for (int node = 0; node < nodeCount; ++node) {
ketamaHash.addNode("node" + to_string(node), node, virtualNodes);
}
ketamaHash.sortNode();

// 只调用一次getIndex,使用rand()作为hashCode
unsigned int index;
ketamaHash.getIndex(rand(), index);
}

auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end - start;

double qps = totalLoops / elapsed.count();

cout << "[Ketama一致性哈希性能测试]" << endl;
cout << "总循环次数: " << totalLoops << endl;
cout << "每次节点数: " << nodeCount << endl;
cout << "每个节点虚拟节点数: " << virtualNodes << endl;
cout << "总耗时: " << elapsed.count() << " 秒" << endl;
cout << "QPS: " << qps << endl;
}

TEST_F(KetamaConsistentHashTest, PerformanceTestWithCache) {
TC_LRU<map, pair<string, vector<string>>, shared_ptr<TC_ConsistentHashNew>>
lru;
lru.updateLimit(1000);
const int totalLoops = 1000000; // 总共循环次数
const int nodeCount = 600; // 每次生成的节点数
const int virtualNodes = 25; // 每个节点的权重

std::srand(std::time(nullptr));

auto ketamaHash = make_shared<TC_ConsistentHashNew>(E_TC_CONHASH_KETAMAHASH);
vector<string> nodes;
for (int node = 0; node < nodeCount; ++node) {
nodes.push_back("node" + to_string(node));
ketamaHash->addNode("node" + to_string(node), node, virtualNodes);
}
ketamaHash->sortNode();
pair<string, vector<string>> key{"ketama", nodes};
lru.put(key, ketamaHash);

auto start = std::chrono::high_resolution_clock::now();

for (int loop = 0; loop < totalLoops; ++loop) {
auto kv = lru.get(key);
assert(kv.second);
unsigned int index;
ketamaHash->getIndex(rand(), index);
}

auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end - start;

double qps = totalLoops / elapsed.count();

cout << "[Ketama一致性哈希LRU性能测试]" << endl;
cout << "总循环次数: " << totalLoops << endl;
cout << "每次节点数: " << nodeCount << endl;
cout << "每个节点虚拟节点数: " << virtualNodes << endl;
cout << "总耗时: " << elapsed.count() << " 秒" << endl;
cout << "QPS: " << qps << endl;
}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[ RUN      ] KetamaConsistentHashTest.PerformanceTest
[Ketama一致性哈希性能测试]
总循环次数: 1000
每次节点数: 600
每个节点虚拟节点数: 25
总耗时: 13.1539 秒
QPS: 76.0232
[ OK ] KetamaConsistentHashTest.PerformanceTest (13153 ms)
[ RUN ] KetamaConsistentHashTest.PerformanceTestWithCache
[Ketama一致性哈希LRU性能测试]
总循环次数: 1000000
每次节点数: 600
每个节点虚拟节点数: 25
总耗时: 13.3273 秒
QPS: 75033.9
[ OK ] KetamaConsistentHashTest.PerformanceTestWithCache (13341 ms)

单线程7万5qps,看着还行,因此在一致性普通哈希彻底使用lru缓存代替原有的缓存系统,我删除了原有的缓存系统,完全基于LRU来缓存

一个关键的实现细节

一致性哈希修复服务端断开连接后,同时并发到这个节点的请求只有第一个会成功,后续会屏蔽节点的问题

taf框架专门为对端节点关闭连接(不管是正常四次挥手,还是发reset包又或者是read超时)的情况进行了优化

当负载均衡算法在检查这种对端节点的时候,只会有一个请求在重新建连的同时选中这个节点,其他的请求会暂时屏蔽这个节点

对于普通哈希,又或者是轮询的负载均衡算法而言,这会减少大量请求阻塞在建连过程中的情况,选别的节点又不是不能用

但是对于一致性哈希而言,如果这个节点没有黑历史(以前没有连接失败过),并且很快就能建连完毕,那么等一下这个节点也无妨

一致性哈希可以对稳定性和可用性折中,因此我实现了如下的判断

1
2
3
4
5
6
7
8
9
10
11
// blocking状态的可能是正在连接的节点
// 如果这个节点以前没出过问题,可能是稀疏情况下被服务端关闭过链接,那么假定可用
// 因为一致性哈希的稳定性要求比起其他算法更高,允许牺牲一定可用性换取稳定性
if (adapterProxy->trans()->isConnecting() &&
!adapterProxy->isConnExc() &&
!adapterProxy->isConnTimeoutBefore()) {
XFDLOG("taf_conhash")
<< "node is connecting|_sObjName:" << _sObjName
<< "|nodeName:" << nodeName << endl;
return adapterProxy;
}

这里牺牲的可用性,是指在极端的网络隔离的情况下(客户端到对端网络坏了,对端到主控网络通畅)

会在故障发生的15分37秒(默认的linux tcp read超时)左右,fd从connected转为connecting状态时,第一次连接到超时的这1.5秒内,所有请求都卡住到1.5连接超时,再找其他节点

具体体现在被调图表上,就是在故障大突次的15分37秒左右,会多出现一个小小的耗时突次

PS:这里前15分钟的最大耗时3秒,是由于tcp read还未超时,所以会每30秒放一个请求过去试试导致的,这里会一直卡在tcp上直到请求本身的默认3秒超时

15分钟后,虽然也会放一个请求过去试试,但是tcp的connect是1.5秒超时,connect失败以后,就立刻会选择其他节点重试,因此最大耗时下降到1.5秒

总结

在全部bug修复以后

实际压测途中,6个节点摘除1个节点,重映射的比例会从旧版本的83.1%下降到新版本的17.7%