最近遇到一个类定时器逻辑需求
可以抽象为write和rangeRead两个接口
write
超高频,多线程调用
基本按从小到大写入(不严格自增)
基本不重复
rangeRead
低频,固定一个线程调用
传入一个key,从最小值到key的所有值批量读取出来并删除
初步筛选方案
多线程方面
这是一个写多读少的场景,那么双buffer的技巧没有机会用上
只能加个读写锁了
数据结构方面
假设rangeRead读取k个值
根据多线程方面初步定位,高频高并发读,低频无并发写的场景,读取性能差一点也没事
那么看起来最小堆很适合这个场景,写一个demo确认一下
初步确认
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 int allCount = 1000000 ;void testIntOrderedPerformace () { cout << "test int64 ordered performance start" << endl; vector<int64_t > vs; for (int i = 0 ;i < allCount;i++) { vs.push_back (rand () % INT_MAX); } sort (vs.begin (), vs.end ()); { set<int64_t > s; Timer t; for (auto &v : vs) { s.insert (v); } } { priority_queue<int64_t , vector<int64_t >, std::greater<int >> q; Timer t; for (auto &v : vs) { q.push (v); } } cout << "test int64 ordered performance end" << endl; }
输出:
1 2 3 4 test int64 ordered performance start set: Cost 1211 ms priority_queue: Cost 128 ms test int64 ordered performance end
确实挺不错,这里的优先级队列没有用reserve提前扩容,有一些复制损耗,实际的话应该有10到20倍性能提升了
编写代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 template <typename T, typename getValueFuncT, typename CmpT = std::less<decltype (declval <getValueFuncT>()(declval <T>()))>>class RangePopHeap { using KeyT = decltype (declval <getValueFuncT>()(declval <T>())); public : RangePopHeap (const getValueFuncT& func) : cmp_ (CmpT ()), getValueFunc_ (func) { } RangePopHeap (const getValueFuncT& func, int size) : cmp_ (CmpT ()), getValueFunc_ (func) { vs_.reserve (size); } RangePopHeap (const getValueFuncT& func, const std::function<bool (const T&)> &cmp, int size) : cmp_ (cmp), getValueFunc_ (func) { vs_.reserve (size); } void push (const T& v) { insert (v); } vector<T> rangePop (const KeyT& key) { vector<T> results; results.reserve (vs_.capacity ()); while (!empty ()) { auto &v = top (); if (cmp_ (getValueFunc_ (v), key)) { break ; } results.emplace_back (move (v)); pop (); } return results; } T& top () { return vs_[0 ]; } void pop () { swap (vs_[0 ], vs_[vs_.size () - 1 ]); vs_.pop_back (); heapify (0 ); } bool empty () { return vs_.empty (); } void makeHeap (const vector<T> &vs) { vs_ = vs; for (int64_t i = vs.size () / 2 ;i >= 0 ;i--) { heapify (i); } } private : void heapify (int64_t index) { int64_t leftIndex = left (index); int64_t rightIndex = right (index); int64_t largestIndex = index; if (leftIndex < int64_t (vs_.size ()) && cmp_ (getValue (index), getValue (leftIndex))) { largestIndex = leftIndex; } if (rightIndex < int64_t (vs_.size ()) && cmp_ (getValue (largestIndex), getValue (rightIndex))) { largestIndex = rightIndex; } if (largestIndex != index) { swap (vs_[index], vs_[largestIndex]); heapify (largestIndex); } } void insert (const T &v) { vs_.push_back (v); int64_t index = vs_.size () - 1 ; while (index > 0 && cmp_ (getValue (parent (index)), getValue (index))) { swap (vs_[parent (index)], vs_[index]); index = parent (index); } } int64_t left (int64_t index) { return 2 * index + 1 ; } int64_t right (int64_t index) { return 2 * index + 2 ; } int64_t parent (int64_t index) { return (index - 1 ) / 2 ; } int64_t getValue (int64_t index) { return getValueFunc_ (vs_[index]); } CmpT cmp_; getValueFuncT getValueFunc_; vector<T> vs_; };
堆算是比较好写的数据结构
测试写入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 int allCount = 1000000 ;int nestCount = 10 ;void testTimePairPerformance () { cout << "test pairT performance start" << endl; vector<pair<int64_t , string>> vs; for (int i = 0 ;i < allCount / nestCount;i++) { int64_t v = now + i; for (int j = 0 ;j < nestCount;j++) { string s = to_string (i * nestCount + j); vs.push_back ({v - j, s}); } } { map<int64_t , set<string>> m; Timer t; for (auto &v : vs) { m[v.first].insert (v.second); } } { priority_queue<pairT, vector<pairT>, std::greater<pairT>> q; Timer t; for (auto &v : vs) { q.push (v); } } { auto f = [](const pairT &v) { return v.first; }; RangePopHeap<pairT, decltype (f), std::greater<int64_t >> q (f, allCount); Timer t; for (auto &v : vs) { q.push (v); } } cout << "test pairT performance end" << endl; }
输出:
1 2 3 4 5 test pairT performance start map: Cost 1289 ms priority_queue: Cost 308 ms RangePopHeap: Cost 131 ms test pairT performance end
经过reserve的自己实现heap比priority_queue性能略好,远好于map,符合预期
测试读取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 void testTimePairRangePopPerformance (int keyPercent = 2 ) { cout << "test pairT rangePop performance start" << LOGV (keyPercent) << endl; vector<pair<int64_t , string>> vs; for (int i = 0 ;i < allCount / nestCount;i++) { int64_t v = now + i; for (int j = 0 ;j < nestCount;j++) { string s = to_string (i * nestCount + j); vs.push_back ({v - j, s}); } } int key = vs[vs.size () / keyPercent].first; { map<int64_t , set<string>> m; for (auto &v : vs) { m[v.first].insert (v.second); } Timer t; vector<pairT> result; result.reserve (allCount); for (auto iter = m.begin (); iter != m.end ();) { if (iter->first > key) { break ; } for (auto &v : iter->second) { result.emplace_back (iter->first, move (v)); } iter = m.erase (iter); } } { auto f = [](const pairT &v) { return v.first; }; RangePopHeap<pairT, decltype (f), std::greater<int >> q (f, allCount); for (auto &v : vs) { q.push (v); } { Timer t; auto vs = q.rangePop (key); } } cout << "test pairT rangePop performance end" << endl; }
输出:
1 2 3 4 test pairT rangePop performance start|keyPercent=2| map: Cost 111 ms rangePopHeap: Cost 1033 ms test pairT rangePop performance end
直接裂开。。。堆和红黑树相似的时间复杂度,读取性能差了10倍,写入赚来的性能都被这里消耗了
可能是因为每次pop最小值都是堆的最坏时间复杂度,堆的删除性能要比红黑树差一些
优化
显然,堆的读取优势只有在于取少量数据并删除的时候,取少量数据是\(O(1)\) 的
但是读取的数据规模是不确定的,有可能非常多甚至全部取走,此时堆的时间复杂度为\(O(n\log n)\)
那么还不如直接遍历\(O(n)\) 以后重建堆\(O(n)\) ,一共是\(O(n)\)
于是我又试了一下遍历
1 2 3 test pairT rangePop performance start|keyPercent=2| Cost 99 ms test pairT rangePop performance end
确实很不错,比红黑树还好
于是我想到了std::sort中的优化,当快排因为命中bad case导致递归层数超过预期值时,会转为更稳定的堆排序
那么这里当读取的数据量超过一定值时,就转为遍历算法
遍历\(O(n)\) ,堆\(O(k\log n)\)
因此当\(k > \dfrac{n}{\log n}\) 时,就可以从堆算法转为遍历算法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 vector<T> rangePop (const KeyT& key) { bool ok = false ; auto results = rangePopLittleData (key, vs_.size () / std::log2 (vs_.size ()), ok); if (!ok) { rangePopLargeData (key, results); } return results; } void rangePopLargeData (const KeyT& key, vector<T>& results) { vector<T> newVs; newVs.reserve (vs_.capacity ()); for (auto &v : vs_) { if (cmp_ (getValueFunc_ (v), key)) { newVs.emplace_back (move (v)); continue ; } results.emplace_back (move (v)); } makeHeap (newVs); } vector<T> rangePopLittleData (const KeyT& key, int64_t countLimit, bool &ok) { ok = false ; vector<T> results; results.reserve (vs_.capacity ()); while (!empty ()) { auto &v = top (); if (!countLimit--) { break ; } if (cmp_ (getValueFunc_ (v), key)) { ok = true ; break ; } results.emplace_back (move (v)); pop (); } return results; }
重新测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 void testTimePairRangePopPerformance (int keyPercent = 2 ) { cout << "test pairT rangePop performance start" << LOGV (keyPercent) << endl; vector<pair<int64_t , string>> vs; for (int i = 0 ;i < allCount / nestCount;i++) { int64_t v = now + i; for (int j = 0 ;j < nestCount;j++) { string s = to_string (i * nestCount + j); vs.push_back ({v - j, s}); } } int key = vs[vs.size () / keyPercent].first; { map<int64_t , set<string>> m; for (auto &v : vs) { m[v.first].insert (v.second); } Timer t; vector<pairT> result; result.reserve (allCount); for (auto iter = m.begin (); iter != m.end ();) { if (iter->first > key) { break ; } for (auto &v : iter->second) { result.emplace_back (iter->first, move (v)); } iter = m.erase (iter); } } { auto f = [](const pairT &v) { return v.first; }; RangePopHeap<pairT, decltype (f), std::greater<int >> q (f, allCount); for (auto &v : vs) { q.push (v); } auto q1 = q; auto q2 = q; { Timer t; vector<pairT> vs; vs.reserve (allCount * nestCount); q.rangePopLargeData (key, vs); } checkHeap<decltype (q), decltype (f), std::greater<int >>(q, f); { Timer t; int64_t countLimit = INT_MAX; bool ok = false ; auto result = q1.rangePopLittleData (key, countLimit, ok); } checkHeap<decltype (q), decltype (f), std::greater<int >>(q1, f); { Timer t; auto result = q2.rangePop (key); } checkHeap<decltype (q), decltype (f), std::greater<int >>(q2, f); } cout << "test pairT rangePop performance end" << endl; } int main () { for (int i = 1 ;i < 30 ;i++) { testTimePairRangePopPerformance (i); } }
输出:
第一项为红黑树
第二项为纯遍历
第三项为纯堆
第三项为新算法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 test pairT rangePop performance start|keyPercent=1| map: Cost 228 ms rangePopLargeData: Cost 86 ms rangePopLittleData: Cost 1935 ms rangePop: Cost 201 ms test pairT rangePop performance end test pairT rangePop performance start|keyPercent=2| map: Cost 111 ms rangePopLargeData: Cost 98 ms rangePopLittleData: Cost 1027 ms rangePop: Cost 232 ms test pairT rangePop performance end test pairT rangePop performance start|keyPercent=3| map: Cost 72 ms rangePopLargeData: Cost 102 ms rangePopLittleData: Cost 713 ms rangePop: Cost 241 ms test pairT rangePop performance end test pairT rangePop performance start|keyPercent=4| map: Cost 55 ms rangePopLargeData: Cost 104 ms rangePopLittleData: Cost 543 ms rangePop: Cost 239 ms test pairT rangePop performance end test pairT rangePop performance start|keyPercent=5| map: Cost 44 ms rangePopLargeData: Cost 106 ms rangePopLittleData: Cost 439 ms rangePop: Cost 242 ms test pairT rangePop performance end ... test pairT rangePop performance start|keyPercent=20| map: Cost 11 ms rangePopLargeData: Cost 110 ms rangePopLittleData: Cost 118 ms rangePop: Cost 119 ms test pairT rangePop performance end test pairT rangePop performance start|keyPercent=21| map: Cost 11 ms rangePopLargeData: Cost 110 ms rangePopLittleData: Cost 113 ms rangePop: Cost 113 ms test pairT rangePop performance end test pairT rangePop performance start|keyPercent=22| map: Cost 10 ms rangePopLargeData: Cost 109 ms rangePopLittleData: Cost 108 ms rangePop: Cost 108 ms test pairT rangePop performance end test pairT rangePop performance start|keyPercent=23| map: Cost 10 ms rangePopLargeData: Cost 109 ms rangePopLittleData: Cost 103 ms rangePop: Cost 103 ms test pairT rangePop performance end
可以看到,新算法在读取数据规模较大时比较稳定
然后在读取数据规模较小时又能使用堆算法提升性能
其中的rangePopLargeData(遍历)和rangePopLittleData(堆算法)的分界点\(\dfrac{1}{22}\) 和预估的\(\dfrac{10000000}{\log_2 10000000} = \dfrac{1}{23}\) 分界点相差不大
总结
全部代码
参考资料