1 Bubble Sort
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 package org.liuyehcf.sort.bubblesort;import static org.liuyehcf.sort.SortUtils.*;public class BubbleSort { public static void sort (int [] nums) { for (int i = 0 ; i < nums.length; i++) { for (int j = nums.length - 1 ; j > i; j--) { compareAndExchange(nums, j - 1 , j); } } } private static void compareAndExchange (int [] nums, int left, int right) { if (nums[left] > nums[right]) { exchange(nums, left, right); } } }
2 Insertion Sort
2.1 经典版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class InsertSort { public static void sort (int [] nums) { for (int i = 1 ; i < nums.length; i++) { int pivot = nums[i]; int j = i - 1 ; while (j >= 0 && nums[j] > pivot) { nums[j + 1 ] = nums[j]; j--; } nums[j + 1 ] = pivot; } } }
2.2 二分插入排序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class BinaryInsertSort { public static void sort (int [] nums) { for (int i = 1 ; i < nums.length; i++) { int pivot = nums[i]; int left = 0 , right = i - 1 ; if (nums[right] <= pivot) { continue ; } else if (nums[0 ] > pivot) { System.arraycopy(nums, 0 , nums, 1 , i); nums[0 ] = pivot; continue ; } while (left < right) { int mid = left + (right - left >> 1 ); if (nums[mid] <= pivot) { left = mid + 1 ; } else { right = mid; } } System.arraycopy(nums, left, nums, left + 1 , i - left); nums[left] = pivot; } } }
3 Merge Sort
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class MergeSort { public static void sort (int [] nums) { sort(nums, 0 , nums.length - 1 ); } private static void sort (int [] nums, int left, int right) { if (left < right) { int mid = left + (right - left >> 1 ); sort(nums, left, mid); sort(nums, mid + 1 , right); merge(nums, left, mid, mid + 1 , right); } } private static void merge (int [] nums, int left1, int right1, int left2, int right2) { int [] tmp = new int [right1 - left1 + 1 ]; System.arraycopy(nums, left1, tmp, 0 , tmp.length); int i1 = 0 , i2 = left2, i = left1; while (i1 <= tmp.length - 1 && i2 <= right2) { if (tmp[i1] <= nums[i2]) { nums[i++] = tmp[i1++]; } else { nums[i++] = nums[i2++]; } } if (i1 <= tmp.length - 1 ) { System.arraycopy(tmp, i1, nums, i, tmp.length - i1); } else if (i2 <= right2) { System.arraycopy(nums, i2, nums, i, right2 - i2 + 1 ); } } }
归并排序还有一个优化版本TimSort,关于TimSort的实现可以参考JDK Arrays.sort的源码 SourceAnalysis-ComparableTimSort
3.1 Paralle Merge
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 #include <algorithm> #include <chrono> #include <exception> #include <iostream> #include <random> #include <thread> #include <tuple> #include <typeinfo> #include <utility> #include <vector> class ParallelMerger {public : void merge (std::vector<int32_t >& A, std::vector<int32_t >& B, std::vector<int32_t >& S, int32_t p) { int32_t length = (A.size () + B.size ()) / p + 1 ; for (int i = 0 ; i <= p; ++i) { auto pair = diagnoal_intersection (A, B, i, p); int32_t a_start = pair.first; int32_t b_start = pair.second; int32_t s_start = i * (A.size () + B.size ()) / p; do_merge_along_path (A, a_start, B, b_start, S, s_start, length); } } private : std::pair<int32_t , int32_t > diagnoal_intersection (std::vector<int32_t >& A, std::vector<int32_t >& B, int32_t p_i, int32_t p) { int32_t diag = p_i * (A.size () + B.size ()) / p; if (diag > A.size () + B.size () - 1 ) { diag = A.size () + B.size () - 1 ; } int32_t i_high = diag; int32_t i_low = 0 ; if (i_high > A.size ()) { i_high = A.size (); } while (i_low < i_high) { int32_t i = i_low + (i_high - i_low) / 2 ; int32_t j = diag - i; auto pair = is_intersection (A, i, B, j); bool is_intersection = pair.first; bool all_true = pair.second; if (is_intersection) { return std::make_pair (i, j); } else if (all_true) { i_high = i; } else { i_low = i + 1 ; } } for (int offset = 0 ; offset <= 1 ; offset++) { int32_t i = i_low + offset; int32_t j = diag - i; auto pair = is_intersection (A, i, B, j); bool is_intersection = pair.first; if (is_intersection) { return std::make_pair (i, j); } } throw std::logic_error ("unexpected" ); } std::pair<bool , bool > is_intersection (const std::vector<int32_t >& A, const int32_t i, const std::vector<int32_t >& B, const int32_t j) { auto evaluator = [&A, &B](int32_t i, int32_t j) { if (i < 0 ) { return false ; } else if (i >= A.size ()) { return true ; } else if (j < 0 ) { return true ; } else if (j >= B.size ()) { return false ; } else { return A[i] > B[j]; } }; bool has_true = false ; bool has_false = false ; if (evaluator (i - 1 , j - 1 )) { has_true = true ; } else { has_false = true ; } if (evaluator (i - 1 , j)) { has_true = true ; } else { has_false = true ; } if (evaluator (i, j - 1 )) { has_true = true ; } else { has_false = true ; } if (evaluator (i, j)) { has_true = true ; } else { has_false = true ; } return std::make_pair (has_true && has_false, has_true); } void do_merge_along_path (const std::vector<int32_t >& A, const int32_t a_start, const std::vector<int32_t >& B, const int32_t b_start, std::vector<int32_t >& S, int32_t s_start, int32_t length) { int32_t i = a_start; int32_t j = b_start; int32_t k = s_start; while (k - s_start < length && k < S.size ()) { if (i >= A.size ()) { S[k] = B[j]; k++; j++; } else if (j >= B.size ()) { S[k] = A[i]; k++; i++; } else if (A[i] <= B[j]) { S[k] = A[i]; k++; i++; } else { S[k] = B[j]; k++; j++; } } } }; void print (const std::vector<int32_t >& v) { for (int32_t i = 0 ; i < v.size (); i++) { if (i != 0 ) { std::cout << ", " ; } std::cout << v[i]; } std::cout << std::endl; } int main () { constexpr int32_t max = 100 ; std::default_random_engine e; std::uniform_int_distribution<int32_t > u (1 , max) ; for (int times = 0 ; times < max; ++times) { int32_t size_A = u (e); int32_t size_B = u (e); std::vector<int32_t > A; std::vector<int32_t > B; std::vector<int32_t > expected; for (int32_t i = 0 ; i < size_A; ++i) { int32_t item = u (e); A.push_back (item); expected.push_back (item); } for (int32_t i = 0 ; i < size_B; ++i) { int32_t item = u (e); B.push_back (item); expected.push_back (item); } std::sort (A.begin (), A.end ()); std::sort (B.begin (), B.end ()); std::sort (expected.begin (), expected.end ()); std::cout << "A's size=" << A.size () << ", B's size=" << B.size () << std::endl; for (int p = 1 ; p <= 128 ; p++) { std::cout << "times=" << times << ", p=" << p << std::endl; std::vector<int32_t > S; S.assign (A.size () + B.size (), -1 ); ParallelMerger merger; merger.merge (A, B, S, p); bool is_ok = true ; for (int32_t i = 0 ; i < S.size (); i++) { if (S[i] != expected[i]) { is_ok = false ; break ; } } if (!is_ok) { std::cerr << "not ok" << std::endl; print (A); print (B); print (S); print (expected); return 1 ; } } } return 0 ; }
4 Heap Sort
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public class HeapSort { public static void sort (int [] nums) { buildMaxHeap(nums); for (int len = nums.length; len >= 2 ; len--) { exchange(nums, 0 , len - 1 ); maxHeapFix(nums, len - 1 , 0 ); } } private static void maxHeapFix (int [] nums, int heapSize, int i) { int left = i * 2 + 1 , right = i * 2 + 2 ; if (left >= heapSize) { left = -1 ; } if (right >= heapSize) { right = -1 ; } int max = i; if (left != -1 && nums[left] > nums[i]) { max = left; } if (right != -1 && nums[right] > nums[max]) { max = right; } if (max != i) { exchange(nums, i, max); maxHeapFix(nums, heapSize, max); } } private static void buildMaxHeap (int [] nums) { for (int i = nums.length >> 1 ; i >= 0 ; i--) { maxHeapFix(nums, nums.length, i); } } }
4.1 TopK
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 #include <algorithm> #include <functional> #include <iostream> #include <list> #include <queue> #include <random> std::vector<int32_t > top_k (const std::vector<int32_t >& nums, size_t k) { std::priority_queue<int32_t , std::vector<int32_t >, std::greater<int32_t >> min_queue; for (auto num : nums) { if (min_queue.size () < k) { min_queue.push (num); } else if (num > min_queue.top ()) { min_queue.pop (); min_queue.push (num); } } std::list<int32_t > res; while (!min_queue.empty ()) { res.insert (res.begin (), min_queue.top ()); min_queue.pop (); } return {res.begin (), res.end ()}; } int main () { size_t test_times = 100 ; std::default_random_engine e; std::uniform_int_distribution<size_t > u_k (1 , 20 ) ; std::uniform_int_distribution<size_t > u_size (1 , 50 ) ; std::uniform_int_distribution<int32_t > u_num (1 , 10000 ) ; for (size_t i = 0 ; i < test_times; ++i) { const size_t k = u_k (e); const size_t size = u_size (e); std::cout << "test_time=" << i << ", k=" << k << ", size=" << size << std::endl; std::vector<int32_t > nums; for (size_t j = 0 ; j < size; ++j) { nums.push_back (u_num (e)); } std::vector<int32_t > res = top_k (nums, k); std::sort (nums.begin (), nums.end (), std::greater <int32_t >()); if (k < size) { nums = std::vector <int32_t >(nums.begin (), nums.begin () + k); } for (size_t j = 0 ; j < res.size (); ++j) { if (res[j] != nums[j]) { std::cerr << "wrong result" << std::endl; return 1 ; } } } return 0 ; }
5 Quick Sort
5.1 2-way Quick Sort
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public class QuickSort { public static void sortRecursive (int [] nums) { sortRecursive(nums, 0 , nums.length - 1 ); } public static void sort (int [] nums) { sortStack(nums, 0 , nums.length - 1 ); } private static void sortRecursive (int [] nums, int lo, int hi) { if (lo < hi) { int mid = partition(nums, lo, hi); sortRecursive(nums, lo, mid - 1 ); sortRecursive(nums, mid + 1 , hi); } } private static void sortStack (int [] nums, int lo, int hi) { LinkedList<int []> stack = new LinkedList <int []>(); if (lo < hi) { stack.push(new int []{lo, hi}); } while (!stack.isEmpty()) { int [] peek = stack.pop(); int peekLo = peek[0 ], peekHi = peek[1 ]; int mid = partition(nums, peekLo, peekHi); if (peekLo < mid - 1 ) { stack.push(new int []{peekLo, mid - 1 }); } if (mid + 1 < peekHi) { stack.push(new int []{mid + 1 , peekHi}); } } } private static int partition (int [] nums, int lo, int hi) { int i = lo - 1 ; int pivot = nums[hi]; for (int j = lo; j < hi; j++) { if (nums[j] < pivot) { exchange(nums, ++i, j); } } exchange(nums, ++i, hi); return i; } }
5.2 3-way Quick Sort
因此满足nums[lo~lt]<pivot nums[lt+1~gt-1]==pivot nums[gt~hi]>pivot
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 public class ThreeWayQuickSort { public static void sortRecursive (int [] nums) { sortRecursive(nums, 0 , nums.length - 1 ); } public static void sort (int [] nums) { sortStack(nums, 0 , nums.length - 1 ); } private static void sortRecursive (int [] nums, int lo, int hi) { if (lo < hi) { int [] range = partition(nums, lo, hi); sortRecursive(nums, lo, range[0 ]); sortRecursive(nums, range[1 ], hi); } } private static void sortStack (int [] nums, int lo, int hi) { LinkedList<int []> stack = new LinkedList <int []>(); if (lo < hi) { stack.push(new int []{lo, hi}); } while (!stack.isEmpty()) { int [] peek = stack.pop(); int peekLo = peek[0 ], peekHi = peek[1 ]; int [] range = partition(nums, peekLo, peekHi); if (peekLo < range[0 ]) { stack.push(new int []{peekLo, range[0 ]}); } if (range[1 ] < peekHi) { stack.push(new int []{range[1 ], peekHi}); } } } private static int [] partition(int [] nums, int lo, int hi) { int lt = lo - 1 ; int gt = hi + 1 ; int pivot = nums[hi]; int j = lo; while (j < gt) { if (nums[j] < pivot) { exchange(nums, ++lt, j++); } else if (nums[j] > pivot) { exchange(nums, --gt, j); } else { j++; } } return new int []{lt, gt}; } }
5.3 Dual-Pivot Quick Sort
因此满足nums[lo~lt]<pivot1 nums[lt+1~le]==pivot2 pivot1<nums[le+1~ge-1]<pivot2 nums[ge~gt-1]==pivot2 nums[gt~hi]>pivot2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 public class DualPivotQuickSort { public static void sortRecursive (int [] nums) { sortRecursive(nums, 0 , nums.length - 1 ); } public static void sort (int [] nums) { sortStack(nums, 0 , nums.length - 1 ); } private static void sortRecursive (int [] nums, int lo, int hi) { if (lo < hi) { int [] range = partition(nums, lo, hi); sortRecursive(nums, lo, range[0 ]); sortRecursive(nums, range[1 ] + 1 , range[2 ] - 1 ); sortRecursive(nums, range[3 ], hi); } } private static void sortStack (int [] nums, int lo, int hi) { LinkedList<int []> stack = new LinkedList <int []>(); if (lo < hi) { stack.push(new int []{lo, hi}); } while (!stack.isEmpty()) { int [] peek = stack.pop(); int peekLo = peek[0 ]; int peekHi = peek[1 ]; int [] range = partition(nums, peekLo, peekHi); if (peekLo < range[0 ]) { stack.push(new int []{peekLo, range[0 ]}); } if (range[1 ] + 1 < range[2 ] - 1 ) { stack.push(new int []{range[1 ] + 1 , range[2 ] - 1 }); } if (range[3 ] < peekHi) { stack.push(new int []{range[3 ], peekHi}); } } } private static int [] partition(int [] nums, int lo, int hi) { int lt = lo - 1 ; int le = lt; int gt = hi + 1 ; int ge = gt; int pivot1 = nums[lo]; int pivot2 = nums[hi]; if (pivot1 > pivot2) { exchange(nums, lo, hi); pivot1 = nums[lo]; pivot2 = nums[hi]; } int j = lo; while (j < ge) { if (nums[j] == pivot1) { exchange(nums, ++le, j++); } else if (nums[j] < pivot1) { if (le == lt) { ++lt; ++le; exchange(nums, lt, j++); } else { exchange(nums, ++lt, j); exchange(nums, ++le, j++); } } else if (nums[j] == pivot2) { exchange(nums, --ge, j); } else if (nums[j] > pivot2) { if (ge == gt) { --gt; --ge; exchange(nums, gt, j); } else { exchange(nums, --gt, j); exchange(nums, --ge, j); } } else { j++; } } return new int []{lt, le, ge, gt}; } }
关于双轴快排的更多实现可以参考JDK Arrays.sort的源码 SourceAnalysis-DualPivotQuickSort
6 Counting Sort
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public class CountSort { public static void sort (int [] nums) { int min = Integer.MAX_VALUE; int max = Integer.MIN_VALUE; for (int num : nums) { if (num < min) { min = num; } if (num > max) { max = num; } } int len = max - min + 1 ; int [] counts = new int [len]; for (int num : nums) { counts[num - min]++; } int index = 0 ; for (int i = 0 ; i < counts.length; i++) { int count = counts[i]; while (count-- > 0 ) { nums[index++] = min + i; } } } public static void main (String[] args) { final int LEN = 500000 ; final int TIMES = 100 ; final Random random = new Random (); long duration = 0 ; for (int t = 0 ; t < TIMES; t++) { int [] nums = new int [LEN]; for (int i = 0 ; i < LEN; i++) { nums[i] = random.nextInt(100000 ); } long start = System.currentTimeMillis(); sort(nums); duration += System.currentTimeMillis() - start; for (int j = 1 ; j < nums.length; j++) { if (nums[j] < nums[j - 1 ]) throw new RuntimeException (); } } System.out.format("%-20s : %d ms\n" , "CountSort" , duration); System.out.println("\n------------------------------------------\n" ); } }
7 Bitonic Sort
Bitonic Sequence
的序列,前半部分递增,后半部分递减(当然也可以反过来)。特别地,当n = 1
由于当n = 1
时,已经是4个双调序列了,分别为(3, 7)
、(4, 8)
、(6, 2)
、(1, 5)
。因此,我们下一步需要构造2个(n + 1) = 2
current distance, cd = 2^(n-1) = 1 | n = 1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 group 1 │ group 2 │ group 3 │ group 4 │ │ │ 3 7 │ 4 8 │ 6 2 │ 1 5 │ ▼ group 1 │ group 2 │ 3 7 4 8 │ 6 2 1 5 │ ▲ ▲ │ │ │ ▲ ▲ │ └────┘ └─sw─┘ │ └─sw─┘ └─sw─┘ │ │ │ ▼ │ ▼ 3 7 8 4 │ 2 6 5 1
此时,我们得到了2个n = 2
的双调序列,分别为(3, 7, 8, 4)
、(2, 6, 5, 1)
,因此,我们下一步需要构造1个(n + 1) = 3
cd = 2^(n-1) = 2 | n = 2
cd = cd / 2 = 1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 group 1 │ group 2 │ 3 7 8 4 │ 2 6 5 1 │ ▼ group 1 3 7 8 4 2 6 5 1 │ │ ▲ ▲ ▲ ▲ │ │ └────┼─────┘ │ └────┬─sw──┘ │ │ │ │ │ └────sw────┘ └──────────┘ │ ▼ 3 4 8 7 5 6 2 1 │ ▲ │ ▲ ▲ │ ▲ │ └────┘ └─sw─┘ └─sw─┘ └────┘ │ ▼ 3 4 7 8 6 5 2 1
此时,我们得到了1个n = 3
的双调序列,即(3, 4, 7, 8, 6, 5, 2, 1)
,因此,我们下一步需要构造1个(n + 1) = 4
cd = 2^(n-1) = 4 | n = 3
cd = cd/2 = 2
cd = cd/2 = 1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 group 1 3 4 7 8 6 5 2 1 │ ▼ group 1 3 4 7 8 6 5 2 1 x x x x x x x x │ │ │ │ ▲ ▲ ▲ ▲ └────┼─────┼────┼─────┘ │ │ │ │ │ │ │ │ │ └─────┼────┼──────────┘ │ │ │ │ │ │ └────┼────sw──────────┘ │ │ │ └──────────sw─────────┘ │ ▼ 3 4 2 1 6 5 7 8 x x x x x x x x │ │ ▲ ▲ │ │ ▲ ▲ └──sw├─────┘ │ └────┼─────┘ │ │ │ │ │ └────sw────┘ └──────────┘ │ ▼ 2 1 3 4 6 5 7 8 x x x x x x x x │ ▲ │ ▲ │ ▲ │ ▲ └─sw─┘ └────┘ └─sw─┘ └────┘ │ ▼ 1 2 3 4 5 6 7 8 x x x x x x x x
此时,我们得到了一个n = 4
的双调序列,即(1, 2, 3, 4, 5, 6, 7, 8, x, x, x, x, x, x, x, x)
7.1 参考
8 Parallel Merge Sort
8.1 Merge Path
Merge Path - A Visually Intuitive Approach
8.2 Pipelined Multi-staged Merge Sort
Parallel Merge Sort
9 Approx Top K
Efficient Computation of Frequent and Top-k Elements in Data Streams
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 #include <iostream> #include <unordered_map> #include <vector> struct ItemCounter { int32_t item; size_t counter; }; class SpaceSaved {private : std::unordered_map<int32_t , ItemCounter*> _table; std::vector<ItemCounter> _counters; size_t empty_idx = 0 ; public : SpaceSaved (int32_t k) : _counters(k) {} void process (int32_t item) { if (_table.find (item) != _table.end ()) { _table[item]->counter++; } else { if (empty_idx < _counters.size ()) { ItemCounter* empty_counter = &_counters[empty_idx++]; empty_counter->item = item; empty_counter->counter = 1 ; _table[item] = empty_counter; } else { ItemCounter* minCounter = &_counters[0 ]; for (auto & counter : _counters) { if (counter.counter < minCounter->counter) { minCounter = &counter; } } _table.erase (minCounter->item); minCounter->item = item; minCounter->counter++; _table[item] = minCounter; } } } std::vector<ItemCounter> get_frequent_items () { return _counters; } }; int main () { SpaceSaved ss (5 ) ; std::vector<int32_t > stream = {1 , 1 , 2 , 2 , 2 , 3 , 4 , 4 , 5 , 5 , 6 , 7 , 7 , 7 , 8 }; for (int32_t item : stream) { ss.process (item); } auto frequentItems = ss.get_frequent_items (); for (const auto & entry : frequentItems) { std::cout << "Item: " << entry.item << ", Count: " << entry.counter << std::endl; } return 0 ; }
9.1 Two-stage Aggregation
The Space Saving Algorithm is commonly used for estimating the top-K frequent items in a stream of data with limited memory. To implement this as a two-stage aggregate function for a distributed database management system (DBMS), you’ll need to handle the aggregation in two main phases:
Local Aggregation
(First-stage aggregate on each node)
Global Aggregation
(Second-stage aggregate on a single node)
Here’s how you can design and execute the two-stage aggregation:
Local Aggregation (First-stage): Each node will maintain a list of counters based on the Space Saving Algorithm:
For each incoming item in the stream:
If the item is already in the list of counters, increment its count.
If the item is not in the list and there is space available, add it to the list with a count of 1.
If the item is not in the list and there is no space available, find the item with the smallest count, replace it with the new item and increment the count of the new item.
At the end of this phase, each node will have its local top-K counters.
Global Aggregation (Second-stage): After the local aggregation phase, the intermediate counters from all nodes will be sent to a particular aggregation node. On this node:
For each counter from the nodes:
If the item is already in the global list of counters, add the local count to the global count.
If the item is not in the global list and there is space available, add it to the global list with its local count.
If the item is not in the global list and there is no space available, determine if its local count is greater than the smallest global counter. If it is, replace the global counter with the new item and its count. Otherwise, discard the counter.
Once all the local counters have been processed, the global list will contain the estimated top-K frequent items across all the nodes.
Some considerations:
Due to the nature of the Space Saving Algorithm, the accuracy of the results will depend on the number of counters you maintain in your list. The more counters you have, the more accurate the result, but at the cost of increased memory usage.
In the global aggregation phase, you might be merging a lot of counters, especially if you have many nodes. Ensure your global counter list is sufficiently large to maintain accuracy.
Depending on the distribution of your data across nodes, there might be significant overlap between local top-K items. This can make your global results more accurate.
Implementing the Space Saving Algorithm for a distributed DBMS can be a challenging task, but with careful design and attention to detail, it’s possible to get accurate top-K estimations with limited memory.
10 总结