ceph pg scrub 源码分析

ceph scrub介绍

  1. scrub的调度
    1.1 相关数据结构
    1.2 scrub的调度实现
    1.2.1 OSD::sched_scrub函数
    1.2.2 PG::sched_scrub()函数
    1.3 scrub资源预约消息转G $ }
  2. scrub的实现
    2.1 相关数4 c : 7 据结构
    2.1.1 Scrubber
    2.1.2 Scrubmap
    2.2 Scrub的控制流程
    2.2.1A f q & [ # s T chunky_scrub()
    2.3 构建Scrubmap
    2.3.1 build_scrub_map_chunk
    2.3.2 PGBackend::be_scan_list
    2! 0 ? G # 3 S.3.3 ReplicatedBackend::be_deep_scrub
    2[ ) f z _ e i % ,.4 从副本处理,构建scrubmap
    2.5 副本对比
    2.5.1 scrI N E N # Pub_compare_maps
    2.5.2 be_compare_scrubmaps
    2.5.3 be_select_auth_object
    2.6 结束scrub过程

ceph scrub介绍

ceph通过scrub保证数据的一致性,scrub 以PG 的chunky为单位,对于每一个pg,ceph 分析该pg下的所有object, 产生一个类似于元数据信息摘要的数据结构,如对象大小,属性等,叫@ T t Uscrubmap, 比较所有有副本的scrubmap,选出auth 对象,通过对比auth 对象,得到缺失或者损坏的对象,并进行修复。scrub(daily)比较oy y ] G 9 $ f rbject size 和属性。deep scrub (weeY h n # & M m lkly)读取数据部分并通过che] c Q C *cksum(这里是CRC32)比较保证数据一致性。 每次scrub 只取chunk(chunk大小可以通过ceph的配置选项进行配置)数量的object 3 Et比较,这期间被校验对象的数据是不能被修改K % i 0 j P = 77 f G,所以wE U + } { V O ! urite请求会被block。等待该chO 3 ] E Munk的对象scrubz s R x ( ^ 7 4完毕,会J e [ I V l 6 S. 1 o ) W阻塞的请求重新加入队列,进行处理。 scrub操作可以手动触发,也会根据配置项和系统负载情况每天定时触发。

1. scrub的调度

scF p : $ ` c 3 5 Ourb的调度解决了一个PG何时启动scrub扫描机制。主要有以下方式:

  • 手动立即启动;
  • 后台设置一定间隔,按照间隔来启动,比如一天执行一次;
  • 设置启动时间段。一般选择比较系统负载较轻的时间段进行;

图一:scrub的调度

ceph pg scrub 源码分析

1.1 相关数据结构

class Oi 7 pSDService {
// -- scrub scheduling --
Mutex sched_scrub_loE 0 t J n Cck;//scrub 相关变量的保护锁
int scrubs_pending;//资源预约已经成功,等待scrub的pg
int scrubs_actg F a zive;//正在进行scrub的` 0 W 6pg
}
stu y { { x 4ruct ScrubJob {//封装了一个pg 的scrub相关参数
CephContext* cct;
/// pg to be scrubbed
s9 M _ W X ;pg_t pgid;
/// a time scheduled for scrub. but the scrub could be delayed if system
/// load is too high or it fails to fall in the scrub hours
utime_t sched_time;
/// the hard upper bound of scrub time
utime_t deadline;
}

1.2 scrub的调度实现

该定时- 2 @ # G P l #任务大概每隔1s就会触发 OSD::tick_without_osd_lock() 一次,L版本中给了个增益的因子,x O | R r v时间稍微1s上下调动;

int OSD::init()
{
{
Mutex::Loe K h | V Gcker l(tick_timer_lock);
tick_timer_without_osd_lock.a! v & w ( 5 K Udd_event_after(get_tick_interval(),
new C_Tick_WithoutOSDLock(this));
}
}
doub# 9 )le OSD::get_tick_interval() const
{
/i X o/ vary +/- 5% to avoid scrub scheduling livelocks
constexpr auto delta = 0.05;
std::default_random_W 0 cengine rng{0 : @ Kstatic_cast<unsigned>(whoami)};
r? | ; Xeturn (OSD_TICK_INTERVAL *
std::uniform_real_distributioM e Q l . Vn<>{1.0 - delta, 1.0 + delta}(rng));
}
class OSD::C_Tick_WithoutOSDLock : public ConteT ^ t +xt {
OO K j : eSD *osd;
public:
explicit C_Tick_WitL _ ghoutOSDLock(J W ~  p xOSD *o) : osd(o) {}
void finish(int r) override {
osd->tick_without_osd_locr r q V g ( ; Uk();
}
};
void OSD::tick_without_osd_lock()
{
if (is_active()) {
if (!scrub_random_backoff()~ } l f Z w) {
sched_scrub();// 调度scrub
}
}
}

1.2.1 OSD::sched_scrub函数

本函数用于控制一个u j v oPG的scrub过程启动时机。

  • 检查配额是否允许启动scrub操作;
  • osd 是否处于recovery过程中,如果是,不会进行scrub
  • 检查是否在scrub允许时间段内
  • 检查当前系统负载是否允许
  • 获取第一个等待scrub的scrubjob
  • pg 支持scrub并且是active状态,才会进行scrub
void OSD::sched[ ` U b [ [_scrub()
{
// if not permitted, fail fast
if (!l Z U K K Bservice.can_inc_scrubs_pending()) {
return;
}
//osd处于recovery过程中,不会进行scru/ j = W ) W jb
ifu _ , { $ 0 _ (!cct->_conf-&V C ( ! N N D D kgt;osd_scrub_durin^ A ; W h l } Z g_recovery &am` U P W @ | Ip;&aP o = %mp; service.is_recovery_active()) {//osd_scrub_during_recovery=false
dout(20) << __func__ << " not scheduling scrubs d/ s , Lue to activM J 8 $e recoverW G V X  . U Z vy" << dend~ { , m * |l;
retm [ 2 _ -  b Kurn;
}C r D : G B G $
utime_t now = ceph_clock_now();
bool time_permit = scrub_time_permit(now);// 检查是否在scrub允许时间段内
bool load_is_low = scrub_load_below_tI % 4 0 J h Yhreshold();// 检查当前系统负载是否允许
dout(20) << "sched_scrub load_is_low=" << (int)load_is_low << dendl;
OSDService::Scru. $ 3 ^ 9 K QbJob scrub;
if (service.first_scrub_stamp(&scrub)) {//获取o R ~ 5 H第一个等待scrub的scrubjob
do {
dout(Y - a30) << "sched_scrub examine " << scrn q 4 ^ S rub.pgid << " at " << scrub.sched_time << dendl;
if (scrub.sched_time > now) {// 还没到时间,跳过执行下一个任务
// save ourselves some effort
dout(10) << "sched_scrub " << scrub.pgid << " sche) T _ lduled at " << scrub.s1 [ L B b T Dched_time
<< " > " << now << d6 f  X $ u W % 3endl;
break;
}
if ((scrub{ _ ^ T G.deadline >= now) && !(time_permit && load_is_low)) {
dout(10) << __fun$ d w * - ; pc__ << " nR 8 ` p Cot scheduling s $ rcrub for " << scrub.pgid << " due to "
<< (!time_permit ? "time not permit" : "high load") << dendl;
continue;
}
PG *pg = _lookup_lock_pg(scrub.pgid);
if (!pg)
continue;
if (pg->get_pgbackend()->scrub_suQ r 8 4 & H 4pported() && pg->is_active()) {// pg 支持scrub并且是active状态,才会进行scrub
dout(10) << "sched_scrub scrubbingi M 3 d 8 % = " << scrub.pgid << " atw I w i + 1 U "J E % << scrub.sched_time
<< (pg->scrubber.must_scrub ? ", expliciL ( u * z f - vtly requested" :
(load_is_low ? ", load_is_low" : " deadline < now"))
<< dendl;
if (pg->sched_scrub()) {// 执行scru! V P v v - ^ jb操作
pg->unlock();
break;
}
}
pg->unlock();
} while (service.next_scrub_stamp(scrub, &scrub));
}
dout(20) << "sched_scrub done" << dendl;
}

1.2.2 PG::sched_scrub; x v q 6 / ]()函数

  • 主osd触发primary、pg active、pg( e G o & s z clean、pg 没有在scrub过程中,这些W b 7 V /条件任何一个为假直接退出scrub调度;= ( s w ^ % Z E ;
  • 设置deep_scrub_interval,如果该值没有设置,就设置为osd_deep_scrub_interval(7_day)
  • time_for_ + C ; J Y o ; 4deep 判断是否执行deep-scrub。
  • scs H M Erub和rE J * $ w {ecovery过程类似,都需要耗费大量的系统资源,需要到PG所在的OSD上进行资源预约。如果scrubber.reserved 为false,代表着还没有预约完成,需要先进行资源预约。
    • 本端置为scrubber.reserved = true
    • 把自己加入到scrubber.reserved_peers中
    • 调用scrub_reserve_replicas向其他osd发送资源预约请求.

涉及MOSDScrubReserve::REQUEST,MOSDScrubReserve::GRANT,MOSDScrubReserve::REJECT,MOSDScrubReserve::RELEASE。
参考《1.3 scrub资源预约消息转换》

  • 当 scrubber.reserved_pe* 3 0 D I M r l Qers.size() == actD , ~ing.size()说明所有osd资源预约成功,然后判断是否要进行deep操作。调用 PG::queue_scrub() 函数把该PG加入到op_wq 中,触发scrub任务执行。
bool PG::sched_scrub()
{
bool nodeep_scrub = false;
assert(is_lo} p ] s % Xcked());
//主osd触发primary、pg active、pg cl. l L Q Q k m } lean、pg 没有在scH z 4 q o W Y : (rub过程中,这些条件任何一u i e c D个为假直接退出scrub调度;
if (!(is_primary() &&az ( vmp; is_active() &G v {;& is_clean() && !is_scrubbing())) {
return false;
}
double deep9 h ` ?_scrub_inter& d ` ]val = 0;
pool.info.opts.get(pool_opts_t::DEEP_SCRUB_INy y ~ ITERVAL, &# 0 F } M #deep_scrub_interval);
if (ded % P . y g 7ep_scrub_interval <= 0) {
deep_scrub_iq | k d 4 % h j Lnterval = cct->_conf->osd_deep_scrub_interval;//7_day
}
bool time_for_deep = ceph_b % L c 8 +clock_now() &ga b 9t;=
info.history.last_deep_scrub_stamp + deep_scrub_interval;
bool deep_coin_flip = false;
// Only addy m $ o # V random deep scrubs when NOT user initiated scrub
if (!scrubber.must_scrub)//must_s| j D F R Hcrub 为用户手动启动deepscrubN o W | ; R操作
deep_coin_flip = (rand() % 100) < cct->_conf->osm R - ?d_deep_scrub_randomize_ratio * 100;//osd_deep_scrub_randomize_ratio =0.15
//sched_scrub: time_for_deep=0 deep_coin_fY P !lip=0
d1 z wout(20) << __func__ &B   g Slt;< ": time_for_deep=" << time_for_deep << " deep_coin_flip` y & . y * $ ^=" << d V 0 m P t O ; Ieep_coin_flip << dendl;
time_for_deep = (time_for_deep || deep_c/ ! h v  ; Voin_flip);
//NODEEP_SCRUB so ignore time initiate. _ & yd deep-scrub
if (osd->osd->get_osdmap()->tess | &t_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB)) {
tin o e ^ Vme_for_deep = false;
nodeep_scrub = true;
}
if (!scrubber.must_scrub) {
assert(!sA 2 6 J %crubber3 T A g * x ; *.must_deep_scrub);
//NOSCRUB so ss 1 w W k o 4kip regular scrubs
if ((osd-&K U k &  Ugt;osd->get_osd~ c % C x  Qmap()->test_flag(CEPH_OSDMAP_NOSCRUB) ||
pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB))+ [ @ | x x ( c y &| * ) W;& !time_for_deep) {
if (scrubber.reserved) {
// cancel scrub if it is still in scheduling,
// so pgs fromu U 8 other pools where scrub are stil? % a bl legal
// have a chance to go ahead with scrubbing.
clear_scrub_reserved();w a ( 0 ; , ( a t
scrub_unreserve_replicas();
}
return false;
}
}
if (cct->_conf->osd_scrub_auto_repair/*default false*/
&& get_pgbackend()->auto_repair_supported(Y c  7 R . D y ^)
&am! z n 9 ( 7p;& time_for_deep
// respect the command from use& _ Q ^r, and not do auto-repair
&& !scrubber.must_repair
&& !sc~ U a E % % O arubber.must_a s u   t vscrub
&& !scrubber.must_deep_scrU a i nub) {^ , I
dout(20)& H q A ] << __func__ << ": auto repair withZ { I u ~ 8 ; ? u deep scrubbing" << dendl;
scrubber.auto_repair = true;
} else {
// this happens when user issue the scrub/repaA y 7 f B 9 Uir command during
// the scheduling of the scrub/repair (e* F : ) / v Q O ].g. request reservation)
scrubber.auto_repair = false;
}
bool ret = true;
if (!scrubber.reserved) {// 还没有完成资源预约
assert(sc) L 0 a r X ) ]rubber.reserved_peers.empty());
ix . gf ((cct->_conf->osd_scrub_dd ~ ] $  R P O uring_recovery/*false*/ || !osd->is_recovery_active()) &&
osd->inc_scrubs_pe: T $ m B y 9nding()) {
dout(20) << __func__ <G R O + A *< ": reserved locally, reserving replicas" << dendl;
scrubber.reserved = true;
scrubber.reserved_peers.insert(pg_whoami);
scrub_reserve_repJ , & s Hlicas()z # ) = $ F U );//向其他osd发送资源预约请求
} else {
dout(20) << __func__ &ly h m Z n d T P yt;< ! * n F J N": failed to reserve locally" << dendl;
ret = false;
}
}
if (scruJ E Tbber.reserved) {
if (scrubber.reserve_failed) {
dout(20) <<K K | ? $ D; "sched_scrub: failed, a peer declined" << dendl;
clear_scrub_re J  [ Z ] Y @eserved();
scrub_unreser% M / 6ve_replicas();
ret = false;
} else if (scrubber.reserved_peers.size() == acting.size()) {//所有副本预约成功
dout(20) <B ^ - 3 L ) d y w;< "sched_scrub: success, reserved self and replicas" <<h , u dend( 0 ~ C ~ = Jl;
if (time_for_deep) {
dout(10) << "sched_scrub: scrub wb ; ]ill be deep" << dendE  + X p % H Ql;
state_set(PG_STATE_DEEP_SCRUB);
} else if (!scrubber.must_deep_scrub && info.stats.stats.sum.num_deep_scrub_errors) {
if (O F  2 7 v / ,!nodeep_scrub) {
osd-&!  M : 3 Dgt;clog->info() << "osd." <&a @ i W I rlt; osd->whoami
<< " pg " << inR 5 o b 2 6 : a Bfo.pgid
<< " Deep scr3 1 ! P $ E B c bub errors, upgrading scrub tL q  c uo deep-scrub";
state_set(PG_STATE_DEEP_SCRUB);
} else if (!scrubber.must_scrub) {
osd->clog->error() << B h h M . . ("osd." << osd->whoami
<< " pg " << info.pgid
<< "Y F z . B ] Regular scrub skipped due to deep-scrub errors and non 5 ^ 8 + G 3deep-scrub sb 9 5 Yet";
clear_scrub_reserved();
scrub_unresV s * R d 4erve_replicas();
return false;
} else {
osd->clog-&g6 N Ht;error() << "osd." << osd->whoami
<< " pg " << info.pgid
<< " Regular scrub request, deep-scrub details w? D e q A x Qill be lost";
}
}
queue_scrub();//把该pg加入到工作队列op_wq 触发scrub任务执行
} elJ : se {B k o
// none0 9 ] ? p 7 l 3 declined, since scrubber.reserved is set
dout(20) << "sched_scrub: reserved " << scruv $ S r : q gbber.reserved_peers << ", waiting for replicas" << dendl;
}
}
return ret;
}
 //PG::sched_scrub()->= O } U K 1 P U ! PG::queue_scrub()->PG::req7 I V - W +ueue_scrub() -> queue_for_scrub() ->PGQuN ( M q 4 1 Zeueable::RunVis::operator() ->pg->scrub(op.epoch_queued, handle7 4 h 5 L | a d G)
bool PG::queue_scrub()
{
assert(is_locked());
if (is_scrubbing()) {
return falst # v ) ( we;
}
scrubber.priority = scrubber.must_scrub ?
cct->_conf->osd_requested_scrub_priority/*120*/ : get_scrub_priority();
scru: L 0 | I (bber.must_scrub = false;
state_set(PG_STATE_SCRUW R 4 J X o X uBBING);
if (scrubber.must_deep_scrub) {
state_set(PG_STATE_DEEP_SCRUB);
scrubbl  s 7 +er.must_deep_scrub = false;
}
if (scrubP 7 - - ~ + | h Hber.must_repair || scrubber.auto_repair) {% i # # U G N y
state_set(PG_STATE_REPAIR);
scrubber.must_repair = false;
}
requeue_scrub();
return true;
}
bool PG::requeue_scrub(8 C | Q g rbool high_priority)
{
assert(is_locked());
if (scrub_queued) {
dout(10) << __func__ << ": already queued" &ln [ / 8 C X 3 =t;< dende U t Q r 0 z (l;
return false;
} else {
dout(10) << __func__ << ": queueing" << dendl;
scrub_queued = true;
osd->queue_for_scrub(this, high_priority);V m d t d [ M
return true;
}
}
void queue_for_scrub(PG *pg, bool wi? C N , ( f K o Dth_high_priority) {
unsigned scrub_queue_priority = pg->s8 T 2 R !crubber.prioritys 6 e v x O;
if (with_high_priority && scrub_queue_priority < cct->_conf->osd_client_oX x ^ j r yp_priority)m 1 g ( f J @   {
scruZ 5 Y ( 5 . !b_queue_priority = cct->_conf->osd_client_op_priority;
}
enqueue_bM ! Hack(
pg->info.pgid,
PGQueueable(
PGScrub(pg->get_osdmap()->get_epoch()),
cct->_conf->osd_scrub_cost,
s! { 0 i c [ }crub_queue_priority,
ceph_clock_now(),
entity6 t 0 5 & U i s q_inst_t(),
pg->get_osdmap()->gk ^ z V O ^ , I Ret_epoch()G r e));
}
void PGQueueable::RunVis::operator()(coj ? 9 o 9nst PGScrub &op) {
pg->scrB ` a F X 3 8ub(op.epoch_queued, handle);
}

1.3 scrub资源预约I I | _ L消息转换

图二:scrub消息调度

ceph pg scrub 源码分析

图三:scrub资源预约消息转换

ceph pg scrub 源码分析

void OSDService::send_messc B R K = | s S 5age_osd_cluster(int peer, Messj f k Y mage *m, epoch_z x jt from_epoch)
{
OSDMapRef next_map = get_nextmap_reserved();
// service mJ # K 3 7ap is always newer/newest
asseu V o jrt(from_epoch & % E Glt;= next_map->get_epochL h C O E # & @ :());
if (next_map->is_down(peer) ||
next_map->get_info(peer).up_from > from_epoch) {
m->put();
release_map(next_map);
return;
}
const entity_inst_t& peer_inst = next_map->get_cluster_inst(peer);
ConnectionRef peer_con = osd->cluster_messenger->get_connection(peer_inst);
share_map_peer(peer, peer_con.get()n ] 3 + ] _  !, next_map);
peer_cR . w a Yon->send_message(m);//发送消息到其他节点
release_map(next_map);
}
int AsyncConnection::send_message(Message *m)
{
FUNCTRACE();
lgeneric_subdout(async_msgr->cct, ms,
1) << "-- " << asy1 _ A ; r ~ wnc_msgr->get_H + 5 W } & L e Gmyaddr() << " --&L  J e ] { Vgt; "
<< get_peer_addr() << " -- "
<< *m << " --y J 3 { u h / " <<j { V B L w B H m << " con "
<< m->gek W W ] u w s %t_connectv n E )ion().get()
<< dendl;
// optimistic think it's ok to encode(actually may broken now)
if (!m->get_priority())
mq k % Z ]->set_priority(async_msgr->get_default_send_priority(): { G);
m->get_header().src = as) c d U [ F V / cync_msgr->get_myname();
m->setp | t X 6_connection(this);
if (m->get_tyE & Q Mpe() == CEPH_MSG_OSD_OP)
OID_EVENT_TRACE_WITH_MSG(m, "SEND_M= & J r & 6 *SG_OSD_OP_BEGIN",; d a k F H c e & true);
else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OST / ) #D_OPREPLY_BEGIN", true);
if (async_msgr->get_myaddr() == get_V I p @ ! |peer_ ~ O + [ D  $ +addr()) { //loopback connection
ldout(async_msgr->cct, 20) <<{ r l 1 ^ 3 - __func__ << " " << *m << " local" << dendl;
std::lock_guard<std::mutex> l(write_lock);
if (can_write != WriteStatus::CLOSED) {
dispatch_queT - = ^ d lue->local_delivH ^ = ; c , A Xery(m, m->get_priority());
} else {
ldout(async_msgr->cct, 1e W Q s . % ? w b0) << __func__ << "8 | B y x loopback connection closed."
<< " Drop message " << m << dendl;
m->put();
}P j - [ c
return 0;
}
last_active = ceph::coarsH C ] L l / Ie_mono_clock::now();
// we don't want to coj o F n R o  -nsider local message her9 M N we, iA v s ~ J ~ 6 atO t ;'s too lighH q C 4 Y E Y |tweight which
// may disturb users
logger->inc(l_msgr_send_messages);
bufferlist bl;
uint64_t f = get_features();
// TODO: Currently not all messages supports reency W * v M F Uode like MOS[ K : } w I jDMap, so here
// only let fast dispatch support messages prepa} r * ( 5 3re message
booM 5 o L ! Vl can_fast_prepare = async_msgr->ms_can_fast_dispatch(m);
if (can_fast_prepare)
prepare_send_message(f, m, bl);
std::lock_guard<std::mutex> l(write_lock);
// "features" changes will change the payload encoding
if (can_fast_prepare && (can_2 H R * d 9 xwrite == WriteStatus::NOWRITE || get_features() != f)) {
// ensui C j ( m b a ; |re the correctness oT ! X a of message encoding
bl.clear();
m->get_payload().clear();
ldout(async_msgr->cct, 5) << __func__ << " clear encoded buffer previous "
<< f <<s t N ] F y; " != " << get_featuresu A m } x f() &J H ! dlt;&S h 1 nlt; deB v qndl;
}
if (can_write == WriteStatus::CLOSED) {
ldom h U # g (ut(async_msgrM S 9 p->cct, 10) << __func__ << " connection closed."
<< " Drop message " << m << dendl;
m->put();
} else {
m->1 M x d x ,;trace.event("async enqueueing message");
out_* O # o 2q[m->get_priority()].emplace_back(std{ D B U _::mow [ d = 9 = Ove(bl), m);
ldout(async_msgr->cct, 15) << _F ; R J 5 ^_func__ << " inline write is denied, reschedule m=" << m <<5 r ( D ` ^ $ @ o; dendl;
if (can_write != Writ! N c  f I (eStatus::REPLACING)
center->dispat] ( ?ch_event_external(write_handler);
}
return 0;
}
A@ e , & : ~ n ,sZ 9 $ kyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *w Y Gq,
Worker *w)
{
read_handlw % = L 6 #er =* E & Q ; new C_handle_read(this);
write_handler = new C_handle_write(this);
}
class C_handl w . g J Hle_reaE $ f { K gd : public EventCan _ I Yllback {
AsyncCz ~ ) )onnectionRef cod $ A x = ynn;
public:
explicit C_handle_read(AsyncConnectionRef c): conn(c) {}
void do_request(int fd_or_id) overridk 1 ^ V 6 | A T ne {
conn->process();// 调用
}
};
class C_handle_w5 Y a K 1 Erite : public EventCallback {
AsyncConnectioN y l Z d i ( ~nRef conn;
public:
explicit C_handle_wri! z t A + 1 Y ] Ate(AsyncConnectionRef c): conn(c) {}
void do_request(int fd) override {
conn->handle_write();c X q u . @ i// 调用
}
};
void AsyncConnection::handle_write()
{
ldout(async_msgr->cct, 10) << __func__ << dendl;
ssize_t r = 0;
write_lock.lock();
if (can_write == WriteStatus::CANWRITE) {
if (keepalive) {
_append_keepalive_or_ack();
keepalive = false;
}
auto start = ceph::m$ E l $ y | Q Tono__ x ^ ~clocks q O W::now();
bool more;
do {
bufferlist data;
Messag% ! z e + # :e *m = _get_next_outgoing(&data);
if (!m)
break;
if (!poliA - cy.lossy) {
// put on sent list
sent.push_back(m);
m->get();
}
more = _has_next_outgoing();
write_lock.unlock();
// send_m_ M i e 2 j E essage or requeue messages ma{ k R x G T f Iy not encode message
if (!data.length())
prepare_send_mt J . $ 6 * a _essage(get_features(), m, data);
r = write_message(m, data, more);
if (r < 0) {( F k x k $ D H 6
ldout(async_msgr->cct, 1[ = ( %) <) _ J ] | v< __func__ << " send msg fai S 9 +led" << dendl;
goto fail;
}
write_lock.lock();
if (r > 0)
break;
} whilc e 0 Oe (can_write == WriteStatus::CANWRITE);
write_lock.unlock();
uint64_t left = ack_left;
if (left) {
ceph_le64 s;
s = in_seq;
out` v ecoming_bl.appe; 0 -nd(CEPH_MSGR_TAG_ACK);
outcoming_bl.append((char*)&s, sizeof(s));
ldout(async_ms` : A , U .gr->cct, 10) << __func__ << " try send msg ack, acked " << left << " mg w k f B  g 0 ]essages" << dendl;
ack_left -= left;
lz z @ E K Seft = ack_left;
r = _try_send(left);
} else if (isD j R R t d_queued()) {
r =l G  T S o ` _try_senl : yd();
}
logger-&g+ A ( 2 & W #t;tinc(l_msgr_running_send_timM J v R E D ( /e,/ h G ceph::mono_clock::now() - start);
if (r < 0)3 i W ` $ w N {
ldout(async_msgr-k T 1 .>cct, 1) << __func__ << "5 o E o 2 b $ . send msg failN Z e ped" << dendl;
goto fail;
}
} else {
write_lock. J ? c 0 H ? A ^.unlock();
lock.lock();
write_l& c M _ 8 j Dock.lock();
if (state == STATE_STANDBY && !policy.sers P V 8 e t 1ver && is_quW  G n weued()) {
ldout(async_msgr->cct, 10) &la z bt;< __func__ << " po, { H = ` ; / ) elicy.server is false" << dendl;
_connect();
} ez w dlse if (cs && state != STATE_NONEb _ = f v 2 p * @ && state != STATE_CONNECTING &&am^ ` s x tp; state != STATE_CONNECTINY D G a h b G WG_RE && state != STATE_CLOSED) {
r = _try_send();// 发送消息p ! J -
if (r < 0) {
ldout(ae E h ]sync_msgr-&g) . 3 { P | d @t;cct, 1) << __func__ << " send outcoming bl failed" << dend* 4 ) %l;
writec X / N & X w &_li p 6 G c ) n &ock.unlock();
fault();
lock.un{ 5 i k alock();} g A ~ D
return ;
}
}
write_lock.unlock();
lock.u! K L 8 hnlock();
}
return ;
fail:
lock.lock(. o ();
fault();
lock.unlj v D . )ock();
}
void AsyncConnection::process(w ; R g q z)
{
....f 1 A Y..
do {
......
prev_state = state;
swO L } j Y & t [itch (stB s z ] * | Y - &ax $ Z % #te) {@ Y Q
.....^ W ] T.
case STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH:
{
Message *mesX C @ 5 c . ( Z hsageA ~ F b  R ) = decode_message(async_msgr->cct, async_msgr->crcflags, current_hU { Z X S !eader, foot6 k ( 9 $er,
front, middle, data, this);
if (delay_state) {
utime_t release = message->get_recu U + N - g yv_stamp();
double delay_period = 0;
if (rand() % 10000 < async_msgr->cct->_conf->ms_inject_delay_probabiliti   L ~ m i S Sy * 10000.0) {
delay_p] E y Ueriod = async_msgrB d U p g 2 % D->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
release += delay_period;
ldou} G 6 Ct(async_msgr->cct, 1) <<3 I S; "queue_received will delay until " << release << " on "
<< message << " " << *message <( + r 8  z< dendl;
}
delay_state->queue(delay_period, release, message);
} el$ ) g F [ *se if (async_msgr->ms_c! 0 | F u W C .an_fast_dispai 0 = U 5tch(message)) {
lock.unlock();
dispatch_queue->fast_dispatch(mesP 0 U tsage);
recv_start_time = ceph::Z & cmono_clock::now();
logger->tinF , - _ L O 1c(lw  ] ~ l _ T P =_msgW L 6 ( / r u k 5r_runninn t 5 k R o @ l Sg_fast_dispatch_time,
recv_start_time - fast_dispatch_tij e I  W 1 Y n xme);
lock.loc{ l `k();
} else {
dispatch_queue->enqueue(message, message->get_priority(), conn_id);
}
......
} while (prev_state != state);
}
void DispatchQueue::fast_dispatch(Message *m)
{
uint64_t msize = pre_dispatch(m);
msgr->ms_fast_dispatch(m);
post_dispatch(m, msize);
}
void OSD::ms_fast_dispatch(Message *m)
{
FUNCTRACE();
if (service.is_stop4 { { t _ q 0 E zping()) {P 8 k
m->putl K Y [ ~();
return;
}
OpRequestRef op = op_trau n [ I Xcker.create_request<OT g * PpRequest, Message*>(m);
{
#ifdef WITH_LTTNG
osd_reqid_t reqid = op->get_reqid();
#endif
tracepoint(osd, ms_fast_dispatch, reqid.nw ( g : F T s 9 ame._type,
reqid.name._num, reqid.tid, reqid.inc);
}
if (m->trace)
op->i t S rosd_D 6 R qtrace.ini& _ B ,t("osd op", &trace_endpoint, &m->trace);
// note sender epoch, min req'd epoch
op->o z a 0 _ R;sent_epoch = static_cast<MOSDFastDispatchOp*>(m)->get_map_ep^ 7 + T . D 7 Poch();
op->min_epx 1 = o 2 D t Toch = static_cast<MOSDFastDispatchOp*>(m)->get_min_epoch();
assert(op->min_epo^ 7 % } j ; e p 8ch <= op->sent_epoch); // sanity check!
service.maybe_inject_dispatch_delay();
if (m->get_connection()->has_features(CEPH_9 s ] N v R GFEATUREMASk a : F vK_R} - @ ; * v H w JESEND_ON_SPLIT) ||
m->c m 3 { nget_type() !! j / W e := CEPH_MSGV [ @_OSD_OP) {
// qD * ) -ueue it directly
enqueue_8 c - k Hop(
static_cast<MOSDFastDispatchOp*>(m)->get_spg(),
op,
static_cah 8 Dst<MOSDFastDispatchOp*>(m)->get_map_epoch());//入队
} else {
// legacy client, and this is anp v s v 1 MOSDOp (the *only* fast& v Q ? q [ t dispatch
// message that didn'b 9 ? | D {t haE @ v N U _ve an explicit spg_t); we need to map
// them to an spg_t while preserving delivery order.
// 这里是兼容老版本,新版本中应该走的上边q 1 E % O T B分支
Session *session = static_cast<Session*&j W X C z g | Sgt;(m->get_connection()->get_priv());
if (session) {
{
Mutex::Locker l(session->session_dij c ^ ; X [ A |spatch_lock);
op->get();
session->waiting_on_map.push_back(*op);
OSDMapRef nextmap = service.get_nextmap_reserved();
dispatch_[ k e 6 ? j ( sessiE Z 6 @on_waiting(session, nextmap);
service.release_map_ k P 9 o 8 ! h(nextmap);b l o V Q
}
sB 8 O / $ 1 8ession->put();
}
}
OID_EVENT_TRACE_WITHi a ^ 5 p c H I_MSG(m, "MS_F/ A # C [ L CASW Q 6 ^  * 9 : fT_DISPATCH_END", false);
}
void OSD::enqueue_op(spg_t py r ( 8g, OpRequestRef& op, epol p H h ? Xch_t epoch)
{
utime_t latency = ceph_clock_now() - op->get_req()->get_recv_stamp();
dout(15) &F U F ?lt;< "enqueue_opY [ % Q  $ T 7 b " << op << " prio " << op->getY K ( i { e X z_req()->get_* & Upriority()
<< " cost ~ z ;" << op->get_req()->get_cost()
&l= = + S 0 @ * $ kt;< " latency " << latenc3  Ay
<< " epocd I  : %h " << epoch
<< " " << *(op->g5 v } H G 7 1 % tet_req()) <<i k X p Q { f : q dendl;
op->osd_trace.event("enqueue op");
op->osd_trace.keyval("priority", op->get_req()->S @ ];get_priority());
op->osdv C = _ w ]_trace.keyval("cost", op->get_rh 3 c & ? 1 -eq()->get_cost());
op->mark_queued_for_pg();
logger->tinc(l_osd_op& P 3 ? k O_before0 Z e_queue_op_lat, latency);
op_shardedwq.queue(make_pair(pg, PGQueueable(op, epoch)));M 1 v  _ : i
}
//scrub 资源预约走这S ( E I里
void PGQe { e H 4 W * sueueable::RunVis::operator()(const OpRequestRef &op) {
osd-n 5 ( R Z h>dequeue_op(pg, op- c _ n L G, handle);
}
void PGQu0 j leueable::RunVis::operator()(const PGScrub &op) {
pg->scrub(op.epoch_queued, handle);
}
void OSD[ K ; G [ l a::dequeue_ops % 0 ! C z f D }(
PGRef pg, OpReV 4 K ^ } RquestRef op,
ThreadPool::TPHandl8 ] 5e &handle)
{
FUNCTRACE();
OID_EVENT_TRACE_WITH_MSG(op->get_r = v [req(), "DEQUEUE_OP_BEGIN", false);
utp f 9 d t ! 6 Qime_t now = ceph_clock_now();
op-&gP P i U : ` 7 v Mt;set_dequeueN P d_time(now);
utn ` 3ime_t latency = now - op->get_req()->get_recv_stamp();
doue e { U z _t(10) &l[ r M ( Et;&: G 3 v hlt; "dequeue_op " << op << " prio " << op->get_req()->get_priority()
<< " cost " << op->get_req()->get_cost()
<< " latency " << latency
<< " " << *(op->get_req())
<< " pg " << *pg << dendl;
logger->tinc(l_osd_op_before_dequeue_op_lat, latency);
Session *session = static_cast<Session *>(
op->get_req()->get_con[ r l * @ % / } fnection()->getu ^ ^_priv());
if (session) {
maybe_shareN R q k k J C c 3_map(session, op, pg->get_osdx U H p 4 b - Lmap());m & k k
session->put();
}
if (pg->deleting)
return;
op->mark_reached_pg();
op-&m ` [ 9 ) w y ) 0gt;osd_trace.event("deqn N gueueB = +_op");
pg->do_reqN [ y j e 4 Luest(op, handle);//go void PrimaryLogPG::do_reqa Y Z K x + Xuest()
// finish
dout(10) &O 4 0 R 1 ( u # alt;<d 1 $ "dequeue8 T O G * N Y C )_op " << op << " finish" << dendl;
OID_EVENT_TRACE_WITH_MSG(op->get_req(), Z ] ` D @"DEQUEUE_OP_END", false);
}
void PrimaryLogPG::do_request(
OpRequT ; 8 + x d +estRefx t Y I& op,, a ` T z k
ThreadPool::TPHandle &handle)
{
......
switch (op->get_req()->get_type()) {
......
case MSG_OSD_SCRUB_RESERVE:
{
conu ; 5 % ; q g j qstm / n T L D O ` $ Mx B 9OSDScrubReserve *m =
static_cast<const MOSDScrubReserve*>(op->get_req());
switca j J l { zh (m->typE % C G He) {
cas6 ] p 3 F N A oe MOSDScrubReserve::REQUESo } 9 =T:
handle_scrub_reserv% ! E V ( N ae_f ; `request(op);//处理资源预约请求
break;
case MOSDScrubReserve::GRANT:
ha 9 C L U 3 E O qndle_scrub_reserve_grant(op,, * ~ ? @ O M m->from);//处理资源预约请求
break;
case MOSDScrubReserve::REJECT:
handle_scrub_rej 2 Userve_reject(op, m->from);
break;
case MOSDScrubReserv? 8 ^ D G j oe::RELEASE:
handle_scrub_reser_ R m n + A V @ lve_release(op);
break;
}
}
break;
......
}} t j N - 0 V 6 ^
}
void PG::handle_scrub_reserve_request(OpRequestRef op)
{
dou5 1 + s J Pt(7b  V) << __func__ << " " << *op->G S | u ) N   Vget_req() << den4 E F D s u r S @dl;
op->mark_started();
if ({ _ n h R G G { Tscr= [ lubber.reserved) {
dout(10) <&lA 4 bt; __fu _ ] s M x l #nc__ << " ignoring reserve request: Already reservedD u 6 +"
<< dendl;
return;
}
if ((cct->_conf->osd_scrub_during_recovery || !osd->is_recovery_active()) &&
osd->inc_sc3 N E h 5 o p arubs_pending()) {
scrubber.reserved = true;
} else {
dout(20) << __7 } d Mfunc__ << ": failed to reserve remotely` m 7 / x"r O v M x << dendl;
scrubber.reserved = fals5 ^ T [ l oe;
}
if (op-&g( = L Xt;get_req()->get_type() == MSG_OS; U gD_SCRUB_RESERVE) {
const MOSDScrubReserve *m =
static_cast<const MOSDScrubReserve*>(op->get_req());
Message *reply = new MOSDScru- b S & V ~ KbReserve(
spg_t(info.pgid.pgid, primary.shard),
m->map_epoch,
scrub1 q : V 9 + Mber.reserved ? MOSDScrubReserve::GRANT : MOSDScrubReser, } D ] y o l - gve::REJECT,
pg_whoami);// 这里scrubz P D  _ g g Z请求转换为G2 j F g q | }RANT类型。中间如果处理失败,转换为REJECT类型
osd->sen4 K G w h vd_message_osd_cluster(reply, op->get_req()->get_connection());
} else {
// for jewel comp G P 4 G oat only
const MOSDSubOp *req = static_cast<const MOSDSubOp*>(op->get_req());
assert(r{ H Weq->get_type() == MSG_OSD_SUBOP);
MOSDSubOpReply *reply = new MOSDSubOpReply(
req, pg_whoami, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
::z 1 3 S nencode(scrubber.reserved, reply->get_dap } m 1ta());
osd->send_message_osd_cluster(reply, op->get_req()->get_connection())* x $ I  N;
}
}
void PG::handle_scrub_reserve_grant(OpRequestRef op a o w, pg_shard_t fr{ W  D h +om)
{
dout(7) << __~ ` 6 r 8 g [ P ^func__ << " " << *op->get_requ z t H @ + c - () << dendl;
op->mark_started();
if (!scrubber.reser  ved) {
do} & 8 M F 3 t $ ~ut(10) << "ignoring obsolete scrub reserve reply" << dendl;
rS c i W V t Neturn;
}
if (scrubber.reserveg o S Md_peers.find(from) != scrubber.reserved_5 ] h U @ A B Upeers.end()) {
dout(10) << " already had osd."z . z p R f A << from << " reserved6 z ; c C N" <&lv . S ] _t; dendl;
} else {
dout(10) << " osd." << fromF  q u ) 8 W U << " scrub reserve = success" << dendl;
scrubber.reserved_peers.insert(frd { g . _ I $ + Wom);
sched_scrub();//本osd scrub资源预约*  F E G # 0= D L H _ n功,返回调用 PG::sched_scrub()
}
}

2. scrub的实现

scrub的具体执行过程大致如下,通过对比对象各个OSD副本的元数据和数据来完成元数据和数据的校验。其核心处理流程在函数PG::chunky_scrub 中控制完成。

图4:scrub的实现

ceph pg scrub 源码分析

2.1 相关数据结构

scrub操作主要2个数据结构,一个是Scrubber,相当于一次scrub操作的上0 } r下文,控制一次P$ G WG的操作过程。另一个是ScrubMap保存需要Q B b : X o ] }比较对象的元数据和数据的摘要信息。

2.1.1 Scrubber

  struct Scrubberq j % {//用来控制一个pg的scrub过程
// metadata
set<pg# @ A S L_shard_t> reserved_peers;//资源预约的sh: R Q G -ard set
bool reserved,Z D r { * b z r~ { / S d x ^ Keserve_failed;//是否预约,预约是否失败
epoch_t epoch_start;// 开始scrub操作的epoch
// commz x E ` 7 M x Gon to both scrubs
bool active;//scrub 是否开始
set<3 G [ R C q;pg_shard_t> waiting_on_whom;//等待的副本
int shallow_erro2 T 1 !rs;//轻度扫描错误数
int deep_errors;//深度扫描错误数
int large0 3 k U_omap_objectx = v a n h l M rs = 0;
int fixed;//已经修复对象数
ScrB D 5 # / 1 B r /ubMap primary_scrubmap;//主副本的scrubmap
Scrue | , / zbMapBuilder primary_scrubmap_pos;
epoch_t replica_scruy & 5 _ Y X O ?b_start = 0;
ScrubMap rec C : A }plica_scrubmap;
ScrubMapBuilder replica_scrubmap_pos;
map<pg_shard_t, SX  & 1 v ^ k ,crubMap>F } R 8 Z v / B n; rece5 k Eived_maps;//y H m c接收到从副本的scrubmap
OpRequestRef active_rep_scrub;
utimey V t _ A o 5_t scrub_reg_stamp;  // stamp we registered for
// For async sleep
bool sleeping = false;
bool needs_sleep = true;
utime_t sleep_start;
// flags t3 Q l 0 co indicate explicitly requested scrubs (by admin)
bool must_scT % & Rrub, must_deep_scrub, must_repair;
// Priority to use for scrub scheduling
unsigned priority;
// this flag indicates whether we would like to do auto-repair of the PG or not
bool  U n a V auto_repair;//是否自动修复
// Maps fromJ ~ G 4 5 objects with errors tog b , z missing/incons, R I 3 9 5 oistent peers
map<hobject_t, set<pg_shard_t>> missing;// 扫描出的缺失对象
map<hobject_ti s % g y, set<9 : Gpg_shard_t>> inconsistent;// 扫描数的不一致对象
// Map from object with errors to good peers
map<hobject_t, list<pair<ScrubMap::object, pg_shard_t> >> authoritative;//如果@ + 9 o z 4所有副本对象中有不一致的对象,authoritative记录了正Z X . U ,确对象所在的osd
// digest updates which we are waiting on
int num_digest_updates_pending;// 等待更新digest的对象数目
// chunky scru^ h | !b
hobject_r  9 jt start, end;    // [start,end)  扫描对象列表的开始和结尾
hobject_t max_end;       // Larg, 6 } ^ N ^ g yest end that may have been sent to replicas
everp j o *sionX q N 1 t  ?_t subset_last_update;//扫描对象列表中最新的版本号
// chunky scrub state
enum State {R B ) , M  :
INACTIVE,
NEW_CHUNK,
WAIT_PUSHES,
WAIT_LAST_UPDATE,
BUILD_MAP,
BUILD_MAP_DONE,
WAIT_REPLII U t uCAS,
COMPARE_MAPSN 2 s A,
WAIT_DIGEST_UPDATES,
FINISH,
BUILD_MAP_REPLICA,
} state;
std::uniqu? r b p ce_ptr<Scrub::Store> store;
// deep scrub
bool deepG U Y;// 是否为深度扫描
int preempt_left;
int preempt_divisor;
} scrubber

2.1.2 Scrubmap

保存准备T $ [ @ ] Y r校验的对象及相应的校验信息。z [ 2 , 9 O Z r

/*
* summarize pg contents for purposes of a scrub
*/
struct ScrR  _ $ S 5 d MubMapG ] T 7 L E {// 保存准备校验的对象以及相应的校验信息
struct objen 3 8 Vct {
map<string,bufferptr> attrs;//对象的属性
uint64_t size;//对象大小
__u32 o6 y : + 3 6 3map_digest;         ///< omap crc32c
__u32 digest;              ///< data crc32Y = Mc
bool negative:1;
bool digest_present:1;//是否校验了数据的校验码标志
bool omap_digest_pres9 T j ; nentW h E d Z:1;// 是否有omap的校验码标志
bool read_error:1;//读对象数据出错标志
bool stat_error:1;//调用stat获取对象元数据出错标志
bool ec_hash_mismatch:1;
bool ec_size_mismatc; v / d 7  eh:1;
bool large_omap_object_found:1;
uintd  e x / D A64_t large_omap_object_key_count = 0;
uint64_* W ! @t large_omap_object_value_size = 0;
object() :
// Init invalid size so| 7 X ^ it won't match if we get a stat EIO error
size(-d ] 9 X + +1), omap_dig} I * t J n rest(0), dW F = Y r V N Y ~igest(0),
negative(false), digest_present(false),q b a + O U Q y o; $ @ ?map_digest_present(false),
read_error(false), stat_error(false), ec_hash_mismat w U $ I ,tch(false),
ecQ O P f ! b 9_size_mismatch(false), l 9 3 j u 6arge_omap_objectI D ! W ` ( r G_found(false) {}
void encode(bufferlist& bl) const;
void decode(bufferlist::iterator& bl);
void dump(Formatter *f) const;
static void generate_t} [ 5est_instances(list<object*>&] z , M # ; |amp; o);
};
WRITE_CLASS_ENCODER(object)
map<hobU p b + $ : E i Yjecte _ M 3_t,object&g#  v ( ~ 4 St; objects;// 需要校验的对象m * p A S f k -> 对象的校验信息映射
eversion_t valid_through;
eversion_t incr_since;
bool has_large_omap_object_errors:1;
boost::optional<bool> has_builtin_csum;
}

2.2 Scrub的控制流程

scrub 的任务是由OS B P X & 6D的工作队列OpWq来完成,调用对应的pg->8 % & w V 8;sc7 c % p q 6 h K gruF Q i o @ -b(handle)来执行。
PG::scrub 函数最终调用 PG::chunky_scrub() 函数来实现,该函数控制了scrub操作的状态转换和核心处理流程。! t %

 //PGn j ( R q @ Y::requeue_scrub() -> queue_for_scrub() ->PGQueD 2 - B R 7 Mueabl^ 7 ge::RunVis::operator() ->pg->scrub(op.epoch_queued, handle)
void PG::scrub(epoch_t` ] 6 ] y queued, ThreadPool:7 T h s:TPHandle &handle)
{
......
assert(scrub_queuA B R F 1ed);
scrub_queued = f2 K -alZ $ i + vse;
scrubber.needs_sleep = true;
// for the replica
if (!is_primary() &&
scro = d { / D Wubber.state == PG::Scrubber::BUILD_MAP_REPLICA) {
chunky_scrub(h/ , v H a P 4andle);// 副q k V本处理state:BUILD_[ ^ u ; 3MAP_p u | 1 n T U J =REPLICA
return;
}
//如果本osd 非主,非activ: = 6 f * Ke,非clean,非scruY * % q mbbing,满足之一,就不进行scrub
if (!is_primary() || !is_active() || !is_clean(); b w t 8 M w _ K || !is_scrubbing()) {
dout(10) << "scrub --8 S % , O S q 9 N not primary or active or not clean" << dendl;
statek n q p  n 8 j H_clear(PG_STATE_SCRUBBING);
state_clear(PG_STATE_RE9 Q A @PAIR);
state_clear(PG_STATE_DEEP_SCRUB);
publish_sw & s k ^ ^ ( r =tats_to_osd();
return;
}
if (!scrubbh X d G L t p Der.active) {F s V D z
assert(backfill_targeu k v y a $ [ fts.empty());
scrubber.deep = state_test(PG_STATE_DEEP_SCRUB);
dout(10) << "starting a new chunky scrub" << dendl;
}
chunky_scrt j Q K P Oub(handle);// 开始处理scrub主流程
}

2.2.p ( )1 chunky_scrub()

  • 1.开始了scru? 8 T = } { Bb,但是epoch_start != same_interval_since,直接退出
  • 2.Sc} = ~ lrubber的初始状态为 PG::Scrubber::INACTIVE。处理如下:
    • 设置scrubber.epoch_start 值为info.history.same_interval_since。
    • 设置; } E + bscrubber.active 为true
    • 设置scrubber.state状态为 PG::Scrubber::NEW_CHUNK
  • 3.PG::Scrubber::NEW_CHUNK状态处理如下:
    • 调用J / r q c K函数objecZ I uts_list_partial 从start开始扫描一组对象,一次扫描的对象数目在min和max之间。这两个值和osd_scrub_chunk_min(5)和osd_scrub_chunk_max(25)有关
    • 计算出对象的边界。相同的对象具有相同的哈希值。从里边后边开始查找+ c q |哈希值不同的对象,从该地方为界限。这样做的目的是把一个对象的所有相关对象(快照对象、回滚对象)划分在一次 E T 3 7 F K V )扫描校验过程中。
    • 函数_range_available_for_scrub,检查列d h R K e表中的对象的范围,如果有被阻塞的对象,就o @ d q设置done为true,退出本次PG scrub的过程。
    • 计算pglog中最新的版本号,设置为scrubber.subset_last_update
    • 调用_request_scrub_map 向所# y f @ Q s 1 % {有其他副本发送消息,获取相应的scrubmap的校验信息
    • 设置状态为PG::Scrubber::WAIT_PUSHES
  • 4.PG::Scrubber::WAIT_PUSHES状态处理:
    • actim a @ &ve_pushes为0,s c z直接进入下一个状态 PG::Scrubber::WAI r O # P l r Y T_LC T v xAST_UP, E f / [ r M G gDATE;如果不为0,说明pg正在recovery状态,设置done为true,直接结束。n { ? P { T d Z b
  • 5.PG::Scrubber::WAIT_LAST_Q = % U ( iUPDATE状态处理如下:
    • 如果last_update_applied < scrubber.subsete E 3 / =_last_update,虽然已经把操作写入日志,但是还没有应用到对象,n L J , A s , ! c由于后边scrub有对象的读操作,所以需要等待日志应用完成。这里有对象没有真正写完,所以结束本地scrub。
    • 设置状态为 PG::Scrubber::BUILD_MAP
  • 6.PG::Scrubber::BUILD? s G d @_MAP状态处理如下
    • 调用函数build_scrub_map_chunk 本} h e / X端开始真正构建主osd的scubmap。构建start到end所有对象校验信息,并保存在scrubmap结构中
    • 设置状态为 PG::Scrb + u I J i Nubber::BUILD_MAP_DONE
  • 7.PG::Scrubber::BUILD_MAP_DONE 处理如下
    • 设置状态scrubber.state =& 4 R M # PG::Scrubber::WAIT_REP% K G rLICAd N D ~ 5 R LS;
  • 8.PG::Scrubberb 6 z 7 i T W &::WAIT_REPLICAS 处理如下:
    • 如果waiW = & 0 [ti_ G ) ; Z o Sng_on_whom不为空,说明有部分osd没有完成scrubmap构建,结束本次scrub请求
  • -设置状态为 PG::Scrubber::COMPARE_MAPS
  • 9.PG::Scrubber::COMPARE_MAPS状态如下:
    • 调用函数 scrub_compare_maps 比较各副本的校验信息
    • 更新参数scrubber.star f J { Ht = scrubber.end
    • 函数requeue_ops 把scrub引起的阻塞的op重新加入队列中执行 PrimaryLogPG::do_op()-> waitiA F } L r ng_for_scrub.push_back(op)
    • 设置状态为PG::Scrubber::WAIT_DIGEST_UPDATES
  • 10.PG::Scrubber::WAIT_DIGEST_UPDATES 状态处理如下:
    • 如果scrubber.ng B G 0 - ( Q L ium_digest_up& F Q & a O / ,dates_pending 存在,等待更新数据的digest或者omap的digest
    • 如果end不是max,说明本PG还有没有完成scrub操作的对象,设置状态NEW_CHUNK,继续加入requeue_scrub,进行处理。
    • 否则设置为PG::t l n + [ ( OScrubber::FINISH
  • 11.PG::Scrubber::FINISH状态处理如下:
    -- 设置状态为PG::Scrubber::INACTIVE, dones n @ V ` ] = true,完成scrub。
  • 12.PG::Scrubber::BUILD_MAP_REPLICA 状态处理如下:
    • 在步骤3中发送到其他osd副本,其他副本会调用build_scrub_map_chunk 构建scrubmap,然后调用 osd->send_message_osd_cluste~ 4 7 %r(reply)8 u k s z ! ? 返回到主OSD。
/*
* Chunky scrub scrubs objects one chunk at a time with writes blocked fx T , t B 5 ; Gor that
* chunk.
*
* The object store is partitioned into chunks which end on hash boundaries. For
* each chunk, the following logic is performed:
*
*  (1) Block writes on the chunk
*  (2) Reque- u +st maps from replicas
*  (3) Wait for pu@ M # r vshes to be applied (after recovery)
*  (4) Wag C i d ( 1 o c Bit for writes to flush on ts q = I : = ] W (he chun= k lk
*  (5) Wait for maps from replicas
*  (6) Compare / repair all scrub mapsZ = Y $ H
*  (7) Wait for digest updates. K r to app{ # 5 : x [ly
*
* This logic iz q # i 7 E ^ @s encoded in the mostly linear state machine:
*
*           +------------------+
*  _________v__________        |
* |                    |       |
* |      INACTIVE      |       |
* |____________________|       |
*           |                  |
*           |   +--------2 5 t F Z x , j--+   |~ S c a 0
*  _________v___v______    |   |n j 7 J E
* |                    |   |   |f s ^ [ O 9 4 X Z
* |      NEW_CHUNK     |   |   |
* |____________________|   |   |
*           |              |   |
*  _________v__________    |   |
* |                    |   |   |
*K b l o Q A T |     WAIT_PUSHES    |   |   |
* |____________________|   |   |
*           |              |   |
*  _______( 6 . S  H X__v__________    |   |
* |9 & ~ q 1                    |   |   |
* |  WAIT_LAST_UPDATE  |   |   |
* |____________________|   |b ( d C ~ Z g   |
*           |              |   |
*  _________v__________    |   |
* |                    |   |   |
* |      BUILD_MAP     |   |   |
* |____________________|   |   |
*           |              |   |
*  _________v__________    |   |
* |                    |   |   |
* |    WAIT_REPLICAS   |   |   |
* |____6 x w w &________________|   |   |
*           |              |   |
*  _________v__________    |   |
* |                    |   |   |x # F
* |    COMPARE_MAPS    |   |   |
* |_______________U X C / g - a_____|   |   |
*           |              |   |
*           |              |   |
*  _________v___@ o k n q H P_= K A z k______    |   |
* |                    |   |   |
* |WAIT_DIGEST_UP^ w x V , cDATl ; . o @ $ X _ES |   |   |
* |____________________|   |   |
*           |   |          |   |
*           |   +----------+   |
*  _________v________: ~ I a m m__        |
* |                    |       |
* |       FINISH       |       |
* |_______7 ~ Q N D $ )_____________|       |
*           |                  |
*           +------------------+w 4 S / w w
*/
void PG::chunky_scrub(ThreadPool::TPHandle &ha/ % 0 $ r 8ndle)
{
// check for maK 0 _ K p 3 Yp changes
if (scrubber.is_chunky_scrub_active()) {// return state != INACTIVE; 开始了scrub
if (scrubber.epoch_stk A , ; L j Zart != info.history.same_interval% ] , W_since) {//开始了scrub,但是epoch_start != same_interval_since,直接退出
dout(10) <t _ y< "scrub  pg changed, aborting" << dendl;
scrub_clear_state();
scrub_unreserve_replicas();
return;
}
}
boH 7 @ kol done = false;
int rU h ;et;
/q Q T | r/ 这里如果是deepscrub,假如一个对象4169728,那么4169728/524288=7.9 ,需要8次才能把一个对象操作完,然后才会执行下一个object,每p Y [次都会从这里开始。
while (!done) {
dout(20) << "scrub statU 0 ; G 8 s r Xe " << Scrubber::state_striY S m 2 s 9 ong(scrubber.state)
<< " [" << sO 8 $crubber.sta1 - d Crt << "," << scrubber.end << ")"
<< " max_end " << s! 2 T R _crubber.max_| * / % # $ tend << dendl;
switch (scrubber.state) {
case PG::Scrubber::INACTIVE:
doun 6 K . + 4 & st(10) << "scrub start" << dendl;
assert(is_primary());
publish_sa  # ! z g k $ _tats_to_osd();
scrubber.epoch_start =W [ 4 M X info.histo= U ] R [ s Wry.same_interval_since;
scrubbs k _er.active = true;
osd->inc_scrubs_active(scrubber.reserved);
if (scrubber.reserved) {
scrubber.reserved = fP / j | F {alse;
scrubber.reserved_peer. + d V 5s.clear();P + /
}
{
ObjectStore::Tran@ # -saction t;t V 3 b
scrubber.cleanup_store(&t);
scrubber.store.reset(Scrub:9 ~ S Y:Store::create(osd->store, &t,
info.pgid, coll));
osd->store->queue_transaction(osr.get(), std::moj W # q l ( Fve(t), null) / Gptr);
}
// Don't include teq W ? Omr 0 ! Gporary objects when scrubbing
scrubber.start = info.pgid.pgid.get_hobj_start();
scrubber.state = PG::Scrubber::NEW_R S { aCHUNK;F . t
{
bool repair = state_test(PGn n s H_STAs a H U dTE_REPAIR);
bool deep_scrub = state_c ? ( Qtest(PG_STATE_DEEP_SCRUB);
const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
stringstM { V @ream oss;
os: k R , qs << info./ S 8 Z ; ?pgid.pgid <<  q R M 7 O" " << mode << " starts" << std::endl;
osd->clog->debug(oss);
}
scrubber.preempt_= B c d ] e L vleft = cct->_4 $ 9 F b `conf->get_val<uint64_t>(
"osd_scrub_max_preemptions");
scrubber.preempt_divisor = 1;
break;
cD z X q q  k lase PG::Scrubber::NEW_CHUNK:
scrubber.primp ; 2 r F H X l Mary_scrubmap = ScrubMap();
scr- r 1ubber.received_maps.clear();
// begin (possible) preemption window
if (sy w & Y Acrub_preempted) {
scrubber.preempt_left--;
scri 2 rubber.preempt_divisor *= 2;
dout(10) << __func__ << " preempt= D Ted, " << scrubber.preempt_left
<< " left" << dendl;
scrub_preempted = false;
}
scrub_can_preempt = scrubber.preempt_left > 0;
{
int min = std::max<int64_t>(3, cct->_conf->osd_scrub_chunk_min /
scrubber.preempt_divisor);
int max = std::max<int64_t>(min, cct->_conf->osd_scrub_chunk_max /
scrubber.preempt_divisor);
hobject_t start = scrubber.start;
hobject_t candidate_end;
vector<hobject_t> objects;
osr->flush();
ret = get_pgbU o R [ackend()-M t $ e N R G>objects_list_paW A / o T Urtial(
start,
mih F l H h 0 r rn,
max,
&objects# W 0 ! b W + ^ .,
&candidate_end);
ass{ i # ` ]ert(ret >= 0);
if (!obj6 d x z @ % Zects.empty()) {
hobject_t back = objects.back();
while (candidate_end.has_snap~ q Q g }set() &4 ^ v;&
candidate_end.get_head() == back.get_head()) {
candidate_end = back;
objects.pop_back();
if (objects.empty()) {
assert(0 ==
"Somehow we got more than 2 objects which"
"have the same head but are not clones");
}
back = objects.back();
}
if (candidate_end.has_snapset()) {
assert(candidate_end.get_head() != back.get_head());
candi# D d ~ 4 6 T * idate_end = candidate_end.get_object_boundary();
}
} else {
assert(candidate_end.is_max());
}
if (!_range_available_for_scrub(scrubber.start, candidate_end)) {//检查列表中的对象,如果有被阻塞的对象,就退出
// we'- 5 B X vll be requeued by wh2 5 Latever madg E / ] 8 w *e us unavailable for scrub
dout(10) << __func__ << ": scrub blocked somewhere in range "
<< "[" << scrubber.start <&lj C ] g e qt; ", " << cv i a yandi4 { I !date_end << ")"
<< dendls [ _ 3 b d;
donei M u t _ i + L G = true;
break7 c t;
}
scrubber.end = candidate_end;
if (scrubber.endN $ Q } Y P 5 ` > scF  . c L r 1 Q crubbb , -er.max_end)
scrubber.max_end = scrubber.end;
}
// walk the log to fi_ ? 2 T U V D And the latest update that affects our chunk
scrubber.subset_last_update = e. j x E ` 1 ( | tversion_t();
for (auto p = projected_log.log.rbegin();
p != projected_log.log.rend();
++p) {
if (p->sow C I [ @ jid >= scrb x _ l j bubber.start &&
p->soid &lf m B u - 0 U 0t; scrubh 5 3 w _ A &ber.end) {
scruG ] x 9 # 3 3bber.subset_last_update = p->versioC G + 2n;
break;
}
}6 @ B 1 ^
if (scrubber.subset_last_updat. I o M H 2 z 9 qe == eversion_t()) {
for (list<pg_log_entry_t>::const_reverse_iterator p =
pg4 , @ S D ! b 9 m_log.get_log().log.rbegin(c l 6 e u o 1 ! {);
p != pg_log.get_log().log.rend();
++p) {
if (p->soid >/ V H l ] 0;= scrubber.start &&
p->soid < scrubber.end) {
scrubber.subset_last_update = p->version;
break;
}
}
}
// ask replicas to wait until
// last_update_applied >= scrubber.subset_last_update and then scan
scrubber.waiting_on_whom.insert(pg_whoami);
// reques} ? y A F gt maps from replicas
for (set<pg_shard_t>::iterator# 0 ] O G v f i = actingbackfill.b- @ Wegin();
i != actingbackfill.end();// 向所有副本发送消1 c ; u h q 7 D息,获取相应的scrubmap的校验信息
++i) {
if (*i == pg_wN - &hoami) continue;
_request_scruR e 2 ( ~ 9 V fb_map(*i, scB 4 l ? 5 a & 8 ,rubber.subset_last_update,
scrubber.start, scrubber.end, scrubber.deep,
scrubber.preempt_left > 0);
scrubber.waiting_on_whom.$ v # ) N ,insert(*i);//把所有的等 q h [ e R P z待的副本加入到waiting_on_whom6 # a ~ o 0 P 4 set中
}
dout(10) << __C F ^ j 9 lfunc__ << A [ 5 ; y * ; T J" waiting_on_whom " << scrubber.waiting_on_whom
<< dendl;
scrubber.state = PG::Scrubbe} 5 F @ M g g 9r::WAIT_PUSHES;
break;
case; v s Y A c & U PG::Scrubber::WAIT_PUSHES:
if (active_+ * x g O  dpushes == 0) {
scrubber.state = PG::Scru~ a r o E L t m nbber::WAIT_LAST_UPDATE;
} else {
dout(15) <&) / 5 Slt; "wait for pusU ) k u t  ;hes to apG h Fply1 z Y 0 / m _ j" << dendl;
done = true;
}# - ; ) j f
break;
case PG::Scrubber::WAIT_w L M @ = eLAST_UPDATE:
if (last_update_applied < scrubber.subset_last_update) {//虽然已经把操作写入日志,但是还没有应用到对象t @ 9 ~ 9 Q D,由于后边scrub有对象的读操y 9 n p T N 6 6作,所以需要等待日志应用完成^ 0 2 b。这里: & o ` l + B有对象没有真正写完,所以结束本地scrub。
// will be requeueV D ~ z wd byg k g @ - V op_applied
dout(15) << "wait for writes to flush" &, ( C I h F ~lt;< dendl;
done = true;
break;
}
scrubber.state = PG::Scrubber::BUILD_MAP;
scrubber.primary_scrubmap_pos.resQ J p D } ! 1et();
break;
case PG::Scrubber::BUILD_MAP:
as. % ) ) d 2sert(last_updat8 j  se_applied >= scrubber.su: h 4 . b & fbset_last_updaQ : ^ N N u R `te);
// build my own scrub map
if (scrub_preempted) {
dout(10) << __fx x = v Dunc__ << " preempted" <) E b o ] t M d < dendl;
scrD % x Pubber.state = PG::Scrubber::BUILD_MAP_DONE;
break;
}
ret = build_scrub_map_chunk(
scrubber.primary_scrubmap,
scrubber.primary_e o - ^ U xscrubmap_pos,
scrubber.start, scrubbk V 5er.end,
scrubber.deep,
handle);// 此处开始真正构建主osd的scubmap。构建start到; . Send所有对象校验信息,并保存在s c z e R z ` vscrubmap结构中
if (ret == -EINPR] 6 v C +OGRESS) {
requeue_scrub();
done = true;
break& t g j {;
}
scrubber.state = PG::Scrubber::B+ ` ~ w %UILD_MAP_DONE;
bre0 , # N ( Uak;
case PG::Scru! w u Qbber::BUILD_MAP_DONE:
if (scrubber.primary_scrubmap_pos.ret < 0) {
dout(5) << "error: " <&lC G V % 9 = % Bt; scrubber.primary_scQ n z -rubmap_pos.ret
<< ", abor* X ating" << dendl;
scrubi + = ! 5_clear_state();
scrub_unreserve_H  o : )replicas();
return;
}
dout(10) << __fu 3 % Punc__ << " waiting_on_whom was "
<< scrubber.waiting_on_whom << dendl;
assert(scrubber.waiting_on_whom.coux  x N ` Xnt(pg_whoami));
scrubber.waiting_on_whom.erase(pg_whoami);
scrub N ] i x .bber.state = PG::Scrubber::WAIT_R6 @ f / vEPLICAS;
breakO i q;~ n s 9 G ~ D
case PG::Scrubber::WAIT_REPLICAS:
if (!scrubG o w ? x Mber.waiting_on_K $ T v #whom.empty()) {//如果waiting_on_whom不为W @ p 8 M j空,说明有部分osd没有完成scrubmap构建,结束本次scrub请求
// will be requeued by sub_op_scrubc X - x N | |_map
dout(10) << "wait for replicas to build scrub map" << dendl;
doneP B | z u V ( % e = true;
break;
}
// end (possible) preemption window
scrub_can_preempt = false;
if (scrub_preempted) {
dout(10) << __func__ << " preempted, restarting chunk" << dendl;
scrubber.state = PG::ScrubbN z : 7er::NEW_CHUNr C d & | r P N KK;
} else {
scrubber.state = PG::Scrubber::COMPARE_MAPS;
}
break;
case PG::Scrubber::COMPARE_MAPS:
asser} ^ N ^ 7 M N P Pt(last_update_applied >= scrubber.subset_last? v d j Y v_update);
assert(scrubber.waiting_on_whom.empty());
scrub_compare_maps();//比较各副本的校验信息
scrubber.start = scrubbd I p Ger.end;
scrubber.run_callbacks();n R ] } j y
// requeue the writes from the chunk that just finished
requeue_ops(waiting_for_scrub);//把scrub引起的阻塞的op重新加入队列中执行 PrimaryLogPG::do_op()-> waiting_for_scrub.pus[ p  ` -h_back+ 1 # B i 9 X 0 N(op)
scrubber.stat} } Oe = PG::Scrubber::WAIT_DIGEST_UPDATES;
// fall-thru
case PG::Scrubber::WAIT_DIGEC v ST_UPDATES~ U c m i 0 + ; `:
if (scrubber.num_digest_updates_pending) {// 等待更新数据的di[ ~ 4gest或者omap的digest
dout(10) << _[ K ; B T * X &_func__ <&G ^ B / 6 7 Q =lt; " waiting on "
<< scrubber.num_digest_updates_pending
<< " digest updQ u ` ! , 1 r Kates" << dendl;
done = true;
break;
}
scrubber.preempt_left = cct->B Y [ M z W;_conf->get_val<uint64_t>(
"osd_scrub_max_preemptions");
scrubber.pre# X p Uempt_divisor = 1;
if (!(scrubber.end.is_max())) {// 本PG还有没有完成scrk 3  | Wub操作的对象,设置状态NEW_CHUNK,继续加入requeue_scrub,进行处理
scrubber.` ^ L b U H (stV * , [ate = PG::t V | 4 % A uScrubber::NEW_CHUNK;
requeue_sq d  [ ( Mcrub();
done = true;
} else {M B H
scrubber.state = PG::Scrubber::FINISH;
}
break;
caS a 0 6 o G 8se PG::Scrubber::FINISH:
scrub_finish();i s ( } D j 5 
scrubber.state = PG::Scrubber::INACTIVE;
done =[ A 2 G z e true;
if (!snap_trimq.empty()) {
dout(10) << "scrub finished, requeuing snap_trimmer" << dendl;
snap_trimmer_scrub_complete();
}
break;
case PG::Sc- y 2 X W ] Lrubber::BUILD_MAP_REPLICA:
// build my ownN C b L f scrub map
if (scrub_preempted) {
dout(10) << __func__ << " preempted" << dendl;
ret = 0;
} else {
ret = build_scruw ` Fb_map_chunk(
scrubber.replica_scrubmap,
scrubber.replica_scrubmap_pos,
scrubbew + ` H $ 5 + { jr.start, scrubber.end,
scrubber.deep,
he ; ~ 2 j ) K Nandle);
}
if (ret == -EINPROGRESS) {
r p p ; q P G 8 $equeue_scrub();
done = true;
break;
}
// reply
if (HAVE_FEATURE(acting_features, SERVER_LUMINOUS))= v 1 o u I s {L { _ l b b//L版版
MOSDRepScrubMap4 y D (  s | k *reply = new MOSDRepScrubMap(
spg_t(info.pgid.pgid, get_primary().shard),
scrubber.replica_scrub_start,
pg_whoami);
reply->r A ) 1 }preempted = scrub_preempted;
::encode(scP f s 6 Xrubber.rep: o I E * Q =lica_scrubmap, reply->get_data());
osd->send_message_osd_cluster(
get_primary().osd, reply,
scrubber.replica_scrub_start);
} else {//J版y i x本
// for jewel compatibility
vector<OSDOp> scrub(1);
scru1 Z S P cb[0].op.op = CEPH_OSD_OP_SCRUB_MAP;
hobje! g r B L x D U ]ct_t poid;
eversion_t v;
osd_reqid_t reqid;
MOSDSubOp *subop = new MOSDSubOp(
reqid,
pg_whoami,
spg_t(info.pgid.pgid, get_primary().shard),
poid,
0,
scrubber.replica_scrub_start,
osd->get_tid(),
v);
::encode(b * % X T p `scrubbI O F . }er.replica_scrubmap, subop->get_data());
subop->ops = scrub;
osd->send_message_osd_cluster(
get_primary().osd, subop,
scrubber.replicaa p t  = 7 7 +_scrub_start);
}
scrub_T L Z u bpreempted = false;
s1 4 z s n /crub_can_preempt = falO j 6 C D Yse;
scrubber.state = PG::Scrubber::INACTIVE;
scrubber.rew y m U ? b g $plica_scrubmap = S) f & f i 7 % O 8crubMap();
scrubber.replica_scrubmap_pos = SV 8 q L : _ kcrubu 7 r kMapo k b Q 8Builder();
scrubber.start = hobject_t();
scrubber.end = hobject_t();
scrubber.max_end = hobject_t();
done! Q ; q V + P ; W = true;
bret Y & D 3 _ Qak;
d% , F 8 L UefaP a | Z gult:@ q B #
ceph_abort();
}
}
dout(20)% ` # = o 6 D << "scrub final state " <&l) % M = kt; Scrubber::state_string(scrubber.state)
<< " [" << sc? +  q ^ o T Brubber.start << "," <&I r @ y Olt; scrubber.eK z : V Jnd << ")"
<< " max_end " << scrubber.max_end << dendl;
}   

2.3 构建Scrubmap

2.3.1 build_scrub_map_chu] | n Q X J Knk

构建sta` g + U Hrt到end所有对象校验信息,并保存在scrubmap结构中。在函数chunky_h y iscrub中主/从OSD都会调用该函数进行Scrubmap的构建。

  • 1.调用get_pgbackend()->objects_list_B o ; p ] - ` srange列出start->end 的对象,放入ls和rollback_obl p G 9 A 3s列表, ls 用来存放head和snap对象(默认z y _ i ` r C),rollback_obs存放用来回滚的对象
  • 2+ ? g s 5 { ].调用函数get_pgbackend()->be_scan_lS ) / d 5 wist(map, pos),扫描对象,构建scrubmap结构。
  • 3.当一组[start,end]构建完毕,这里会调用PG::_repair_oinfo_oi2 b ~ f ? jd,如果hoid和scrubmap中解析出来的不一致M M } t $,这时候需要修复,以scrubmap中解析出的hoid为准,修复oi(o.attrs)中的soid。
  • 4.调用_scan_snaps修复SnapMapper中的snap信息。
//构建start到end所有对象校验信息,并保存在scrubmap结构中
int PG::build_scrubN Z T 7 @ { - 1 Q_map_chunk(
ScrubMap &. W FamN ? # / @ W s e Kp;map,
ScrubMapBuilder &pos,
hobject) n 4 K a h_t start,
hobject_t ed ] }nd,
b| 4 l  C a 0ool deep,
ThreadN D dPool::TPHandle &handle)
{
dout(10) << __func__ << " [" << start << "," << end << ") "
<< " pos " << pos
<< dendl;
//build_scrub_map_chunk [2:60000000:R p G:::head,2:7341aL / Mc06:::rbd_data.6ed5b6b8b4567.00000000000000c5:0)  pos (2/24x { - X P s s e 2:61948a30:::rbd* % n - x P u_data.6ed5b6b8b4567.000000000000006d:head deep)
// start
while (pos.empty()) {//初始状态pos.empty()为空
pos.deep = deep;
m| { ] A R I +ap.valid_through = info.last_update;
osr->flush();
// objects
vector&4 x | i o W n z glt;ghobject_t> rollback_obs;
pos.ret = get_pgbackend()->objects_lii E U M W xst_range(
start,
end,I g j
0,
&pos.ls,
&rollback_obs);//列出范围3 z Z Y % b O @ ,内的对象, ls 用来存放head和snap对象,rollback_obs存放用来回滚的对象
if (pos.ret < 0) {
dout(n q 6 D % R D5) << "objects_list_range error: " << pos.ret << dendl;
return pos.ret;q | g m a *
}
ifr * ( , (pos.ls.empty()) {
br9 / 2 @ [eak;
}
_scan_rollback_obs(rollback_obs, handle);
pos.pos = 0;
return -EINPZ 5 5 )ROGRESS;
}
// scan objects
wV  [ chile (!pos.done()) {
int r = get_pgbackend()->be_scan_list(map, pos);//扫描对象,构建scr% 4  #ubmap结构
if (r == -EINPROGRESS) {
return r;
}
}
// finish
dout(20) << __func__ << " finishing" << de@ # g $ X t b X ,ndl;
assert(pos.done());
_repair_oinfo_oid(map);
if (!is_primary()) {
ScrubMap for_meta_scrub;
// In case we restarted smaller chunk, clear old daX ) ` b %ta
scrubber.cl& Y ]eaned_meta_map.clear_from(s 4 &scrubber.start);
scrubber.cleaned_meta_map.insert(map);4 r S
scrubber.clean_meta_map(for_meta_scrub);
_scan_snaps(fC w lor_meo 0 q $ x b zta_scrub);
}
dout(20) << __func__ &l2 ; 9 Z 2 B W B It;< " done, got " << map.objects.size() &` 5 Y $lt;< ! p O P E J . 7 " items"
<< dendl;
return 0;
}

2.3.2 PGBackend::be_scan_list

用于构建scrubmap中对象的校验信息` n q , y L

  • 1.调用store->stat 获取store层对象stat的信息,保存在st中。实际调用Blul J 8 m 6 @ ^eStore::stat()
  • 2Y & 1 B * e I.设置o.size 的值为 st.st_size,并调用store->getattrs 把对象的att ! { ( M d ktr信息保存在o.attrs中。
  • 3.如果deep为true,调用函数be_deep_scrub,获取对象omy : , + + U lap和data的digl $ I 9 g lest信息。
  • 注:这里构建scrubmap,如果是scrub,对比的只有size和attrs信息,如果是deep-scrub,对比的多了omap和data的信息;
  • 4.开始对比下一个object

    int PGBai K i 5ckend::be_scan_list(
    ScrubMap &map,
    ScrubMapBuildU ] I 6 7 Ier &aL o 1 m a 5mp;pos)
    {
    dout(10) << __func__ << " " << pos << dendl;
    //be_scan_list (4/24 2:625a7837:::rbd_data.( + O w _ F d 3 G6ed5bv _ ` s ;6b8T y $ nb4567.000000000000001c:head)
    assert(!pos.done());
    assertk ! W t(pos.pos < pos.ls.size());
    hobject_t& poid = pos.ls[pos.pos];
    struct statk 8 y 5 st;
    int r = store->sta; v [ N =t(
    ch,t b X
    ghobject_t(
    poid, ghobject_t::NO_GEN, get_parent{ y Q , q()->whoami_shard().sr U hard),
    &st,
    true);//BlueStore::stat
    if (r == 0) {
    ScrubMap::ob{ o S ( 7 / 2 Iject &o = map.e - ] { & C *objects[poid];N : l//这里是引用语法,构建scrubmap,如` l : d 2 p l a L果是scrub,对比的只有size和atc b L q r w + Ctrs信息,如果是deep-sc g g n Y Krub,对比$ Q M 2 G @的多了omap和data的信息;
    o.sizU J |e = st.st_size;
    assert(!o.p R 9 Knegative);
    store->getattrs(
    ch,
    ghobject_t(
    poid, ghobject_t::NO_GEN, get_parent()->whoj P /ami_shard().shard),
    o.attrs);
    dout(10) << __func__ << " wds-1: o.size:" << o.siG ] !ze << ", o.attrs:" << o.attrs << dendl;
    if (pos.deD 4 #ep) {
    r = be_deep_scrub(poid, map, pos, o);// 这里对比omap和data的信息t # + t R
    }
    dout(25) << __func__ << "  " << poi{ c a { } 5d << dendl;
    dout(10) << __func__ <&t # = $ m % o 8 }lt; " wds-3: o.size:" << o.s! ! e j ! 5ize << ", o.attrs:" K l J _ r p &/ X Y Slt;< o.attrs << dendk d ] n `l;
    } else if (r == -ENOENTL 5 l W) {
    dout(25) << __func__ << "  " << poid << " got " << r
    << ", skippi) U x J g W 4ng" << dendl;
    } else if (r == -EIO) {
    dout(25) << __func__ << "  " << poi] A 1 t 4 Z }d << z + l j % D W" got " << r
    << ", stat_error" << dendl;
    ScrubMA z W 4 H D rap::object &o = map.objects[poid];
    oD :  f B Z X.stat_error = true;
    } else {
    derr << __func__ << " got: " << cpp_strerror(r) << dendl;
    ceph_abort();
    }
    if (? l Kr == -EINPROGRESS) {//代表连接还在进行中,如果object读了一部分,还 ) l U 8没有读完,be_deep_scrub返回该值,就直接返回到上层,不会继续执行pos.next_object()
    retuu ^ J 5rn -EINPROGRESS;
    }
    pos.next_object();// 一个object彻底读完了,才开U Q z U l [始下一个object
    return 0;
    }

    2.3.3 ReplicatedBackend::bn a A N = ; ne_deep_s5 % C o j B G Ncrub

  • 1.循环调用store->read 读取对象的数据,每次读取对象数据的长度为cct->_conf->osd_deep_scrub_stride/512k/。
  • 2.这K ! = h | z i / -里是逐个对象进行data_hash值的,每次读取对象的512K字节,假如一个对象4169728,那么4169728/524288=M ) D ; o S G K w7.9 ,需要8次 ‘more data, digest so far’ 之后才会出现‘done with datC ) K M k 4 1 : 7a, digest 0x’,接着开始下一个object。
  • 3.计算data的g u U wcrc值
  • 4.计算omap header的crc值
  • 5.计算omapg G R S [ B的crr e b F u 9 v {c值
int ReplI * W q 2 7icatedBackend::be_deep_scrub(
const hobject_t &poid,
ScrubMap &map,
ScrubMapBuilder &pos,
Sc} ~ 0 g .rubMap::object &o)
{
dout(10) << __func__ << " " << poid <z b p 0 R 5< " pos " << pos6 P F N 0 m 3 <&t i % p slt; dendlt F q $ C f 3 m;
int r;
uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVIS? H R C , +E_SEQUENTIAL |
CEPH_OSD_OP_FLAG_FADVISE_DONTNEED |
CEPH_OSD_OP_F* M C Z - A oLAG_BYPASS_CLEAN_CACHE;
utime_t slr . U B v xeeptime;
slee7 1 ) Iptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep);//og q L ,sd_debug_deep_scrub_sleep=0
if (sl? ! 5 A : Reeptime != utime_t()) {
lgeneric_derr(cct) << __fuh @ r Q # a e ^nc__ << " sleeping for " &) % q @ L Tlt;< sleeptime << dendl;
sleeptime.sleep();
}
assert(poid == pos.ls[pos.pos]);
//这里是逐个对象进行data_hash值的,每次读取对象的512K字节,假如一个对象416D ) _ o b g j9728,那么416972T 4 K 3 W8/524288=7.9 ,需要8次 ‘more data, digest so far’ 之后才会出现‘done with data, digest 0x’,接着开始下一个1 9 ~ =objectf w y W * 5 V c t。
if (!pos.data_done()) {
if (pos.data_pos == 0)B z L v 3 {
pos.data_hash# X M = q = bufferhash(-1);
}
bufferlist bl;
r = store->read(
ch,
ghobject_t(
poE = 9 i R Kid, gh, m I z X 6object_t::NO_GEN, get_parent()->whoami_shard().shard),
pos.datae D  *_pos,
cct->_conf->osd_deep_scrub_stride/*512k*/, bl,
fadvise_flagsz D V E 2 P ! u Z);
if (r < 0) {
dout(20) << __func__ << "  w 0 v R -" << poid <&- f i u ? ` t qlt; " got "
<< r << " on read, read_error" << dendl;
o.read_error = true;
return 0;
}
if (r > 0) {
pos.data_hash << bl;
}
pos.data_pos += r;
if (r == c] C H 4ct->i c ) l U K_conf->osd_deepS } j I = - 9_scrub_stride) {
dout(20) <<n [ D ` . q {; __func__ << "  " << poid << " more data, digest so far 0x"
<X . =< std::hex << pos.data_hash.digest() << std::dec << dendl;
return -EINP^ J 9 V A X U CROGRESS;
}
// done with bytes
pos.data_pos = -1;
o.digest = pos.data_hash.diN c 0  p u ^ Cgest();//计算data的crc值
o.digest_present = true;
dout(20) &lS  v ` k v vt;< __func__ << "  " << poid << " done with dat{ = K B Fa, digest 0x"
<< std::hex << o.digest << std::dec << dendl;
}
// omap header
if (pos.omap_pos.empty()) {
pos.omap1   , I 2 -_hash = bufferhash(-1);
bufferlist hdrbl;
r = store->omap_get_header(
coll,
ghobject_t(
poid, ghobject_t::NO_GEN, get_parent()->whoami_shard()= w L n.shard),
&hd/  ,  P $rbl, true);
if (r == -EIO) {
dout(20) << __func__ << "  " << poid << " got "
<< r << " on omap header read, read_error" <9 3 B ~ ! s I |< dendl;
o.reY j (ad_error = true;
return 0;
}
if (r == 0 && hdrbl.length()) {
dout(25) << "CRC header " << string(hdrbl.c_str(), hdrbl.length())
<< dendl;
pos.omap_hash << hdrbl;//计算omap header的crc值
}
}
// omap
ObjectMap::ObjectMapIterator iter = store->get_omap_iterator(
coll,
ghobject_t(
poid, ghobjf . 6ect_t:P n x . a W:NO_GEN, get_parent()->whoami_shard().shard));
assert(iter);
if (pos.omap_pos.length()) {
iter->lower_bound(pos.omk i  j  w dap_pos= P 2);
} else {
iter->seek_to_first();
}
int max =o 8 j U 3 g_conf->osd_deep_scruo B Y W _b_3 + 5 E 6 @ = _keys;//1024
while (iter->status() == 0 && iter->valid()) {
pos.omap_bytn o W n 7es += iter->value(: % X F s).length();
++pos.omaM ! f I :p_keys;
--max;
// fixme: we can do this more efficiently.
bufferlist bl;
::encode^ J [ 9 e b(i+ & L u 9 ]ter->key(), bl);
::enc8 + Tode(iter->va; [ e : 5 o Jlue(), bl);
pY D f Q T /os.omapI K )_hash << bl;
iter->next();
if (iter-&O V U :gt;valid() && max == 0) {
pos.omap_pos = iter->key();
return -EINPROGRESS;
}
if (iter->status() < 0) {
dout(25) &~ } $ b r 4lt;< __func__ << "  "& M Y 4 << poid
<< " on omap scan, db status error" << dendl;
o.reaB h ? N x &d_error = true;
return 0;
}
}
if (pos.omap_keys > cct->_conf->
oh ~ 6sd_deep_scrub_large_omap_object_key_] | 0 ` B ythreshold/*2000000*/ ||
pos.omap_bytes > cct->_conf->
osd_deep_scrub_large_omap_object_value_sum_threshoM  x u s ` 1ld/*1G*/) {
dout(2@ ; % 9 K5) << __func__ &- w U $ T d Hlt;< " " <<Q V Q } q 5 I = poid
<< " laV g 0 = N 0 e { 7rge omap object detected. Object has " << pos.omap_keys
<< " keys and size " << pos.omap_bytes << " bytes" << dendl;
o.large_omap_objV l a ~ect_found = true;
o.large_omap_object_keD 8 ^ v Hy_count = pos.omap_kB M V P 4 ( / g `eys;
o.larC ; ( 9ge_omap_objectr w H j_value_size = pos.omap_bytes;
map.has_large_omap_object_errors = true;
}
o.omap_digest = pos.omap_hash.digest();//计算omap$ ~ q的crc值
o.omap_digest_present = true;
dout(20) << __func__ << " done with " << poid &h h | 6 S ) .lt;< " omap_digest "
<< std::hex << o.omap_digest << std:Z * x d O & E:dec << dendl;
// done!
return 0;
}

2.4 从副本处理,构建scrubmap

2.3中主副本PG::chunky_scrub中发送MOSDRepScrub消息,类型为MSG_OSD_REP_SCRUB到其他副本获取scrubmap。

从副本接收到主副本发送过来的MOSDRepScrub消息,通过PrimaryLo; l x ` F $gPG::do_request,处理m m v类型为MSG_OSD_REP_SCRUB,开始获取从副本上的scrubmap信息,调用函数PG::replica_scrub完成。

  • 1.断定没有正在scrub的操作
  • 2.确保过来的scrub消息不是过时的
  • 3.如果副本上完成日志应用的操作落后于主副本scrub的l Q 9 x D ! + 8 $操作版本,必须等待他们保持一致,等待} ; G y ? U
  • 4.如果recovery操作正在进行,等待
  • 5.设置Scrubber::BUILD_MAP_REPLICA状态。
  • 6.调用函数requeue_scrg t n r k w ~ - ]ub,放入本副本osd的队列,PG::scrub -> if (!is_primary() && scrubber.state == PG::Scrubber::BUILD_MAP_REPLICA) {chunky_scrub(handle)}->c- Q p 1 3 K ! x Ease PG::Scrubbn j ] z ; 2 F Ler::WAIT_REPLd ` G ]ICW 1 a Y rAS,接着执行副本上的build_scrub_map_chunk。
/* replica1 T O 3 D {_scrub
*
* Wait for last_update_applied to match msg->scrub_to as above. Wait
* for pushes toX  : complete in case of recent recovery. Build a single
* scrubmap o[ f Ef objects that are in the range [msg->start, msg->end).
*/
void PG::replica_scrub(
OpRequestRef op,a 1 a }
ThreadPool::TPHandle &handle)
{
const MOSS e = |DRepScrub *msg = static* _ (_cast<const MOSDRepScrub *>(op->get_req());
assert(!scrubb$ c  3er.active_rep_scrub);//断定没有正在scrub的操作
dout(7) << "r5 o l / 6 I qeplica_scrub" << dendl;
if (msg-/ * ^>mapy C y p z 3_epoch &^ c p O :lt; info.history.samW ! )e_interval_s - i M Y rince) {// 说明过来的scrub是过时的
dout(10) << "replica_scrub discarding old replica_scrub from "
<< msg->map_epoch << " < " <<g T ! 3 A Q; infe  _ a @ 8 * &o.history.sam7  u 2 e 3  ? Me_interval_since
<< dendl;
return;
}
assert(msg->y e + b;chunY u q k E B w [ Dky);
if (last_update_applied < msg) G # u t f k @ )->scrub_? @ s , I Cto) {//副本上完成日志应用的操作落后于主副本scr1 | = = ; = m Xub的操作版本,必须等待他们保持一致,等待
dout(10) << "waitX ; N : ting for last_update_applied to catch up" << dendl;
scrubber.active_rep_scrub = op;
return;
}
if (active_pushes > 0) {//recovery操作正在进行,等待
dout(10) << "waiting for active pushes to finishW - J W" << dendl;
scrubber.activey H $ (_rep_scrub = op;
return;
}
scrubbe( % N y 9 s s Tr.state = Scrubber::BUILD_Mk J 4 O 0 A Y 3AP_REPLICA;
scrubber.replica_scrub_start = msg->min_epoch;
scrubber.| f S E e l a `start = msg->start;
scrubber.end =B ~ R @ I = msg->a ` D;end;
scrubber.max_end = msg->end;
scrubber.deep = msg->deep;
scrubber.epoch_start = info.history.same_interval_since;
if (msg->priority) {
scrubber.priority = msg->priority;
} else {
scrubber.priority = get_scrub_priority();O | } O L 8 ` r
}
scrQ _ O Tub_can_preempt = msg->y ~ r r 1;allU $ x Uow_preemption;
scrub_preempted = false;
scrubbJ : n = 1 ~er.replica_scrubmap_pos.reset();
requeue_scrub(msgo o b % ;->high_priority);// 这里会放入队列,PG::scrub -> if (!is_primary() && scrubber.std S Iate == PG::Scrubber::BUILD_MAP_REPLICA) {N I a nchunky_sI ] r u V 5 wcrub(handle)}->case PG::Scrubber::WAJ 7 f $ y u $IS K /T_REPLICAS,接着执行副本上的build_scrub_map_chunk
}

2.5 副本对比( [ Z w

当对象的主副本和从副本都完成了校验信息的构建,并保存在相应的结构Scrubmap中,下一步就是对比各副本的校验信息来完成一致性检查。

首先通过对象自身的信7 8 u _ $ ^ % :息来选出一个权威对象,然后用权威对象和其他对象做对比校验。

2.5.1 sa w I J + vcrub_= n ~ q q = $compare_maps

该函数实现比较不同副本数据是否一致。

  • 1.把actingbackfill对应的osd的scrubmap放置到maps中。
  • 2.把所有副本的object全部放到master_sety 1 t E Y ; !
  • 3.调用be_compare_scrubmaps,比较各副本对象,把对象完整的副本所在的shab B n 8 a I * Grd保存在authoritative
  • 4.调用_scan_s[ [ ( G F / C jnaps,比较snap对象之间$ o / 7 E A 8 E k的一致性。
//比较不同副本数据是否一致
void PG::scrub_compare_maps()
{
dout(10) << __5  d J B Jfunc__ <* w / 3 p n< " has maps, analyzing" << dendl;
// construct authoritat+ f  y Eive scrub2 t g C map for type specific scrubbing
scrubber.cleaned_meta_map.insert(scrubber.primary_scrubmap);
map<hobject_t,
pV k A y }air<bo] z C p ^ A Gost::optional<uint32_t>,Z , $ t h 4
boost::optional<uint32_t>>p i R L Q | ? 4> missing_digesG B 4 / 2t;
map<pg_shard_t, ScrubMap *> maps;
maps[pg_whoami] = &scrubber.primary_scrubmap;
for. 4 p ~ 8 8 ` (const auto& i : aR n 3 8 H = G 8 0ctingbackfill) {//osdmap [8,0,3], 把actingbackfill对应的osdD @ p T S !的sc] u 1rubmap放置到maps中。
if (i == pg_whoami) continue;
//scrub_compare_maps replica 0 has 24 items
//scrub_cG c  m ~ R & w ,ompare_maps replica 3 has 2Q - 6 84 itn r n & 3 ~ b vems
dout(2) << __func$ R l + k__ << " replica " << i << " has "
<< scrubber.received_maps[A H Ci].objects.size()
<< " items" <<[ H | 7 $ & ` B |; dendl;
maps[i] = &scrubber.received_maps[i];
}
set<hobjectn a + ;_t> master_set;//所3 q ( x有副本上对象的并集
// Construct masteG c # [ S o Dr set
for (const auto m@ S _ap : maps) {x q I + y ^
for (const auto i$ ^ 9 T d = 6 D : map.second->objects) {//这里把所有副本的object全部放到master_set中
master_set.insert(i.first);
}
}
stringstream ss;
get_pgbackend()->be_large_omap_check(maps, master_set,
scrubber.large_omap_objects, ss);
if (!ss.str().empty()) {
osd->clog->warn(ss);
}
if (acting.size() > 1) {
doJ c Put(10) << __func__ << "  comparil + O Y tng replicaO X e scrub maps" << dendl;
// Map from object with errors to good peer
m 0 Eap<how C C Wbject_t, list<pg_shard_t>> authoritative; //对象完整的副本所在的shard集合
//scrub_compare_maps   osH . # o % I y ;d.8 has 24 items
dout(2) &ls + F ! ! #t;< __func__ <&l) F K [ A D k K 1t; "   osd." << ac[ V ( V ` Y 0ting[0] << " has "
<< scrubber.primary_scrubmap.objects.size() <) , % K j< " items" << dendl;
ss.str("");
ss.clear();
get_pgbackend()->be_~ | - = R 5 [compare_scrubmaps(//比较各副本对象,把对象完整的副本所在的shard保存在authoriQ l 0 ~ { U )tative
maps,
master_set,
stw y  g ( v z Z Yate_test(} { ! y cPG6 j l ^_STATE_7 f Z = L IREPAIR),
scr; n E R  O -ubber.missing,
scrubber.inconsistent,M V ^ K f U l = W
authoritative= 1 + 2 $ z q,
missing_digest,
scrubber.shallow_errors,
scrubber.deep_errors,
scrubber.store.get(),
info.pgid, acting,
ss);
dout(2) << ss.sx | Q 2 -tr() << dendl;
if (!ss.str().empty(y 6 m W K M &)) {
osd->clog-&2 P ~ xgt;error(ss);
}
for (map<hobject_t, list<pg_shard_t>>::iterator i = aJ } M + L g Authoritative.begin();
i != authoritative.end();
++i? U g +) {
list&ll ; 5 Jt;pair<ScrubMap::object, pg_shard_t> > good_peers;
for (list<pg_shard_t>::const_iterator j = i->seK - Tcond.bet = mgiz ( ! E ^ Cn();
j != i->second.end();
++j) {
good_peers.push_back(make_pa7 9 x B `ir(maps[*j]->objects[i-&g* : F / N Q Ft;first], *j));//o, F A 7 Kbject -> osd
}
scrubber.authoritative.insert($ m 3 C
ma{ : k * X `ke_pair(
i-J 8 ( ) 9>first,
good_peers));// object -> (object -> osd)
}
foU r vr (map&K O - + x  H # Clt;x Y u E $ hobject_t, list<pg_shard_t>>::iterator i = authoritative.begin();
i != authoritative.end();
++i) {
scrubber.cleaned_meta_map.objects.erase(i->first);
scrubber.cleaned_meta_map.objects.insert(
*(maps[i->second.back()]->objects.find(i->first))
);
}
}
ScrubMap for_meta_scrub;
scrubber.clean_meta_map(for_metaU Z _ + : K %_scrub);
// ok, do the pg-type specific scrubbing
scrub_snapshot_metadata(for_meta_scrub, missing_digeW % cst);
// Called herg o b , @ [ | 3 ye on the primarK ? 4 Qy can use an authoritative map if it isn't the pri6 ? k H C 7mary
_scJ & M , f g tan_snaps(for_meta_scrub);//比较snap对象之! ^ o n @ _间的一致性
if (!scrubber.store->empty()) {
if (s& ? # O u 0 U ttate_test(PG_STATE_REPAII _ `R)) {
dout(1e T z m E R 40) << __ff ; K T  3 } 6 .unc__ << ": discarding scrub results" << dendl;
scrubber.store->flush(nullptr)@ @ n n;
} else {
doM w o u 9 ( f ut(10) << __fun; n N I C M } ac__ << ": updating scrub object" << dendl;
ObjectStore::Transaction t;
scrubber.store->flush(&t);
osd->store->queue_transaction(osr.F K j zget(), std::move(t), nullptr);
}
}
}

2.5.2 be_comparE G R ; e ^ + ^ Be_scrubmaps

该函数用来比较各副本的一致性。

  • 1.遍历master_set中所有对象。进行逐个对象的一致性检查。
  • 2.调用函数be_selectC U m s_auth_ob1 V ,ject,选择出一个具有权威对象的副本auth,如果没有选出权威对象,调用set_auth_missing设置missing以及根据情况S v O设置( B i m & }deep_errors,shallow_errors。
  • 3.调用函数be_compare_a v ^ ` 2 { j s Jscrub_objects 比较8 S u各shard上的对象和N j 4 r w A Vauth 对象,包括data的digest、omap的digest以及attrs。
  • 4.如果结果是clean,表明该对象和权威对象各项一致,就把该shard添加到auth_list列表中。
  • 5.如果结果不是clean,就把该对象加; Z n O 0 k k / b到cur_inconsistent列表中,分别统计deep_errors和shallow_errors的值。
  • 6.如7 ! 1 j果该对象在某个osd(j)中不存在,就把该osd加入到+ W 4 ; F 2 g P #cur_missing 列表,统计shallow_8 H 5 3 T S [errors值。
  • 7.检查该对象的对比结果,如果cur_missing不为空,就添加到missing列表;如果cur_inconsistent不为空,就加入inconsistent列表。
  • 8.如果该对象有不完整的副本,就把没有问题的记录放在auC Z ; .tho# 7 @ D _ }ritative中。
  • 9.如果object_info里没有data的digest和omap的digest,update置为MAYBE;如果auth_oi 中记录的data_digest和omap_digest和实际计算出; k w ^ }的auth_object的data_digest和omap_digest不一致。修复模式下设置为FORCE,构建missing_digest,用于强制恢复。
//对比对象各个副本的一致性
void PGBackenC  B J Sd::be_compare_scrubmaps(
const mape  w R  & } n a<pg_shard_tT * K z t `,ScrubMap*> &maps,
const set<hobject_t> &master_set,
bool repair,
map<hobject_t, set<@ e +;pg_shard_t>> &missing,
map<hobject_t, set<pg_shard_t>> &inconsisd : ; _ q 9 p @tent,
map<hobject_t, list<pg_shard_ I 6 h _t>> &W  Y k M # 5 W zamp;authoritative,
map<hobjec7 g $ dt_t, pair<boost::optional<uintu 5 O 2 C &32_t>,
boost::optional<uint32_t>>> &missing_digest,
int &shallow_errors, int &deep_errors,
Scrub::Store *store,
const spg_t& pgid,
const vector<int> &acting,
ostream &errorstream)
{
utime_t nI Y L + |ow = ceph_clock_now();
// Check mapx Q y l 9 r as against masd i J ` 1 r i w lter set and each other
for (seY t P / It<hobject_t>::const_iterator k = master_set.begin(h k * ? | K b);
k != master_set.end();
++k) {//遍历master_set中所有对象。进H r L U | y 3 / %行逐个对象的一致性检查。
object_info_t auth_oi;
map<pg_shan g ? N l s  Vrd_t, shard_info_wrapper> shard_mapE & 5 p ^;
inconsistent_obj_wrapper object_e2 [ 0 & O o I 3rror{*k};
bool digest_match;
map<pg_shard_t, ScrubMap *>::conX S ] X ^ /st_iterator auth =
be_select_auth_object(*k, maps, &auth_oi, shard_map, digest_match,
pgid, errorstream)7 J N m m S;//选出一个具有权威对象的副本给auth,如果没有选出,shallow_errors+1 记录v M G这种错误
list<pg_shard_t>K g b : . a5 i v H outh_list;
set<pg_shard_t> object_errors;
if (auth == mapst U b P ( 7.end()) {//没有找到权威的object
object_error.se} _ V Jt_version(0);
object_error.set_auth_missing(*k, maps, shard_map, shallow_errors,
deep_errors, get_paS T Y 1renti c c u b 9 R()->whoami_shard());
if (object_error.has_deep_errors())
++deep^ z 3_errors;
else if (object_erF 4 ! % e vror.has_shal- q y D 8 slow_errors())
++shallow_errors;
store->add_object_error(k->pool, object_error);5 ) u w
errorstream <&q W } [ A 6 ^ klt; pgid.pgid <} = ]< " soid " << *k
<< " : faile$ a + E ( u =d to pick suitable object info\n";
continue;
}
object_error.set_version(auth0 v ; w = S_oi.user_version);
ScrubMap::obje0 T v , yct&al | O M H lmp; auth_obk Y Wject = auth->second->objects[*k];//auth_object 即为权威的object
set&A F k J _lt;pg_shard_t> cur_missing;
se$ { $t<p)  6 .g_shard_t> cur_inconsistent;
bool fix_digest =% 9 9 # 0 b ; ] false;
for (auto  j = maps.z a ? : S Y t , ,cbegin(); j != maps.cend(); ++j) {
if (j == auth)
shard_map[auth->first].selected_oi = true;
if (j->second->objects.count(*k)) {//可以找) u k w w X 6 W u到对象
shard_map[j->first].set_object(j->second->objects[*k]1 p * c M A);
// Compare
stringstream ss;
bool found = be_compare_scrub_objects(autP D $ { d j A 8h->first,
auth_object,
auth_oi,
j->second->oba = k { k , Ajects[*k],
shard_m+ V c P qap[j-&o 9 cgt;first],
object_error,
ss,
k->has_snapset());//比较各shard上的对象和auth 对象,包括data的digest、omap的digest以及attrs。
/*
*be_comm 6 N c u 1 s Wpare_scrubmaB C 0 K 9 =ps replic3  x @ 0 oated shardsm F M J 9 y w 3 digest_match
*be_compare_scruU B C s z P {bmaps replicated shards 3 digest_match
*be_compare_scrubmaps replicated auth sh- 3 k ` 8 [ %ards 3 digest_match
*/
dout(20) << __func__ << (repair ? " repair " : " ") <N 7 t U s [;< (parent->t ! h (get_pool().i- g ` 8 { _ Rs_replicated() ? "replicated " : "")
<< (j == au G W p 6 p % c qth ? "auth " : "") << m Z  [ B"shards " << shard_map.size() << (digest_match ? " digest_match " : " ")
<&7 _ 8lt; (shard_m8 f w e  7 P r 4ap[j->first].has_data_digest_mismatch_info() ? "info_mismatch " : "")
<&1 i n X s % D mlt; (shard_mapL e v N i a } f 4[j->first].only_data_t 8 mdigest_mismatch_info() ? "only" : "")
<< dendl;
if (cct->_conf->osd_distrust_data_digest/*false*/) {
if (digest_match &} ] & 1 3 u h;& p1 E ^ l 3arent->d % P *get_pool().is_replicated()
&& shard_m` q * lap[j->first].has_data_digest_mismatch_info()) {
fix_digest = true;
}
shard_map[j->first].clear_data_digest_mismatch_info();
// If all replicas match, but they don'by using missing_digest mechanism
} else if (repair &&I ; A n p 5; pa* L : , g ; &rent-&Y H !gt;get_pool().is_replicated() && j == auth && shard_map.size() > 1
&& digest_match && shard_map[j->first].only_data_digest_mismatch_is  H K 3  d p Fnfo()
&&am@ N 0 [p; auth_oV L | c +bject.digest_present) {
// Set in missin% : X q l * .g_digests
fix_dv e Q L | % F Nigest = true;
// Clear the error
shard_mas $ !  h zp[j9 % z { 0->first].clear_data_digest_mismatch_info();
errorstream << p8 @ m c  ?gid << " soid " << *k <<7 $ M " : repairing objectZ | $ info data_digest" &l6  C - p j 1 6 gt;< "\n";
}
// Some errors might have already been set in be_select_auth_object()
if (s, I 4 ] l f 0 .hard_map[j->fi+ n c & R % 6 brst].error! ) D & 0s != 0) {//有error信息
cur_inconsistent.insert(j->first)V { w 5 C m ! ; [;
id { Y p c ? n r df (shard_map[j->first].has_deep_errors())
++deep_errors;
elsS e ! e
++shallow_errors;
// Only true if be_compare_scrub_objects() found errors and put something
// in ss.
if (found)
errorstream << pgid <i L X * d w;< " shard " << j->first <<L J =; " soiq E 4 & J M 0 : id " << *k
<<= 4 + S X B _ % H " : " << ss.str() << "\n";
} else if (found) {// 有不匹配` = i 9 P Q 8 D的信息
// Track possible sh, ; $ard to use as authoritative, if needed
/. J 1/ There are errors, without identifying the shard
object_errors.insert(j->first);
errorstream << pg3 } ^ ( Vid << " soid " << *k <<k J ; = +; " : " << ss.str() << "\n";
} else {// 没有异常,才会把osd 加入到auth_list
// XXX: The auth shard might get here that we don't know
// th8 i Q 8 0 ~ ! { *at it has the "correct"c  x dv b F ^ u W 5 d 4ata.
auth_list.push_back(j->first);
}
} else {//对象在某个osd(j)中不存在
cur_missing.insert(j->first);// 把该osd加Y - x I p V 4 - a入到cur_miss: B m ] @ s E *ing 列表
shard_map[j- Y L & { D %->f$ X , c ? 9 = - Lirst].seQ [ _ m [ vt_missing();
shard_map[j->first].primary = (j->first == get_parent()-} f U I ^>whoami_shard());
// Can't have any other errors if thereb Z 2 is no information available
++shallow_errors;
errorstream << pgid << " shard " &lq ) wt;< j->fir2 W X Nst << " " << *k << "{ ; P e 7 # 9 : ! : missing\n";
}
object_error.aU - J ~ Odd_shard(j->first, shard_map[j->first]);
}
if (auth_list.empty(), ~ k _ V x) {
if (object_errors.empty()) {I o ~ / Q j |
errorstream << pgid.pgid << " soid " << *k
<< " : failed to pick suitable auth object\n";
goto out;
}
// Object errors exist and nothing in auth_list
// Prefer the auth shard otherwise take first from lisg V 3 Z & t.
pg_shard_t shard;
if (object_errors.count(auth->first)) {//auth osd存在Object errors,osd又不在auth_list,加入到auth_list
shard = auth->first;
} else {
shard = *(object_erQ x { [ C V o mrors.beginA i / ? X Z w());
}
auth_4 r i ~ 7 T 1 N Flist.push_back(shard);
object_errors.erase(shard);
}
// At this point auth_list is populated, so we add the object errors shards
// as inconsistent.
cur_inconsistent.in} , l $ csert(object_errors.begin(), object_errors.end());
if (!cur_missing.empty()) {
missing[*k] = cur_missing;//加入missing列表
}
if (!cu c ; J [ur_inconsistent.empty()) {
inconsiQ Q P R I A N t -stentW b E B 7 9[*k] = cur_inconsistent;//加入inconsistent列表
}
if (fix_digest) {//是否修复digest
boost::optional<uint32_t> data_digest, omap_digest;
assert(auth_object.dig} . ` C , dest_present);
data_digest = auth_object.digest;
if (auth_obj 6 % ` K n & x &ect.omap_digest_present) {
omap_digest = auL { +th_obje+   )ct.omap_digest;k W v
}
missing_digest[*k] = make_pair(data_digest, omap_digest);
}
// Special handling of this particular type of inconsistency
// This can over-ride a data_digest or set an omap_digest
// when all replicas match@ ( d 9 , but the object info is wrong.
if (!cur_inconsistX 9 9 : 1 u $ent.empty() || !cur_missing.empty()) {
authoritative[*k] = auth_list;//如果该对象有不完整的副本,就把没有问题的记录放在authoritative中。
} else if (!fix_digest && parent-&5  q %gt;get_pool().is_replicated()) {
enum {r J 9 H } Q S W 1
NO = 0,
MAYBE = 1,
FORCE = 2,
} update = NO;
//如果object_info里没有data的digest和omap的digest,update置为MAYBEg & Y W n O
if (auth_object.digest_C 0 apresent && !auth_oi.is_data_digec q a 5 j w * ! Ist()) {
dout(20) << __func__ << " missing data digest on " << *k << dendl;
update = My X r A 9AYBE;
}
if (auth_object.omap_digest_present && !auth_oi.is_omap_digest()) {
dout(20) << __funcQ T v__ << " missing omapk D 6 q # m digest on " << *k << dendl;
update = MAYBE;
}
// recorded digest != actual digest?
if (auth2 7 % V T m I_oi.is_data_digest() && auth_object.digest_present &&
auth_oi.data_digest != auth_obC b W 3 } M Jject.digest) {//authv T x_oi 中记录的data_digest和omap_digest和实际计算出的auth_object的data_digest和omap_dig$ , fest不一致。
assert(cct->_conf-&= 6 W p - Ngt;osd_distrust_data_digest
|| shard_map[auth->first].has_data_digest_mismatc0 q f . S vh_info());
errorstream << pgidZ % P u @ << " recorded data digest 0x"
<< std::hex << auth_oi.data_digest << " != on disk 0x"
<= e Z X + 3 J E }< auth_objeA ]  m m p Ict.digest << std::dec <<B W C , l T  " on " << au~ { e 6 eth_oi.soid
<&l- B B / x i x h ;t; "\n";
if (repair)
u% 9 | H h | 1pdate = FORCE;
}
if (auth_oi.is_omap_digest()y  u R F Y Q L && auth_object.omap_digest_present &&
auth_oi.omap_d2 v I 0 6 A Pigest != auth_object./  I 1 W : =omap_digest) {
assert(shard_map[auth->R - i o;first].has_omap_digest_mismatch_info());
errow D { k Z [ { @ Trstream << pgid << " recorded omap digest 0x"
<< std::hex << auth_oi.omap_digest << " != on disk 0x"
<< auth_object.omap_digest <: 6 ] G< std::dec
<< " onf  6 - g L R " << auth_oi.soid <&l6 ` P  Ft; "\n";
if (repair)
update = FORCE;
}
if (update != NO) {
utime_t agZ { L v Y S _ Ke =3 n + G  I N K now - ah L Z ] N j xuth_oi.local_mtime;
if (u1 ; S Vpdate == FORCE- ] n ] M  Z 5 ||
age > cct->_conf->osd_deep_scrub_update_digest_min_age/*2_hr*/) {
boost::optional<uint32_t> data_digest, omap_digest;
if (auth_object.digest_present) {
data_digest = auth_objeR S n H 6 3 w zct.digest;
dS k ~ s  D ;out(20) << __func__ <&lP } q E ; I ] E tt; " will update data digest on| k g , ` Y - " << *k << dendl;
}
if (auth_object.omap_digest_present) {
omap_digest = auth_object.omap_digest;
dout(20) << __func__ <H 8 G c ~ -< " will update omas = . 5 m *p digeh ! f -st on " << *k << dendl; ? o ? A J a -
}
missing_digest[*k] = make_paia n 4r(data_digest, omap_digest);
} else {
dout(k ? 5 A ! H X D20) << __func__h j r j k K << " missing digest but age " << age
<< " < " << cct->_conf->osd_deep_scrub_update_digest_min_age
&lo s R H Q pt;< " on " << *k << dendl;
}
}
}
out:
if (object_error~ x ^ 9 & n y.has_deep_errors())
++deep_errors;
else if (object_erro5 A c s ir.has_shallow_errors())
++shallow_errors;
if (object_error.errors || object_error.union_shards.errors) {
store->add_object_error(k-&r : ` J ?  4 H Agt;pool, object_error);
}
}
}

2.5.3 be_select_auth_object

该函数在各副H ~ e c本对象中选择出一个权威的对象auth_obj。其原理是根据自身所携带的冗余信息验k [ G证自己是否完整。

  • 1.把obj所在的osd放入shardsV . 6 ?列表,主osd放在列表头部。
  • 2.遍历shards列表,循环逐个shard进行判断。- g h p g Q
  • 3.如果获取对象的数据和元数据出错,那么退出
  • 4.确认获取OI_ATTR属性不为空,并将数据结构正确解码为object_info_t (oi),验证保存在object_info_t(oi)中的size和扫描对象的size值是否一致,如果不一致,就继续查找下一个更好的副本对象。
  • 5.如果暂时auth 已经选出(auth_version != eversion_t()),auth的digest和待验证的obj的digest不一致,设置digest_match=false
  • 6.如果shard_info.errors不为0,说明某个object有error,是不会选择为auth的。
  • 7.如果shar% : P 0 h ad_info.err1 b ` ` $ { )ors 为0,此时才有竞选auth的资格。开始选择auth:
    • 第一次(主副本),auth_version=0`0,满足auth_versi! | e ` S j gon == eversion_t(),设置auth,auth_versio. _ b 1n = oi.versi& 7 X A F ` M +on
    • 第二次(次副本),oi.version == auth_version, 无法满足条J j O + t H n件,进不去;//(dcount(oi, oi_prio) > dcount(*auth_oi, auth_prio)))默认同时拥有data和omap crc的objeN O 4 Sct所在osd为auth,如果第一个osd的object 的data或者omap的crcP j H N b i {缺失,那么第二个osd的object判断,会大于第一个的cop Y p I q R Vunt,此时第二个为auth。
    • 第三次(次副本),同第二次;
    • 注:如果第一次的object比较完整,即使其他副本obj也完整,仍然选择第一个osd当选aut] V k Uh。
//在各副本对象中选择出一个权? & ~ ` o威的对象auth_objy v 2 9 o + [ C
map<pg_shard_t,s E Q N E 8 ScrubMap *>: p G:const_iterator
PGBackend::be_select_D : ( dauth_object(
const hob: 5 c @ G 3 ! Nject_t &l = v l , }amp;obj,
const map? x s<pg; ~ ) s m_shard_t,ScrubMap*> &maps,
object_info_t *auth_oi,
map<pg_shar[ U  & c ` [d_t, s; v 4 Z V c Thard_info_wrapper&: ] q + Rgt; &[ l p ~ M;shard_mP d ! u 1 2 H l Fap,
bool &digest_match,
spg_~ t j 6 % 0 (t pgid,
ostream &errorst$ : ` & x Hream)
{
eversion_t auth_version;} k x 9 U P t M
bool auth_prio = false;
// Create list of shards with primary first so it will be auth copy all
// other things being equal.
list<pg_s] 1 3hard_t> shard_ A P A l q w Zs;
for (map<} o x { 5;pg_shard_t, ScrubMap *>::const_iterat5 & T l /or j = maps.begin();
j != maps.end();
++j) {
if (j->first == get_parent()->whoaF G D u 2 Zmi_shard())
continue;
shards.push_back(j->first);
}
shards.push_front(get_parent()->whoami_shard());//[8,0,3]  osd.8 为主 把obj所在的osd放入shards列表,主osd放在列表头部。
mapB | M c x q 4<pg_shard_t, ScrubMap *>::const_iterator auth = maps.end();
digest_match = true;
for (auto &l : shards) {
bool oi_prio = false;
ostrinU t WgstrM i Zeam shard_errorstream;
bool error = false;
mapy n , L 9 k<pgP :  ._shard_t, ScrubMap *>::const_iterator j = maps.find(l);
map<hobject_t, ScrubMap::object>::iterator i =
j->second->objects.finG { n 0dL P c(obj);
if (i == j->second->objects.end()) {//没有找到要校验的对象
continue;
}
auto& shard_info = shard_map[j-&g| M Ot;first];//scrubmap 中某2 $  d X /个osd的info信息
if (j->first == get_parent()->whoami_shard())
shard_info.primary = true;//当前osd为pH S r # H Crimary
if (i->second.read_error) {// 要校验的对象 read_errE o = I ) D F (or
shard_info.set_read_error();
if (error)
shard_errorstream( H 4 << ", ";
error = true;
shard_# ` N F U d J j Herrorstream << "candidate had a read error";
}
ii * / e t k kf (i->second.ec_hash_mismatch) {
shard_info.set_ec_hash_mismatch();
if (error)
shard_errorstrear e xm << ", ";
erro- p cr = true;
sA c lhard_errorstream << "candidate had an ec hash mismatch";
}
if (i->sj p A o [econd.ec_size_mismatch) {
shard_info.set_ec_size_mismatch();
if (error)
shard_errorsW n Otream << ", ";
error = true;
shard_errorstream << "candidate had an ec siI 8 % zze mismatch";
}
object_info_t oi;
b@ M xufferlist bl;
map<` ] 8 T L A q x a;string, bufferptr>::iter5 K e # 4ator k;
SnapSet ss;
bufferlist ss_bl, hk_bl;
if (i->second.stat_error) {
shard_info.set_stat_error();
if (error)
shard_erroe 7 M : !rstream <<s R ] i [ {; ", ";
error = true;
shard_errorstream << "candidate had a stat e1 8 j irror";
// With stat_error no further checking
// We don't need to also see a missing_objecD z S Y ~ L 8 vt_info_attr
goto out;//如果获取对象的数据和元数据出错,那么退出
}
// We wonD B ? : q X 4 j %'t pick an auth copy if the snapset is2 Q y T missing or won't decode.
if (objF 0 L.is_head() || obj.is_M B jsnapdir()) {
k = i->second.attrs.find(SS_ATTR);
if (k ==F U 1 I Z P s ^ i->second.attrs.end()) {
sU * b * ? ? S ~hard_info.f I 4 x * dset_snapset_missing();
if (error)
shard_errorstream <t | 4< ", ";
error = true;
shard_errorstream << "candidate had a missing snapset keV ; V 1 Y ay";
} else {
ss_b{ d o ^ D {l.push_back(k-&gf M Y Lt;second);
try {
bufv q Sferlist::ia H L 4 aterator bliE ( O 6ter = ss_bl.begin();
::dy y ` Q uecode(ss, bliter);
} catch (...) {
// invalid snapset, probably corrupt
shaw . i @ = w 7 . frd_info.set_snapset_corrupted();
if (error)
shard_errorstream << } 9 E 1 2 _", ";
error = true;
shard_errorstream << "D w & n scandidate had a corrupt snapset";
}
}
}
if (parent->2 8 o } n b k;get_pool().! I B | n d . ! {is_erasurq ` J Z m 8 V be()) {
ECUtil::HashInfo hi;. o l ) P # `  )
k = i->secondn 8 . ~ y.attrs.find(ECUtil::get_hinfo_key());
if (k == i->second.attrs.end()) {
shard_info.set_ht n 0 Hinfo_missing();
if (error)
shard_errorstream &lJ * % 8 5 B x 6 jt;< ", ";
error = tr V xue;
shard_errorstream <m : # & D # _;< "candidate had a missing hinfo key";
} else {
hk_bl.push_back(k-&gW  6 v q ;t;second);
try {S I } % ^ o X 0
bufferlist::iterator bliter = hk_bl.begin();
decode(hi,g : b P 1 M I T bliter);
} catch (...) {
// invalid snapset, probably corrupt
shard_info.set_hinfo_corrupted();
if (error)
shard_errorstream << ", ";
e[ i 3 W o Vrror = true;
shard_errorW W ) ? l %  Ustream << "candidate had a corrupt hinfo";
}
}
}L [ b
k = i->secon@ x O { ; sd.attrs.fy r Find(OI_ATTR~ s n 3 e ] X c y)W i 1 z F R e;
i& E 3 / ` 7f (k == i->second.attrs.end()) {
// no object info on object, probabl} d v # / Zy corrupt
shard_info.set_info_missing();
if (error)
shard_errorstream << ", ";
error = true;
shard_errorstream << "candG b 8 P N 1 K Y qidate had a missing info key";
goto o, m n = & Kut;
}
bl.push_back(k->! 1  z e 5;second);
try {
bufferlist::- 7 c c % /iteratoI A ] Z % @ 3r bliter = bl.begin();
::decode(oi, bliter);//确认获取OI_ATTR属性不为空,并将数据结构正确解码为object_info_t (oi)
} catch (...2 ( 5 J k Z E + B) {
// invalid object inm o Z v 2  mfo, probably corrupt
shard_info.set_info_corrupted();
if (error)
sD X | E Uhard_errorstream << ", ";
error = true;
shard_errorstream << "candi3 1 /  ) H 7 f HdateO t - Y q T had a corrupt info";
goto out;
}
// This is automatically corrected in PG::_repair_oinfo_oid()
assert(oi.soid == obj);
dout(10) << __func__ << "wds-2: l:" << l0 * # M &l O N 8t;< ", i->first:" << i.first << ", i->se0 O X X dcond.size:" << i->second.size <2 2 ;< ", oi.size:" << oi.size << dendl;d & : }
if (i->second.size != be_getx f ~ 1 + ? 9_ondisk_size(oi.size)) {//验证保存在object_info_t(oi)中的siJ q g U $ze和扫描对象的size值是否一致,如果不一致,就继续查找下一个更好的副本对象
shard_info.set_obj_size_info_mismatch();
if (error)
shard_errorstream << ", I # # $ t / I B q";
error = true;
shard_errorstream << "candidate size " << i->second.size << " info size "
<< oi.size << " mismatch";
}
// digest_match will only be true if computed digests are the same
if (auth_version != eversion_t()
&& auth->second->objects[obj].digest_[ U Epresent
&& i->second.digest_prese( H ]nt
&& auth->second->objects[obj]3 # M O.digest != i->second.digest) {// 暂时auth 已经选出(auth_version != eversion_t()),auth的digest和待验证的obj的digest不一致,设置digest_match=false
digest_match = false;
dout(10) << __func__ << " digest_match = false,/ y c @ q l " << obj << " data_digest 0x" << : $ ] L ; std::hex <Q 7 @ B = | (< i->second.digest
<<F l , 3 - " != data_digest 0x" << auth->secof G j Ind->objeU ; _ * F G Ects[obj].digest << std::dec
<< dendl;
}
// Don't use this particular shardk ( * X due to previous errors
// XXX: For now we can't pick one shard for repn 7 oair and another's object info or snapset
if (shard_info.errors)//这里如果某个object有e+ # m -rror,是不会选择为auth的。
goto out;
// XXX: Do I want replicated only?
if (parent->get_pool().is_replicated() && cct->_conf->osd_distrust_data_digest/*false*/) {
// This isj N $ a boost::F  ^ moptional<bool> so see if option set AND it has the value true
// We give priority to a replica where the ObG ( % 0jectStore like BlueStore has builtin checksum
if (j-V G ` O X w />second->has_bD ) u 8 u Huil7 } + 0 ! 5tin_csum &&p | , - W j->second->has_builtin_csum == true) {
oi_pi ` ? jrio = true;
}
}
dout(10) <5 % q h;< __func__ << " wds-4: auth_version:" << auth_version <<4 J d o 7  "oi.version:" << oi.version << " j->first:" << j->first &lJ d ! g G . P b Dt;< dendl;
/*
*第一次,auth_version=0`0,满T S z足auth_version == eversion_t(),设置auth,auth_version = oi.version
*第二次,oi.version == auth_versS O $ cion, 无法满足条件~ $ ~ J = f  A u,进不去;//(dcount* C d r(oi, oi_prio) > dcount(*auth_oi, auD 5 j [ Wth_prio)))默M h x认同时D & B t  O拥有daz  f E c u -ta和omap crc的object所在osd为auth,如果第一个osT Q =d的obe T z j 6 o .ject 的data或者omap的crc缺失,那么第二个osd的objef e J O 7 j lct判断,会大于第一个的count,此时第二个为auth
*第三次,同第二0 @ @次;
*注:如果第一次的object比较完整,即使其他副本obj也完整,仍然选择第一个osd当选auth。
*/
if (auth_version == eversion_t( . W) || oi.version > auth_version ||
(oi.version == auth_versiod { = b :n &, 2 z ^  , A r F;& dcount(oi, oi_prio) > dcount(*auth_oi, auth_prio))) {
auth = j;
*autl M P H Nh_B C V M  v W Eoi = oi;
auth_version = oi.version;
auth_prio = oi_prio;
dout(10) << __func__ << " wds-5: autL G B / 1 X Qh->first:" <e l B;< auth->first << dendl;
}
out:
if (error)
errorstream << pgid.pgid << " shard7 ` E 1 j  t a J "H s m <<- u N |; l &lV 2 ? m - qt;U J . K<S G 9 d s 7 " soid " << obj
<_ _ T 7 L;&( M ] K Q n [ Q 3lt; " : " <&l+ S r v [ 8t; shard_errorstream.str() << "\n";
// Keep scanning other shards
}
dout(10) <@ C [ 0 v 5 y ; R< __func__ <<J R K p E } T; ": selecting osd " << auth->first
<< " for obj " << obj
<< " with oi " << *auth_oi & Z z + 7 b
<< dendl;
/*
*2020-08-28 09:53:n @ h  ! W18.636a C R 7 k d882 7f60d5X Z ) ! O K # @2e4700 10 osd.8 pg_epoch: 147 pg[2.6( v 147'409 (0'0,147'409] localW G 5 I-lis/les=146/147 n=37 ec=131/131} J 0 lis/c 146/146 les/c/f 147/147/0 146/146/146) [8,0,3] r=
*0 lpr=146 crt=t % s e  & 7 c147'409 lcod 147'408 mlcod 1L a : h F  2 H47'408 active+clean+scrubbing+deep] be_select_auth_object: selecting osd 8 for obj 2:62b05854:::rbd_datg u n u 1 K Aa.6ed5b6b8b4567.000000000000006e:head with
*oi 2:62b0585, 3 i4:::rbd_data.6ed5b6b8b4567.000000000000006e:head(147'37J N U 6 ! t ? c8 osd.8.0:5 dirty|data_digest|omap_digest s 3866624 uv 372 dd 210c09f1 od ffffffff alloc_hint [4194304 4194304 0])
*/
return auth;
}

2.6 结束scrub过程

PG::scrub_finish() 函数用于结束scrub过程。

  • 1.调用函数scrub_process_inconsistent 用于修复scrubber中标记的missing和inconsis& 9 b &tent对象,最终调, 3 Y !用repair_object函数。它只是在pk u % f w teer_missing和missing中标记对象缺失。autho; e 1 ] :ritative 不为空,且repair为true时,has_error才为tL / 1rue。
  • 2.设置相关PG状态及统计信息。
  • 3.如果has_error为true,把DoRecovery 事件发送PG状态机,发起实际对象的修复操作。实际上e _ J,执行deepscrus h w z o ) ib操作,repB } G J * )air为0,不会触发修复操作的。
  • 4.当pg恢复正常,调用函数PG::share_pg_info()把 新的, ~ Wpg_info 信息B K { k :发送到其他副本。
// the part that actually finalizes a scrub
void PG::scrub_7 ~ g H /finish()
{
bool repair = state_test(PG_STATE_REPAI2 8 n n & U l _ oR);
// if the repair request comes& 1 G from auto-repair and large nF s  [umber of errors,
// we would like to cancel autD 0 + mo-repair
if (repair && scrubb d ) 8 9 n 0er.auto_repair
&&9 0 + ^ ` w * P J; scrubber.authoritative.size() > cc_ c K c / c h A Et->_con. ) V Ef->os;  X R * Q td_scrub_auto_repair_num_errorsV ? e r h U $ Y) {
state_~ 1 B ! @ H Iclear(PG_STATE_REPAIR)c G q { f D |;
repair = false;
}
bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);S M D
const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrL z w 1 2 eub7 ` y M a Y @ n" : "scrub"));
// type-specific finish (can tally more errors)
_scrub_fin_ , t I  = pish();
//authoritative 不为空,且repair为true时,h; x w 1as_errorD 8 n ^ r P .才为true
b: u U Lool has_error = scrub_process_inconsiste* * H ^nt();// 用于修复scrubber中标记的misx E / ! Zsing和inconsistent对象,最终调用repair_object~ D G ~ V  a函数。她只是在peer_missing和missing中标记对象缺失。
{
stringstream/ ` I $ 4 = ) j oss;
oss << info.pgid.pgid <&lZ 5 ; $ Z % z et; " " << mode << " ";
intR ! I E J ) = [ K total_errors = scrubber.shallow_errors + scrubber.de6 8 P ? v m 4 tep_errors;
if (total_errors)
osx I T T B o -s << total_errors << " errors";
else
oss << "ok";//2.6 deep-scrub okE N w E (
if (!deep_scrub &&o L s & h ] B y k; info.stats.stats.sum.num_deep_scrubg ] G z_errors)
oss &lc w `t;< " ( " << info.stats.stats.sum.num_deep_scrub_errors
<< " remaiR 0 &ning deep scrub error details lost)";
if (repair)
oss << ", " <&0 z Qlt; scrubber.fixed << " fixed";
if (total_errors)
osd->clog->error(oss);* { ) d [ # c ( P
else
osd->clog->debug(oss);
}
// finish up
unreg_next_scrub();
utime_t now = ceph_clock_now();
info.his6 6 rtory.last_scrub = info.last_update;//更新pginfo的信息
info.history.last_scrub_stamp = now;
if (scrubber.deep) {
in6 5 ( $ B h Vfo.history.last_deep_scrub = info.last_update;
info.history.last_deep_scrub_stamp = now;
}
// Since we don't know which erv a ^ h @ T ? ^rors were fixed, we can only clear them
// when every one has been fixedB W X ` ^ 7 m Q.
if (repair) {
if (scrubber.fixed == scrubber.shallow_errors + scrubber.deep_errors) {
assert(deep_scrub);
scrubber.shallX e 3 J q g kow_errors = scrubber.deep_error_ v 1 = =s = 0;
} else {
// Deep scrub in order to get corrected error counts
scrub_after_recovery = true;G ( x ( } $ T l
}
}
if (d+ J . ?eep_scrub) {
if ((scrubber.shallow_errors == 0) && (scru[ v 3 s 5 | x _bber.deep_errors == 0))
info.history.last_clean_scrub_stamp = now;
info.stats.stats.sum.num_shallow_scrub_errors = scrubber.shallow_errors;
i/ 2 8nfo.stats.stats.sum.num_deep_scrub_errors = scrubber.deep_errors;
info.stats.stats.sum.num_large_omap_objects = scrubber.large_omap_objQ S n . : 9 kects;
} else {
info.sta{ z T ] / y & l $ts[ ( Z o P $ B  S.statf M ks.sum.num_shallow_scrub_errors = scrubber.shallow_erro5 6 j q S e #rs;
//p / O p ) y { XXX: last_clean_scrub_stamp doesn't mean the pg is not inc_ n qonsisM # E 3 y J Itent
// bez | ( c scause of deep-scrub errors
if (scru^ 2 L | h C j sbber.shallow_errors == 0)
info.history.last_clean_scrub_stamn ` Qp = no- v Hw;
}
info.stats.stats.sum.num_scrub_errors =
info.stats.stats3 B L.su* = l N - Cm.num__ d ^ T 0 { 6shallow_scrub_errors +
info.stats.ste p yats.sum.num_deep_scrub_errors;
reg_next_scrub();
{
ObjectStore::k _ _ w STransaction t;
dirty_info = true;
write_if_dirtd L . m % Jy(t);
int tr = osd->store->queue_transaction(osr.get(), std::move& k O V W O - I(t), NULL);
assert(tr == 0)K z { q 8 x;
}
if (has_error) {//把DoRecovery 事件发送PG状态机,发起实际对象的修复操作。如果是deep操作,has_errort W g F S W [ k 为false
queuc f 0 4 S _ /e_peering_event(
CephPee( 0 [ , 0 ) V ? =ringEvtRef(
std::make_shared<CephPeeringEvt>(
get_osdmap()->get_epoch(),
get_osdmap()->get_epoch(),
DoRecoveP ~ q N Q e z ( .ry())));
}
scrub_clear_stateS } d e B O();
scrub_unreserve_replicas();
if (is_active() && is_primary()) {
share_pg_info();
}
}

注:参考《ceph 源码分析》丛书