批量pop堆以解决类定时器逻辑

最近遇到一个类定时器逻辑需求

可以抽象为write和rangeRead两个接口

  • write
    • 超高频,多线程调用
    • 基本按从小到大写入(不严格自增)
    • 基本不重复
  • rangeRead
    • 低频,固定一个线程调用
    • 传入一个key,从最小值到key的所有值批量读取出来并删除

初步筛选方案

多线程方面

这是一个写多读少的场景,那么双buffer的技巧没有机会用上

只能加个读写锁了

数据结构方面

假设rangeRead读取k个值

  • 红黑树

    红黑树作为万能数据结构,总是会被第一个想到的

    大概用map<int64_t, set<string>>来实现,由于基本不重复,因此set的使用聊胜于无

    • 写入\(O(\log n)\)

    • 读取\(O(k\log n)\)

      • 取数据用upper_bound\(O(\log n)\)
      • 每次删除最小值:\(O(k\log n)\)
  • 时间轮

    大杀器,需要引入一个额外的定时线程

    引入非常多的额外代码,看看有没有凑活可以用的数据结构

  • 最小堆

    • 写入

      数据有一个特点是从小到大基本有序插入,那么在push入堆时,基本只需要上升几次就结束了,几乎可以达到\(O(1)\)的时间复杂度

    • 读取

      • 取数据: \(O(k)\)
      • 每次删除最小值:\(O(k\log n)\)

根据多线程方面初步定位,高频高并发读,低频无并发写的场景,读取性能差一点也没事

那么看起来最小堆很适合这个场景,写一个demo确认一下

初步确认

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int allCount = 1000000;
void testIntOrderedPerformace() {
cout << "test int64 ordered performance start" << endl;
vector<int64_t> vs;
for (int i = 0;i < allCount;i++) {
vs.push_back(rand() % INT_MAX);
}
sort(vs.begin(), vs.end());
{
set<int64_t> s;
Timer t;
for (auto &v : vs) {
s.insert(v);
}
}
{
priority_queue<int64_t, vector<int64_t>, std::greater<int>> q;
Timer t;
for (auto &v : vs) {
q.push(v);
}
}
cout << "test int64 ordered performance end" << endl;
}

输出:

1
2
3
4
test int64 ordered performance start
set: Cost 1211 ms
priority_queue: Cost 128 ms
test int64 ordered performance end

确实挺不错,这里的优先级队列没有用reserve提前扩容,有一些复制损耗,实际的话应该有10到20倍性能提升了

编写代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
template <typename T, typename getValueFuncT, typename CmpT = std::less<decltype(declval<getValueFuncT>()(declval<T>()))>>
class RangePopHeap {
using KeyT = decltype(declval<getValueFuncT>()(declval<T>()));
public:
RangePopHeap(const getValueFuncT& func) : cmp_(CmpT()), getValueFunc_(func) {
}
RangePopHeap(const getValueFuncT& func, int size) : cmp_(CmpT()), getValueFunc_(func) {
vs_.reserve(size);
}
RangePopHeap(const getValueFuncT& func, const std::function<bool(const T&)> &cmp, int size) :
cmp_(cmp), getValueFunc_(func) {
vs_.reserve(size);
}
void push(const T& v) {
insert(v);
}
//smallHeap, range pop value, util value > key
vector<T> rangePop(const KeyT& key) {
vector<T> results;
results.reserve(vs_.capacity());
while(!empty()) {
auto &v = top();
if (cmp_(getValueFunc_(v), key)) {
break;
}
results.emplace_back(move(v));
pop();
}
return results;
}
T& top() {
return vs_[0];
}
void pop() {
swap(vs_[0], vs_[vs_.size() - 1]);
vs_.pop_back();
heapify(0);
}
bool empty() {
return vs_.empty();
}
void makeHeap(const vector<T> &vs) {
vs_ = vs;
for (int64_t i = vs.size() / 2;i >= 0;i--) {
heapify(i);
}
}
private:
void heapify(int64_t index) {
int64_t leftIndex = left(index);
int64_t rightIndex = right(index);
int64_t largestIndex = index;
if (leftIndex < int64_t(vs_.size()) && cmp_(getValue(index), getValue(leftIndex))) {
largestIndex = leftIndex;
}
if (rightIndex < int64_t(vs_.size()) && cmp_(getValue(largestIndex), getValue(rightIndex))) {
largestIndex = rightIndex;
}
if (largestIndex != index) {
swap(vs_[index], vs_[largestIndex]);
heapify(largestIndex);
}
}
void insert(const T &v) {
vs_.push_back(v);
int64_t index = vs_.size() - 1;
while(index > 0 && cmp_(getValue(parent(index)), getValue(index))) {
swap(vs_[parent(index)], vs_[index]);
index = parent(index);
}
}
int64_t left(int64_t index) {
return 2 * index + 1;
}
int64_t right(int64_t index) {
return 2 * index + 2;
}
int64_t parent(int64_t index) {
return (index - 1) / 2;
}
int64_t getValue(int64_t index) {
return getValueFunc_(vs_[index]);
}
CmpT cmp_;
getValueFuncT getValueFunc_;
vector<T> vs_;
};

堆算是比较好写的数据结构

测试写入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int allCount = 1000000;
int nestCount = 10;
void testTimePairPerformance() {
cout << "test pairT performance start" << endl;
vector<pair<int64_t, string>> vs;
for (int i = 0;i < allCount / nestCount;i++) {
int64_t v = now + i;
for (int j = 0;j < nestCount;j++) {
string s = to_string(i * nestCount + j);
vs.push_back({v - j, s});
}
}
{
map<int64_t, set<string>> m;
Timer t;
for (auto &v : vs) {
m[v.first].insert(v.second);
}
}
{
priority_queue<pairT, vector<pairT>, std::greater<pairT>> q;
Timer t;
for (auto &v : vs) {
q.push(v);
}
}
{
auto f = [](const pairT &v) {
return v.first;
};
RangePopHeap<pairT, decltype(f), std::greater<int64_t>> q(f, allCount);
Timer t;
for (auto &v : vs) {
q.push(v);
}
}
cout << "test pairT performance end" << endl;
}

输出:

1
2
3
4
5
test pairT performance start
map: Cost 1289 ms
priority_queue: Cost 308 ms
RangePopHeap: Cost 131 ms
test pairT performance end

经过reserve的自己实现heap比priority_queue性能略好,远好于map,符合预期

测试读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void testTimePairRangePopPerformance(int keyPercent = 2) {
cout << "test pairT rangePop performance start" << LOGV(keyPercent) << endl;
vector<pair<int64_t, string>> vs;
for (int i = 0;i < allCount / nestCount;i++) {
int64_t v = now + i;
for (int j = 0;j < nestCount;j++) {
string s = to_string(i * nestCount + j);
vs.push_back({v - j, s});
}
}

int key = vs[vs.size() / keyPercent].first;
{
map<int64_t, set<string>> m;
for (auto &v : vs) {
m[v.first].insert(v.second);
}
Timer t;
vector<pairT> result;
result.reserve(allCount);
for (auto iter = m.begin(); iter != m.end();) {
if (iter->first > key) {
break;
}
for (auto &v : iter->second) {
result.emplace_back(iter->first, move(v));
}
iter = m.erase(iter);
}
}
{
auto f = [](const pairT &v) {
return v.first;
};
RangePopHeap<pairT, decltype(f), std::greater<int>> q(f, allCount);
for (auto &v : vs) {
q.push(v);
}
{
Timer t;
auto vs = q.rangePop(key);
}
}
cout << "test pairT rangePop performance end" << endl;
}

输出:

1
2
3
4
test pairT rangePop performance start|keyPercent=2|
map: Cost 111 ms
rangePopHeap: Cost 1033 ms
test pairT rangePop performance end

直接裂开。。。堆和红黑树相似的时间复杂度,读取性能差了10倍,写入赚来的性能都被这里消耗了

可能是因为每次pop最小值都是堆的最坏时间复杂度,堆的删除性能要比红黑树差一些

优化

显然,堆的读取优势只有在于取少量数据并删除的时候,取少量数据是\(O(1)\)

但是读取的数据规模是不确定的,有可能非常多甚至全部取走,此时堆的时间复杂度为\(O(n\log n)\)

那么还不如直接遍历\(O(n)\)以后重建堆\(O(n)\),一共是\(O(n)\)

于是我又试了一下遍历

1
2
3
test pairT rangePop performance start|keyPercent=2|
Cost 99 ms
test pairT rangePop performance end

确实很不错,比红黑树还好

于是我想到了std::sort中的优化,当快排因为命中bad case导致递归层数超过预期值时,会转为更稳定的堆排序

那么这里当读取的数据量超过一定值时,就转为遍历算法

遍历\(O(n)\),堆\(O(k\log n)\)

因此当\(k > \dfrac{n}{\log n}\)时,就可以从堆算法转为遍历算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
vector<T> rangePop(const KeyT& key) {
bool ok = false;
auto results = rangePopLittleData(key, vs_.size() / std::log2(vs_.size()), ok);
if (!ok) {
rangePopLargeData(key, results);
}
return results;
}
void rangePopLargeData(const KeyT& key, vector<T>& results) {
vector<T> newVs;
newVs.reserve(vs_.capacity());
for (auto &v : vs_) {
if (cmp_(getValueFunc_(v), key)) {
newVs.emplace_back(move(v));
continue;
}
results.emplace_back(move(v));
}
makeHeap(newVs);
}
//smallHeap, range pop value, util value > key
vector<T> rangePopLittleData(const KeyT& key, int64_t countLimit, bool &ok) {
ok = false;
vector<T> results;
results.reserve(vs_.capacity());
while(!empty()) {
auto &v = top();
if (!countLimit--) {
break;
}
if (cmp_(getValueFunc_(v), key)) {
ok = true;
break;
}
results.emplace_back(move(v));
pop();
}
return results;
}

重新测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
void testTimePairRangePopPerformance(int keyPercent = 2) {
cout << "test pairT rangePop performance start" << LOGV(keyPercent) << endl;
vector<pair<int64_t, string>> vs;
for (int i = 0;i < allCount / nestCount;i++) {
int64_t v = now + i;
for (int j = 0;j < nestCount;j++) {
string s = to_string(i * nestCount + j);
vs.push_back({v - j, s});
}
}

int key = vs[vs.size() / keyPercent].first;
{
map<int64_t, set<string>> m;
for (auto &v : vs) {
m[v.first].insert(v.second);
}
Timer t;
vector<pairT> result;
result.reserve(allCount);
for (auto iter = m.begin(); iter != m.end();) {
if (iter->first > key) {
break;
}
for (auto &v : iter->second) {
result.emplace_back(iter->first, move(v));
}
iter = m.erase(iter);
}
}
{
auto f = [](const pairT &v) {
return v.first;
};
RangePopHeap<pairT, decltype(f), std::greater<int>> q(f, allCount);
for (auto &v : vs) {
q.push(v);
}
auto q1 = q;
auto q2 = q;
{
Timer t;
vector<pairT> vs;
vs.reserve(allCount * nestCount);
q.rangePopLargeData(key, vs);
}
checkHeap<decltype(q), decltype(f), std::greater<int>>(q, f);
{
Timer t;
int64_t countLimit = INT_MAX;
bool ok = false;
auto result = q1.rangePopLittleData(key, countLimit, ok);
}
checkHeap<decltype(q), decltype(f), std::greater<int>>(q1, f);
{
Timer t;
auto result = q2.rangePop(key);
}
checkHeap<decltype(q), decltype(f), std::greater<int>>(q2, f);
}
cout << "test pairT rangePop performance end" << endl;
}
int main() {
for (int i = 1;i < 30;i++) {
testTimePairRangePopPerformance(i);
}
}

输出:

第一项为红黑树

第二项为纯遍历

第三项为纯堆

第三项为新算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
test pairT rangePop performance start|keyPercent=1|
map: Cost 228 ms
rangePopLargeData: Cost 86 ms
rangePopLittleData: Cost 1935 ms
rangePop: Cost 201 ms
test pairT rangePop performance end
test pairT rangePop performance start|keyPercent=2|
map: Cost 111 ms
rangePopLargeData: Cost 98 ms
rangePopLittleData: Cost 1027 ms
rangePop: Cost 232 ms
test pairT rangePop performance end
test pairT rangePop performance start|keyPercent=3|
map: Cost 72 ms
rangePopLargeData: Cost 102 ms
rangePopLittleData: Cost 713 ms
rangePop: Cost 241 ms
test pairT rangePop performance end
test pairT rangePop performance start|keyPercent=4|
map: Cost 55 ms
rangePopLargeData: Cost 104 ms
rangePopLittleData: Cost 543 ms
rangePop: Cost 239 ms
test pairT rangePop performance end
test pairT rangePop performance start|keyPercent=5|
map: Cost 44 ms
rangePopLargeData: Cost 106 ms
rangePopLittleData: Cost 439 ms
rangePop: Cost 242 ms
test pairT rangePop performance end
...
test pairT rangePop performance start|keyPercent=20|
map: Cost 11 ms
rangePopLargeData: Cost 110 ms
rangePopLittleData: Cost 118 ms
rangePop: Cost 119 ms
test pairT rangePop performance end
test pairT rangePop performance start|keyPercent=21|
map: Cost 11 ms
rangePopLargeData: Cost 110 ms
rangePopLittleData: Cost 113 ms
rangePop: Cost 113 ms
test pairT rangePop performance end
test pairT rangePop performance start|keyPercent=22|
map: Cost 10 ms
rangePopLargeData: Cost 109 ms
rangePopLittleData: Cost 108 ms
rangePop: Cost 108 ms
test pairT rangePop performance end
test pairT rangePop performance start|keyPercent=23|
map: Cost 10 ms
rangePopLargeData: Cost 109 ms
rangePopLittleData: Cost 103 ms
rangePop: Cost 103 ms
test pairT rangePop performance end

可以看到,新算法在读取数据规模较大时比较稳定

然后在读取数据规模较小时又能使用堆算法提升性能

其中的rangePopLargeData(遍历)和rangePopLittleData(堆算法)的分界点\(\dfrac{1}{22}\)和预估的\(\dfrac{10000000}{\log_2 10000000} = \dfrac{1}{23}\)分界点相差不大

总结

全部代码

参考资料