Elastic-Job源码解读

写在前面

现在公司运用的作业调度东西是Elastic-Job,版别2.1.5,三月份由于失效搬运装备出过一次线上事端,排查问题的过程中大略的读了一下源码,刚好借此机会深化了解一下Elastic-Job

全体架构

Elastic-Job源码解读
注:图片来自https://github.com/elasticjob/elastic-job-lite

概述

Elastic-Job是一个分布式调度解决方案,由两个彼此独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
Elastic-Job-Lite定位为轻量级无中心化解决方案,运用jar包的办法供给分布式使命的和谐服务。
Elastic-Job-Cloud运用Mesos + Docker的解决方案,额定供给资源办理、运用分发以及进程阻隔等服务。(本文不评论)

Elastic-Job中心组件:quartzZookeeper

  • quartz的人物是调度每台机器上的使命(即每台机器上的分片使命何时履行)
  • Zookeeper则是分布式调度中心

功用

Elastic-Job-Lite
分布式调度和谐
弹性扩容缩容
失效搬运
错失履行作业重触发
作业分片共同性
确保同一分片在分布式环境中仅一个履行实例
自确诊并修正分布式不稳定形成的问题
支撑并行调度
支撑作业生命周期操作
丰厚的作业类型
Spring整合以及命名空间供给
运维渠道

源码解读

使命初始化

JobScheduler

public class JobScheduler {
/**
* 两个固定的key值,存于Quartz的JobDetail#JobDataMap中
*/
public static final String ELASTIC_JOB_DATA_MAP_KEY = "elasticJob";
private static final String JOB_FACADE_DATA_MAP_KEY = "jobFacade";
...
private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
// 添加作业实例
JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
// 作业装备
this.liteJobConfig = liteJobConfig;
// 注册中心
this.regCenter = regCenter;
List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
// 供给了一个分布式使命开端或完结时的前置后置扩展点,这儿用户能够用来履行一些使命敞开以及完结时的特定逻辑
setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
// 使命门面,一个使命一个jobFacade,封装了使命敞开、使命失效搬运、错失再履行、使命事情等
jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
}
private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistryCenter regCenter, final List<ElasticJobListener> elasticJobListeners) {
GuaranteeService guaranteeService = new GuaranteeService(regCenter, liteJobConfig.getJobName());
for (ElasticJobListener each : elasticJobListeners) {
if (each instanceof AbstractDistributeOnceElasticJobListener) {
((AbstractDistributeOnceElasticJobListener) each).setGuaranteeService(guaranteeService);
}
}
}
/**
* 初始化作业.
*/
public void init() {
// 更新作业装备到ZK
LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
// 设置分片数
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
// 创立作业调度控制器
JobScheduleController jobScheduleController = new JobScheduleController(
createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
// 本地将job与jobScheduleController和注册中心相关起来,一起zk注册中心创立以jobName命名的节点
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
// 注册作业发动信息
schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
// 调度使命
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}
/**
* 装备Quartz
* 这儿表现的是Elastic Job和Quartz的交融,Quartz使命调度的详细句子是scheduler.scheduleJob(jobDetail, createTrigger(cron)),
* 当使命设守时刻到了之后,Quartz会去履行org.quartz.Job#execute(org.quartz.JobExecutionContext)办法,Elastic Job对应的Job完结是LiteJob
*
*/
private JobDetail createJobDetail(final String jobClass) {
// Quartz描绘调度使命的接口
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
// 留意这个当地的jobFacade和LiteJob中jobFacade是共同的
result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
if (elasticJobInstance.isPresent()) {
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
} else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
try {
// 留意这个当地的elasticJob和LiteJob中elasticJob是共同的,用于使命调度时判别使命的类型
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
} catch (final ReflectiveOperationException ex) {
throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
}
}
return result;
}
...

使命触发

作业实例内使命触发是经过Quartz来完结的,依照cron设定的时刻守时触发,Quartz触发Job的完结LiteJob#execute办法

LiteJob

public final class LiteJob implements Job {
/**
* 留意JobScheduler中的两个静态特点,JobScheduler初始化时将这两个字段存于Quartz的JobDetail#JobDataMap中,
* Quartz在初始化LiteJob时,会从JobDetail的JobDataMap中取到这两个值,详细可见org.quartz.simpl.PropertySettingJobFactory#newJob(org.quartz.spi.TriggerFiredBundle, org.quartz.Scheduler)
* public static final String ELASTIC_JOB_DATA_MAP_KEY = "elasticJob";
* private static final String JOB_FACADE_DATA_MAP_KEY = "jobFacade";
*/
@Setter
private ElasticJob elasticJob;
@Setter
private JobFacade jobFacade;
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
// Quartz使命调度的进口
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
}
}

AbstractElasticJobExecutor

public abstract class AbstractElasticJobExecutor {
...省掉代码
/**
* 履行作业.
*/
public final void execute() {
try {
jobFacade.checkJobExecutionEnvironment();
} catch (final JobExecutionEnvironmentException cause) {
jobExceptionHandler.handleException(jobName, cause);
}
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), JobStatusTraceEvent.State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName));
}
// 假如使命正在履行中,将分配给当时作业实例的分片都记载为misfire
if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), JobStatusTraceEvent.State.TASK_FINISHED, String.format(
"Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName,
shardingContexts.getShardingItemParameters().keySet()));
}
return;
}
try {
// 作业前置扩展
jobFacade.beforeJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobExceptionHandler.handleException(jobName, cause);
}
// 1、履行使命
execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
// 2、错失再履行
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
}
// 3、失效搬运
jobFacade.failoverIfNecessary();
try {
// 作业后置扩展
jobFacade.afterJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobExceptionHandler.handleException(jobName, cause);
}
}
private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
if (shardingContexts.getShardingItemParameters().isEmpty()) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), JobStatusTraceEvent.State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
}
return;
}
// 记载使命发动
jobFacade.registerJobBegin(shardingContexts);
String taskId = shardingContexts.getTaskId();
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, JobStatusTraceEvent.State.TASK_RUNNING, "");
}
try {
// 履行作业
process(shardingContexts, executionSource);
} finally {
// TODO 考虑添加作业失利的状况,而且考虑怎么处理作业失利的全体回路
jobFacade.registerJobCompleted(shardingContexts);
if (itemErrorMessages.isEmpty()) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, JobStatusTraceEvent.State.TASK_FINISHED, "");
}
} else {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, JobStatusTraceEvent.State.TASK_ERROR, itemErrorMessages.toString());
}
}
}
}
private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
// 单分片,当时作业实例只要一个分片
if (1 == items.size()) {
int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
JobExecutionEvent jobExecutionEvent =  new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
process(shardingContexts, item, jobExecutionEvent);
return;
}
final CountDownLatch latch = new CountDownLatch(items.size());
// 多分片状况,一个分片一个线程
for (final int each : items) {
final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
if (executorService.isShutdown()) {
return;
}
executorService.submit(new Runnable() {
@Override
public void run() {
try {
process(shardingContexts, each, jobExecutionEvent);
} finally {
latch.countDown();
}
}
});
}
try {
// 和谐多分片同步完结
latch.await();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobExecutionEvent(startEvent);
}
log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
JobExecutionEvent completeEvent;
try {
// 单分片使命处理,实际会转到用户自定义的履行内容
process(new ShardingContext(shardingContexts, item));
completeEvent = startEvent.executionSuccess();
log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobExecutionEvent(completeEvent);
}
// CHECKSTYLE:OFF
} catch (final Throwable cause) {
// CHECKSTYLE:ON
completeEvent = startEvent.executionFailure(cause);
jobFacade.postJobExecutionEvent(completeEvent);
itemErrorMessages.put(item, ExceptionUtil.transform(cause));
jobExceptionHandler.handleException(jobName, cause);
}
}
protected abstract void process(ShardingContext shardingContext);
}

SimpleJobExecutor

这儿仅以SimpleJobExecutor为例,AbstractElasticJobExecutor还有两个完结ScriptJobExecutor、DataflowJobExecutor,内容相同不赘述。

public final class SimpleJobExecutor extends AbstractElasticJobExecutor {
private final SimpleJob simpleJob;
public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
super(jobFacade);
this.simpleJob = simpleJob;
}
@Override
protected void process(final ShardingContext shardingContext) {
// 用户自定义的使命履行内容
simpleJob.execute(shardingContext);
}
}

分片战略

/**
* 根据平均分配算法的分片战略.
*
* <p>
* 假如分片不能整除, 则不能整除的剩余分片将顺次追加到序号小的服务器.
* 如:
* 1. 假如有3台服务器, 分红9片, 则每台服务器分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8].
* 2. 假如有3台服务器, 分红8片, 则每台服务器分到的分片是: 1=[0,1,6], 2=[2,3,7], 3=[4,5].
* 3. 假如有3台服务器, 分红10片, 则每台服务器分到的分片是: 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8].
* </p>
*
* @author zhangliang
*/
public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy {
@Override
public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
if (jobInstances.isEmpty()) {
return Collections.emptyMap();
}
// 整除的部分,每台机器平均分配
Map<JobInstance, List<Integer>> result = shardingAliquot(jobInstances, shardingTotalCount);
// 不能整除的部分,从第一台机器开端,一台一个,直到分完停止
addAliquant(jobInstances, shardingTotalCount, result);
return result;
}
private Map<JobInstance, List<Integer>> shardingAliquot(final List<JobInstance> shardingUnits, final int shardingTotalCount) {
Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingTotalCount, 1);
int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
int count = 0;
for (JobInstance each : shardingUnits) {
List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
shardingItems.add(i);
}
result.put(each, shardingItems);
count++;
}
return result;
}
private void addAliquant(final List<JobInstance> shardingUnits, final int shardingTotalCount, final Map<JobInstance, List<Integer>> shardingResults) {
int aliquant = shardingTotalCount % shardingUnits.size();
int count = 0;
// 从第一个开端,分配不均的别离加到各台机器上,直到分完停止
for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
if (count < aliquant) {
entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count);
}
count++;
}
}
}

分布式

Elastic Job运用zk作为注册中心,作业实例信息、分片信息、装备信息、作业运转状况等均已节点办法存于zk。Elastic Job是经过zk的节点改变事情完结分布式使命协同,节点新增、改变、移除等事情会实时同步给分布式环境中的每个作业实例,Elastic Job供给了多种监听器来处理这些事情,监听器父类AbstractJobListener以及TreeCacheListener
Elastic-Job源码解读

TreeCacheListener

zk供给的节点改变监听接口

/**
* Listener for {@link TreeCache} changes
*/
public interface TreeCacheListener
{
/**
* 监听zk事情改变
* Called when a change has occurred
*
* @param client the client
* @param event  describes the change
* @throws Exception errors
*/
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception;
}

AbstractJobListener

Elastic Job封装的作业监听器

public abstract class AbstractJobListener implements TreeCacheListener {
@Override
public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData) {
return;
}
String path = childData.getPath();
if (path.isEmpty()) {
return;
}
dataChanged(path, event.getType(), null == childData.getData() ? "" : new String(childData.getData(), Charsets.UTF_8));
}
// 笼统办法,子类监听器按需完结
protected abstract void dataChanged(final String path, final Type eventType, final String data);
}

JobCrashedJobListener

失效搬运监听器

class JobCrashedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
// 1失效搬运敞开、2注册中心事情-节点移除,也便是一台服务器下线、3是instance途径,即jobName/instances途径
if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
// path,jobName/instances/ip-@-@pid
// jobInstanceId是这个姿态的ip-@-@pid
String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
// 假如jobInstanceId和当时机器共同,直接越过
if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
return;
}
// 获取失效搬运的分片,对应zk目录jobName/sharding/分片号/failover,失效搬运分片对应的实例id
List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
if (!failoverItems.isEmpty()) {
// 假如有jobInstanceId的失效搬运分片
for (int each : failoverItems) {
// 把分片存放到目录leader/failover/items
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
} else {
// 获取假如jobInstanceId没有失效搬运分片对应的分片,然后存放到目录leader/failover/items/分片号,履行分片分片失效搬运
// 从这儿看只要是服务器宕机就一定要履行时效搬运逻辑了,其实也不是,
// shardingService.getShardingItems(jobInstanceId)会判别服务器是否还可用,不可用的话回来的分片调集便是空的
// 可是,针对dump对内存导致的服务器时间短的不可用,则有或许呈现过错,咱们的使命反常发动就呈现这儿
for (int each : shardingService.getShardingItems(jobInstanceId)) {
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
}
}
}
}

FailoverSettingsChangedJobListener

失效搬运装备改变监听器,从控制台封闭失效搬运时的处理逻辑,假如是敞开的话本地无需处理

class FailoverSettingsChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (configNode.isConfigPath(path) && Type.NODE_UPDATED == eventType && !LiteJobConfigurationGsonFactory.fromJson(data).isFailover()) {
failoverService.removeFailoverInfo();
}
}
}

其他

JobRegistry 使命办理,一个JVM一个单例,记载使命和注册中心对应关怀、使命状况、使命实例
SchedulerFacade 使命调度门面类,一个使命对应一个
JobNodeStorage 作业节点拜访

ShardingNode zk节点称号构建规矩
JobNodePath 作业节点构建

总结

文章以使命初始化、使命触发、分片战略、分布式为切入点叙述Elastic Job的源码,一方面自己总结记载、另一方面期望能够协助到其他的开发者快读了解Elastic Job作业原理。