Openstack Swift object replicator 代码分析
在OpenStack Swift
中,object replicator
的作用是在系统遇到诸如临时的网络中断或磁盘故障后使系统处于一致状态。object replicator
会将本地数据与每个远程副本进行比较,以确保它们都包含最新版本。下面会简单分析一下object replicator
的代码,了解一下整个Replication的工作流程。
首先,在bin/swift-object-replicator
这个可执行文件中,引入了相关包,并加上一下命令行参数,提供了一个命令共用户调用,实际调用的是ObjectReplicator
这个类下的run_once
(只运行一次)和run_forever
(作为daemon运行)方法。
这里直接看一下run_once
方法。ObjectReplicator
类在swift/obj/replicator.py
文件中定义。
run_once
的实现很简单,主要就是获取需要的选项,然后运行replicate
方法,该方法是主要的逻辑实现:
def replicate(self, override_devices=None, override_partitions=None,
override_policies=None):
"""Run a replication pass"""
# 省略一部分代码
try:
self.run_pool = GreenPool(size=self.concurrency)
# 收集所有的同步任务
jobs = self.collect_jobs(override_devices=override_devices,
override_partitions=override_partitions,
override_policies=override_policies)
for job in jobs:
# 省略代码,主要完成对任务的简单判断,判断任务是否需要执行
if job['delete']: # 如果是同步后需要删除本地副本的任务,调用update_deleted方法
self.run_pool.spawn(self.update_deleted, job)
else: # 否则就调用update方法
self.run_pool.spawn(self.update, job)
current_nodes = None
with Timeout(self.lockup_timeout): #加上超时
self.run_pool.waitall()
except (Exception, Timeout):
# 失败处理
finally:
stats.kill()
lockup_detector.kill()
self.stats_line()
self.stats['attempted'] = self.replication_count
然后具体来看一下是怎么样收集任务的:
def collect_jobs(self, override_devices=None, override_partitions=None,
override_policies=None):
jobs = []
ips = whataremyips(self.bind_ip)
for policy in POLICIES:
if policy.policy_type == REPL_POLICY:
if (override_policies is not None and
str(policy.idx) not in override_policies):
continue
#针对每个policy,加载ring,然后调用build_replication_jobs收集任务
# ensure rings are loaded for policy
self.load_object_ring(policy)
jobs += self.build_replication_jobs(
policy, ips, override_devices=override_devices,
override_partitions=override_partitions)
random.shuffle(jobs) #把任务打乱
if self.handoffs_first:
# Move the handoff parts to the front of the list
jobs.sort(key=lambda job: not job['delete'])
self.job_count = len(jobs)
return jobs
def build_replication_jobs(self, policy, ips, override_devices=None,
override_partitions=None):
# 省略部分前期准备工作的代码
# 直接访问挂挂载的分区,将所有的目录,也就是partition的目录全部列举出来
for partition in os.listdir(obj_path):
if (override_partitions is not None
and partition not in override_partitions):
continue
if (partition.startswith('auditor_status_') and
partition.endswith('.json')):
# ignore auditor status files
continue
part_nodes = None
try:
job_path = join(obj_path, partition)
# 根据partion获取该partition应该存放的节点信息
part_nodes = policy.object_ring.get_part_nodes(
int(partition))
# 将不是本机的节点找出来
nodes = [node for node in part_nodes
if node['id'] != local_dev['id']]
jobs.append(
dict(path=job_path,
device=local_dev['device'],
obj_path=obj_path,
nodes=nodes,
delete=len(nodes) > len(part_nodes) - 1, #如果副本多了,就把自己删了
policy=policy,
partition=partition,
region=local_dev['region']))
except ValueError:
# 失败处理,输出到日志
continue
if not found_local:
self.logger.error("Can't find itself in policy with index %d with"
" ips %s and with port %s in ring file, not"
" replicating",
int(policy), ", ".join(ips), self.port)
return jobs
下面开始真正执行任务,进行复制:
def update_deleted(self, job):
# 复制后删除本地副本
# 省略一部分初始化代码
try:
responses = []
suffixes = tpool.execute(tpool_get_suffixes, job['path'])
synced_remote_regions = {}
delete_objs = None
if suffixes:
# 对所有的节点进行复制
for node in job['nodes']:
# 调用sync进行复制
success, candidates = self.sync(
node, job, suffixes, **kwargs)
if success:
with Timeout(self.http_timeout):
# 复制成功,发送一个REPLICATE请求到目的节点,通知它针对对应目录重新计算hash
conn = http_connect(
node['replication_ip'],
node['replication_port'],
node['device'], job['partition'], 'REPLICATE',
'/' + '-'.join(suffixes), headers=headers)
conn.getresponse().read()
if node['region'] != job['region']:
synced_remote_regions[node['region']] = viewkeys(
candidates)
else:
failure_devs_info.add((node['replication_ip'],
node['device']))
responses.append(success)
for cand_objs in synced_remote_regions.values():
if delete_objs is None:
delete_objs = cand_objs
else:
delete_objs = delete_objs & cand_objs
if self.handoff_delete:
# 如果没有问题,删除本地文件
delete_handoff = len([resp for resp in responses if resp]) >= \
self.handoff_delete
else:
# delete handoff if all syncs were successful
delete_handoff = len(responses) == len(job['nodes']) and \
all(responses)
if delete_handoff:
self.stats['remove'] += 1
if (self.conf.get('sync_method', 'rsync') == 'ssync' and
delete_objs is not None):
self.logger.info(_("Removing %s objects"),
len(delete_objs))
_junk, error_paths = self.delete_handoff_objs(
job, delete_objs)
# if replication works for a hand-off device and it failed,
# the remote devices which are target of the replication
# from the hand-off device will be marked. Because cleanup
# after replication failed means replicator needs to
# replicate again with the same info.
if error_paths:
failure_devs_info.update(
[(failure_dev['replication_ip'],
failure_dev['device'])
for failure_dev in job['nodes']])
else:
self.delete_partition(job['path'])
handoff_partition_deleted = True
elif not suffixes:
self.delete_partition(job['path'])
handoff_partition_deleted = True
except (Exception, Timeout):
self.logger.exception(_("Error syncing handoff partition"))
self._add_failure_stats(failure_devs_info)
finally:
target_devs_info = set([(target_dev['replication_ip'],
target_dev['device'])
for target_dev in job['nodes']])
self.stats['success'] += len(target_devs_info - failure_devs_info)
if not handoff_partition_deleted:
self.handoffs_remaining += 1
self.partition_times.append(time.time() - begin)
self.logger.timing_since('partition.delete.timing', begin)
def update(self, job):
# 正常复制,不删除本地数据
# 省略准备工作相关代码
try:
# 计算当前partition的hash,(hashes.pkl文件)
hashed, local_hash = tpool_reraise(
df_mgr._get_hashes, job['device'],
job['partition'], job['policy'],
do_listdir=_do_listdir(
int(job['partition']),
self.replication_cycle))
self.suffix_hash += hashed
self.logger.update_stats('suffix.hashes', hashed)
attempts_left = len(job['nodes'])
synced_remote_regions = set()
random.shuffle(job['nodes'])
nodes = itertools.chain(
job['nodes'],
job['policy'].object_ring.get_more_nodes(
int(job['partition'])))
while attempts_left > 0:
# If this throws StopIteration it will be caught way below
node = next(nodes)
target_devs_info.add((node['replication_ip'], node['device']))
attempts_left -= 1
# if we have already synced to this remote region,
# don't sync again on this replication pass
if node['region'] in synced_remote_regions:
continue
try:
with Timeout(self.http_timeout):
# 首先去目标机器获取当前partition的hash值
resp = http_connect(
node['replication_ip'], node['replication_port'],
node['device'], job['partition'], 'REPLICATE',
'', headers=headers).getresponse()
if resp.status == HTTP_INSUFFICIENT_STORAGE:
self.logger.error(
_('%(replication_ip)s/%(device)s '
'responded as unmounted'), node)
attempts_left += 1
failure_devs_info.add((node['replication_ip'],
node['device']))
continue
if resp.status != HTTP_OK:
self.logger.error(_("Invalid response %(resp)s "
"from %(ip)s"),
{'resp': resp.status,
'ip': node['replication_ip']})
failure_devs_info.add((node['replication_ip'],
node['device']))
continue
remote_hash = pickle.loads(resp.read())
del resp
suffixes = [suffix for suffix in local_hash if
local_hash[suffix] !=
remote_hash.get(suffix, -1)]
if not suffixes:
self.stats['hashmatch'] += 1
continue
# 重新计算当前partition下子目录的hash,(hashes.pkl文件)
hashed, recalc_hash = tpool_reraise(
df_mgr._get_hashes,
job['device'], job['partition'], job['policy'],
recalculate=suffixes)
self.logger.update_stats('suffix.hashes', hashed)
local_hash = recalc_hash
suffixes = [suffix for suffix in local_hash if
local_hash[suffix] !=
remote_hash.get(suffix, -1)]
self.stats['rsync'] += 1
# 如果本地的和远程的子目录hash不一致,调用sync操作进行复制
success, _junk = self.sync(node, job, suffixes)
with Timeout(self.http_timeout):
# 通知目标节点重新计算对应目录的hash
conn = http_connect(
node['replication_ip'], node['replication_port'],
node['device'], job['partition'], 'REPLICATE',
'/' + '-'.join(suffixes),
headers=headers)
conn.getresponse().read()
if not success:
failure_devs_info.add((node['replication_ip'],
node['device']))
# add only remote region when replicate succeeded
if success and node['region'] != job['region']:
synced_remote_regions.add(node['region'])
self.suffix_sync += len(suffixes)
self.logger.update_stats('suffix.syncs', len(suffixes))
except (Exception, Timeout):
failure_devs_info.add((node['replication_ip'],
node['device']))
self.logger.exception(_("Error syncing with node: %s") %
node)
self.suffix_count += len(local_hash)
except (Exception, Timeout):
failure_devs_info.update(target_devs_info)
self._add_failure_stats(failure_devs_info)
self.logger.exception(_("Error syncing partition"))
finally:
self.stats['success'] += len(target_devs_info - failure_devs_info)
self.partition_times.append(time.time() - begin)
self.logger.timing_since('partition.update.timing', begin)
对于sync方法,实际就是调用了rsync方法,直接看rsync方法,很简单,就不解释了。:
def rsync(self, node, job, suffixes):
"""
Uses rsync to implement the sync method. This was the first
sync method in Swift.
"""
if not os.path.exists(job['path']):
return False, {}
args = [
'rsync',
'--recursive',
'--whole-file',
'--human-readable',
'--xattrs',
'--itemize-changes',
'--ignore-existing',
'--timeout=%s' % self.rsync_io_timeout,
'--contimeout=%s' % self.rsync_io_timeout,
'--bwlimit=%s' % self.rsync_bwlimit,
'--exclude=.*.%s' % ''.join('[0-9a-zA-Z]' for i in range(6))
]
if self.rsync_compress and \
job['region'] != node['region']:
# Allow for compression, but only if the remote node is in
# a different region than the local one.
args.append('--compress')
rsync_module = rsync_module_interpolation(self.rsync_module, node)
had_any = False
for suffix in suffixes:
spath = join(job['path'], suffix)
if os.path.exists(spath):
args.append(spath)
had_any = True
if not had_any:
return False, {}
data_dir = get_data_dir(job['policy'])
args.append(join(rsync_module, node['device'],
data_dir, job['partition']))
return self._rsync(args) == 0, {}
看完代码,可以总结一下replicator
具体的流程了,首先会扫描本机上所有的partition
,并找到所有partition
对应的目标节点;有两种情况,当目标节点不在本机,也就是说需要转移数据时,直接将数据同步到目标节点,然后通知目标节点重新计算hash,确认成功后删除本地副本;第二种情况,会请求其他节点,对比hash,如果有不一致的情况,会进行相应同步操作,同步完成后,通知目标节点重新计算hash。总的来说,流程还是十分清晰的。