• 哈希, Hash,一般翻译做散列,或音译为哈希,是把任意长度的输入(又叫做预映射pre-image)通过散列算法变换成固定长度的输出,该输出就是散列值。这种转换是一种压缩映射,也就是,散列值的空间通常远小于输入的空间,不同的输入可能会散列成相同的输出,所以不可能从散列值来确定唯一的输入值。简单的说就是一种将任意长度的消息压缩到某一固定长度的消息摘要的函数,在分布式缓存服务中,经常需要对服务进行节点添加和删除操作,我们希望的是节点添加和删除操作尽量减少数据-节点之间的映射关系更新。假如我们使用的是哈希取模( hash(key)%nodes ) 算法作为路由策略,哈希取模的缺点在于如果有节点的删除和添加操作,对 hash(key)%nodes 结果影响范围太大了,造成大量的请求无法命中从而导致缓存数据被重新加载。基于上面的缺点提出了一种新的算法:一致性哈希。

  • 首先对节点进行哈希计算,哈希值通常在 2^32-1 范围内。然后将 2^32-1 这个区间首尾连接抽象成一个环并将节点的哈希值映射到环上,当我们要查询 key 的目标节点时,同样的我们对 key 进行哈希计算,然后顺时针查找到的第一个节点就是目标节点。一致性哈希可以实现节点删除和添加只会影响一小部分数据的映射关系,由于这个特性哈希算法也常常用于各种均衡器中实现系统流量的平滑迁移。假如环上的节点数量非常少,那么非常有可能造成数据分布不平衡,本质上是环上的区间分布粒度太粗。粒度太粗吗,那就加入更多的节点,这就引出了一致性哈希的虚拟节点概念,虚拟节点的作用在于让环上的节点区间分布粒度变细。一个真实节点对应多个虚拟节点,将虚拟节点的哈希值映射到环上,查询 key 的目标节点我们先查询虚拟节点再找到真实节点即可。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
package hash

import (
	"fmt"
	"sort"
	"strconv"
	"sync"

	"github.com/tal-tech/go-zero/core/lang"
	"github.com/tal-tech/go-zero/core/mapping"
)

const (
	// TopWeight is the top weight that one entry might set.
	TopWeight = 100

	minReplicas = 100
	prime       = 16777619
)

type (
	// Func defines the hash method.
	//哈希函数
	Func func(data []byte) uint64

	// A ConsistentHash is a ring hash implementation.
	//一致性哈希
	ConsistentHash struct {
		//哈希函数
		hashFunc Func
		//虚拟节点放大因子
		//确定node的虚拟节点数量
		replicas int
		//虚拟节点列表
		keys []uint64
		//虚拟节点到物理节点的映射
		ring map[uint64][]interface{}
		//物理节点映射,快速判断是否存在node
		nodes map[string]lang.PlaceholderType
		//读写锁
		lock sync.RWMutex
	}
)

// NewConsistentHash returns a ConsistentHash.
//默认构造器
func NewConsistentHash() *ConsistentHash {
	return NewCustomConsistentHash(minReplicas, Hash)
}

// NewCustomConsistentHash returns a ConsistentHash with given replicas and hash func.
//有参构造器
func NewCustomConsistentHash(replicas int, fn Func) *ConsistentHash {
	if replicas < minReplicas {
		replicas = minReplicas
	}

	if fn == nil {
		fn = Hash
	}

	return &ConsistentHash{
		hashFunc: fn,
		replicas: replicas,
		ring:     make(map[uint64][]interface{}),
		nodes:    make(map[string]lang.PlaceholderType),
	}
}

// Add adds the node with the number of h.replicas,
// the later call will overwrite the replicas of the former calls.
//扩容操作,增加物理节点
func (h *ConsistentHash) Add(node interface{}) {
	h.AddWithReplicas(node, h.replicas)
}

// AddWithReplicas adds the node with the number of replicas,
// replicas will be truncated to h.replicas if it's larger than h.replicas,
// the later call will overwrite the replicas of the former calls.
//扩容操作,增加物理节点
func (h *ConsistentHash) AddWithReplicas(node interface{}, replicas int) {
	//支持可重复添加
	//先执行删除操作
	h.Remove(node)
	//不能超过放大因子上限
	if replicas > h.replicas {
		replicas = h.replicas
	}
	//node key
	nodeRepr := repr(node)
	h.lock.Lock()
	defer h.lock.Unlock()
	//添加node map映射
	h.addNode(nodeRepr)
	for i := 0; i < replicas; i++ {
		//创建虚拟节点
		hash := h.hashFunc([]byte(nodeRepr + strconv.Itoa(i)))
		//添加虚拟节点
		h.keys = append(h.keys, hash)
		//映射虚拟节点-真实节点
		//注意hashFunc可能会出现哈希冲突,所以采用的是追加操作
		//虚拟节点-真实节点的映射对应的其实是个数组
		//一个虚拟节点可能对应多个真实节点,当然概率非常小
		h.ring[hash] = append(h.ring[hash], node)
	}
	//排序
	//后面会使用二分查找虚拟节点
	sort.Slice(h.keys, func(i, j int) bool {
		return h.keys[i] < h.keys[j]
	})
}

// AddWithWeight adds the node with weight, the weight can be 1 to 100, indicates the percent,
// the later call will overwrite the replicas of the former calls.
//按权重添加节点
//通过权重来计算方法因子,最终控制虚拟节点的数量
//权重越高,虚拟节点数量越多
func (h *ConsistentHash) AddWithWeight(node interface{}, weight int) {
	// don't need to make sure weight not larger than TopWeight,
	// because AddWithReplicas makes sure replicas cannot be larger than h.replicas
	replicas := h.replicas * weight / TopWeight
	h.AddWithReplicas(node, replicas)
}

// Get returns the corresponding node from h base on the given v.
//根据v顺时针找到最近的虚拟节点
//再通过虚拟节点映射找到真实节点
func (h *ConsistentHash) Get(v interface{}) (interface{}, bool) {
	h.lock.RLock()
	defer h.lock.RUnlock()
	//当前哈希还没有物理节点
	if len(h.ring) == 0 {
		return nil, false
	}
	//计算哈希值
	hash := h.hashFunc([]byte(repr(v)))
	//二分查找
	//因为每次添加节点后虚拟节点都会重新排序
	//所以查询到的第一个节点就是我们的目标节点
	//取余则可以实现环形列表效果,顺时针查找节点
	index := sort.Search(len(h.keys), func(i int) bool {
		return h.keys[i] >= hash
	}) % len(h.keys)
	//虚拟节点->物理节点映射
	nodes := h.ring[h.keys[index]]
	switch len(nodes) {
	//不存在真实节点
	case 0:
		return nil, false
	//只有一个真实节点,直接返回
	case 1:
		return nodes[0], true
	//存在多个真实节点意味这出现哈希冲突
	default:
		//此时我们对v重新进行哈希计算
		//对nodes长度取余得到一个新的index
		innerIndex := h.hashFunc([]byte(innerRepr(v)))
		pos := int(innerIndex % uint64(len(nodes)))
		return nodes[pos], true
	}
}

// Remove removes the given node from h.
//删除物理节点
func (h *ConsistentHash) Remove(node interface{}) {
	//节点的string
	nodeRepr := repr(node)
	//并发安全
	h.lock.Lock()
	defer h.lock.Unlock()
	//节点不存在
	if !h.containsNode(nodeRepr) {
		return
	}
	//移除虚拟节点映射
	for i := 0; i < h.replicas; i++ {
		//计算哈希值
		hash := h.hashFunc([]byte(nodeRepr + strconv.Itoa(i)))
		//二分查找到第一个虚拟节点
		index := sort.Search(len(h.keys), func(i int) bool {
			return h.keys[i] >= hash
		})
		//切片删除对应的元素
		if index < len(h.keys) && h.keys[index] == hash {
			//定位到切片index之前的元素
			//将index之后的元素(index+1)前移覆盖index
			h.keys = append(h.keys[:index], h.keys[index+1:]...)
		}
		//虚拟节点删除映射
		h.removeRingNode(hash, nodeRepr)
	}
	//删除真实节点
	h.removeNode(nodeRepr)
}

//删除虚拟-真实节点映射关系
//hash - 虚拟节点
//nodeRepr - 真实节点
func (h *ConsistentHash) removeRingNode(hash uint64, nodeRepr string) {
	//map使用时应该校验一下
	if nodes, ok := h.ring[hash]; ok {
		//新建一个空的切片,容量与nodes保持一致
		newNodes := nodes[:0]
		//遍历nodes
		for _, x := range nodes {
			//如果序列化值不相同,x是其他节点
			//不能删除
			if repr(x) != nodeRepr {
				newNodes = append(newNodes, x)
			}
		}
		//剩余节点不为空则重新绑定映射关系
		if len(newNodes) > 0 {
			h.ring[hash] = newNodes
		} else {
			//否则删除即可
			delete(h.ring, hash)
		}
	}
}

func (h *ConsistentHash) addNode(nodeRepr string) {
	h.nodes[nodeRepr] = lang.Placeholder
}

//节点是否已存在
func (h *ConsistentHash) containsNode(nodeRepr string) bool {
	//nodes本身是一个map,时间复杂度是O1效率非常高
	_, ok := h.nodes[nodeRepr]
	return ok
}

//删除node
func (h *ConsistentHash) removeNode(nodeRepr string) {
	delete(h.nodes, nodeRepr)
}

//返回node的string值
//在遇到哈希冲突时需要重新对key进行哈希计算
//为了减少冲突的概率前面追加了一个质数 prime来减小冲突的概率
func innerRepr(v interface{}) string {
	return fmt.Sprintf("%d:%v", prime, v)
}

//返回node的字符串
//如果让node强制实现String()会不会更好一些?
func repr(node interface{}) string {
	return mapping.Repr(node)
}