Ceph OSD的心跳机制分析
心跳机制在Ceph中承担非常重要的角色,所有OSD之间都需要通过心跳来确认各个OSD的状态,并且在OSD出现失联,Crash等情况下能及时的被发现,从而进行故障OSD摘除,触发数据重平衡等,保证数据的安全性。
所以弄明白当前Ceph的心跳机制,理顺OSD从故障到被集群踢出的流程是十分必要的。
心跳初始化 & 心跳发送
首先我们从Ceph OSD进程的启动main函数开始,代码在src/ceph_osd.cc
:
int main(int argc, const char **argv)
{
// ...
//首先创建前端、后端的发送、接收总共四个Messenger
Messenger *ms_hb_back_client = Messenger::create(g_ceph_context, cluster_msg_type,
entity_name_t::OSD(whoami), "hb_back_client",
nonce, Messenger::HEARTBEAT);
Messenger *ms_hb_front_client = Messenger::create(g_ceph_context, public_msg_type,
entity_name_t::OSD(whoami), "hb_front_client",
nonce, Messenger::HEARTBEAT);
Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, cluster_msg_type,
entity_name_t::OSD(whoami), "hb_back_server",
nonce, Messenger::HEARTBEAT);
Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, public_msg_type,
entity_name_t::OSD(whoami), "hb_front_server",
nonce, Messenger::HEARTBEAT);
// ...
// 接下来进行绑定,将前后端对应的IP地址绑定到对应的Messenger上
entity_addrvec_t hb_front_addrs = public_addrs;
for (auto& a : hb_front_addrs.v) {
a.set_port(0);
}
if (ms_hb_front_server->bindv(hb_front_addrs) < 0)
forker.exit(1);
if (ms_hb_front_client->client_bind(hb_front_addrs.front()) < 0)
forker.exit(1);
entity_addrvec_t hb_back_addrs = cluster_addrs;
for (auto& a : hb_back_addrs.v) {
a.set_port(0);
}
if (ms_hb_back_server->bindv(hb_back_addrs) < 0)
forker.exit(1);
if (ms_hb_back_client->client_bind(hb_back_addrs.front()) < 0)
forker.exit(1);
// ...
// 创建OSD实例
osdptr = new OSD(g_ceph_context,
store,
whoami,
ms_cluster,
ms_public,
ms_hb_front_client,
ms_hb_back_client,
ms_hb_front_server,
ms_hb_back_server,
ms_objecter,
&mc,
data_path,
journal_path);
// ...
// 此时心跳等还没启动,真正初始化心跳是在OSD::init()里
err = osdptr->init();
}
初始化心跳相关逻辑在OSD::init()方法里,我们转到src/osd/OSD.cc
:
int OSD::init()
{
// ...
// 给各个心跳相关的Messenger加上回调,用于处理心跳返回,以及其他OSD发送到自己的心跳,具体流程后面分析
hb_front_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_back_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_front_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_back_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
// ...
// 正式启动心跳
// start the heartbeat
heartbeat_thread.create("osd_srv_heartbt");
}
继续,看看heartbeat_thread
的定义是什么样的,代码在src/osd/OSD.h
:
struct T_Heartbeat : public Thread {
OSD *osd;
explicit T_Heartbeat(OSD *o) : osd(o) {}
void *entry() override {
osd->heartbeat_entry();
return 0;
}
} heartbeat_thread;
很简单,就是调用osd->heartbeat_entry()
,那就继续看OSD::heartbeat_entry()
做了哪些事,同样实现在src/osd/OSD.cc
:
void OSD::heartbeat_entry()
{
std::unique_lock l(heartbeat_lock);
if (is_stopping())
return;
while (!heartbeat_stop) {
// 调用OSD::heartbeat()
heartbeat();
// 根据配置,等待一段时间,继续心跳
double wait;
if (cct->_conf.get_val<bool>("debug_disable_randomized_ping")) {
wait = (float)cct->_conf->osd_heartbeat_interval;
} else {
wait = .5 + ((float)(rand() % 10)/10.0) * (float)cct->_conf->osd_heartbeat_interval;
}
auto w = ceph::make_timespan(wait);
dout(30) << "heartbeat_entry sleeping for " << wait << dendl;
heartbeat_cond.wait_for(l, w);
if (is_stopping())
return;
dout(30) << "heartbeat_entry woke up" << dendl;
}
}
void OSD::heartbeat()
{
// ...
// 获取当前的系统负载,在心跳包中,会带上当前OSD所在机器的负载信息
// 这里用了个公式,计算的是一天的累计的负载
// get CPU load avg
double loadavgs[1];
int hb_interval = cct->_conf->osd_heartbeat_interval;
int n_samples = 86400;
if (hb_interval > 1) {
n_samples /= hb_interval;
if (n_samples < 1)
n_samples = 1;
}
if (getloadavg(loadavgs, 1) == 1) {
logger->set(l_osd_loadavg, 100 * loadavgs[0]);
daily_loadavg = (daily_loadavg * (n_samples - 1) + loadavgs[0]) / n_samples;
dout(30) << "heartbeat: daily_loadavg " << daily_loadavg << dendl;
}
// ...
utime_t now = ceph_clock_now();
auto mnow = service.get_mnow();
utime_t deadline = now;
deadline += cct->_conf->osd_heartbeat_grace;
// 遍历所有需要心跳检测的Peer
// send heartbeats
for (map<int,HeartbeatInfo>::iterator i = heartbeat_peers.begin();
i != heartbeat_peers.end();
++i) {
// ...
// 通过集群内网络发送MOSDPing消息
i->second.con_back->send_message(
new MOSDPing(monc->get_fsid(),
service.get_osdmap_epoch(),
MOSDPing::PING,
now,
mnow,
mnow,
service.get_up_epoch(),
cct->_conf->osd_heartbeat_min_size,
delta_ub));
// 如果前端网络是分离的,那从前端网络也发送MOSDPing消息
if (i->second.con_front)
i->second.con_front->send_message(
new MOSDPing(monc->get_fsid(),
service.get_osdmap_epoch(),
MOSDPing::PING,
now,
mnow,
mnow,
service.get_up_epoch(),
cct->_conf->osd_heartbeat_min_size,
delta_ub));
}
logger->set(l_osd_hb_to, heartbeat_peers.size());
// 有个情况是就一个OSD,那就等着OSDMap更新吧。
// hmm.. am i all alone?
dout(30) << "heartbeat lonely?" << dendl;
if (heartbeat_peers.empty()) {
if (now - last_mon_heartbeat > cct->_conf->osd_mon_heartbeat_interval && is_active()) {
last_mon_heartbeat = now;
dout(10) << "i have no heartbeat peers; checking mon for new map" << dendl;
osdmap_subscribe(get_osdmap_epoch() + 1, false);
}
}
dout(30) << "heartbeat done" << dendl;
}
心跳对象确定
心跳逻辑还是比较好理解的,基本就是个for循环,但是有个小细节,就是heartbeat_peers
哪来的?哪些OSD会被放入到这个心跳列表?是所有的OSD么?这个列表的初始化在OSD::maybe_update_heartbeat_peers()
:
void OSD::maybe_update_heartbeat_peers()
{
// ...
// 首先,把所有OSD负责的PG的副本OSD,加到列表里
// build heartbeat from set
if (is_active()) {
vector<PGRef> pgs;
_get_pgs(&pgs);
for (auto& pg : pgs) {
pg->with_heartbeat_peers([&](int peer) {
if (get_osdmap()->is_up(peer)) {
_add_heartbeat_peer(peer);
}
});
}
}
// 然后再看看OSDMap,把前后相邻的OSD先算出来
// include next and previous up osds to ensure we have a fully-connected set
set<int> want, extras;
const int next = get_osdmap()->get_next_up_osd_after(whoami);
if (next >= 0)
want.insert(next);
int prev = get_osdmap()->get_previous_up_osd_before(whoami);
if (prev >= 0 && prev != next)
want.insert(prev);
// 根据配置,从OSDMap里随机找一些OSD
// make sure we have at least **min_down** osds coming from different
// subtree level (e.g., hosts) for fast failure detection.
auto min_down = cct->_conf.get_val<uint64_t>("mon_osd_min_down_reporters");
auto subtree = cct->_conf.get_val<string>("mon_osd_reporter_subtree_level");
auto limit = std::max(min_down, (uint64_t)cct->_conf->osd_heartbeat_min_peers);
get_osdmap()->get_random_up_osds_by_subtree(
whoami, subtree, limit, want, &want);
// 把这些OSD全加到extras列表里
for (set<int>::iterator p = want.begin(); p != want.end(); ++p) {
dout(10) << " adding neighbor peer osd." << *p << dendl;
extras.insert(*p);
_add_heartbeat_peer(*p);
}
// 二次确认一下,如果有非UP的OSD,就从列表里去掉
// remove down peers; enumerate extras
map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin();
while (p != heartbeat_peers.end()) {
if (!get_osdmap()->is_up(p->first)) {
int o = p->first;
++p;
_remove_heartbeat_peer(o);
continue;
}
if (p->second.epoch < get_osdmap_epoch()) {
extras.insert(p->first);
}
++p;
}
// 下面就是根据配置调整心跳列表OSD的数量,让数量保持在一个合理的值
// too few?
for (int n = next; n >= 0; ) {
if ((int)heartbeat_peers.size() >= cct->_conf->osd_heartbeat_min_peers)
break;
if (!extras.count(n) && !want.count(n) && n != whoami) {
dout(10) << " adding random peer osd." << n << dendl;
extras.insert(n);
_add_heartbeat_peer(n);
}
n = get_osdmap()->get_next_up_osd_after(n);
if (n == next)
break; // came full circle; stop
}
// too many?
for (set<int>::iterator p = extras.begin();
(int)heartbeat_peers.size() > cct->_conf->osd_heartbeat_min_peers && p != extras.end();
++p) {
if (want.count(*p))
continue;
_remove_heartbeat_peer(*p);
}
dout(10) << "maybe_update_heartbeat_peers " << heartbeat_peers.size() << " peers, extras " << extras << dendl;
// clean up stale failure pending
for (auto it = failure_pending.begin(); it != failure_pending.end();) {
if (heartbeat_peers.count(it->first) == 0) {
send_still_alive(get_osdmap_epoch(), it->first, it->second.second);
failure_pending.erase(it++);
} else {
it++;
}
}
}
心跳接收
在OSD初始化的一开始,我们已经知道4个心跳相关Messenger注册了一个回调heartbeat_dispatcher
用于处理收到的请求。这是封装后的dispatcher,最终会执行osd->heartbeat_dispatch(m)
,这个heartbeat_dispatch
也定义在src/osd/OSD.cc
:
bool OSD::heartbeat_dispatch(Message *m)
{
dout(30) << "heartbeat_dispatch " << m << dendl;
switch (m->get_type()) {
// ...
// 如果是MSG_OSD_PING类型MSG继续调用handle_osd_ping
case MSG_OSD_PING:
handle_osd_ping(static_cast<MOSDPing*>(m));
break;
return true;
}
void OSD::handle_osd_ping(MOSDPing *m)
{
// ...
switch (m->op)
{
// 收到PING包
case MOSDPing::PING:
{
// 发送回包
Message *r = new MOSDPing(monc->get_fsid(),
curmap->get_epoch(),
MOSDPing::PING_REPLY,
m->ping_stamp,
m->mono_ping_stamp,
mnow,
service.get_up_epoch(),
cct->_conf->osd_heartbeat_min_size,
sender_delta_ub);
con->send_message(r);
// ...
else if (!curmap->exists(from) ||
curmap->get_down_at(from) > m->map_epoch)
{
/*
* 如果对端异常,发送个MOSDPing::YOU_DIED包
* tell them they have died
*/
Message *r = new MOSDPing(monc->get_fsid(),
curmap->get_epoch(),
MOSDPing::YOU_DIED,
m->ping_stamp,
m->mono_ping_stamp,
mnow,
service.get_up_epoch(),
cct->_conf->osd_heartbeat_min_size);
con->send_message(r);
}
}
break;
/* 收到PING回包 */
case MOSDPing::PING_REPLY:
{
map<int, HeartbeatInfo>::iterator i = heartbeat_peers.find(from);
// 如果在心跳列表里
// 根据连接更新前后端的上次心跳时间
// 如果只有后端网络,则前后端一起更新,如果前后端网络都有,则各自更新各自的
if (i != heartbeat_peers.end())
{
auto acked = i->second.ping_history.find(m->ping_stamp);
if (acked != i->second.ping_history.end())
{
int &unacknowledged = acked->second.second;
if (con == i->second.con_back)
{
i->second.last_rx_back = now;
ceph_assert(unacknowledged > 0);
--unacknowledged;
/* if there is no front con, set both stamps. */
if (i->second.con_front == NULL)
{
i->second.last_rx_front = now;
ceph_assert(unacknowledged > 0);
--unacknowledged;
}
}
else if (con == i->second.con_front)
{
dout(25) << "handle_osd_ping got reply from osd." << from
<< " first_tx " << i->second.first_tx
<< " last_tx " << i->second.last_tx
<< " last_rx_back " << i->second.last_rx_back
<< " last_rx_front " << i->second.last_rx_front
<< " -> " << now
<< dendl;
i->second.last_rx_front = now;
ceph_assert(unacknowledged > 0);
--unacknowledged;
}
// ...还会记录一些历史信息,这里不分析了。
/* 如果Peer状态正常,清除掉之前不正常的状态 */
if (i->second.is_healthy(now))
{
/* Cancel false reports */
auto failure_queue_entry = failure_queue.find(from);
if (failure_queue_entry != failure_queue.end())
{
dout(10) << "handle_osd_ping canceling queued "
<< "failure report for osd." << from << dendl;
failure_queue.erase(failure_queue_entry);
}
auto failure_pending_entry = failure_pending.find(from);
if (failure_pending_entry != failure_pending.end())
{
dout(10) << "handle_osd_ping canceling in-flight "
<< "failure report for osd." << from << dendl;
send_still_alive(curmap->get_epoch(),
from,
failure_pending_entry->second.second);
failure_pending.erase(failure_pending_entry);
}
}
}
else
{
/* old replies, deprecated by newly sent pings. */
dout(10) << "handle_osd_ping no pending ping(sent at " << m->ping_stamp
<< ") is found, treat as covered by newly sent pings "
<< "and ignore"
<< dendl;
}
}
}
break;
/* 收到MOSDPing::YOU_DIED包,更新osdmap */
case MOSDPing::YOU_DIED:
dout(10) << "handle_osd_ping " << m->get_source_inst()
<< " says i am down in " << m->map_epoch << dendl;
osdmap_subscribe(curmap->get_epoch() + 1, false);
break;
}
heartbeat_lock.unlock();
m->put();
}
超时检测和上报
心跳流程分析的差不多了,该进行心跳的超时检测和信息上报了。
心跳的超时检测,是在OSD::heartbeat_check()
方法里检测的:
void OSD::heartbeat_check()
{
ceph_assert(ceph_mutex_is_locked(heartbeat_lock));
utime_t now = ceph_clock_now();
// check for incoming heartbeats (move me elsewhere?)
for (map<int, HeartbeatInfo>::iterator p = heartbeat_peers.begin();
p != heartbeat_peers.end();
++p) {
// 如果一个心跳还没发呢,先跳过
if (p->second.first_tx == utime_t()) {
dout(25) << "heartbeat_check we haven't sent ping to osd." << p->first
<< " yet, skipping" << dendl;
continue;
}
// 如果有发生超时的情况
if (p->second.is_unhealthy(now)) {
utime_t oldest_deadline = p->second.ping_history.begin()->second.first;
if (p->second.last_rx_back == utime_t() ||
p->second.last_rx_front == utime_t()) {
// fail
// 一个返回都没收到,扔failure_queue队列里
failure_queue[p->first] = p->second.first_tx;
} else {
// fail
// 收到过返回,但是依然超时了,扔failure_queue队列里
failure_queue[p->first] = std::min(p->second.last_rx_back, p->second.last_rx_front);
}
}
}
}
这个检测还是比较简单的,那么什么时候上报到Monitor呢?我们慢慢来:
在OSD:init()
阶段,做了这样一件事:
int OSD::init()
{
// ...
tick_timer_without_osd_lock.init();
// ...
{
std::lock_guard l(tick_timer_lock);
// 定时执行一下C_Tick_WithoutOSDLock
tick_timer_without_osd_lock.add_event_after(get_tick_interval(),
new C_Tick_WithoutOSDLock(this));
}
}
class OSD::C_Tick_WithoutOSDLock : public Context {
OSD *osd;
public:
explicit C_Tick_WithoutOSDLock(OSD *o) : osd(o) {}
void finish(int r) override {
// 实际执行的是OSD::tick_without_osd_lock()
osd->tick_without_osd_lock();
}
};
在OSD::tick_without_osd_lock()
中,做了很多事情,其中就包括OSD心跳超时的检测和Monitor的上报:
void OSD::tick_without_osd_lock()
{
// ...
if (is_active() || is_waiting_for_healthy()) {
{
std::lock_guard l{heartbeat_lock};
// 检测心跳超时
heartbeat_check();
}
map_lock.lock_shared();
std::lock_guard l(mon_report_lock);
// mon report?
// 根据情况上报Monitor,最长不超过osd_mon_report_interval
utime_t now = ceph_clock_now();
if (service.need_fullness_update() ||
now - last_mon_report > cct->_conf->osd_mon_report_interval) {
last_mon_report = now;
send_full_update();
// 上报失败的OSD
send_failures();
}
}
// 等待下次tick
tick_timer_without_osd_lock.add_event_after(get_tick_interval(),
new C_Tick_WithoutOSDLock(this));
}
再看看OSD::send_failures()
:
void OSD::send_failures()
{
// ...
while (!failure_queue.empty()) {
int osd = failure_queue.begin()->first;
if (!failure_pending.count(osd)) {
int failed_for = (int)(double)(now - failure_queue.begin()->second);
// 给Monitor发个MOSDFailure消息,带上失效OSD的各种信息
monc->send_mon_message(
new MOSDFailure(
monc->get_fsid(),
osd,
osdmap->get_addrs(osd),
failed_for,
osdmap->get_epoch()));
failure_pending[osd] = make_pair(failure_queue.begin()->second,
osdmap->get_addrs(osd));
}
failure_queue.erase(osd);
}
}
到此,OSD针对心跳的部分就基本结束了。剩下来就要看Monitor收到MOSDFailure消息之后怎么处理了。