Building a Consistent Hashing Ring Part3-Part5(构建一个一致性哈希环 Part3-Part5)
本篇是Building a Consistent Hashing Ring 第三到第五部分的翻译,上篇翻译了原文的第一到第三部分,在第三到第五部分中,引入了分区概念,多副本,多可用区,以及权重的概念,更加接近一个高可用的实际一致性环
Part 3
在本系列的第2部分中,我们实现了一种算法,在集群添加新节点时也能够很好地运行。我们使用了可以独立分配给物理节点的1000个虚拟节点,这样我们可以最小化添加节点时移动的数据量。
虚拟节点数量确定了我们可以拥有多少物理节点。例如,如果您有1000个虚拟节点,并且尝试添加一个第1001个物理节点,那么在不替换任何物理节点的情况下是没有虚拟节点可以分配的,最终只会剩下1000个活动的物理节点。
不幸的是,在集群的整个生命周期中,在不进行很多细致的工作的情况下,创建时确定的虚拟节点的数量几乎永远无法改变。例如,可以将原有的一个虚拟节点拆成两个,而且将拆出的两个新虚拟节点分配给同一个物理节点,这样可以实现增加一倍的虚拟节点数量。然而,如果物理节点需要借助虚拟节点的id来更好地存储数据(例如,所有数据可能存储在/[虚拟节点id]/[数据id]目录中),则必须要移动数据来适应虚拟节点数量变化了。而且在移动的过程中,新旧位置都必须要有数据,很难或者基本不可能实现原子性操作。
所以稍微往前想一下:改变虚拟节点的数量可能是一件付出远比收益大的工作,但请记住,对于某些应用程序可能会是很有好处的。
解决这个限制的最简单的方法是使限制足够高,这样就不用怕遇到需要扩充的情况了。例如,如果我们认为集群永远不会拥有超过60000个物理节点,那么我们就可以设置6万个虚拟节点。
此外,我们应该在我们的计算中考虑一下节点的相对大小。例如,一年之后,我们可能会拥有处理当前节点的两倍容量的物理节点。因此,我们希望虚拟节点数量翻倍,这样应该将虚拟节点的估计值提高到120000。
为每个物理节点保留100个虚拟节点会是一个很好的规则。这样可以实现对任何给定节点的负载进行1%的调整,即使在最大容量情况下,这样的调整影响也很小。所以,对于一个最多包含60000个物理节点的集群,我们会有6,000,000个虚拟节点。
600万虚拟节点看起来很多,让人觉得可能会用到太多的内存。但唯一需要存储的结构是虚拟节点到实际节点映射关系。所需的内存大小大概为600万乘以2字节(存储从0到65,535的真实节点ID)大约需要12MB的内存,对于现阶段的机器来说,这并不是一件难事。
即使算上Python存储类型的所有开销,也没有什么可担心的。稍微修改了一下上面的代码,扩展成60000个物理和6,000,000个虚拟节点,将list更改为array(‘H’),最高也只用了27MB的常驻内存,这还包括了两个环。
这时候可以换一种说法了,可以将这些这些虚拟节点叫做“分区”(partitions)。这将使我们更容易辨别之前所讨论的两种节点类型。另外,将它叫做分区也是有道理的,因为它确实是一个不间断的哈希空间的一部分。
我们也会始终保持分区个数为2的幂。这样可以使用哈希的位操作而不是模数来确定分区了。虽然并没有快多少,但好歹快一点点。所以,下面就是更新后的哈希环代码,使用8,388,608 (2**23)个分区和65,536个节点。我们已经提取了样本数据id集,并检查了分布情况,以确保没有破坏任何东西。
from array import array
from hashlib import md5
from struct import unpack_from
PARTITION_POWER = 23
PARTITION_SHIFT = 32 - PARTITION_POWER
NODE_COUNT = 65536
DATA_ID_COUNT = 100000000
part2node = array('H')
for part in xrange(2 ** PARTITION_POWER):
part2node.append(part % NODE_COUNT)
node_counts = [0] * NODE_COUNT
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
part = unpack_from('>I', md5(str(data_id)).digest())[0] >> PARTITION_SHIFT
node_id = part2node[part]
node_counts[node_id] += 1
desired_count = DATA_ID_COUNT / NODE_COUNT
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % (max_count, over)
min_count = min(node_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % (min_count, under)
运行结果:
1525: Desired data ids per node
1683: Most data ids on one node, 10.36% over
1360: Least data ids on one node, 10.82% under
额 +-10%似乎有点高,但是我重新以65,536个分区和256个节点的情况运行了一次,得到+-0.4%,所以原因是因为1亿个样本对于8百万的分区数来说太小了。 样本量太大会使运行时间过长,所以我们将数字缩小了点。 (可以肯定的是,我用完整的版本重新运行了一个100亿数据id的样本集,得到+-1%的结果,但是一共花了了6.5个小时)。
Part 4
在本系列的第3部分中,我们进一步讨论了分区(虚拟节点),并基于此清理了代码。现在,我们来谈谈如何提高集群中数据的持久性和可用性。
对于很多的分布式数据存储,数据持久性是非常重要的。无论是RAID阵列或独立的数据副本都是需要的。虽然RAID会增加数据持久性,但它不会增加可用性 - 如果RAID机器崩溃了,数据可能是安全的,但是在修复完成之前,数据是无法被访问的。如果我们在不同机器上保留数据的多个副本,即使机器崩溃,在修复损坏的机器时,其他副本仍然是可用的。
实现这种多副本持久性/可用性的最简单简单方法就是将节点进行分组并使用独立的哈希环。例如,要实现三数据副本的行业标准,您可以将节点分为三组,每组都有自己的环,每个组将收到每个数据项的副本。这可以工作地很好,但是缺点在于如果需要扩充容量,就需要一下子添加三个节点,而且,当有一个节点丢失时,基本上也就相当于少了三倍的容量。
作为替代,我们用一个不同但是通用的方法来满足我们的需求。这可以通过从起点沿着哈希环走动,并寻找其他不同的节点来完成。下面的代码是支持不同副本数量的(测试时将数量设置为了3):
from array import array
from hashlib import md5
from struct import unpack_from
REPLICAS = 3
PARTITION_POWER = 16
PARTITION_SHIFT = 32 - PARTITION_POWER
PARTITION_MAX = 2 ** PARTITION_POWER - 1
NODE_COUNT = 256
DATA_ID_COUNT = 10000000
part2node = array('H')
for part in xrange(2 ** PARTITION_POWER):
part2node.append(part % NODE_COUNT)
node_counts = [0] * NODE_COUNT
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
part = unpack_from('>I',
md5(str(data_id)).digest())[0] >> PARTITION_SHIFT
node_ids = [part2node[part]]
node_counts[node_ids[0]] += 1
# 按顺序往下选三个没有选中的节点
for replica in xrange(1, REPLICAS):
while part2node[part] in node_ids:
part += 1
if part > PARTITION_MAX:
part = 0
node_ids.append(part2node[part])
node_counts[node_ids[-1]] += 1
desired_count = DATA_ID_COUNT / NODE_COUNT * REPLICAS
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % (max_count, over)
min_count = min(node_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % (min_count, under)
运行结果:
117186: Desired data ids per node
118133: Most data ids on one node, 0.81% over
116093: Least data ids on one node, 0.93% under
很不错,数据不均衡小于1%。但是,还是有几个问题。
首先,由于我们最初将分区对应到物理节点,因此某个物理节点上的所有分区在其他两个物理节点上都有有额外的副本。问题是当一台机器故障时,另两个物理节点上的负载将增加一台机器的量。为了能更好地分发故障转移产生的负载,最好在开始分配分区时将分区打乱。
还有一个比较难解释的问题,就是是处理机器的物理分离。想像一下,一个机架只能放16台机器。因此256个节点需要16个机架。对于当前的代码,如果整个机架出现问题(电源问题,网络问题等),很有可能某些数据的三个副本都在该机架中,变得无法访问。因此我们可以给节点引入一个区域的概念,然后确保每个副本都存储在不同的区域中,从而解决这个缺陷。
from array import array
from hashlib import md5
from random import shuffle
from struct import unpack_from
REPLICAS = 3
PARTITION_POWER = 16
PARTITION_SHIFT = 32 - PARTITION_POWER
PARTITION_MAX = 2 ** PARTITION_POWER - 1
NODE_COUNT = 256
ZONE_COUNT = 16
DATA_ID_COUNT = 10000000
node2zone = []
# 将256个节点平均分配到16个不同的区域中
while len(node2zone) < NODE_COUNT:
zone = 0
while zone < ZONE_COUNT and len(node2zone) < NODE_COUNT:
node2zone.append(zone)
zone += 1
part2node = array('H')
for part in xrange(2 ** PARTITION_POWER):
part2node.append(part % NODE_COUNT)
# 关键的一步,将所有的分区全部打乱
shuffle(part2node)
node_counts = [0] * NODE_COUNT
zone_counts = [0] * ZONE_COUNT
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
part = unpack_from('>I', md5(str(data_id)).digest())[0] >> PARTITION_SHIFT
node_ids = [part2node[part]]
zones = [node2zone[node_ids[0]]]
node_counts[node_ids[0]] += 1
zone_counts[zones[0]] += 1
for replica in xrange(1, REPLICAS):
# 分配副本节点时忽略在同一个区域的节点
while part2node[part] in node_ids and node2zone[part2node[part]] in zones:
part += 1
if part > PARTITION_MAX:
part = 0
node_ids.append(part2node[part])
zones.append(node2zone[node_ids[-1]])
node_counts[node_ids[-1]] += 1
zone_counts[zones[-1]] += 1
desired_count = DATA_ID_COUNT / NODE_COUNT * REPLICAS
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % (max_count, over)
min_count = min(node_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % (min_count, under)
desired_count = DATA_ID_COUNT / ZONE_COUNT * REPLICAS
print '%d: Desired data ids per zone' % desired_count
max_count = max(zone_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids in one zone, %.02f%% over' % (max_count, over)
min_count = min(zone_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids in one zone, %.02f%% under' % (min_count, under)
运行结果:
117186: Desired data ids per node
118782: Most data ids on one node, 1.36% over
115632: Least data ids on one node, 1.33% under
1875000: Desired data ids per zone
1878533: Most data ids in one zone, 0.19% over
1869070: Least data ids in one zone, 0.32% under
可以看到将分区打乱,并且引入区域概念影响了我们的分配,但结果仍然是足够好的。 这个测试在我的机器上运行了大约64秒。
有一种更加常见的方式可以替换上面的代码。 这个方法根本不使用分区,而是将锚点分配给散列空间中的节点。 给定一个散列,只需要沿着锚环就能找到地一个节点,同样的可以继续找到其他节点。 为了达到和虚拟节点相同的效果,每个真实节点会被分配多个锚点。
from bisect import bisect_left
from hashlib import md5
from struct import unpack_from
REPLICAS = 3
NODE_COUNT = 256
ZONE_COUNT = 16
DATA_ID_COUNT = 10000000
VNODE_COUNT = 100
node2zone = []
while len(node2zone) < NODE_COUNT:
zone = 0
while zone < ZONE_COUNT and len(node2zone) < NODE_COUNT:
node2zone.append(zone)
zone += 1
hash2index = []
index2node = []
for node in xrange(NODE_COUNT):
for vnode in xrange(VNODE_COUNT):
hsh = unpack_from('>I', md5(str(node)).digest())[0]
index = bisect_left(hash2index, hsh)
if index > len(hash2index):
index = 0
hash2index.insert(index, hsh)
index2node.insert(index, node)
node_counts = [0] * NODE_COUNT
zone_counts = [0] * ZONE_COUNT
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
hsh = unpack_from('>I', md5(str(data_id)).digest())[0]
index = bisect_left(hash2index, hsh)
if index >= len(hash2index):
index = 0
node_ids = [index2node[index]]
zones = [node2zone[node_ids[0]]]
node_counts[node_ids[0]] += 1
zone_counts[zones[0]] += 1
for replica in xrange(1, REPLICAS):
while index2node[index] in node_ids and node2zone[index2node[index]] in zones:
index += 1
if index >= len(hash2index):
index = 0
node_ids.append(index2node[index])
zones.append(node2zone[node_ids[-1]])
node_counts[node_ids[-1]] += 1
zone_counts[zones[-1]] += 1
desired_count = DATA_ID_COUNT / NODE_COUNT * REPLICAS
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % (max_count, over)
min_count = min(node_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % (min_count, under)
desired_count = DATA_ID_COUNT / ZONE_COUNT * REPLICAS
print '%d: Desired data ids per zone' % desired_count
max_count = max(zone_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids in one zone, %.02f%% over' % (max_count, over)
min_count = min(zone_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids in one zone, %.02f%% under' % (min_count, under)
结果:
117186: Desired data ids per node
351282: Most data ids on one node, 199.76% over
15965: Least data ids on one node, 86.38% under
1875000: Desired data ids per zone
2248496: Most data ids in one zone, 19.92% over
1378013: Least data ids in one zone, 26.51% under
运行这个测试需要花上15分钟的时间!然后看起来这个方法对于控制数据分布效果不是很好,所以需要添加更多的虚拟节点,当然这样也会需要消耗更多的内存,而且查找时也会消耗更多时间。对于根据id查找节点这一最常用的操作,还是有改进空间的(例如,将每个虚拟节点的故障转移节点预先分配好),但是从第一部分开始我们都没有做相关的优化,所以暂时还是先保持原样。
在下一节中,我们会将这些封装一下,变成一个Python模块。
Part 5
在第四部分中,我们实现了一个支持复制和区域概念的环,或者说,至少部分实现了。在最后一部分中,我们会将这些封装成一个Pyhton的模块,然后,再加上最后的一个功能。
首先,我们将创建哈希环的操作和测试哈希环的操作分开:
from array import array
from hashlib import md5
from random import shuffle
from struct import unpack_from
from time import time
class Ring(object):
def __init__(self, nodes, part2node, replicas):
self.nodes = nodes
self.part2node = part2node
self.replicas = replicas
partition_power = 1
while 2 ** partition_power < len(part2node):
partition_power += 1
if len(part2node) != 2 ** partition_power:
raise Exception("part2node's length is not an "
"exact power of 2")
self.partition_shift = 32 - partition_power
def get_nodes(self, data_id):
data_id = str(data_id)
part = unpack_from('>I',
md5(data_id).digest())[0] >> self.partition_shift
node_ids = [self.part2node[part]]
zones = [self.nodes[node_ids[0]]]
for replica in xrange(1, self.replicas):
while self.part2node[part] in node_ids and \
self.nodes[self.part2node[part]] in zones:
part += 1
if part >= len(self.part2node):
part = 0
node_ids.append(self.part2node[part])
zones.append(self.nodes[node_ids[-1]])
return [self.nodes[n] for n in node_ids]
def build_ring(nodes, partition_power, replicas):
begin = time()
part2node = array('H')
for part in xrange(2 ** partition_power):
part2node.append(part % len(nodes))
shuffle(part2node)
ring = Ring(nodes, part2node, replicas)
print '%.02fs to build ring' % (time() - begin)
return ring
def test_ring(ring):
begin = time()
DATA_ID_COUNT = 10000000
node_counts = {}
zone_counts = {}
for data_id in xrange(DATA_ID_COUNT):
for node in ring.get_nodes(data_id):
node_counts[node['id']] = node_counts.get(node['id'], 0) + 1
zone_counts[node['zone']] = zone_counts.get(node['zone'], 0) + 1
print '%ds to test ring' % (time() - begin)
desired_count = DATA_ID_COUNT / len(ring.nodes) * REPLICAS
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts.itervalues())
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % (max_count, over)
min_count = min(node_counts.itervalues())
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % (min_count, under)
zone_count = len(set(n['zone'] for n in ring.nodes.itervalues()))
desired_count = DATA_ID_COUNT / zone_count * ring.replicas
print '%d: Desired data ids per zone' % desired_count
max_count = max(zone_counts.itervalues())
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids in one zone, %.02f%% over' % (max_count, over)
min_count = min(zone_counts.itervalues())
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids in one zone, %.02f%% under' % (min_count, under)
if __name__ == '__main__':
PARTITION_POWER = 16
REPLICAS = 3
NODE_COUNT = 256
ZONE_COUNT = 16
nodes = {}
while len(nodes) < NODE_COUNT:
zone = 0
while zone < ZONE_COUNT and len(nodes) < NODE_COUNT:
node_id = len(nodes)
nodes[node_id] = {'id': node_id, 'zone': zone}
zone += 1
ring = build_ring(nodes, PARTITION_POWER, REPLICAS)
test_ring(ring)
结果:
0.06s to build ring
82s to test ring
117186: Desired data ids per node
118773: Most data ids on one node, 1.35% over
115801: Least data ids on one node, 1.18% under
1875000: Desired data ids per zone
1878339: Most data ids in one zone, 0.18% over
1869914: Least data ids in one zone, 0.27% under
测试我们的哈系环需要比较长的时间,但这主要是因为我们将很多的变量从数组换成了字典。 用字典存储节点好处是,可以直接附加想要的任何节点信息(IP地址,TCP端口,驱动器路径等)。 我们还会进一步测试中,目前的数据分布情况还是不错的。
最后,我们将最后一个特性添加到哈希环中,也就是权重。权重的用处在于,在一个哈希环的整个生命周期中,后期添加的节点,很可能比前期添加的节点拥有更多的容量,对于这次的测试,我们将模拟所有节点中有一半的节点拥有两倍的权重。所以,我们需要修改build_ring
函数,将节点的权重考虑进去,分配给权重高的节点更多的分区。由于修改比较大,所以直接贴出代码:
from array import array
from hashlib import md5
from random import shuffle
from struct import unpack_from
from time import time
class Ring(object):
def __init__(self, nodes, part2node, replicas):
self.nodes = nodes
self.part2node = part2node
self.replicas = replicas
partition_power = 1
while 2 ** partition_power < len(part2node):
partition_power += 1
if len(part2node) != 2 ** partition_power:
raise Exception("part2node's length is not an "
"exact power of 2")
self.partition_shift = 32 - partition_power
def get_nodes(self, data_id):
data_id = str(data_id)
part = unpack_from('>I',
md5(data_id).digest())[0] >> self.partition_shift
node_ids = [self.part2node[part]]
zones = [self.nodes[node_ids[0]]]
for replica in xrange(1, self.replicas):
while self.part2node[part] in node_ids and \
self.nodes[self.part2node[part]] in zones:
part += 1
if part >= len(self.part2node):
part = 0
node_ids.append(self.part2node[part])
zones.append(self.nodes[node_ids[-1]])
return [self.nodes[n] for n in node_ids]
def build_ring(nodes, partition_power, replicas):
begin = time()
parts = 2 ** partition_power
# 计算总权重
total_weight = float(sum(n['weight'] for n in nodes.itervalues()))
for node in nodes.itervalues():
# 每个节点应该分配的分区数量
node['desired_parts'] = parts / total_weight * node['weight']
part2node = array('H')
for part in xrange(2 ** partition_power):
for node in nodes.itervalues():
# 如果节点还有配额,就直接分配了
if node['desired_parts'] >= 1:
node['desired_parts'] -= 1
part2node.append(node['id'])
break
# 处理不能整除的情况
else:
for node in nodes.itervalues():
if node['desired_parts'] >= 0:
node['desired_parts'] -= 1
part2node.append(node['id'])
break
shuffle(part2node)
ring = Ring(nodes, part2node, replicas)
print '%.02fs to build ring' % (time() - begin)
return ring
def test_ring(ring):
begin = time()
DATA_ID_COUNT = 10000000
node_counts = {}
zone_counts = {}
for data_id in xrange(DATA_ID_COUNT):
for node in ring.get_nodes(data_id):
node_counts[node['id']] = \
node_counts.get(node['id'], 0) + 1
zone_counts[node['zone']] = \
zone_counts.get(node['zone'], 0) + 1
print '%ds to test ring' % (time() - begin)
total_weight = float(sum(n['weight'] for n in
ring.nodes.itervalues()))
max_over = 0
max_under = 0
for node in ring.nodes.itervalues():
desired = DATA_ID_COUNT * REPLICAS * \
node['weight'] / total_weight
diff = node_counts[node['id']] - desired
if diff > 0:
over = 100.0 * diff / desired
if over > max_over:
max_over = over
else:
under = 100.0 * (-diff) / desired
if under > max_under:
max_under = under
print '%.02f%% max node over' % max_over
print '%.02f%% max node under' % max_under
max_over = 0
max_under = 0
for zone in set(n['zone'] for n in
ring.nodes.itervalues()):
zone_weight = sum(n['weight'] for n in
ring.nodes.itervalues() if n['zone'] == zone)
desired = DATA_ID_COUNT * REPLICAS * zone_weight / total_weight
diff = zone_counts[zone] - desired
if diff > 0:
over = 100.0 * diff / desired
if over > max_over:
max_over = over
else:
under = 100.0 * (-diff) / desired
if under > max_under:
max_under = under
print '%.02f%% max zone over' % max_over
print '%.02f%% max zone under' % max_under
if __name__ == '__main__':
PARTITION_POWER = 16
REPLICAS = 3
NODE_COUNT = 256
ZONE_COUNT = 16
nodes = {}
while len(nodes) < NODE_COUNT:
zone = 0
while zone < ZONE_COUNT and len(nodes) < NODE_COUNT:
node_id = len(nodes)
nodes[node_id] = {'id': node_id, 'zone': zone,
'weight': 1.0 + (node_id % 2)}
zone += 1
ring = build_ring(nodes, PARTITION_POWER, REPLICAS)
test_ring(ring)
结果:
0.88s to build ring
86s to test ring
1.66% max over
1.46% max under
0.28% max zone over
0.23% max zone under
所以在引入了不同权重的节点后,结果还是不错的。我用随机的1到100的权重重新运行了一下代码,得到7.35%低于/18.12%高于预期的不均衡度,对于不同区域为0.24%/0.22%,考虑到这样夸张的权重情况下,结果还是非常好的。
Summary
希望这个系列能比较好的介绍清楚如何创建一个哈希环。 此代码也基本上就是OpenStack Swift环的工作原理了,对于Swift来说,它的环还有许多额外的优化,例如把每个副本存储分开,以及许多用于构建,验证和以其他方式处理环的额外功能。