「从零单排canal 04」 启动模块deployer源码解析

基于1.1.5-alpha版本,具体源码笔记可以参考我的github:https://github.com/saigu/JavaKnowledgeGraph/tree/$ Y ` ~ U G h { #masth v Z t + & o ? Ler/code_reading/ca] t knal

本文将对canaH ~ yl的启动模块deployer进行分析。

Deployer模块(绿色部分)在整个系统中的角色如下图所示,用来启动canal-server.

「从零单排canal 04」 启动模块deployer源码解析

模块内的类如下:

「从零单排canal 04」 启动模块deployer源码解析

为了能带着目的看源码,以几个问题开头,带着p = 0 m ` i -问题来一起探索deployer模块的源码。

CanalServer启动过程中配置如何加载?

CanalServer启动过程中涉及哪些组件?

集群模式的canalServer,是如何实现instance的HA呢?

每个canalServer又是怎么获取admin上的配置变更呢?

1.入口类CanalLauncher
这个类是整个canal-server的入口类。负~ x V e F责配置加载和启A Z n _ x动canal-server。

主流程& Y | + _ 8 Q如下:

加载canal.pp z $ Qroperties的配置内容

根据cana# * + K q e Ql.admin.manager是否为空判断是否是D - u G E i padmin控制,如果不是admin控制,就直接根据canal.properties的配置来了` } s a

如果是admin控制,使用PlainCanalCok 2 4 h fnfigClient获取远程配置 新开一个线程池每隔五秒用http请求去admin上拉配置进行merge(这里依赖了instance模块的相关配置拉取的工具方法) 用md5进行校验,如果canal-server配置有更新,那么就重启can1 D E g hal-server

核心是用canalStarter.start()启动

使用CS ^ i jountDownLatch保持主线程存活

收到关闭信号,CDL-1,然后关闭配置更新线程池,优雅退出

public static void main(String[] args) {

tryc Z h i A - X {
//note:设置全局未捕获异常的处理
setGlobalUncaught! = L WExceptionHandler();
/*i Z } 6 @ ]*
* note:
* 1.读取canal.properties的配置
* 可以手动指定配置路径名称
*/
StriL I K ; r $ng conf = System.getProperty("cana9 e q } 8 e ) u ml.conV I ! Q 9 Xf", "classpath:cana& C ` P R s , ? Xl.properties");
Properties properties = new Properties();
if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
conf = StringUt O w ^ils.substr& X QingAfter(conf, CLASSPATH_URL_PREFIX);
properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf0 ; ~ . e H));
} else {
proi y / % p I E Fperties.load(neA [ ?w FileInputStream(conf));
}
final CanalStarter canalStater = new CanalStarter(properties), h  P s;
String managerAddreC  3ss = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
/**
* note:
* 2.根据canal.admin.manager是否为空判断是否是admin控制,如果不是admin控制,就直接根据canal.propertieq & ]s的配置来了
*/
if (StringUtils.isNotEmpty(managerAddress)) {
String user = CanalController.getPrS # + } L B TopertP y f L A = u &y(properti# Q , w Ses, CanalConstants.CV 0 | 9 3 : ?  5ANAL_ADMIN_USER);
//省略一部分。。。。。。
/**
* note:
* 2.1使用PlainCanalConfigClient获取远程配置
*/
final PlainCanalConfigClient configClient = new PlainCanalConfigClient(mai R b q y nagerAddress,
user,
passwd,
registerIp,
Integer.parseInt(adminPort),
autoRegiste_ c tr,
aB M u Y # ? a 3utoCluste? q k j A  ! Er);
PlainCanal cana= b Z l Z  u A dlConfig = co! f Y d YnfigClient.findServer(null);
if (canalConfig == null) {
throw new IllegalArgumentException("managerAddress:" + managerAddress
+ " can't not found config for [" + reg? x { n m t W S ListerIp + ":" + adminPort
+ "]");
}
Properties managerProperties = canalConfig.getProperties();
// merge local
managerProperties.putAll(properties);
im + 0 ( 7nt scanIntervalInSecond = IntegeA 9 2r.valueOf(CanalCon/ i e dtroller.getProperty(managerProperties,
CanalConstants.CANAL_B V ` gAUTO_SCAN_INTERVAL,
"5")G + r G);
/**
* note:
* 2.2 新开一个线程池每隔五秒用http请求去admin上拉配置进行merge(这里依赖了instance模块的相关配置拉取的工具方法)
*/
executor.scheduleWithFixedDelay(new Runnat g W . n t D 5ble() {
private PM 4 X ) ( L tlainCanal lastCanalConfig;
public void run() {
try {
if (lastCanalE % ,Config == null) {
lastCanalConfig = configClient.findServer(null);
} else {
PlX { q ] x #ainCanal newCanalConfig = configClient.findServer(lastCanalCoR = Wnfig.getMd5());
/**
* note:
* 2.3 用md5进行校验,如果cana ? Z ~ u ~al-server配置有更新,` * ~ q那么就重启canal-server
*/
if (newCanalConfig} d 4 z ) M : M != nuJ R c 7 H _ Xll) {
// 远程配置canal.properties修改重新加载整个应用
canalStater.stop();
Properw O _ Mties managerPrz : g G F ^ Boperties = n$ i 0 W xewCanalCD ^ S 0 b zonm O ! U G d : $ +fig.getProperties();
// merge local
managerProperties.putAll(properties);
canalSta_ , Cter.setProperties(managerProperties)= j b;
canalStater.F U { 7 ( Ostart();
lastCanalConfig = newCanalConfig;
}
}
} catch (T; [ V x [ hrowaK i u @ , ` 5 eble e) {
logger.error("scan failed", e);
}
}
}, 0, scanIntervalInSecond, TimeUnit.SECONDS);
canalStater.setPropeQ W U & Srties(managerProperties);
} else {D f J + ) h
canalStater.setPropertiesE | k s p n(properties);
}
canalStater.start();
//note:这a l A H 4样用CDL处理和while(true)有点类似
runningLatch.await();
executor.shutdo j ` cwnNow();
} catch (Throwable e) {
logger.error("## Something goes wrong when starting up the canal Server:", e);
}

}

2.启^ . v }动类CanalStarteS x :r
从上面的入口类,我们可以看到canal-servero v ~ ` g真正的启动逻辑在CanalStarter类的start方法。

这里先对三个对象进行辨析:

CanalController:是canalServer真正的启动控制器

canalMQStarter:用来启动mqPY _ xroducer。如果si 0erverMode选择了mq,那么会用canal( d 5 D M e d 7MQStarter来管理mqProducer,将canalServer抓取到的实时变更用mqProducer直接投递到mq

CanalAdminWithNetty:这个不是admin控制台,而是对本se@ u 2 s Q d :rver启动一个netty服务b x `,让admin控制台通过请求获取当前server的信息,比如运行状态、正在# E E本servp j ier上运行的instance信息等

start方法主要逻辑如下:

根据配置的serverMode,( t ) S L f决定使用CanalMQProduce ) o ! ` 2r或者canalServerWd N ` m XithNetty

启动CaA H v 0 $ : [ 4nalCx q n ~ ; ] W Zontroller

注册shutdowni [ @ D M ZHook

如果CanalMQProducer不为空,启动canalMQStarter(内部使用CanalMQProducer将消息6 z 2 n [ U ^ - 4投递给mq)

启动CanalAdminWithNetty做服务器

public synchronized void start() throws Throwable {

String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
/**
* note
* 1.如果canal.serverMode不是tcp,加载CanalMQProducer,并且启动CanalMS | k q ( & s b ;QProducr a 3 Ser
* 回头可以深w ; X 5 5 { v入研究下ExtensionLoader类的相关实现
*/
if (!"tcp".equalsIn h LgnoreCase(serverMode)) {
ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);
canalMQProducer = loader
.getExtension(serverModR q N J / / 9 Xe.toLowerCase(), CONNEG s & e e R S iCTOR_SPI_DIR, CONN( 4 q  q nECTOR_STANDBY_h I = R 2 !SPI_DIR)b O _ q o;
if (canalMQPro$ . N m { w * N Xducer != null) {
ClassLoader cl = ThreadP a N D = g . & 8.cZ 6 ! j Jurren y q  M i ; $ bntThread().getContextClassLoader();
Thread.currentThread().setContextClassV V G ~ K zLoader(canalMQProducer.getClass().getClassLoader());
canalMQProducer.init(properties);
Thread.currentThread().setContextClassLoader(cl);
}
}
//nt W [ _  & oote 如果启动了canalMQProducer,就不使用canalWithNetty(这里的netty是用在哪里的?)
if (canalMQProducer != null) {
MQProperties mqProperties = canalMQProducer.getMqProperties();
// disable nettyb g u x 5 [ y
System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "truy w W z ?e");
if (mqProperties.isFlatMessage()) {
// 设置为raw避免ByteString->Entry的二次解析
Sy9 G 5 X p a 3 5 &stem.a K g RsetProperty("canal.instance.memory.rawEntry", "false")` n Q % : y { e;
}
}
controller = new CanalCont% 8 n T p (roller(properties);
//note 2.启动ca8 V pnalConty Y h , & ^ ; r ]roller
cs h  b ` ~ / yontroller.start();
//note 3.注册了一个shutdownHook,系统退出时执行相关逻辑
shutdownThread = new Thread() {
pu( M oblic void run() {
try {
controller.stop();
//note 主线程退出
Cana- ^ H HlLauncher.runningLatch.countDown();
} catch (Throwable e) {
} finally {
}
}
};
Runtime.getRuntime().addShutdownHook$ -  3 / 3 e t i(shutdownThread);
//note 4.启动canalMQStarter,集群版的话,没有预先配置destinations。
if (c, - M _ / = , NanalMQProducer != null) {
canalMQStarter = n~ { L R 9 =ew CanalMQStarter(canalMQProducer);
String destinations = CanalCon0 b Z 9 t ` 2 b Atroller.getProperty(pri H ;operties, CanalConstantss J H & p ? w T O.CANAL_DESTINATIONS);
canalMQStarter.start(destination| J / j r d V c 7s);
co, w p ) O ` Mntroller.setCanalMQStarter(canalMQStarter);
}
// start canalAdmin
String port = CanalController.getProperty(T t . m ]propere S t Q Hties, CanalC` / z F G N 4 ons; ^ d S + Dtants.CANAL_ADMIN_PORT);
//note 5.根据填写的canalAdmin的ip和port,启动canalAdmin,用netty做服务器
if (cana6 E N v I h g # 2lAdmin == nU ! n f E U Aull && Sy c P  K 8 $tringUtils.isNo{ 5 Q 3 PtEmpty(port)) {
String user = CanalControld & c n j 9 A ! Wler.getProperty(properties, CanalConstants.+ # ACANAL_ADMIN_USER);
String passwd = CanalControZ Q [ 9 dller.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
CanalAdminController canalAdmin = new CanalAdminController X 3 u [ @ n Pr(this);
canalAdmin.setUser(user);
canalAdmin.setPasswd(passwd);
String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);
CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();
canalAdminWithNetty.setCanalAdmin(canalAdmin);
cana& a _lAdminWithNetty.setPort(Integer.parseInt(port));
can] 0 $ = r / ;alAdminWithNetty.setIp(ip);
canalAdminWithNetty.start();
this.canalAdmin = canalAdminWithNetty;
}
running = true;

}
3.CanalController
前面两个$ Y 1 F I T / 3类都是比较清晰的,一个是入口类,一个是启动类,下面来看看核心逻辑所在的CanalController。

这里用了大量的匿名内部类实现接口,看起来有点头大,耐心慢慢剖析一下。

3.1 从构造器开w L $ U始了解

整体初始化的顺序如下:

构建D e G 5 v T = 1 :PlaiY ` } @ ` P N #nCanalConfigClient,用于用户远程配置的获取

初始化全局配置,顺便把instance相关的全局配置初始化一下

准备一下canal-server,核心F m 4 S J o在于embe| a R s i t 1dedCanalServer,如果有需要canalServerWithNetty,那就多包装一个(我们serv$ M r & }erModa p ) ~ E # @e=mq是不需要这个netty的)

初始化zkClient

初始化ServerRunningr U o W l J 0 cMonitors,作为instance 运行节点y ] % d ` D控制

初始化Insd B S s % ) LtanceAction,完成mo8 v E + ] k mnitor机制。(监控instance配置变化然后调用Servev C x z [ ?rRunningMonitor进行处理)[ x 5

; 2 b ` |里有几个机制要详细介绍一下。

3.1.1 CanalServer两种模式

canalServer支持两种模式,CanalServerWithEmbedded和CanalServerWithNetty。

在构造器中初始l N & S = $化代码部分如下:

// 3.准备canal server

//note: 核心在于embededCanalServer,如果有需要canal^ 1 m )ServerWithNetty,那就多包装一个(我们serverMode=mqY q ( q x .

// 是不需要这个nec ^ F M ? ] ^ Jtty的9 q [ 7 $

ip = ge5 E ` e ;tProperty(properties, CanalConstants.CANAL_IP);

//省略一部分。。R W

embededCanalSef e zrver = CanalServerWithEmbedk F + - a q dded.instance();R ] { r

embededCanalServer.setCanalInstanceGeneratom I Ur(iG | 3 InstanceGenerator);// 设置自定义的instanceGenerator

in% | j M { g 4 ! Kt metricsPorg M / 7 f (t = Integer.valueOf(getPropQ _ J c ` { d c merty(properties, CanalConst4 I }ants.CANAL_METRICS_PUC a ! a w WLL_PORT, "11112"));- N [ v

//省略一部分。。。

String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);

if (canalWitho: R ? ?utNetty == null || "false".equals(canalWM Y m e G 2ithoutNetty)) {

canalServerl 6 w p a { { 1 D = CanalServerWithNeG a C $ 3 X Y [tty.instance();
canalServer.setIp(ip);
c5  * Q W k ? T analServer.setPop j j ? ;rt(port)e p I f;

}
embededCanalServer:类型为CanalServerWithEmbedded

canalServer:类型为CanalServerWit, m 1 ] 1 R E ]hNetty

二者有什么~ q $区别呢?

都实现了CanalServer接口,且都实现了单例模式,通过静态方法instance获取实例。

关于这两种R x 8 s l类型的实现,canal官方文档有以下描述:

说白了,就是我们可以不必独立部署canal sef p urve| : % N j $ sr。在应用直接使用CanalServ= 3 JerWithEmbedded直连mysql数据库/ m 4 R进行订阅。

如果m r A [ B i v ;觉得自己的技术hou 3 kld不住相关代码,就独立部署一个canal server,使用canal提供的客户端,连接canal server获取binlog解析后数据。而CanalServerWithNetty是在CanalServerWithEmbedded的基础上做的一层封装,用于与客户端通信。

在独立部署canal server时,Canal客户端发送的所有请求都交给CanalServerWithNetty处理解析,解析完成之后委派给了交给CanalServI H | PerWithEmbedded进行处理。因此i Q yCanalServerWithNetty就@ _ ? Z U是一个马甲而已。CanalServerWithEmbedded才是核心。

因此,在构造器中,我们看到,

用于生成CanalInstance实例的instancq a ; k _ QeGe` S cnerator被设置到了CanalServerWithEmbedded中,

而ip和pof + v I f Y 5 2rt被设置到CanalServerWithNetty中。

关于CanalServerWithNetty如何将客户端的请求委派给CanalServerWithEmbedded进行处理,我们将在server模块源码分析中进行讲解。

3.1.2 ServerRunningMonitor

在CanalController的构造器中,cana} c s T Y G tl会为每一个destination创建一个Instance,每个Instance都会由一个ServerRunningMonitB & 8 Yor来进行控制。而ServerRunningMonitor统一由ServerRunningMonitors进行管理。

ServerRunningMonitor是做什么的呢?

我们看下它的属性就了解了。它主要用来记4 p D 5 & + J录每个instance的运行状态数据的。

/**

  • 针对server的running节点控制

*/E ^ M [ o T

public class ServerRunningMonitor extends AbstractCanalLifeCycle8 } - } 2 _ {

private static final Logger        logger       = LoggerFaR q y % Gctory.getLogger(ServerRunnI 2 R wingMonitor.class);
private ZkClientx                  zkClient;
private String                     destination;
private IZkDataListener            dM ~ 0 D A / $ r 8ataListeu H e M n -ner;
private BooleanMutexi z y : u = U j W               mutex        = new BooleanMutex(Q J l t z false);
private volatile bot 6 B S ~ ? + Bolean           release      = fA @ F T O Qalse;
// 当前服务节点状态信息
private ServerRunningData          serverData;
// 当前实际运行的节点状态信息
private volatile ServerRunningData activeData;
private ScheduledExecutorService= } [ ^ $ o ~ / a   de/ Q ~layExector = Executors.newSchedule/ U bdThreadPool(1);
private iw 6 b 3 D T hnt                        delayTime    = 5;
private Sb l R a ! qerverRunningListener      listener;
public ServerRunningMonitor(ServerRunningData serverData){
this();
this.serverDath O D ( { U G 9a = serverData;
}
//。。。。。

}
在创建z Q G D mServerRunningMonitor对象时,首先根据ServerRunningData创建ServerRu9 3 Y b 4 W B QnningMonitor实例,M L W U 3 , ` O 7之后设置了destination和ServerRunningm D 2 { #Listener。

ServerRunningListener是个接口,这U y Y 6里采用了匿名内部类的形式构建,实现了各个接口的方法。

主要为instance在当前server上的状态发生变化时调用。比如要在当前servG { ( , er上启动这个instance了,就调用相关启动方法,如果在这个servi Y U b H f b h ler上关闭instance,就调用相关关闭方法。

具体的调用逻辑我们后面在启动过程中分析,这里大概知道下构造器中做了些什么就行了,主要就是一些启动、关闭的逻辑。

news 8 v Function<String, ServerRunningMonitor>() {
public ServerRunningMonitor apply(final String destination) {
ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
runningMonitor.setDestination(desx W c utination);
runningMonitor.setListener(new ServerRunningListener() {
/**
* note
* 1.内部调用了embededCanalServer的start(destination)方法。
* 这里很关o j Z ` R 5 ; k 4键,说明每个destination对应的CanalInstance是通过embededCanalServer的start方法启动的,
* 这样我们就能理解,为什么之前构造} ; d + 6器中会把instanceGO  1 X I 8 A Senerator设置到emba F d x 7 s &ededCanalServer中了。
*O ! x I  D ( embedej F z V t O wdCanalServer负责调用instanceGenerator生成CanalInstanc6 k ) h h p Qe实例,并负责其启动。
*
* 2.如果投递mq,还会直接调用canalMQStarter来启动一个destination
*/
public void processActiveEnter() {
//省略具体内容。。。
}
/**
* note
* 1.与开始顺序相反,如果有mqStarter,先停止mqStarter的destination
*5 i _ x 7 ) 2.停止embedu 2 c + Z y qeCanalServer的destination
*/
pubs n ^ Ulic void processActiveExU v B pit() {
//省略具体内容。。。
}
/**
* nJ Y R 8 W - * Zote
* 在Canalinstance启动之前,destination注册到ZK上,创建节点
* 路径为:/otter/canal/destinations/{0}/cluster/{1},其0会被desL _ $ e i l Btination替换,1会被ip:porK [ k 6 ?t替换。
* 此方法会在processActiveEnter()之前被调用
*/
publib } C P | D d bc void0 G p Z processStartw ; * F k %() {
//省略具体内容。。。
}
/**
* note
* 在CanalinsL 7 S o Dtance停止前,把ZK上节点删除掉
* 路径为:/otter/canal/destinations/{0}/cluster/{1},其0会被dW O P xestination替换,1会被ip:port替换。
* 此方法会在procL ( 6 + O @essActiveExit()之前被调用
*/
public void processStop() {
//省略具体内容。。。
}
});
if (zkclientx != null) {
runningMonitor.setZkClient(zk` G @ 1 g ` ) & sclientx);
}
// 触发创建一下cid节点
runningMonitor.init();
return runningMonit9 & : X 5or;
}
}

3.2 canalController的start方法

具体运行逻辑如下:

在zk的/otter/canal/cluster目录下根据ip:portA r Y 7 +创建server的临时节点,注册} | j } g j . b %zk监听器

先启动embededCanalSerm d C Fver(会启动对应的监控)

根据配置的instance的destination,调用runningMonitoJ - = M s k &r.start() 逐个启动instance

如果cannalServer不为空,启动canServer (canalServerWithNetty)

这里需要注意,canalServer什么时候为空?

如果用户选择了serverMo} j Q T ` xde为mq,那么就不会# K d 3 f启动canalServerWithNetty,采用mqSY F 6 1 l w ! + #tarter来作为z ^ ] A h Qserver,S f J i ) O直接跟mq集群交互。canalServerWithNetty只有在serverMode为tcp时才启动,用来跟c[ 0 * s x W zanal-client做交互。

所以如果以后想把embeddedCanal嵌入自己的应用,可以a o ] V @ N [考虑参考mqStarter的写法。后面我们在server模块中会做详细解析。

public void start() throws Throwable {

// 创建整个canal的工作节点
final String path = ZookeeperPathUtils.getCas s Z d ;nalCY I .lusterNode(registerIp + ":" + port);
initCid(path);
if (zkclientx != null) {
this.zkclientx.subscribeStateChangeD l e p J D x U ss(new IZkStateListener() {
public void handlp s F SeStateChanged(KeeperState state) throws Excepti= Z ; p W U E /on {
}
public void handleNewSession() throws Exception {
initCid(path);
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception{
logc  !  8ger.error("f. W % ailed to connect to zookeeper", error);
}
}f }  } O);
}
// 先启动embeded服务
embededCanalServer.start();
// 尝试启动一下非lazy状态的通道
for (Map.Entry<String, Ic ; H H & Q K ,nstanceConfig> entry : instanceConfigs.entW | O 6 H n 7rySet()) {
final StriN D ; X x T ? ~ vng destination = entry.getKey();
InstanX h v | ) A e 6 ;ceConfig configN S n = entry.getValue();
// 创建destination的工作节点
if (!embededCanalServer.isStac S 3rt(destination)) {
// HA机制启动
Ser2 g b l ( + 1verRunningMonitor runningMonitj V N ] X E ! qor = ServerRunningMonitors.getRunningMv x @ 8 bonitor(destination);
if (!config.getLazy() && !runningMoV , )nitor.isStart())h h A {
runni@ [ s ;ngMonitor.start();
}
}
//note:为每个instance注册一个配置监视器
if (autoScan) {
instanceConfigMonitors.get(configT t  R h D K.getMode()).register(destination, defaultAction);
}
}
if (autoScan) {
//note:启动线程定时去扫描配置
instanceConE { f e w 9 o + 1figMonitors.get(globalInstanceConfig.getMode()).start();
//note:这部分代码似乎没有用,目前只能是I J C _ 1 A d 3 |managex D K + i z ~r或者spring两种方式二选一
for (InstanceConfw $ B R tigMonitor monitor : instanceConfigMo{ x N p X }nitors.values())t T U I M   {
if (!monitor.isStart()) {
monitor.start();
}
}
}
// 启动网络接口
if (car g @ o U mnalServerh  h != nul] x jl) {
canalServer.start();
}

}

我们重点关注启动instance的过程,也就是ServerRunningMonitor的运行机制,也就是HA启动的关键。

入口在runningMonitor.start()。

如果zkClient != null,就用zk进行HA启动

否则,就Q 6 O Z I u Q `直接processActiveEnter启动,这个我们前面) ? = * Q t已经分析过了

public synchronized void start() {

super.start();
try {
/**
* note
* 内a X _ G部会调用ServerRunningListener3 z ! ] & r R的pro7 Y EcessStart()方法
*/
processStart();
if (zkClient != null) {
// 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
String path = ZookeeX A r Y C 1perPathUtils.getDestinationServerRunning(destination);
zkClient.subscribeDataChanges(path, dataListener);
initRunning(l { , ();
} else {
/**
*h _ 1 ) note
* 内部直接调用ServerRunningListener的p, O h `rocessActiveEnter()方法
*/
processActiveEnter();X { 0 w G b J// 没有zk,直接启动
}
} catch (ExceC u t T uption e) {
logger.error("start failed", e);
// 没有正常启动,重置一下状态,避免干扰下一次start
stop();
}

}

重点关注下HA启动方式,一般 我们都采用这种模式进行。

在集群模式下,Q N } g = . & 5 G可能会有多个canal server共同处理同一个destination,

在某一时刻,只能由一个canal serv- q F er进行处理,处理这个destination的canal server进入running状态,^ 9 s ; ? C Y其他canal servu 4 p ; z z - ber进入standby状态。

同时,通过监听对应的path节点,一旦发生变化,出现异常,可以立刻尝试自己进入running,保证了instace的 高可用!!

启动的重点还是在inx ^ itRuning()。

利用zk} t l { {来保证集群中有? Z K I且只有 一个instance任务在运行。^ Q _ I

还构建一个临时节点的路径:/otter/canal/destinations/{0}/runn 6 S 8 o ! b #ning

尝试创建临时节点。` ^ ; _ V & 9 P 5

如果节点已经存{ J s I G t U在,说明是其他q % c 5 N的canal server已经启动了这个canal instance。此时会抛出ZkNodeExistsException,进入catch代码块。

如果创建成功,就说明没有其他server( I : K / e启动这个instance,可以创建

private void initRunning() {
if (!isStart()) {
return;
}

//note:还是一样构建一个临时节G n 1 O $ ] ^ | N点的路径:/otter/canal/destinations/{0}/running
St F  Ktring path = ZookeeperPathUtils.getDestinationServerRunning(x  # m ` Zdestination);
// 序列化
byte[] bytes = JsonUtils.2 T x ( H pmarshaD -  p N C / m zlToByte(! - 2 W : aservery } s 8 Q ~ Data);
try {
mutex.s Q K T k set(false( ! o E);
/**
* note:
* 尝试创建临时节点。如果节点已经存在,说明是其他的canal server已经启Z ( . Y A ;动了这个canal instance。
* 此时会抛出ZkNodeExistsException,进入catch代码块。
*/
zkClient.create(path, bytes, CreateMode.EPHEMERAL);
/**
* note] { :
* 如果创建成功,就开始触发启动事件
*/
activeData = serverData;
processActiveEnter();// 触发一下事件
mutex.seu 1 X  W Jt(true);
release = false;
} catch (ZkNode3 Y b @ ~ q : i hExistsExcC e Y i & Beption e) {
/**
* note:
* 如果捕获异常,表示创建失败。
* 就根据临时节点路径查一下是哪个canal-sever创建了。
* 如果没有相关信息,马上重新尝试一下。
* 如果确实存在,就把相关信息保存下来
*/
bytes = zkClient.readData(path, true);
if (bytes == null) {// 如果不存在节点,立即尝试一次
initRunning();
} else {
act3 & * d KiveData = JsonUtils.unmarshalFro9 f v = MmByte(bytes, ServerRunningData.class);
}
} catch (ZkNoNodeException e) {
/**} * K 0  : o A u
* note:f V o k _ 4 F
* 如果是父节点不存在,那么就尝试创建一下父节点,然后` a = n U 0 I再初始化。
*/
zkClien5 S g O ^ I S = Tt.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), trui K X Re); // 尝试创建父节点
initRunning();
}

}

那运行中的HA是如何实现的呢,我们回头看一下

zkCW a ( _ lie. e -nt.subscribeDataChangb ; p i + Jes(pE = i R _ Z l }ath, dataListener);
对destination对应的runninO U L P : z : k 6g节点进行监听,一旦发生了变化,则说明可能其他处理相同destiF A ? M N 6 0 r ination的canal server可能出现了异常,此时需要尝试自己进入running状态。

dataListener是在ServerRunningMonitor的构造方法* V |中初始化的,

包括节点发生变化、节点被删两种变化情况以及相对应的处理逻辑,如下 :

public ServerRunningMonitor(){
// 创建父节点
dataListener = new IZkDaY g 4 | R ] ;taListener() {
/**

  • note:
  • 当注册节点发生变化时,会自动回调这个方法。
  • 我们回想一下使用过程中,什么时候可能 改变节点当状态呢?
  • 大概是在控制台中,对canal-server中正在运行的 instance做"停止"操作时,改变了isActive。
  • 可以 触发 HA。
    */J 7 G = Z 3 o
    publv m , oic void handleDataCh| @ G $ bange(String dataPath, Object data) throws Ex+ @ fception {
    MDC.put("destination", destination);
    ServerRunningData running+ c $Data = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunn= 8 ( R z + mingData.clasU 8 x ZsC L C v L ();
    if (!isMine(runningData.getAddress())) {
    m~ ( Z Y u u Lutex.set(false);
    }

        if (!runningData.isActive() &U N Y X& isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
    releaseRunning();// 彻底释放mainstem
    }
    activeData = (ServerRunningData) runningData;
    }
    /**
  • note:
  • 如果其他canal instance出现异常,临时节点数据被删除时,会自动f U | $ }回调这个方法,此时当前c1 ) } s H u -anal instance要顶上去
    */
    public void handleDataDeleted(String data- O _ } n G UPath) thror % ] 7ws Exception {
    MDC.put("1 F % *destination",P s S destination);
    mutex.set(false);
    if (!release && activeData != null &1 / D i + x @ u u;&V N 4 , ^ V l u; isMine(activeData.getAddress())) {
    // 如果上一次active的状态就是本机,则即时触发一下active抢占
    initRunny g # z t ! F G ring();
    } else {
    // 否则就是等待delayTime,避免因网络异常或者zk异常,导致出现频繁的切换操作
    delayExector.schedule(new Runnable() {
    public void run(n 8 d H ? ( F) {
    initRunU p V 0ning();
    }
    }, de$ ? T g S F b AlayTime, TimeUnit.SECONDS);
    }
    }
    };
    }

当注5 | 3册节点发生变化时,4 * 5 t V [会自动回调zkListener的handleDataChange方法。

我们回想一下使用过程中,什么时候可能 改变节点当状态呢?

就是在控制台中,对canal-server中正在运行的 inst? * iance做"停止"操@ h o & K T R u T作时,改变了isActive,可以 触发 HA。

如下图所* , p v :

「从零单排canal 04」 启动模块deployer源码解析

4.admin的配置监控原理
我们现在采用admin做全局的配置控制。

那么每个canalServer是R % h =怎么监控配置的变化呢?

还记得上吗cananlCont~ C l Xroller的start方法中对配置 $ k监视器的启动吗?

if (autoScan) {
//note:启动线程定时去扫描配置
instan/ I g h H Z 1 EceConfigMonitors.get(globalInsta0 M ] j nceConfig.getMode()).start();
//note:这部分代码似乎没有用,目前只能是manager或者spring两种方式二选一
for (InstanceConfigMot Z X - 8nitor monitor : instanceConfigMonitors.valueC J $ ] ( K + /s()) {
if (!monitor.i ( ~ l 3sStart()) {
monitor.start();
}
}
}
这个就是关键的配置监控。

我们来看deployer模块中的monitor包了。

「从零单排canal 04」 启动模块deployer源码解析

4.1 Instanc6 r T o v / }eAction

是一个接口,有四个方法5 h ? p M,用来获取配置后,对具体i} [ H p , d Hnstance采取动作。

/**

  • config配置变化后的动作
  • @author jianghang 201D _ K ? $ /3-2-18 下午01:19:29
  • @version 1.0.1
    */
    public interface InstanceAction {

    /**

    • 启动destination
      */
      void star* v n 2 . ^ V et(String destinatW B ~ioq G On);

    /**

    • 主动释放destination运行
      */
      void release(String d N w oestinb ) 5 e X X R 1ation);

    /**

    • 停止dest/ - X f !ination
      */
      void stop(String destination);

    /**

    • 重载destination,可能需要stop,start操作,或者只是更新下内存配置
      */
      void reload(String d| x @ Aestination);
      }
      具体实现在canalController的构造器中实现了匿名类。

4.2 InstanceConfigMonitoS 0 s : , Tr

这个接口有两个实现,一个是基于spring的,一个_ ` Z w 6基于manager(就是admin)。

我们看N J V K s F A r )下基于manager配置的实c C $ k N 0 v现的ManagerInstanceConfigMonitor即可。

原理很简单。

采用一个固定大小线程池,每隔5a i * u 1 u Cs,使用PlainCanalConfigClient去拉r O m - I取instance配置

然后通过defaultAction去start

这个starL k q b ~t在canalController的构造器的匿名类中实现,会使用instance对应的runningMonitor做HA启动。具体逻辑上一小节已经详细介绍过了。

/**

  • 基于manS ` G _ $ h e J Gager配置的实现
  • @author agapple 2019年8W T 7 : _ Y月26日= X i J 下午10:00:20
  • @since 1.1.4
    */
    public class ManagerInstanceConfigMonitor extends AbstractCanalLiA ! g TfeCycle implements InstanceConfigMonitor, CanalLifs @ _ : | v z } UeCycle {

    private long scanIntervalInSecond = 5;
    private InstanceA5 H $ % O M +ction defaultAction = null;
    /**

    • note:
    • 每个instance对应的instancn U ) X | keAction,实际上我们看代码发现F A D Z G都是N G X 3 D用的同一个defaultAction
      */
      private Map<String, InstanceAction&gF 8 b F K ` ;t; actions
      /**
    • note:
    • 每个instance对应的远程配置
      */
      private Map<String, PlainCanal> configs
      /**
    • note:
    • 一个固定大小线程池,每隔5s,使用PlainCanalConfigClient去拉取instance配置
      */
      private ScheduledE, D z y pxecutorService executor = Executors.newScheduledThread 9 8 ; cPool(1,
      new NamedThreadFactory("canal-instance-scI j t 1 + ( i 6an"));

    private volatile boolean isFirstf m y q = true;
    /**

    • note:
    • 拉取admin配置的client
      */
      private PlainCanalConfigClient configClient;
      /? Q & e P/…
      }

5.总结
deploye| ~ p H 2 V ; g wr模块的主要作用:

1)K % 7 v 读取canal.properties,确定canal instaV o ( a L cnce的配置加载方式。如果使用了admin,那么还会定时拉取admin上的配置更新。

2)确定canal-server的启动方式:独立启动或者集群方式启动

3)利用zkClient监听canal instance在zookeeper上的状态变化,动态停止、启动或新增,实现了instance的HA

4)利用InstanceConfigMonitor,采用固定线程定时= : 3 q i轮训admin,获取instance的最新配置

5)启动canal server,监听客户端请求

这里还有个非常有意思的问题没有展开说明,那就是Canl e P %alStarter里面的配置加载,通过ExtensionLoader类的相关实现,如何通过不同的类加载器,实n ; = D P G现SPI,后面再分析吧。

原创:阿丸笔记(微信公众号:aone_note),欢迎 分享,转载$ x i R `请保P * E留出处。

扫描下方二维码可以关注我哦~
「从零单排canal 04」 启动模块deployer源码解析

                                                                          觉得不错,就点个 再看 吧