当前的一致性哈希存在四个bug,分别进行分析
以这个版本https://git.huya.com/server_arch/taf/-/blob/924950284557f183bd025ed758dc2e878ae36938/src/libservant/EndpointManager.cpp#L2448为例
我新增了部分日志,总体流程的关键代码在getConHashProxyForNormal
他的输入是hashCode(也就是prx->taf_consistent_hash(hashCode)
传入的),输出是本次负载均衡选出的节点指针
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| AdapterProxyPtr EndpointManager::getConHashProxyForNormal(int64_t hashCode) { if(checkConHashChange(false, _vLastConHashProxys)) {
updateConHashProxyWeighted(true, _vLastConHashProxys, _consistentHash); } LOG_INFO << "[TAF][EndpointManager::getConHashProxyForNormal _sObjName:" << _sObjName << "|_consistentHash.size():" << _consistentHash.size() << endl;
if(_consistentHash.size() > 0) { ...
return nullptr; }
return getHashProxyForNormal(hashCode); }
|
子流程updateConHashProxyWeighted的更新逻辑https://git.huya.com/server_arch/taf/-/blob/924950284557f183bd025ed758dc2e878ae36938/src/libservant/EndpointManager.cpp#L2292如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| void EndpointManager::updateConHashProxyWeighted(bool bStatic, vector<AdapterProxyPtr> &vLastConHashProxys, TC_ConsistentHashNew &conHash) { if(bStatic) { vLastConHashProxys = _vRegProxys; conHash.clear(); }
for(size_t i = 0; i < _vRegProxys.size(); ++i) { int iWeight = (bStatic ? (_vRegProxys[i]->getWeight()) : 100); if(iWeight > 0) { LOG_INFO << "[TAF]EndpointManager::updateConHashProxyWeighted bStatic:" << bStatic << "|_sObjName:" << _sObjName << "|endpoint:" << _vRegProxys[i]->endpoint().desc() << "|weight:" << _vRegProxys[i]->getWeight() << endl; iWeight = iWeight / 4; if(iWeight <= 0) { iWeight = 1; } conHash.addNode(_vRegProxys[i]->endpoint().desc(), i, _vRegProxys[i]->getWeight()); } }
conHash.sortNode(); }
|
第一个bug(计算变量没用到)
这一段代码最明显的bug在于iWeight变量根本没用上,计算了半天,最后使用的从主控拉取的权重信息
第二个bug(一致性哈希的生成key不对)
这一段代码不那么容易注意到的bug,在于一致性哈希新增节点,使用的是_vRegProxys[i]->endpoint().desc(),也就是ip+端口的信息
对于不同的servant而言,由于使用了不同的ip端口,因此会把请求导向不同的节点,对于依赖一致性哈希的有状态服务而言,这一步可能会大量穿透到兜底逻辑
第三个bug(最严重,一致性哈希完全没有用,一直在用普通哈希!)
比较隐蔽的bug也是最严重的bug在运行以后才能发现,日志如下:
1 2 3 4 5 6
| 2025-06-06 00:09:48.842|538363|INFO|/root/taf/src/libservant/EndpointManager.cpp:2206 [TAF]EndpointManager::updateConHashProxyWeighted bStatic:1|_sObjName:HUYASZ.TafStressS erver.StressObj|endpoint:tcp -h 10.132.153.193 -p 2345 -t 60000|weight:-1 2025-06-06 00:09:48.842|538363|INFO|/root/taf/src/libservant/EndpointManager.cpp:2372 [TAF][EndpointManager::getConHashProxyForNormal update _sObjName:HUYASZ.TafStressServe r.StressObj|timecost(ms):0 2025-06-06 00:09:48.842|538363|INFO|/root/taf/src/libservant/EndpointManager.cpp:2375 [TAF][EndpointManager::getConHashProxyForNormal _sObjName:HUYASZ.TafStressServer.Stres sObj|_consistentHash.size():0
|
由于虎牙这边根本没有使用后端权重,10.132.153.193这个节点的权重是-1,因此哈希环没有任何节点,_consistentHash.size():0
所以直接降级到了普通哈希!
实验
实验验证一下实际影响,以下代码生成1-1000的hashkey请求,记录后端关系,然后下一次请求前删掉一个节点,再比较和上一次请求有多少发生了变化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| taf::Int32 StressImp::testStr(const std::string& in, std::string &out, taf::JceCurrentPtr current) { auto prx = Application::getCommunicator()->stringToProxy<Test::StressPrx>( "HUYASZ.TafStressServer.StressObj"); static map<int, string> _last; static std::hash<string> hasher; map<int, string> cur;
(void)prx->getActiveCount(); sleep(1); FDLOG("test") << "当前可用节点数量:" << prx->getActiveCount() << endl;
for (int i = 1; i <= 1000; ++i) { auto prx = Application::getCommunicator()->stringToProxy<Test::StressPrx>( "HUYASZ.TafStressServer.StressObj");
int hashed_value = static_cast<int>(hasher(to_string(i))); prx->taf_consistent_hash(hashed_value)->testStr(in, out);
cur[i] = prx->taf_invoke_endpoint().getHost(); } if (!_last.empty()) { int changedCount = 0;
for (int i = 1; i <= 1000; ++i) { if (cur[i] != _last[i]) { ++changedCount; } }
double changeRatio = changedCount / 1000.0 * 100; FDLOG("test") << "节点变化后,映射关系发生变化的请求数量:" << changedCount << std::endl; FDLOG("test") << "变化比例:" << changeRatio << "%" << std::endl; } _last = cur; return 0; int num = atoi(in.c_str()); if (num < 0) { num = -num; for (int i = 0; i < num; i++) { string out; prx->taf_hash(-1)->testStr("", out); } return num; } for (int i = 0; i < num; i++) { AwaitRun(=) { string out; prx->taf_hash(-1)->testStr("", out); }; } return num; }
|
第一次调用后端6个节点,第二次调用摘除一个节点,输出日志如下
1 2 3 4
| 2025-06-06 10:55:22.151|当前可用节点数量:6 2025-06-06 11:00:29.785|当前可用节点数量:5 2025-06-06 11:00:30.315|节点变化后,映射关系发生变化的请求数量:831 2025-06-06 11:00:30.315|变化比例:83.1%
|
与之相对的,在修复后的https://git.huya.com/server_arch/taf/-/commit/337d561745655a92b002cf8a1382912c42b0e4cd,输出日志如下
1 2 3 4
| 2025-06-09 09:31:29.766|当前可用节点数量:6 2025-06-09 09:35:11.646|当前可用节点数量:5 2025-06-09 09:35:12.287|节点变化后,映射关系发生变化的请求数量:177 2025-06-09 09:35:12.287|变化比例:17.7%
|
第四个bug(比较严重,迷之降级普通哈希了!)
第四个bug,我最初怎么看都觉得怎么不太对劲,但是由于taf-java也有这个功能,因此应该是腾讯以前的大佬在讨论后特意设计的
通过一系列本地的算法实验,我终于能确定这个算法一定有问题
源码分析
还是修复前三个bug的https://git.huya.com/server_arch/taf/-/commit/337d561745655a92b002cf8a1382912c42b0e4cd这个commit
在总体流程的关键代码,getConHashProxyForNormal
他的输入是hashCode(也就是prx->taf_consistent_hash(hashCode)
传入的),输出是本次负载均衡选出的节点指针
上文已经分析过主流程了,现在细化一下根据一致性哈希选节点的流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| AdapterProxyPtr EndpointManager::getConHashProxyForNormal(int64_t hashCode) { if(checkConHashChange(false, _vLastConHashProxys)) {
updateConHashProxyWeighted(true, _vLastConHashProxys, _consistentHash); } LOG_INFO << "[TAF][EndpointManager::getConHashProxyForNormal _sObjName:" << _sObjName << "|_consistentHash.size():" << _consistentHash.size() << endl;
if(_consistentHash.size() > 0) { unsigned int iIndex = 0;
_consistentHash.getIndex(hashCode, iIndex);
if (_vRegProxys[iIndex]->isActiveInReg() && _vRegProxys[iIndex]->checkActive() == AdapterProxy::ActiveStatus::connected) { return _vRegProxys[iIndex]; } else { vector<AdapterProxyPtr> thisHash = _vActiveProxys; vector<AdapterProxyPtr> conn; size_t hash = 0; AdapterProxyPtr firstConnectingProxy = nullptr;
do { hash = ((int64_t)hashCode) % thisHash.size(); AdapterProxy::ActiveStatus status = thisHash[hash]->checkActive(); if (status == AdapterProxy::ActiveStatus::connected) { return thisHash[hash]; } ... } while(!thisHash.empty()); ...
return nullptr; }
return getHashProxyForNormal(hashCode); }
|
这里先对全部节点(不管是不是active)进行一致性哈希,然后发现有节点不是active的,或者已经断开连接,就在active节点中降级到了普通哈希(还不是复用代码的,直接从普通哈希那边复制粘贴过来的)
乍一看好像没问题,只会让非active或故障的节点重映射
算法实验
在初步的算法实验,对比完整的一致性哈希,和一致性哈希降级到普通哈希
单节点故障
我模拟了单个节点挂掉的情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
| TEST_F(KetamaConsistentHashTest, MissTestConsistentHash) { TC_ConsistentHashNew ketamaHashOld(E_TC_CONHASH_KETAMAHASH); TC_ConsistentHashNew ketamaHashNew(E_TC_CONHASH_KETAMAHASH);
for (int node = 0; node < 6; ++node) { ketamaHashOld.addNode("node" + to_string(node), node, 100); } ketamaHashOld.sortNode();
for (int node = 0; node < 5; ++node) { ketamaHashNew.addNode("node" + to_string(node), node, 100); } ketamaHashNew.sortNode();
int totalKeys = 100000; int missCount = 0;
std::hash<string> hasher;
vector<int> oldNodeHits(6, 0); vector<int> newNodeHits(5, 0);
for (int key = 1; key <= totalKeys; ++key) { string keyStr = to_string(key); unsigned int oldIndex, newIndex;
int64_t hashCode = hasher(keyStr);
ketamaHashOld.getIndex(hashCode, oldIndex); ketamaHashNew.getIndex(hashCode, newIndex);
oldNodeHits[oldIndex]++; newNodeHits[newIndex]++;
if (oldIndex != newIndex) { missCount++; } }
cout << "[一致性哈希测试(Ketama)]" << endl; cout << "请求总数: " << totalKeys << endl; cout << "节点数变化: 6 -> 5" << endl; cout << "重新映射的请求数: " << missCount << endl; cout << "重新映射比例: " << (missCount * 100.0 / totalKeys) << "%" << endl;
cout << "\n旧节点负载统计:" << endl; for (int node = 0; node < 6; ++node) { cout << "节点" << (node + 1) << ": " << oldNodeHits[node] << "次" << endl; } cout << "标准差: " << calculateStdDev(oldNodeHits) << endl;
cout << "\n新节点负载统计:" << endl; for (int node = 0; node < 5; ++node) { cout << "节点" << (node + 1) << ": " << newNodeHits[node] << "次" << endl; } cout << "标准差: " << calculateStdDev(newNodeHits) << endl; } TEST_F(KetamaConsistentHashTest, MissTestConsistentHashWithFallback) { TC_ConsistentHashNew ketamaHash(E_TC_CONHASH_KETAMAHASH);
for (int node = 0; node < 6; ++node) { ketamaHash.addNode("node" + to_string(node), node, 100); } ketamaHash.sortNode();
int totalKeys = 100000; int missCount = 0;
std::hash<string> hasher;
vector<int> oldNodeHits(6, 0); vector<int> newNodeHits(5, 0);
for (int key = 1; key <= totalKeys; ++key) { string keyStr = to_string(key); unsigned int oldIndex;
int64_t hashCode = hasher(keyStr);
ketamaHash.getIndex(hashCode, oldIndex);
oldNodeHits[oldIndex]++; auto newIndex = oldIndex;
if (newIndex == 5) { newIndex = myHash(hashCode, 5); }
newNodeHits[newIndex]++;
if (oldIndex != newIndex) { missCount++; } }
cout << "[一致性哈希测试(Ketama) - 降级普通哈希]" << endl; cout << "请求总数: " << totalKeys << endl; cout << "节点数变化: 6 -> 5 (命中节点6降级普通哈希)" << endl; cout << "重新映射的请求数: " << missCount << endl; cout << "重新映射比例: " << (missCount * 100.0 / totalKeys) << "%" << endl;
cout << "\n旧节点负载统计:" << endl; for (int node = 0; node < 6; ++node) { cout << "节点" << (node + 1) << ": " << oldNodeHits[node] << "次" << endl; } cout << "标准差: " << calculateStdDev(oldNodeHits) << endl;
cout << "\n降级后的节点负载统计:" << endl; for (int node = 0; node < 5; ++node) { cout << "节点" << (node + 1) << ": " << newNodeHits[node] << "次" << endl; } cout << "标准差: " << calculateStdDev(newNodeHits) << endl; }
|
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| [ RUN ] KetamaConsistentHashTest.MissTestConsistentHash [一致性哈希测试(Ketama)] 请求总数: 100000 节点数变化: 6 -> 5 重新映射的请求数: 17279 重新映射比例: 17.279%
旧节点负载统计: 节点1: 17561次 节点2: 15998次 节点3: 15372次 节点4: 17015次 节点5: 16775次 节点6: 17279次 标准差: 756.211
新节点负载统计: 节点1: 20513次 节点2: 20100次 节点3: 18794次 节点4: 20455次 节点5: 20138次 标准差: 625.088 [ OK ] KetamaConsistentHashTest.MissTestConsistentHash (29 ms) [ RUN ] KetamaConsistentHashTest.MissTestConsistentHashWithFallback [一致性哈希测试(Ketama) - 降级普通哈希] 请求总数: 100000 节点数变化: 6 -> 5 (命中节点6降级普通哈希) 重新映射的请求数: 17279 重新映射比例: 17.279%
旧节点负载统计: 节点1: 17561次 节点2: 15998次 节点3: 15372次 节点4: 17015次 节点5: 16775次 节点6: 17279次 标准差: 756.211
降级后的节点负载统计: 节点1: 20998次 节点2: 19337次 节点3: 18895次 节点4: 20505次 节点5: 20265次 标准差: 772.25 [ OK ] KetamaConsistentHashTest.MissTestConsistentHashWithFallback (20 ms)
|
可以发现一致性哈希的降级完全没有影响重映射比例,只是负载上,标准差稍微变大了一点点而已
两次单节点故障
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
| TEST_F(KetamaConsistentHashTest, MissTestConsistentHashTwice) { TC_ConsistentHashNew ketamaHash6(E_TC_CONHASH_KETAMAHASH); TC_ConsistentHashNew ketamaHash5(E_TC_CONHASH_KETAMAHASH); TC_ConsistentHashNew ketamaHash4(E_TC_CONHASH_KETAMAHASH);
for (int node = 0; node < 6; ++node) { ketamaHash6.addNode("node" + to_string(node), node, 100); } ketamaHash6.sortNode();
for (int node = 0; node < 5; ++node) { ketamaHash5.addNode("node" + to_string(node), node, 100); } ketamaHash5.sortNode();
for (int node = 0; node < 4; ++node) { ketamaHash4.addNode("node" + to_string(node), node, 100); } ketamaHash4.sortNode();
int totalKeys = 100000;
std::hash<string> hasher;
vector<int> nodeHits6(6, 0); vector<int> nodeHits5(5, 0); vector<int> nodeHits4(4, 0);
int missCountFirst = 0; int missCountSecond = 0;
for (int key = 1; key <= totalKeys; ++key) { string keyStr = to_string(key); unsigned int index6, index5, index4;
int64_t hashCode = hasher(keyStr);
ketamaHash6.getIndex(hashCode, index6); ketamaHash5.getIndex(hashCode, index5); ketamaHash4.getIndex(hashCode, index4);
nodeHits6[index6]++; nodeHits5[index5]++; nodeHits4[index4]++;
if (index6 != index5) { missCountFirst++; }
if (index5 != index4) { missCountSecond++; } }
cout << "[一致性哈希测试(Ketama) - 两次摘除节点]" << endl; cout << "请求总数: " << totalKeys << endl;
cout << "\n初始节点负载统计:" << endl; for (int node = 0; node < 6; ++node) { cout << "节点" << (node + 1) << ": " << nodeHits6[node] << "次" << endl; } cout << "标准差: " << calculateStdDev(nodeHits6) << endl;
cout << "\n第一次摘除节点(6->5):" << endl; cout << "重新映射的请求数: " << missCountFirst << endl; cout << "重新映射比例: " << (missCountFirst * 100.0 / totalKeys) << "%" << endl; cout << "节点负载统计:" << endl; for (int node = 0; node < 5; ++node) { cout << "节点" << (node + 1) << ": " << nodeHits5[node] << "次" << endl; } cout << "标准差: " << calculateStdDev(nodeHits5) << endl;
cout << "\n第二次摘除节点(5->4):" << endl; cout << "重新映射的请求数: " << missCountSecond << endl; cout << "重新映射比例: " << (missCountSecond * 100.0 / totalKeys) << "%" << endl; cout << "节点负载统计:" << endl; for (int node = 0; node < 4; ++node) { cout << "节点" << (node + 1) << ": " << nodeHits4[node] << "次" << endl; } cout << "标准差: " << calculateStdDev(nodeHits4) << endl; }
TEST_F(KetamaConsistentHashTest, MissTestConsistentHashWithFallbackTwice) { TC_ConsistentHashNew ketamaHash(E_TC_CONHASH_KETAMAHASH);
for (int node = 0; node < 6; ++node) { ketamaHash.addNode("node" + to_string(node), node, 100); } ketamaHash.sortNode();
int totalKeys = 100000;
std::hash<string> hasher;
vector<int> nodeHits6(6, 0); vector<int> nodeHits5(5, 0); vector<int> nodeHits4(4, 0);
int missCountFirst = 0; int missCountSecond = 0;
for (int key = 1; key <= totalKeys; ++key) { string keyStr = to_string(key); unsigned int index6;
int64_t hashCode = hasher(keyStr);
ketamaHash.getIndex(hashCode, index6); nodeHits6[index6]++;
unsigned int index5 = index6; if (index5 >= 5) { index5 = myHash(hashCode, 5); } nodeHits5[index5]++; if (index6 != index5) { missCountFirst++; }
unsigned int index4 = index6; if (index4 >= 4) { index4 = myHash(hashCode, 4); } nodeHits4[index4]++; if (index5 != index4) { missCountSecond++; } }
cout << "[一致性哈希测试(Ketama) - 降级普通哈希 - 两次摘除节点]" << endl; cout << "请求总数: " << totalKeys << endl;
cout << "\n初始节点负载统计:" << endl; for (int node = 0; node < 6; ++node) { cout << "节点" << (node + 1) << ": " << nodeHits6[node] << "次" << endl; } cout << "标准差: " << calculateStdDev(nodeHits6) << endl;
cout << "\n第一次摘除节点(6->5):" << endl; cout << "重新映射的请求数: " << missCountFirst << endl; cout << "重新映射比例: " << (missCountFirst * 100.0 / totalKeys) << "%" << endl; cout << "节点负载统计:" << endl; for (int node = 0; node < 5; ++node) { cout << "节点" << (node + 1) << ": " << nodeHits5[node] << "次" << endl; } cout << "标准差: " << calculateStdDev(nodeHits5) << endl;
cout << "\n第二次摘除节点(5->4):" << endl; cout << "重新映射的请求数: " << missCountSecond << endl; cout << "重新映射比例: " << (missCountSecond * 100.0 / totalKeys) << "%" << endl; cout << "节点负载统计:" << endl; for (int node = 0; node < 4; ++node) { cout << "节点" << (node + 1) << ": " << nodeHits4[node] << "次" << endl; } cout << "标准差: " << calculateStdDev(nodeHits4) << endl; }
|
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| [ RUN ] KetamaConsistentHashTest.MissTestConsistentHashTwice [一致性哈希测试(Ketama) - 两次摘除节点] 请求总数: 100000
初始节点负载统计: 节点1: 17561次 节点2: 15998次 节点3: 15372次 节点4: 17015次 节点5: 16775次 节点6: 17279次 标准差: 756.211
第一次摘除节点(6->5): 重新映射的请求数: 17279 重新映射比例: 17.279% 节点负载统计: 节点1: 20513次 节点2: 20100次 节点3: 18794次 节点4: 20455次 节点5: 20138次 标准差: 625.088
第二次摘除节点(5->4): 重新映射的请求数: 20138 重新映射比例: 20.138% 节点负载统计: 节点1: 24808次 节点2: 26125次 节点3: 23111次 节点4: 25956次 标准差: 1202.57 [ OK ] KetamaConsistentHashTest.MissTestConsistentHashTwice (39 ms) [ RUN ] KetamaConsistentHashTest.MissTestConsistentHashWithFallbackTwice [一致性哈希测试(Ketama) - 降级普通哈希 - 两次摘除节点] 请求总数: 100000
初始节点负载统计: 节点1: 17561次 节点2: 15998次 节点3: 15372次 节点4: 17015次 节点5: 16775次 节点6: 17279次 标准差: 756.211
第一次摘除节点(6->5): 重新映射的请求数: 17279 重新映射比例: 17.279% 节点负载统计: 节点1: 20998次 节点2: 19337次 节点3: 18895次 节点4: 20505次 节点5: 20265次 标准差: 772.25
第二次摘除节点(5->4): 重新映射的请求数: 30583 重新映射比例: 30.583% 节点负载统计: 节点1: 26244次 节点2: 24406次 节点3: 23991次 节点4: 25359次 标准差: 872.831 [ OK ] KetamaConsistentHashTest.MissTestConsistentHashWithFallbackTwice (21 ms)
|
可以发现,降级到普通哈希的方案,发生了30%的重映射,高于完整一致性哈希方案的20%
理论分析
假设每个物理节点两个虚拟节点,一共存在6个物理节点
初始情况
1 2 3 4 5 6 7 8 9 10 11 12 13
| [虚拟节点1(物理节点4)] / \ [虚拟节点12(物理节点1)] [虚拟节点2(物理节点6)] | | [虚拟节点11(物理节点6)] [虚拟节点3(物理节点2)] | | [虚拟节点10(物理节点5)] [虚拟节点4(物理节点2)] | | [虚拟节点9(物理节点3)] [虚拟节点5(物理节点5)] | | [虚拟节点8(物理节点4)] [虚拟节点6(物理节点3)] \ / [虚拟节点7(物理节点1)]
|
物理节点6故障
两种方案都是物理节点6所属的虚拟节点2和虚拟节点11发生了重映射
纯一致性哈希:
1 2 3 4 5 6 7 8 9 10 11 12 13
| [虚拟节点1(物理节点4)] / \ [虚拟节点12(物理节点1)] [虚拟节点2(按顺时针指向下一个物理节点2)] | | [虚拟节点11(按顺时针指向下一个物理节点1)] [虚拟节点3(物理节点2)] | | [虚拟节点10(物理节点5)] [虚拟节点4(物理节点2)] | | [虚拟节点9(物理节点3)] [虚拟节点5(物理节点5)] | | [虚拟节点8(物理节点4)] [虚拟节点6(物理节点3)] \ / [虚拟节点7(物理节点1)]
|
一致性哈希降级到普通哈希:
1 2 3 4 5 6 7 8 9 10 11 12 13
| [虚拟节点1(物理节点4)] / \ [虚拟节点12(物理节点1)] [虚拟节点2(降级到普通哈希hashCode%5)] | | [虚拟节点11(降级到普通哈希hashCode%5)] [虚拟节点3(物理节点2)] | | [虚拟节点10(物理节点5)] [虚拟节点4(物理节点2)] | | [虚拟节点9(物理节点3)] [虚拟节点5(物理节点5)] | | [虚拟节点8(物理节点4)] [虚拟节点6(物理节点3)] \ / [虚拟节点7(物理节点1)]
|
一致性哈希在某个节点不可用后第一次降级到普通哈希,发生重映射的哈希环区域是一致性哈希决定的,因此两种方案的重映射比例完全一致
物理节点5又故障
纯一致性哈希:
1 2 3 4 5 6 7 8 9 10 11 12 13
| [虚拟节点1(物理节点4)] / \ [虚拟节点12(物理节点1)] [虚拟节点2(按顺时针指向下一个物理节点2)] | | [虚拟节点11(按顺时针指向下一个物理节点1)] [虚拟节点3(物理节点2)] | | [虚拟节点10(按顺时针指向下一个物理节点1)] [虚拟节点4(物理节点2)] | | [虚拟节点9(物理节点3)] [虚拟节点5(按顺时针指向下一个物理节点3)] | | [虚拟节点8(物理节点4)] [虚拟节点6(物理节点3)] \ / [虚拟节点7(物理节点1)]
|
一致性哈希降级到普通哈希:
1 2 3 4 5 6 7 8 9 10 11 12 13
| [虚拟节点1(物理节点4)] / \ [虚拟节点12(物理节点1)] [虚拟节点2(降级到普通哈希hashCode%4)] | | [虚拟节点11(降级到普通哈希hashCode%4)] [虚拟节点3(物理节点2)] | | [虚拟节点10(降级到普通哈希hashCode%4)] [虚拟节点4(物理节点2)] | | [虚拟节点9(物理节点3)] [虚拟节点5(降级到普通哈希hashCode%4)] | | [虚拟节点8(物理节点4)] [虚拟节点6(物理节点3)] \ / [虚拟节点7(物理节点1)]
|
纯一致性哈希方案,只有虚拟节点10和5发生了重映射
降级普通哈希的方案,虚拟节点10和5也发生了重映射
但是之前虚拟节点2和11,由于普通哈希的取模基数变化,也有很大几率发生重映射!
总结
- 一致性哈希的核心思想是将请求和节点映射到同一个哈希环上,节点增删时只有被摘除节点负责的区域(即顺时针到下一个节点之间的区域)会发生重映射,其他区域不受影响,因此重映射比例较低。
- 普通哈希的映射方式是直接对节点数取模,节点增删时几乎所有请求的映射都会发生变化,重映射比例较高。
- 如果在一致性哈希方案中引入普通哈希作为降级策略,会导致两种哈希策略混合使用,产生不良后果:
- 第一次单节点故障时,降级到普通哈希的方案与纯一致性哈希方案表现一致,重映射比例相同。
- 但当再次发生节点增删时,普通哈希的取模基数发生变化,之前已经降级到普通哈希的区域会再次发生重映射,导致额外的请求重新映射,显著增加重映射比例。
因此,在一致性哈希方案中混入普通哈希作为降级策略,会破坏一致性哈希的稳定性和负载均衡性,导致更多的请求重映射
最终成了不伦不类的负载均衡算法!
实机实验
设计一个实机实验,来验证理论分析的正确性
还是第三个bug中涉及的实验代码,这一次摘除一个节点以后,再摘除一个节点,观察变化比例
旧版本输出如下:
1 2 3 4 5 6 7
| 2025-06-09 09:31:29.766|当前可用节点数量:6 2025-06-09 09:35:11.646|当前可用节点数量:5 2025-06-09 09:35:12.287|节点变化后,映射关系发生变化的请求数量:177 2025-06-09 09:35:12.287|变化比例:17.7% 2025-06-09 09:36:21.154|当前可用节点数量:4 2025-06-09 09:36:21.763|节点变化后,映射关系发生变化的请求数量:296 2025-06-09 09:36:21.763|变化比例:29.6%
|
修复版本https://git.huya.com/server_arch/taf/-/blob/faef46e02fe76672167600b6447f12927eef232f/src/libservant/EndpointManager.cpp#L2367输出如下:
1 2 3 4 5 6 7
| 2025-06-13 00:02:36.661|当前可用节点数量:6 2025-06-13 00:04:28.003|当前可用节点数量:5 2025-06-13 00:04:28.608|节点变化后,映射关系发生变化的请求数量:168 2025-06-13 00:04:28.608|变化比例:16.8% 2025-06-13 00:06:47.904|当前可用节点数量:4 2025-06-13 00:06:48.460|节点变化后,映射关系发生变化的请求数量:211 2025-06-13 00:06:48.460|变化比例:21.1%
|
修复思路
最直接的思路是一次性扫描所有节点,知道哪些可用,哪些不可用,就可以绕过因为节点不可用而需要降级的问题了
但是扫描节点的可用性,本质上是要向下游服务端建立连接
那么在稀疏请求的情况下,为了一个请求,向全部下游服务端建立连接,这显然不合理
那么尝试在每次选中的节点不可用的时候,再构造一次一致性哈希怎么样呢?
这里有两种思路:
直接重建一个新的
这个可能计算花费有点大
1 2 3 4 5 6 7
| do { auto node = conhash.getNode(); if (node->status != OK) { actives.erase(node); conhash = new ConHash(actives); } }while(!actives.empty())
|
并且对于100个节点被删除掉50个节点的case,需要循环里面判断50次节点,这个计算花费就更大了
增量删除不可用节点的虚拟节点,并且在下一次计算的时候还得加回去(因为下一次计算的时候这一次不可用的节点,可能已经可用了)
当前一致性哈希是基于数组的二分查找实现的,要支持随时可以删除插入和二分查找,只能使用链状结构的红黑树了
这个性能和复杂程度可能有点高,暂时不对其进行改造
1 2 3 4 5 6 7
| conhash.recovery(); do { auto node = conhash.getNode(); if (node->status != OK) { conhash.remove(node); } }while(!actives.empty())
|
并且对于100个节点被删除掉50个节点的case,加回去的这个计算花费也小不到哪里去
所以只能引入缓存了,单元测试看下有没有缓存差多少性能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| TEST_F(KetamaConsistentHashTest, PerformanceTest) { const int totalLoops = 1000; const int nodeCount = 600; const int virtualNodes = 25;
std::srand(std::time(nullptr));
auto start = std::chrono::high_resolution_clock::now();
for (int loop = 0; loop < totalLoops; ++loop) { TC_ConsistentHashNew ketamaHash(E_TC_CONHASH_KETAMAHASH);
for (int node = 0; node < nodeCount; ++node) { ketamaHash.addNode("node" + to_string(node), node, virtualNodes); } ketamaHash.sortNode();
unsigned int index; ketamaHash.getIndex(rand(), index); }
auto end = std::chrono::high_resolution_clock::now(); std::chrono::duration<double> elapsed = end - start;
double qps = totalLoops / elapsed.count();
cout << "[Ketama一致性哈希性能测试]" << endl; cout << "总循环次数: " << totalLoops << endl; cout << "每次节点数: " << nodeCount << endl; cout << "每个节点虚拟节点数: " << virtualNodes << endl; cout << "总耗时: " << elapsed.count() << " 秒" << endl; cout << "QPS: " << qps << endl; }
TEST_F(KetamaConsistentHashTest, PerformanceTestWithCache) { TC_LRU<map, pair<string, vector<string>>, shared_ptr<TC_ConsistentHashNew>> lru; lru.updateLimit(1000); const int totalLoops = 1000000; const int nodeCount = 600; const int virtualNodes = 25;
std::srand(std::time(nullptr));
auto ketamaHash = make_shared<TC_ConsistentHashNew>(E_TC_CONHASH_KETAMAHASH); vector<string> nodes; for (int node = 0; node < nodeCount; ++node) { nodes.push_back("node" + to_string(node)); ketamaHash->addNode("node" + to_string(node), node, virtualNodes); } ketamaHash->sortNode(); pair<string, vector<string>> key{"ketama", nodes}; lru.put(key, ketamaHash);
auto start = std::chrono::high_resolution_clock::now();
for (int loop = 0; loop < totalLoops; ++loop) { auto kv = lru.get(key); assert(kv.second); unsigned int index; ketamaHash->getIndex(rand(), index); }
auto end = std::chrono::high_resolution_clock::now(); std::chrono::duration<double> elapsed = end - start;
double qps = totalLoops / elapsed.count();
cout << "[Ketama一致性哈希LRU性能测试]" << endl; cout << "总循环次数: " << totalLoops << endl; cout << "每次节点数: " << nodeCount << endl; cout << "每个节点虚拟节点数: " << virtualNodes << endl; cout << "总耗时: " << elapsed.count() << " 秒" << endl; cout << "QPS: " << qps << endl; }
|
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| [ RUN ] KetamaConsistentHashTest.PerformanceTest [Ketama一致性哈希性能测试] 总循环次数: 1000 每次节点数: 600 每个节点虚拟节点数: 25 总耗时: 13.1539 秒 QPS: 76.0232 [ OK ] KetamaConsistentHashTest.PerformanceTest (13153 ms) [ RUN ] KetamaConsistentHashTest.PerformanceTestWithCache [Ketama一致性哈希LRU性能测试] 总循环次数: 1000000 每次节点数: 600 每个节点虚拟节点数: 25 总耗时: 13.3273 秒 QPS: 75033.9 [ OK ] KetamaConsistentHashTest.PerformanceTestWithCache (13341 ms)
|
单线程7万5qps,看着还行,因此在一致性普通哈希彻底使用lru缓存代替原有的缓存系统,我删除了原有的缓存系统,完全基于LRU来缓存
一个关键的实现细节
一致性哈希修复服务端断开连接后,同时并发到这个节点的请求只有第一个会成功,后续会屏蔽节点的问题
taf框架专门为对端节点关闭连接(不管是正常四次挥手,还是发reset包又或者是read超时)的情况进行了优化
当负载均衡算法在检查这种对端节点的时候,只会有一个请求在重新建连的同时选中这个节点,其他的请求会暂时屏蔽这个节点
对于普通哈希,又或者是轮询的负载均衡算法而言,这会减少大量请求阻塞在建连过程中的情况,选别的节点又不是不能用
但是对于一致性哈希而言,如果这个节点没有黑历史(以前没有连接失败过),并且很快就能建连完毕,那么等一下这个节点也无妨
一致性哈希可以对稳定性和可用性折中,因此我实现了如下的判断
1 2 3 4 5 6 7 8 9 10 11
|
if (adapterProxy->trans()->isConnecting() && !adapterProxy->isConnExc() && !adapterProxy->isConnTimeoutBefore()) { XFDLOG("taf_conhash") << "node is connecting|_sObjName:" << _sObjName << "|nodeName:" << nodeName << endl; return adapterProxy; }
|
这里牺牲的可用性,是指在极端的网络隔离的情况下(客户端到对端网络坏了,对端到主控网络通畅)
会在故障发生的15分37秒(默认的linux tcp read超时)左右,fd从connected转为connecting状态时,第一次连接到超时的这1.5秒内,所有请求都卡住到1.5连接超时,再找其他节点
具体体现在被调图表上,就是在故障大突次的15分37秒左右,会多出现一个小小的耗时突次
PS:这里前15分钟的最大耗时3秒,是由于tcp read还未超时,所以会每30秒放一个请求过去试试导致的,这里会一直卡在tcp上直到请求本身的默认3秒超时
15分钟后,虽然也会放一个请求过去试试,但是tcp的connect是1.5秒超时,connect失败以后,就立刻会选择其他节点重试,因此最大耗时下降到1.5秒
总结
在全部bug修复以后
实际压测途中,6个节点摘除1个节点,重映射的比例会从旧版本的83.1%下降到新版本的17.7%