GO CountMinSketch计数器(布隆过滤器思想的近似计数器)

 更新时间:2022年09月14日 08:34:00   作者:jiaxwu​​​​​​  
这篇文章主要介绍了GO CountMinSketch计数器(布隆过滤器思想的近似计数器),文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的朋友可以参考一下

简介

CountMinSketch是一种计数器,用来统计一个元素的计数,它能够以一个非常小的空间统计大量元素的计数,同时保证高的性能及准确性。

与布隆过滤器类似,由于它是基于概率的,因此它所统计的计数是有一定概率存在误差的,也就是可能会比真实的计数大。比如一个元素实际的计数是10,但是计算器的计算结果可能比10大。因此适合能够容忍计数存在一定误差的场景,比如社交网络中推文的访问次数。

它一秒能够进行上百万次操作(主要取决于哈希函数的速度),并且如果我们每天有一个长度为100亿的数据流需要进行计数,计数值允许的误差范围是100,允许的错误率是0.1%,计数器大小是32位,只需要7.2GB内存,这完全可以单机进行计数。

原理

数据结构

CountMinSketch计数器的数据结构是一个二维数组,每一个元素都是一个计数器,计数器可以使用一个数值类型进行表示,比如无符号int

增加计数

每个元素会通过不同的哈希函数映射到每一行的某个位置,并增加对应位置上的计数:

估算计数

估算计数也是如上图流程,根据哈希映射到每一行的对应位置,然后读取所有行的计数,返回其中最小的一个。

返回最小的一个是因为其他其他元素也可能会映射到自身所映射位置上面,导致计数比真实计数大,因此最小的一个计数最可能是真实计数:

比如上图元素123映射到了元素abc第一行的相同位置,因此这个位置的计数累加了元素abc和元素123的计数和。但是只要我们取三行里面最小的一个计数,那么就能容忍这种情况。

当然,如果一个元素的每一行的对应位置都被其他元素所映射,那么这个估算的计数就会比真实计数大。

哈希函数

CountMinSketch计数器里面的哈希函数需要是彼此独立且均匀分布(类似于哈希表的哈希函数),而且需要尽可能的快,比如murmur3就是一个很好的选择。

CountMinSketch计数器的性能严重依赖于哈希函数的性能,而一般哈希函数的性能则依赖于输入串(一般为字节数组)的长度,因此为了提高CountMinSketch计数器的性能建议减少输入串的长度。

下面是一个简单的性能测试,单位是字节,可以看到时间的消耗随着元素的增大基本是线性增长的:

pkg: github.com/jiaxwu/gommon/counter/cm
cpu: Intel(R) Core(TM) i5-10210U CPU @ 1.60GHz
BenchmarkAddAndEstimate/1-8              2289142               505.9 ns/op         1.98 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/2-8              2357380               513.7 ns/op         3.89 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/4-8              2342382               496.9 ns/op         8.05 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/8-8              2039792               499.7 ns/op        16.01 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/16-8             2350281               526.8 ns/op        30.37 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/32-8             2558060               444.3 ns/op        72.03 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/64-8             2540272               459.5 ns/op       139.29 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/128-8            1919720               538.6 ns/op       237.67 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/256-8            1601738               720.6 ns/op       355.28 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/512-8             950584              1599 ns/op         320.18 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/1024-8            363592              3169 ns/op         323.17 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/2048-8            187500              5888 ns/op         347.81 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/4096-8            130425              8825 ns/op         464.15 MB/s           0 B/op          0 allocs/op
BenchmarkAddAndEstimate/8192-8             67198             17460 ns/op         469.18 MB/s           0 B/op          0 allocs/op

数组大小、哈希函数数量、错误范围、错误率

数组大小、哈希函数数量、错误范围和错误率之间是互相影响的,如果我们想减少错误率和错误范围,则需要更大的数组和更多的哈希函数。但是我们很难直观的计算出这些参数,还好有两个公式可以帮助我们计算出准确的数值:

在我们可以确定我们的数据流大小和能够容忍的错误范围错误率的情况下,我们可以根据下面公式计算数组大小哈希函数数量

n = 数据流大小 
m = 数组大小
k = 哈希函数数量 
eRange = 错误范围(ErrorRange)
eRate = 错误率(ErrorRate)
ceil() = 向上取整操作
E = 2.718281828459045(自然常数)

m = ceil(E/(eRange/n))
k = ceil(ln(1/eRate))

应用

TopK(海量数据计数器)

对于海量数据流中频率最高的K个数,如果使用常规的map<key, uint>,由于内存大小限制,一般情况下单机无法完成计算,需要把数据路由到多台机器上进行计数。

而如果我们使用CountMinSketch则能够在单机情况下处理大量的数据,比如开头所提到对于一个长度为100亿的数据流进行计数,只需要7.2GB内存。这个计数结果可能存在一定误差,不过我们可以在这个基础上再进行过滤。

TinyLFU

TinyLFU是一个缓存淘汰策略,它里面有LFU策略的思想,LFU是一个基于访问频率的淘汰策略,因此需要统计每个元素被访问的次数。如果对每个元素使用一个独立的计数器,那么这个成本会很大,而且对于一个缓存淘汰策略来说,我们并不需要这个计数器非常大且非常准确。

因此TinyLFU使用一个计数器长度为4位的CountMinSketch计数器统计每个元素的频率,减少计数所消耗的内存空间,同时还引入了计数衰减机制避免某些之前热门但是当前已经很少被访问的元素很难被淘汰。

实现

这里给出一个Golang的泛型实现,这个实现支持uint8uint16uint32uint64等基本类型计数器,实际上还可以实现比如长度为2bit4bit6bit的计数器,但是代码会稍微复杂一点(特别是非2的次方的计数器)。

package cm
import (
	"math"

	"github.com/jiaxwu/gommon/hash"
	mmath "github.com/jiaxwu/gommon/math"
	"github.com/jiaxwu/gommon/mem"
	"golang.org/x/exp/constraints"
)

// Count-Min Sketch 计数器,原理类似于布隆过滤器,根据哈希映射到多个位置,然后在对应位置进行计数
// 读取时拿对应位置最小的
// 适合需要一个比较小的计数,而且不需要这个计数一定准确的情况
// 可以减少空间消耗
// https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.591.8351&rep=rep1&type=pdf
type Counter[T constraints.Unsigned] struct {
	counters    [][]T
	countersLen uint64       // 计数器长度
	hashs       []*hash.Hash // 哈希函数列表
	maxCount    T            // 最大计数值
}

// 创建一个计数器
// size:数据流大小
// errorRange:计数值误差范围(会超过真实计数值)
// errorRate:错误率
func New[T constraints.Unsigned](size uint64, errorRange T, errorRate float64) *Counter[T] {
	// 计数器长度
	countersLen := uint64(math.Ceil(math.E / (float64(errorRange) / float64(size))))
	// 哈希个数
	hashsCnt := int(math.Ceil(math.Log(1.0 / errorRate)))
	hashs := make([]*hash.Hash, hashsCnt)
	counters := make([][]T, hashsCnt)
	for i := 0; i < hashsCnt; i++ {
		hashs[i] = hash.New()
		counters[i] = make([]T, countersLen)
	}
	return &Counter[T]{
		counters:    counters,
		countersLen: countersLen,
		hashs:       hashs,
		maxCount:    T(0) - 1,
	}
}

// 增加元素的计数
func (c *Counter[T]) Add(b []byte, val T) {
	for i, h := range c.hashs {
		index := h.Sum64(b) % c.countersLen
		if c.counters[i][index]+val <= c.counters[i][index] {
			c.counters[i][index] = c.maxCount
		} else {
			c.counters[i][index] += val
		}
	}
}

// 增加元素的计数
// 等同于Add(b, 1)
func (c *Counter[T]) Inc(b []byte) {
	c.Add(b, 1)
}

// 增加元素的计数
// 字符串类型
func (c *Counter[T]) AddString(s string, val T) {
	c.Add([]byte(s), val)
}

// 增加元素的计数
// 等同于Add(b, 1)
// 字符串类型
func (c *Counter[T]) IncString(s string) {
	c.Add([]byte(s), 1)
}

// 估算元素的计数
func (c *Counter[T]) Estimate(b []byte) T {
	minCount := c.maxCount
	for i, h := range c.hashs {
		index := h.Sum64(b) % c.countersLen
		count := c.counters[i][index]
		if count == 0 {
			return 0
		}
		minCount = mmath.Min(minCount, count)
	}
	return minCount
}

// 估算元素的计数
// 字符串类型
func (c *Counter[T]) EstimateString(s string) T {
	return c.Estimate([]byte(s))
}

// 计数衰减
// 如果factor为0则直接清空
func (c *Counter[T]) Attenuation(factor T) {
	for _, counter := range c.counters {
		if factor == 0 {
			mem.Memset(counter, 0)
		} else {
			for j := uint64(0); j < c.countersLen; j++ {
				counter[j] /= factor
			}
		}
	}
}

数据结构

这里的数据结构核心是一个k*m的二维数组counters,k是哈希函数数量,m是数组每一行的长度;countersLen其实就是m;hashs是哈希函数列表;maxCount是当前类型的最大值,比如uint8就是255,下面的计算需要用到它。

type Counter[T constraints.Unsigned] struct {
	counters    [][]T
	countersLen uint64       // 计数器长度
	hashs       []*hash.Hash // 哈希函数列表
	maxCount    T            // 最大计数值
}

初始化

我们首先使用上面提到的两个公式计算数组每一行长度和哈希函数的数量,然后初始化哈希函数列表和二维数组。

// 创建一个计数器
// size:数据流大小
// errorRange:计数值误差范围(会超过真实计数值)
// errorRate:错误率
func New[T constraints.Unsigned](size uint64, errorRange T, errorRate float64) *Counter[T] {
	// 计数器长度
	countersLen := uint64(math.Ceil(math.E / (float64(errorRange) / float64(size))))
	// 哈希个数
	hashsCnt := int(math.Ceil(math.Log(1.0 / errorRate)))
	hashs := make([]*hash.Hash, hashsCnt)
	counters := make([][]T, hashsCnt)
	for i := 0; i < hashsCnt; i++ {
		hashs[i] = hash.New()
		counters[i] = make([]T, countersLen)
	}
	return &Counter[T]{
		counters:    counters,
		countersLen: countersLen,
		hashs:       hashs,
		maxCount:    T(0) - 1,
	}
}

增加计数

对于一个元素,我们需要把它根据每个哈希函数计算出它在每一行数组的位置,然后增加对应位置计数器的计数值。

这里需要注意的是,计数值可能会溢出,因此我们首先判断是否溢出,如果溢出则设置为最大值。

// 增加元素的计数
func (c *Counter[T]) Add(b []byte, val T) {
	for i, h := range c.hashs {
		index := h.Sum64(b) % c.countersLen
		if c.counters[i][index]+val <= c.counters[i][index] {
			c.counters[i][index] = c.maxCount
		} else {
			c.counters[i][index] += val
		}
	}
}

估算计数

同增加计数原理,把元素根据哈希函数映射到每一行数组的对应位置,然后选择所有行中最小的那个计数值。

// 估算元素的计数
func (c *Counter[T]) Estimate(b []byte) T {
	minCount := c.maxCount
	for i, h := range c.hashs {
		index := h.Sum64(b) % c.countersLen
		count := c.counters[i][index]
		if count == 0 {
			return 0
		}
		minCount = mmath.Min(minCount, count)
	}
	return minCount
}

到此这篇关于GO CountMinSketch计数器(布隆过滤器思想的近似计数器)的文章就介绍到这了,更多相关GO CountMinSketch内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • golang使用mapstructure解析json

    golang使用mapstructure解析json

    mapstructure 是一个 Go 库,用于将通用映射值解码为结构,这篇文章主要来和大家介绍一下golang如何使用mapstructure解析json,需要的可以参考下
    2023-12-12
  • Golang并发操作中常见的读写锁详析

    Golang并发操作中常见的读写锁详析

    Golang中的锁机制主要包含互斥锁和读写锁互斥锁互斥锁是传统并发程序对共享资源进行控制访问的主要手段,这篇文章主要给大家介绍了关于Golang并发操作中常见的读写锁的相关资料,需要的朋友可以参考下
    2021-08-08
  • go for range遍历二维数组的示例

    go for range遍历二维数组的示例

    今天小编就为大家分享一篇关于go for range遍历二维数组的示例,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-04-04
  • go reflect要不要传指针原理详解

    go reflect要不要传指针原理详解

    这篇文章主要为大家介绍了go reflect要不要传指针原理详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-01-01
  • Go并发原语之SingleFlight请求合并方法实例

    Go并发原语之SingleFlight请求合并方法实例

    本文我们来学习一下 Go 语言的扩展并发原语:SingleFlight,SingleFlight 的作用是将并发请求合并成一个请求,以减少重复的进程来优化 Go 代码
    2023-12-12
  • 基于HLS创建Golang视频流服务器的优缺点

    基于HLS创建Golang视频流服务器的优缺点

    HLS 是 HTTP Live Streaming 的缩写,是苹果开发的一种基于 HTTP 的自适应比特率流媒体传输协议。这篇文章主要介绍了基于 HLS 创建 Golang 视频流服务器,需要的朋友可以参考下
    2021-08-08
  • Go编写定时器与定时任务详解(附第三方库gocron用法)

    Go编写定时器与定时任务详解(附第三方库gocron用法)

    当需要每天执行定时任务的时候就需要定时器来处理了,周期任务,倒计时任务,定点任务等,下面这篇文章主要给大家介绍了关于Go编写定时器与定时任务的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-07-07
  • 解决go echo后端处理跨域的两种操作方式

    解决go echo后端处理跨域的两种操作方式

    这篇文章主要介绍了解决go echo后端处理跨域的两种操作方式,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-12-12
  • go语言的工作空间和GOPATH环境变量介绍

    go语言的工作空间和GOPATH环境变量介绍

    这篇文章主要介绍了go语言的工作空间和GOPATH环境变量介绍,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-12-12
  • 在go语言中安装与使用protobuf的方法详解

    在go语言中安装与使用protobuf的方法详解

    protobuf以前只支持C++, Python和Java等语言, Go语言出来后, 作为亲儿子, 那有不支持的道理呢? 这篇文章主要给大家介绍了关于在go语言中使用protobuf的相关资料,文中介绍的非常详细,需要的朋友可以参考借鉴,下面来一起看看吧。
    2017-08-08

最新评论