Openstack Swift object replicator 代码分析

OpenStack Swift中,object replicator的作用是在系统遇到诸如临时的网络中断或磁盘故障后使系统处于一致状态。object replicator会将本地数据与每个远程副本进行比较,以确保它们都包含最新版本。下面会简单分析一下object replicator的代码,了解一下整个Replication的工作流程。

首先,在bin/swift-object-replicator这个可执行文件中,引入了相关包,并加上一下命令行参数,提供了一个命令共用户调用,实际调用的是ObjectReplicator这个类下的run_once(只运行一次)和run_forever(作为daemon运行)方法。

这里直接看一下run_once方法。ObjectReplicator类在swift/obj/replicator.py文件中定义。

run_once的实现很简单,主要就是获取需要的选项,然后运行replicate方法,该方法是主要的逻辑实现:

def replicate(self, override_devices=None, override_partitions=None,
                  override_policies=None):
        """Run a replication pass"""
        # 省略一部分代码
        try:
            self.run_pool = GreenPool(size=self.concurrency)
            # 收集所有的同步任务
            jobs = self.collect_jobs(override_devices=override_devices,
                                     override_partitions=override_partitions,
                                     override_policies=override_policies)
            for job in jobs:
                # 省略代码,主要完成对任务的简单判断,判断任务是否需要执行
                if job['delete']: # 如果是同步后需要删除本地副本的任务,调用update_deleted方法
                    self.run_pool.spawn(self.update_deleted, job)
                else: # 否则就调用update方法
                    self.run_pool.spawn(self.update, job)
            current_nodes = None
            with Timeout(self.lockup_timeout): #加上超时
                self.run_pool.waitall()
        except (Exception, Timeout):
            # 失败处理
        finally:
            stats.kill()
            lockup_detector.kill()
            self.stats_line()
            self.stats['attempted'] = self.replication_count

然后具体来看一下是怎么样收集任务的:

def collect_jobs(self, override_devices=None, override_partitions=None,
                    override_policies=None):
    jobs = []
    ips = whataremyips(self.bind_ip)
    for policy in POLICIES:
        if policy.policy_type == REPL_POLICY:
            if (override_policies is not None and
                    str(policy.idx) not in override_policies):
                continue
            #针对每个policy,加载ring,然后调用build_replication_jobs收集任务
            # ensure rings are loaded for policy
            self.load_object_ring(policy)
            jobs += self.build_replication_jobs(
                policy, ips, override_devices=override_devices,
                override_partitions=override_partitions)
    random.shuffle(jobs) #把任务打乱
    if self.handoffs_first:
        # Move the handoff parts to the front of the list
        jobs.sort(key=lambda job: not job['delete'])
    self.job_count = len(jobs)
    return jobs

def build_replication_jobs(self, policy, ips, override_devices=None,
                            override_partitions=None):
        # 省略部分前期准备工作的代码

        # 直接访问挂挂载的分区,将所有的目录,也就是partition的目录全部列举出来
        for partition in os.listdir(obj_path):
            if (override_partitions is not None
                    and partition not in override_partitions):
                continue

            if (partition.startswith('auditor_status_') and
                    partition.endswith('.json')):
                # ignore auditor status files
                continue

            part_nodes = None
            try:
                job_path = join(obj_path, partition)
                # 根据partion获取该partition应该存放的节点信息
                part_nodes = policy.object_ring.get_part_nodes(
                    int(partition))
                # 将不是本机的节点找出来
                nodes = [node for node in part_nodes
                            if node['id'] != local_dev['id']]
                jobs.append(
                    dict(path=job_path,
                            device=local_dev['device'],
                            obj_path=obj_path,
                            nodes=nodes,
                            delete=len(nodes) > len(part_nodes) - 1, #如果副本多了,就把自己删了
                            policy=policy,
                            partition=partition,
                            region=local_dev['region']))
            except ValueError:
                # 失败处理,输出到日志
                continue
    if not found_local:
        self.logger.error("Can't find itself in policy with index %d with"
                            " ips %s and with port %s in ring file, not"
                            " replicating",
                            int(policy), ", ".join(ips), self.port)
    return jobs

下面开始真正执行任务,进行复制:

def update_deleted(self, job):
    # 复制后删除本地副本
    # 省略一部分初始化代码
    try:
        responses = []
        suffixes = tpool.execute(tpool_get_suffixes, job['path'])
        synced_remote_regions = {}
        delete_objs = None
        if suffixes:
            # 对所有的节点进行复制
            for node in job['nodes']:
                # 调用sync进行复制
                success, candidates = self.sync(
                    node, job, suffixes, **kwargs)
                if success:
                    with Timeout(self.http_timeout):
                        # 复制成功,发送一个REPLICATE请求到目的节点,通知它针对对应目录重新计算hash
                        conn = http_connect(
                            node['replication_ip'],
                            node['replication_port'],
                            node['device'], job['partition'], 'REPLICATE',
                            '/' + '-'.join(suffixes), headers=headers)
                        conn.getresponse().read()
                    if node['region'] != job['region']:
                        synced_remote_regions[node['region']] = viewkeys(
                            candidates)
                else:
                    failure_devs_info.add((node['replication_ip'],
                                            node['device']))
                responses.append(success)
            for cand_objs in synced_remote_regions.values():
                if delete_objs is None:
                    delete_objs = cand_objs
                else:
                    delete_objs = delete_objs & cand_objs

        if self.handoff_delete:
            # 如果没有问题,删除本地文件
            delete_handoff = len([resp for resp in responses if resp]) >= \
                self.handoff_delete
        else:
            # delete handoff if all syncs were successful
            delete_handoff = len(responses) == len(job['nodes']) and \
                all(responses)
        if delete_handoff:
            self.stats['remove'] += 1
            if (self.conf.get('sync_method', 'rsync') == 'ssync' and
                    delete_objs is not None):
                self.logger.info(_("Removing %s objects"),
                                    len(delete_objs))
                _junk, error_paths = self.delete_handoff_objs(
                    job, delete_objs)
                # if replication works for a hand-off device and it failed,
                # the remote devices which are target of the replication
                # from the hand-off device will be marked. Because cleanup
                # after replication failed means replicator needs to
                # replicate again with the same info.
                if error_paths:
                    failure_devs_info.update(
                        [(failure_dev['replication_ip'],
                            failure_dev['device'])
                            for failure_dev in job['nodes']])
            else:
                self.delete_partition(job['path'])
                handoff_partition_deleted = True
        elif not suffixes:
            self.delete_partition(job['path'])
            handoff_partition_deleted = True
    except (Exception, Timeout):
        self.logger.exception(_("Error syncing handoff partition"))
        self._add_failure_stats(failure_devs_info)
    finally:
        target_devs_info = set([(target_dev['replication_ip'],
                                    target_dev['device'])
                                for target_dev in job['nodes']])
        self.stats['success'] += len(target_devs_info - failure_devs_info)
        if not handoff_partition_deleted:
            self.handoffs_remaining += 1
        self.partition_times.append(time.time() - begin)
        self.logger.timing_since('partition.delete.timing', begin)

def update(self, job):
    # 正常复制,不删除本地数据
    # 省略准备工作相关代码
    try:
        # 计算当前partition的hash,(hashes.pkl文件)
        hashed, local_hash = tpool_reraise(
            df_mgr._get_hashes, job['device'],
            job['partition'], job['policy'],
            do_listdir=_do_listdir(
                int(job['partition']),
                self.replication_cycle))
        self.suffix_hash += hashed
        self.logger.update_stats('suffix.hashes', hashed)
        attempts_left = len(job['nodes'])
        synced_remote_regions = set()
        random.shuffle(job['nodes'])
        nodes = itertools.chain(
            job['nodes'],
            job['policy'].object_ring.get_more_nodes(
                int(job['partition'])))
        while attempts_left > 0:
            # If this throws StopIteration it will be caught way below
            node = next(nodes)
            target_devs_info.add((node['replication_ip'], node['device']))
            attempts_left -= 1
            # if we have already synced to this remote region,
            # don't sync again on this replication pass
            if node['region'] in synced_remote_regions:
                continue
            try:
                with Timeout(self.http_timeout):
                    # 首先去目标机器获取当前partition的hash值
                    resp = http_connect(
                        node['replication_ip'], node['replication_port'],
                        node['device'], job['partition'], 'REPLICATE',
                        '', headers=headers).getresponse()
                    if resp.status == HTTP_INSUFFICIENT_STORAGE:
                        self.logger.error(
                            _('%(replication_ip)s/%(device)s '
                                'responded as unmounted'), node)
                        attempts_left += 1
                        failure_devs_info.add((node['replication_ip'],
                                                node['device']))
                        continue
                    if resp.status != HTTP_OK:
                        self.logger.error(_("Invalid response %(resp)s "
                                            "from %(ip)s"),
                                            {'resp': resp.status,
                                            'ip': node['replication_ip']})
                        failure_devs_info.add((node['replication_ip'],
                                                node['device']))
                        continue
                    remote_hash = pickle.loads(resp.read())
                    del resp
                suffixes = [suffix for suffix in local_hash if
                            local_hash[suffix] !=
                            remote_hash.get(suffix, -1)]
                if not suffixes:
                    self.stats['hashmatch'] += 1
                    continue
                # 重新计算当前partition下子目录的hash,(hashes.pkl文件)
                hashed, recalc_hash = tpool_reraise(
                    df_mgr._get_hashes,
                    job['device'], job['partition'], job['policy'],
                    recalculate=suffixes)
                self.logger.update_stats('suffix.hashes', hashed)
                local_hash = recalc_hash
                suffixes = [suffix for suffix in local_hash if
                            local_hash[suffix] !=
                            remote_hash.get(suffix, -1)]
                self.stats['rsync'] += 1
                # 如果本地的和远程的子目录hash不一致,调用sync操作进行复制
                success, _junk = self.sync(node, job, suffixes)
                with Timeout(self.http_timeout):
                    # 通知目标节点重新计算对应目录的hash
                    conn = http_connect(
                        node['replication_ip'], node['replication_port'],
                        node['device'], job['partition'], 'REPLICATE',
                        '/' + '-'.join(suffixes),
                        headers=headers)
                    conn.getresponse().read()
                if not success:
                    failure_devs_info.add((node['replication_ip'],
                                            node['device']))
                # add only remote region when replicate succeeded
                if success and node['region'] != job['region']:
                    synced_remote_regions.add(node['region'])
                self.suffix_sync += len(suffixes)
                self.logger.update_stats('suffix.syncs', len(suffixes))
            except (Exception, Timeout):
                failure_devs_info.add((node['replication_ip'],
                                        node['device']))
                self.logger.exception(_("Error syncing with node: %s") %
                                        node)
        self.suffix_count += len(local_hash)
    except (Exception, Timeout):
        failure_devs_info.update(target_devs_info)
        self._add_failure_stats(failure_devs_info)
        self.logger.exception(_("Error syncing partition"))
    finally:
        self.stats['success'] += len(target_devs_info - failure_devs_info)
        self.partition_times.append(time.time() - begin)
        self.logger.timing_since('partition.update.timing', begin)

对于sync方法,实际就是调用了rsync方法,直接看rsync方法,很简单,就不解释了。:

def rsync(self, node, job, suffixes):
        """
        Uses rsync to implement the sync method. This was the first
        sync method in Swift.
        """
        if not os.path.exists(job['path']):
            return False, {}
        args = [
            'rsync',
            '--recursive',
            '--whole-file',
            '--human-readable',
            '--xattrs',
            '--itemize-changes',
            '--ignore-existing',
            '--timeout=%s' % self.rsync_io_timeout,
            '--contimeout=%s' % self.rsync_io_timeout,
            '--bwlimit=%s' % self.rsync_bwlimit,
            '--exclude=.*.%s' % ''.join('[0-9a-zA-Z]' for i in range(6))
        ]
        if self.rsync_compress and \
                job['region'] != node['region']:
            # Allow for compression, but only if the remote node is in
            # a different region than the local one.
            args.append('--compress')
        rsync_module = rsync_module_interpolation(self.rsync_module, node)
        had_any = False
        for suffix in suffixes:
            spath = join(job['path'], suffix)
            if os.path.exists(spath):
                args.append(spath)
                had_any = True
        if not had_any:
            return False, {}
        data_dir = get_data_dir(job['policy'])
        args.append(join(rsync_module, node['device'],
                    data_dir, job['partition']))
        return self._rsync(args) == 0, {}

看完代码,可以总结一下replicator具体的流程了,首先会扫描本机上所有的partition,并找到所有partition对应的目标节点;有两种情况,当目标节点不在本机,也就是说需要转移数据时,直接将数据同步到目标节点,然后通知目标节点重新计算hash,确认成功后删除本地副本;第二种情况,会请求其他节点,对比hash,如果有不一致的情况,会进行相应同步操作,同步完成后,通知目标节点重新计算hash。总的来说,流程还是十分清晰的。