时序数据库 DolphinDB 线程简介

本文基于 DolphinDB server 最新版 2.00.X,从任务管理、存储引擎、流数据、集群管理、高可用几个方面,为大家简单介绍 DolphinDB 在运行中可能使用到的各种线程,及其相关的配置项和函数,以便用户掌握 DolphinDB 的线程运行情况。

1.任务管理

工作线程(Worker)接收客户端请求,将任务分解为多个小任务,根据任务的粒度自己执行或者发送给执行线程 localExecutor 或 remoteExecutor 执行。

Worker

常规交互作业的工作线程。每个节点都存在 Worker 线程,可以分为以下几类。

  • ZeroWorker, FirstWorker, SecondWorker, ThirdWorker, ForthWorker

客户端提交至节点的作业为0级,由 ZeroWorker 处理。根据作业所涉及到的分区,ZeroWorker 将其分解为多个子任务。其中本地节点上的子任务由 ZeroWorker 与 localExecutor 并行执行;需要由远程节点执行的子任务则降低为1级,并通过 remoteExecutor 发送到对应节点上的 FirstWorker 处理。

以此类推,若某个级别的子任务需要进一步拆解,则拆分出来的由远程节点执行的子任务降低一级,发送至远程节点上对应层级的 Worker 处理。

ZeroWorker 和 FirstWorker 的数量由 workerNum 决定,默认值为机器上的 CPU 核数,最大值不超过 license 中的最大 核数。其余层级的 Work 数量为上级的0.75倍,最小个数为1。

  • UrgentWorker

处理紧急任务,只接收一些特殊的系统级任务,譬如登录,取消作业等。由 urgentWorkerNum 配置,默认值为1,最大值为 CPU 内核数。

  • WebWorker

处理 HTTP 请求,由 webWorkerNum 配置。默认为1,最大值为 CPU 内核数。

  • InfraWorker

开启高可用后,用于接收 raft 心跳汇报的线程,防止集群负载大时,心跳信息无法及时汇报。默认有2个该线程。

LocalExecutor

本地执行线程。Worker 拆解完任务后生成的本地子任务队列,由同一节点下的 localExecutor 执行。所有 Worker共享本地执行线程,每个 localExecutor 一次只能处理一个子任务。通过​​localExecutors​​配置线程个数,默认为机器上的 CPU 核数减1。

RemoteExecutor

将远程任务发送到远程节点的线程,在非 single 模式的节点上可以通过​​remoteExecutors​​配置线程个数。默认值为集群中节点个数和本地 Worker 的较小值。

AsynchronousRemoteExecutor

接收对其他节点发起的远程调用(Remote Procedure Call, RPC)任务的线程,并将收到的远程调用放到 RemoteExecutor 的任务队列中。每个非 single 模式的节点上有且仅有一个该线程。

RemoteTaskDispatcher

在远程调用出错需要重试时,或者一个被关闭的连接上仍有未完成的任务时,这些任务会先放到一个队列里,由 RemoteTaskDispatcher 从这个队列取任务并重新交由 AsynchronousRemoteExecutor 去发起远程调用。

DynamicWorkerManager 和 DynamicWorker

DynamicWorker 是动态工作线程,作为 Worker 的补充。DynamicWorkerManager 是创建 DynamicWorker 的线程,每个节点有且仅有一个该线程。如果所有的工作线程被占满,有新任务到来时,通过该线程创建 DynamicWorker 来执行新任务。根据系统并发任务的繁忙程度,总共可以创建三组动态工作线程,每一个级别可以创建 maxDynamicWorker 个动态工作线程。

动态工作线程在任务执行完后若闲置60秒则会被系统自动回收,不再占用系统资源。maxDynamicWorker 的默认值为 workerNum。

DynamicExecutorManager 和 DynamicExecutor

与 DynamicWorkerManager 和 DynamicWorker 类似,DynamicExecutor 是由 DynamicWorkerManager 动态创建的执行线程。DynamicExecutor 线程数量上限由​​maxDynamicLocalExecutor​​决定,默认值为 localExecutors。DynamicWorkerManager 最多可以创建3组动态执行线程,每组线程最多为 maxDynamicLocalExecutor 个。

DynamicExecutor 在闲置60秒后被自动回收。

BlockIOWorker

执行对硬盘读写任务的线程。通过​​diskIOConcurrencyLevel​​控制线程数量,默认值为1。

BatchJobWorker

执行批处理作业任务的工作线程。其上限通过​​maxBatchJobWorker​​设置,默认值是 workerNum。该线程在任务执行完后若闲置60秒会被系统自动回收,不再占用系统资源。

2.存储引擎

存储相关的线程在 server 启动过程中创建。其主要负责数据的写入与落盘,并在各种情况下(如节点宕机,磁盘损坏导致数据损坏)维护各节点间数据的完整与一致性。

2.1 预写日志

在数据节点上,OLAP 和 TSDB 都有两个写预写日志(WAL)的线程 RedoLogHeadWriter 和 RedoLogDataWriter,分别负责写 WAL 的元数据和数据。

存储 OLAP 和 TSDB WAL 日志的目录分别由​​redoLogDir​​,​​TSDBRedoLogDir​​配置。

  • RedoLogHeadWriter
    事务发生时通过 RedoLogHeadWriter 同步写事务元数据信息到 redoLog 目录下的 header.log 中。
  • RedoLogDataWriter
    事务的数据会通过 RedoLogDataWriter 异步写入 redoLog 目录下的 tid.log 中。

2.2 OLAP 引擎

在开启了 OLAP 的 cacheEngine 后,会创建一个 ChunkCacheEngineGCWorker 线程。

  • ChunkCacheEngineGCWorker
    负责将 cacheEngine 中的数据写入磁盘的线程。用于清理 cacheEngine,并将磁盘的随机写变成顺序写的线程。BackgroundRoutineService 每隔60秒,或者写入时 cacheEngine 的占用量超过 OLAPCacheEngineSize 的30%,就会触发 ChunkCacheEngineGCWorker 将 cacheEngine 中的数据写入磁盘。cacheEngine 中一张表的一个分区数据称为一个 tabletCache,ChunkCacheEngineGCWorker 写入磁盘时会根据每个 tabletCache 的大小和在 cacheEngine 中存在时间决定写入磁盘优先级,tabletCache 的大小越大、存在时间越久写入磁盘优先级越高。

2.3 TSDB 引擎(1.30.X 版本 server 无此类线程)

除了将数据写入磁盘外,TSDB 引擎的线程还要负责 cacheEngine 中数据的排序与合并,并维护磁盘上的 levelFile,以提高读写性能。这些线程也仅在数据节点上存在。

  • TableAsyncSortRunner异步地对 TSDB cacheEngine 中的表进行排序的线程。TSDB 在写入 cacheEngine 时,如果 cacheEngine 中的表太大,会影响查询性能,因此需要进行排序。但若同步进行排序,会影响写入性能。所以 DolphinDB 提供此线程异步地对表进行排序。可以通过​​TSDBAsyncSortingWorkerNum​​来控制排序线程的数量,默认值为1。也可以通过函数​​disableTSDBAsyncSorting​​和​​enableTSDBAsyncSorting​​,来手动开启和关闭异步排序功能。
  • CacheDumpTaskDispatcher
    分配 cacheEngine 写入磁盘任务的线程。线程数量固定为1。 当 cacheEngine 的内存占用大于 TSDBCacheEngineSize 时,系统会对 cacheEngine 做一次快照,并将快照送到 CacheDumpTaskDispatcher 线程准备写入磁盘。CacheDumpTaskDispatcher 线程将任务分配给 ChunkCacheDumpRunner 线程,由该线程写入磁盘。若磁盘上存在需要合并的 levelFile,则交由 MergeRunner 线程进行合并。
  • ChunkCacheDumpRunner
    将 cacheEngine 中的数据写入磁盘的线程。线程的个数等于​​volumes​​的配置值。
  • MergeRunner
    对磁盘上 levelFile 进行合并的线程,线程的个数等于​​volumes​​的配置值。
  • DelLevelFileRunner
    检查并删除无效的 levelFile(即已经被合并的较小 size 的文件)的线程。线程数量固定为1。每隔30秒会自动执行一次。

2.4 数据恢复

数据恢复(recovery)相关线程负责节点宕机,或者数据损坏时,数据副本间的数据恢复。

  • RecoveryReportingService
    在数据节点上,任何一个 chunk 发生数据错误或者版本号不一致都通过该线程来向控制节点汇报。每个数据节点有且仅有一个该线程。
  • RecoveryWorker
    发生 recovery 时,数据恢复的源节点将数据发送给目标节点的线程。该线程仅存在于数据节点,个数可由​​recoveryWorkers​​配置,默认值为1。可以通过​​resetRecoveryWorkerNum​​函数动态修改线程个数,通过​​getRecoveryWorkerNum​​函数获取实际 RecoveryWorker 线程数量。
  • RecoverMetaLogWriter 和 RecoverRedoLogDataWriter
    在线恢复(onlineRecovery)过程中,为了避免节点宕机或离线影响恢复过程,会分别通过 RecoverMetaLogWriter 和 RecoverRedoLogDataWriter 写 recover redoLog 的元数据(Metadata)和数据(data)。与 redoLog 不同的是,recover redoLog 的 Metadata 和 data 需要进行写磁盘时才能开始 recovery。通过​​enableDfsRecoverRedo​​配置是否开启 recover redoLog,默认是开启。开启后,在每个数据节点上存在一个相应的线程。recover redoLog 的文件目录也可以通过​​recoverLogDir​​配置,默认在节点根目录下的​​log/recoverLog​​中。
  • DFSChunkRecoveryWorker
    在控制节点上处理 recovery 任务的线程。同时进行 recovery 任务的数量默认为集群中数据节点个数的2倍,可由​​dfsRecoveryConcurrency​​配置。

2.5 事务相关

如果集群中的某个节点在处理事务的过程中宕机了,那么重启后仅依靠该节点有可能无法确定事务的最终状态,需要在集群中进行事务决议来确定。

  • UnresolvedTransactionReporter
    在数据节点启动时,如果数据节点自己不能判断某些事务的状态,通过该线程来向控制节点汇报并发起事务决议,判断事务最终处于回滚还是完成状态。该线程只有一个,且在所有需要决议的事务决议后结束。
  • DFSTransactionResolutionWorker
    该线程处理由数据节点发起的事务决议、控制节点启动时回放元数据后无法决定状态的事务或运行时超时未更新状态的事务。在控制节点上存在一个该线程。
  • ChunkNodeLogWriter
    数据节点写元数据的线程。元数据默认在各个数据节点根目录下的​​storage/CHUNK_METADATA​​中,可以通过配置项​​chunkMetaDir​​修改。在数据节点上存在一个该线程。
  • EditLogBatchWriter
    控制节点写元数据的线程。由于对控制节点上元数据的修改比较频繁,所以由该线程统一将写入缓冲区的数据写入磁盘并同步,同时还对写元数据失败的情况进行回滚处理。在控制节点上存在一个该线程。

2.6 其他

  • SnapshotEngineWorker
    为减少开启快照引擎对写入的影响,而将分布式表数据异步写入快照引擎的线程。在数据节点上存在一个该线程。可以通过函数​​registerSnapshotEngine​​和​​unregisterSnapshotEngine​​对一个分布式表注册和取消注册快照引擎。
  • DFSChunkRebalanceWorker
    节点间平衡数据或者多块磁盘间平衡数据的任务,均交由控制节点上的 DFSChunkRebalanceWorker 线程处理。在控制节点上存在一个该线程。同时发起的数据平衡任务数量默认为集群中数据节点个数的两倍。可由​​dfsRebalanceConcurrency​​配置。通过函数​​rebalanceChunksWithinDataNode​​和​​rebalanceChunksAmongDataNodes​​手动触发节点或磁盘间的数据平衡。

3.流数据

本节通过发布订阅、计算引擎和高可用三个模块介绍流数据相关线程。这些线程都仅在数据节点或单节点上存在。

3.1 发布订阅

以下为数据节点上普通流表的订阅发布流程中涉及到的线程。

  • MessageThrottle
    实现流数据订阅 throttle 参数功能的线程,数量为1。系统每隔一段时间检查当前节点上是否存在经过 throttle 时间但仍未达到 batchSize 的订阅(​​subscribeTable​​函数中指定了 batchSize 和 throttle )。如果存在,则触发一次订阅的消息处理。通过​​subThrottle​​配置触发检查的间隔时间,默认值为1000,单位为毫秒。
  • AsynchronousPublisher
    在 AsynchronousPublisher 线程中检查每个发布节点对每个订阅节点建立的连接。如果这个连接对应的发布队列有更新,就将更新的数据发布到订阅端。通过​​maxPubConnections​​配置发布节点连接的订阅节点数量上限,默认值为0,表示不可以作为发布节点,即不会创建 AsynchronousPublisher 线程,大于0时会创建一个该线程。
  • AsynchronousSubscriber
    监听所有的订阅连接,接收、解析连接上收到的数据,并发送到相应的订阅消息队列。配置了subPort之后,会创建一个该线程。 通过​​maxSubConnections​​配置一个订阅节点可以连接的发布节点数量上限,默认值为64。
  • LocalSubscriberImp
    在 LocalSubscriberImp 线程中直接检查有数据更新的本地订阅,并将符合条件的本地订阅中的数据发送到订阅消息队列中。配置 subPort 后,会创建一个该线程。
  • StreamExecutor
    StreamExecutor 线程从订阅消息队列中取出数据,写入相应订阅的 handler 中,同时维护订阅的偏移量、消息总数等信息。每个订阅消息队列对应一个 StreamExecutor 线程,数量由配置项 subExecutors 决定,默认值为1,最大不超过 CPU 核数。
  • PersistenceWorker
    以异步方式持久化的流表会通过 PersistenceWorker 线程将数据写到磁盘上。​​persistenceWorkerNum​​控制持久化线程的数量,默认为1。由​​persistenceDir​​配置开启持久化的流表的保存路径。
  • AsynchronousReconnector
    针对所有设置参数 reconnect=true 的订阅,系统会在非正常中断后通过该线程尝试自动重连。在配置了​​subPort​​之后,会创建一个该线程。

3.2 计算引擎

创建计算引擎时,若配置了如下参数,便会创建两个线程:CheckTimeExecutor 和 SystemTimeExecutor。

  • CheckTimeExecutor
    包括 TimeSeriesCheckTimeExecutor, SessionCheckTimeExecutor, CSEngineCheckTimeExecutor, AsofJoinCheckTimeExecutor 和 LookupJoinCheckTimeExecutor。
    在创建 TimeSeriesEngine 时设置了 updateTime、创建 SessionWindowEngine 时设置了 forceTriggerTime、创建 CrossSectionalEngine 时设置了 triggeringPattern=“interval”、创建 AsofJoinEngine 时设置了 delayedTime、创建 LookupJoinEngine 时设置了 checkTimes,那么每个引擎就会创建一个 CheckTimeExecutor 线程,表示如果经过了参数设置的时间还未触发计算,则强制触发一次引擎的计算。
  • SystemTimeExecutor
    包括 TimeSeriesSystemTimeExecutor, SessionSystemTimeExecutor, CrossSectionalEngineExecutor 和 WindowJoinSystemTimeExecutor。
    在创建 TimeSeriesEngine, SessionWindowEngine, CrossSectionalEngine 和 WindowJoinEngine 时,如果设置了 useSystemTime=true,那么每个引擎就会创建一个 SystemTimeExecutor 线程,表示每隔固定的时间触发一次引擎的计算。

3.3 流数据高可用

配置项​​streamingRaftGroups​​中每个 group 都会在 group 内的节点上生成下述的三个线程。

  • StreamingDataFileWriter
    在 raft 的 leader 节点上向流表写数据时,要通过该线程应用 leader 上写数据的 entryLog,向流表写数据。
  • StreamingRaftReplayWorker
    当一个节点成为某个 group 的 leader 时,就会通过该线程回放此 group 的 raftLog。
  • StreamingHA::CkptWorker
    为节点上的 raftLog 做 checkpoint 以回收垃圾的线程。垃圾回收的间隔可由​​streamingHAPurgeInterval​​设置,默认值为300,单位是秒。

4.集群管理

在集群中控制节点通过心跳监控其他节点的存活状态。

  • HeartBeatSender
    控制数据节点或计算节点向控制节点每隔0.5秒发送一次心跳的线程。心跳信息中同时还会汇报节点当前的一些信息(如 CPU、内存、磁盘占用)给控制节点。在数据节点或计算节点上存在一个该线程。 通过 lanCluster 控制心跳采用 udp 或 tcp 协议,当为 true 时使用 udp,false 时使用 tcp,默认值为true。
  • HeartBeatReceiver
    仅当 lanCluster=true 时,在控制节点和数据节点、计算节点上存在的接收 udp 心跳的线程。
  • HeartBeatMonitor
    仅在控制节点存在的线程。每隔一秒检查一次是否收到集群中数据节点或计算节点的心跳信息。如果一个节点连续3次检查都没有心跳,就认为这个节点已经宕机了。 如果数据节点配置了 datanodeRestartInterval(值大于0),那么当节点宕机时间超过设置值,就会通过 agent 重启该数据节点。该配置项默认值为0。
  • ServiceMgmtCenter
    仅在控制节点存在的线程。当一个代理节点重新上线时,通过该线程将公钥信息保存到代理节点上。当一个数据节点重新上线时,会让其汇报节点的所有 chunk 信息,并且在数据节点上删除控制节点上不存在的chunk。

5.控制节点高可用

本节简述开启控制节点高可用之后,raft 相关的线程。对于每种线程,在 raft group 内的每个控制节点上都有且仅有一个。

  • RaftTimer
    负责计时(心跳发送间隔和发起选举时间)的线程。leader 通过该线程每隔一段时间向 follower 发送心跳信息,follower 如果一段时间没有收到 leader 的心跳,将发起选举。 通过​​raftElectionTick​​可以设置在 [raftElectionTick, 2*raftElectionTick] 之间的一个随机时间后未收到 leader 的心跳将发起选举,默认值为800,单位是10ms。
  • RaftInputWorker
    从输入消息队列取出消息应用到当前节点的线程。
  • RaftOutputWorker
    从输出消息队列取出消息并应用到相应节点的线程。
  • RaftProposeWorker
    处理对 raftLog 读写请求的线程。
  • SnapshotSender
    将 leader 当前状态的快照发送给其他节点的线程。
  • RaftLeaderSwitchWorker
    执行 raft 节点角色切换的线程。
  • DFSRaftReplayWorker
    将记录的 raftLog 应用到当前节点的线程。

6.其他

  • ThreadPoolSocketGroup
    在 server 的端口上监听收到的消息请求,并交由相应的工作队列处理。每个节点有且仅有一个线程。
  • BackgroundRoutineService
    server 的后台线程,每个节点会生成 4 个该线程。server 会在该线程中注册一些函数,这些函数会在BackgroundRoutineService 线程运行过程中每隔一段时间就被调用一次。
  • LogWriter
    将节点运行过程中生成的 log 写入文件的线程。每个节点都有一个该线程。
  • StdConsole
    启动 server 后在命令行窗口接收命令的线程。在 server 启动参数中如果设置 console=true,那么就会启动一个该线程。