Ceph Monitor标记OSD DOWN分析

在上篇Blog:Ceph OSD的心跳机制分析的最后,我们知道Ceph OSD将心跳检测失败的OSD打包成MOSDFailure消息发送给Monitor,但是还遗留了一个问题,就是Monitor是怎么处理这个消息的?又是在什么的情况下会把这个OSD标记为Down状态?所以这篇就是要把整个流程补完。

还是来看代码,首先需要注意的是,MOSDFailure不是一个原始的消息,我们得先找到这个消息对应得MSG TYPE,这样才能知道Monitor是怎么处理的,还是先看src\messages\MOSDFailure.h,找到MOSDFailure类的定义:

class MOSDFailure : public PaxosServiceMessage {
// ...

  // 只看构造函数,发现初始化的消息类型是MSG_OSD_FAILURE
  MOSDFailure() : PaxosServiceMessage(MSG_OSD_FAILURE, 0, HEAD_VERSION) { }

// ...
};

知道了消息类型,我们就很容易知道Monitor处理的流程了,在src/mon/Monitor.cc中,所有Monitor收到的消息都会由Monitor::dispatch_op(MonOpRequestRef op)进行处理:

void Monitor::dispatch_op(MonOpRequestRef op)
{
// ...

  switch (op->get_req()->get_type()) {

    // OSDs
    // 这里有一大堆消息
    case CEPH_MSG_MON_GET_OSDMAP:
    case CEPH_MSG_POOLOP:
    case MSG_OSD_BEACON:
    case MSG_OSD_MARK_ME_DOWN:
    case MSG_OSD_MARK_ME_DEAD:
    case MSG_OSD_FULL:
    // 我们要找的MSG_OSD_FAILURE在这里
    case MSG_OSD_FAILURE:
    case MSG_OSD_BOOT:
    case MSG_OSD_ALIVE:
    case MSG_OSD_PGTEMP:
    case MSG_OSD_PG_CREATED:
    case MSG_REMOVE_SNAPS:
    case MSG_MON_GET_PURGED_SNAPS:
    case MSG_OSD_PG_READY_TO_MERGE:
      paxos_service[PAXOS_OSDMAP]->dispatch(op);
      return;
// ...
  }
// ...
}

可以看到Monitor中把MSG_OSD_FAILURE消息交给了paxos_service[PAXOS_OSDMAP]处理,很容易的,在Monitor构造函数里我们找到paxos_service[PAXOS_OSDMAP].reset(new OSDMonitor(cct, this, paxos, "osdmap"));,也就是说,MSG_OSD_FAILURE消息是由OSDMonitor真正处理的。

话不多说,直接转到src/mon/OSDMonitor.cc,这里需要注意的是,OSDMonitor没有直接的dispatch方法,因为这个方法在基类里,这里就不多赘述了,实际dispatch会调用到OSDMonitor::preprocess_query(MonOpRequestRef op)以及OSDMonitor::prepare_update(MonOpRequestRef op)这两个方法:

bool OSDMonitor::preprocess_query(MonOpRequestRef op)
{
  op->mark_osdmon_event(__func__);
  Message *m = op->get_req();

  switch (m->get_type()) {

// ...

  // MSG_OSD_FAILURE 由preprocess_failure处理
  case MSG_OSD_FAILURE:
    return preprocess_failure(op);

// ...

  default:
    ceph_abort();
    return true;
  }
}

bool OSDMonitor::prepare_update(MonOpRequestRef op)
{
  op->mark_osdmon_event(__func__);
  Message *m = op->get_req();

  switch (m->get_type()) {

// ...

  // MSG_OSD_FAILURE 由prepare_failure处理
  case MSG_OSD_FAILURE:
    return prepare_failure(op);

// ...

  default:
    ceph_abort();
  }

  return false;
}

终于找到最终的处理方法了,先preprocess_failure,后prepare_failure,一般来说,preprocess_query阶段,大多数是只读阶段,如果这个方法返回true,那就意味着消息处理结束了,当然如果不是一个只读操作,那还会继续交由prepare_update阶段进行处理,如果结果是需要更新Paxos,那么就会提交给Paxos。

先继续看OSDMonitor::preprocess_failure(MonOpRequestRef op)的实现吧:

bool OSDMonitor::preprocess_failure(MonOpRequestRef op)
{
  op->mark_osdmon_event(__func__);
  auto m = op->get_req<MOSDFailure>();
  // who is target_osd
  int badboy = m->get_target_osd();

  // check permissions
  // 检查权限
  if (check_source(op, m->fsid))
    goto didit;

  // first, verify the reporting host is valid
  // 检查是不是一个正常源OSD汇报的
  if (m->get_orig_source().is_osd()) {
    int from = m->get_orig_source().num();
    if (!osdmap.exists(from) ||
	!osdmap.get_addrs(from).legacy_equals(m->get_orig_source_addrs()) ||
	(osdmap.is_down(from) && m->if_osd_failed())) {
      dout(5) << "preprocess_failure from dead osd." << from
	      << ", ignoring" << dendl;
      send_incremental(op, m->get_epoch()+1);
      goto didit;
    }
  }


  // weird?
  // 检查汇报的源OSD是不是最新的
  if (osdmap.is_down(badboy)) {
    dout(5) << "preprocess_failure dne(/dup?): osd." << m->get_target_osd()
	    << " " << m->get_target_addrs()
	    << ", from " << m->get_orig_source() << dendl;
    if (m->get_epoch() < osdmap.get_epoch())
      send_incremental(op, m->get_epoch()+1);
    goto didit;
  }
  if (osdmap.get_addrs(badboy) != m->get_target_addrs()) {
    dout(5) << "preprocess_failure wrong osd: report osd." << m->get_target_osd()
	    << " " << m->get_target_addrs()
	    << " != map's " << osdmap.get_addrs(badboy)
	    << ", from " << m->get_orig_source() << dendl;
    if (m->get_epoch() < osdmap.get_epoch())
      send_incremental(op, m->get_epoch()+1);
    goto didit;
  }

  // already reported?
  // 检查汇报的目标OSD是不是已经Down了
  if (osdmap.is_down(badboy) ||
      osdmap.get_up_from(badboy) > m->get_epoch()) {
    dout(5) << "preprocess_failure dup/old: osd." << m->get_target_osd()
	    << " " << m->get_target_addrs()
	    << ", from " << m->get_orig_source() << dendl;
    if (m->get_epoch() < osdmap.get_epoch())
      send_incremental(op, m->get_epoch()+1);
    goto didit;
  }

  // 检查汇报的目标OSD是不是可以被Down
  if (!can_mark_down(badboy)) {
    dout(5) << "preprocess_failure ignoring report of osd."
	    << m->get_target_osd() << " " << m->get_target_addrs()
	    << " from " << m->get_orig_source() << dendl;
    goto didit;
  }

  // 如果上述检查都没有问题,说明这是一个新发现的有问题的OSD,那需要进一步处理了
  dout(10) << "preprocess_failure new: osd." << m->get_target_osd()
	   << " " << m->get_target_addrs()
	   << ", from " << m->get_orig_source() << dendl;
  return false;


 didit:
  // 所有检查失败的,不管什么原因,都会忽略这个汇报
  mon->no_reply(op);
  return true;
}

可以看到preprocess_failure里做了很多的校验工作,确保收到的消息是一个正确的消息,并且不是已经处理过的信息,所有检查都通过,说明真的有OSD出问题了,那就需要进一步处理了。于是就进入到prepare_failure的流程:

bool OSDMonitor::prepare_failure(MonOpRequestRef op)
{
// ...

  // 如果收到的消息中标记这个OSD失效了
  if (m->if_osd_failed()) {
    // calculate failure time
    utime_t now = ceph_clock_now();
    utime_t failed_since =
      m->get_recv_stamp() - utime_t(m->failed_for, 0);

    // add a report
    if (m->is_immediate()) {
      mon->clog->debug() << "osd." << m->get_target_osd()
			 << " reported immediately failed by "
			 << m->get_orig_source();
      force_failure(target_osd, reporter);
      return true;
    }
    mon->clog->debug() << "osd." << m->get_target_osd() << " reported failed by "
		      << m->get_orig_source();

    failure_info_t& fi = failure_info[target_osd];
    // 在这个失效OSD的reporters列表里加上汇报的这个OSD
    MonOpRequestRef old_op = fi.add_report(reporter, failed_since, op);
    if (old_op) {
      mon->no_reply(old_op);
    }
    // 判断是不是到阈值了
    return check_failure(now, target_osd, fi);
  } else {
// ..

    // 如果收到的消息中标记这个OSD没有失效
    // 从失效列表里把这个OSD删了
	failure_info.erase(target_osd);

// ...

  return false;
}

可以看到逻辑不复杂,Monitor会维护一个失效列表,收到消息后,经过一些检查,把汇报源加到失效OSD的reporters列表里,最后执行check_failure,那后续的判断就在OSDMonitor::check_failure(utime_t now, int target_osd, failure_info_t& fi)了,继续:

bool OSDMonitor::check_failure(utime_t now, int target_osd, failure_info_t& fi)
{
  // already pending failure?
  // 先判断是不是已经在提交Paxos的队列里了,在里面了就自然不用再多处理了
  if (pending_inc.new_state.count(target_osd) &&
      pending_inc.new_state[target_osd] & CEPH_OSD_UP) {
    dout(10) << " already pending failure" << dendl;
    return true;
  }

  // ...

  // 关键逻辑来了,经过一系列计算,如果汇报者的数量超过了mon_osd_min_down_reporters的值,就把OSD状态提交给Paxos
  if (failed_for >= grace &&
      reporters_by_subtree.size() >= g_conf().get_val<uint64_t>("mon_osd_min_down_reporters")) {
    dout(1) << " we have enough reporters to mark osd." << target_osd
	    << " down" << dendl;
    pending_inc.new_state[target_osd] = CEPH_OSD_UP;

    // ...

    return true;
  }
  return false;
}

刚看到代码的时候,会不会觉得有点奇怪,为啥提交到Paxos的是一个CEPH_OSD_UP的值?理论上不是应该是标记为DOWN么?我们看下new_state的这个定义就知道了:

mempool::osdmap::map<int32_t,uint32_t> new_state;             // XORed onto previous state.

发现注释没,这个new_state是会异或到原值里的,也就是说,其实上面的操作是把OSD的UP状态去掉,没有UP状态,自然这个OSD就是DOWN状态了。

好了,到这里,基本上就能确定一个OSD要被标记为Down状态,只剩下最后一步,因为Monitor是个分布式的系统,需要Paxos保证一致性,而pending_inc队列里的内容,就是要提交给Paxos的修改了,具体Paxos的部分,这里就不再深入分析了。