阿里云InfluxDB®之snapshot及其内存优化

简介

作为阿里在APM和IOT领域的重要布局,时序数据库承载着阿里对于物理网和未来应用监控市场的未来和排头兵,作为业内排名第一的时序数据库InfluxDB,其在国内和国际都拥有了大量的用户,阿里适逢其时,重磅推出了阿里云 InfluxDB®。
         限于篇幅,本文仅就InfluxDB的其中一个模块:snapshot,对其机制和内存使用的优化进行分析。

为什么要做snapshot

InfluxDB采用的是TSM引擎,TSM 存储引擎主要由几个部分组成: cache、wal、tsm file、compactor
阿里云InfluxDB®之snapshot及其内存优化

TSM存储引擎,其核心思想类似于LSM Tree,它会将最近的数据缓存在磁盘中,在达到预设的阈值之后就会触发snapshot,也就是我们常说的快照刷盘。
内存的作用是为了缓存,加速查询。snapshot主要是解决数据持久化落盘问题。

Snapshot的工作机制

由于snapshot是将cache中的数据刷到磁盘,那么首先,我们来看一下Cache的内部结构。

Cache的内部结构

阿里云InfluxDB®之snapshot及其内存优化

如上图所示,每一个cache内部划分成了16个partition。每1个partition内部包含一个map,所有的map其key为SeriesKey,value为entry。

// Value represents a TSM-encoded value.
type Value interface {
// UnixNano returns the timestamp of the value in nanoseconds since unix epoch.
UnixNano() int64
// Value returns the underlying value.
Value() interface{}
// Size returns the number of bytes necessary to represent the value and its timestamp.
Size() int
// String returns the string representation of the value and its timestamp.
String() string
// internalOnly is unexported to ensure implementations of Value
// can only originate in this package.
internalOnly()
}
// Values represents a slice of  values.
type Values []Value
// entry is a set of values and some metadata.
type entry struct {
mu     sync.RWMutex
values Values // All stored values.
vtype byte
}

entry是一个Value类型的数组,Value本身是一个接口,按照值类型的不同分为:FloatValue、StringValue、BooleanValue、IntegerValue、FloatValue、StringValue。 
以FloatValue为例,每一种类型的Value包含了一个int类型的时间戳和具体的值value。

type FloatValue struct {
unixnano int64
value    float64
}

Snapshot的流程

从代码层面上来讲,从总体的概要流程如下:
阿里云InfluxDB®之snapshot及其内存优化

下面让我们来逐个分析下:

Snapshot的入口

if e.enableCompactionsOnOpen {
e.SetCompactionsEnabled(true)
}

Snapshot的机制

// compactCache continually checks if the WAL cache should be written to disk.
func (e *Engine) compactCache() {
t := time.NewTicker(time.Second)
defer t.Stop()
for {
e.mu.RLock()
quit := e.snapDone
e.mu.RUnlock()
select {
case <-quit:
tsdb.UpdateCacheSize(e.id, 0, e.logger)
return
case <-t.C:
e.Cache.UpdateAge()
tsdb.UpdateCacheSize(e.id, e.Cache.Size(), e.logger)
if e.ShouldCompactCache(time.Now()) {
start := time.Now()
e.traceLogger.Info("Compacting cache", zap.String("path", e.path))
err := e.WriteSnapshot()
if err != nil && err != errCompactionsDisabled {
e.logger.Info("Error writing snapshot", zap.Error(err))
atomic.AddInt64(&e.stats.CacheCompactionErrors, 1)
} else {
atomic.AddInt64(&e.stats.CacheCompactions, 1)
}
atomic.AddInt64(&e.stats.CacheCompactionDuration, time.Since(start).Nanoseconds())
}
}
}
}

每隔1秒钟检查一次,是否达到snapshot的条件。
snapshot的条件有两个:
是否达到配置的阈值。(默认情况下是25M)
离上次snapshot的间隔是否超越:cache-snapshot-write-cold-duration的配置。(默认情况下是10min)

Snapshot的具体实现

那么,这里涉及两个问题:

1、落盘的文件格式如何?
2、snapshot刷盘的过程本身是如何进行的?
我们先看落盘的文件格式:

/>

TSM文件包括了三个部分:Series Data Section、Series Index Section、 Footer。

1、Series Data Section生成:
Series Data Section有若干个Series Data Block组成。
其中对于Series Data Block,这个是在内存中完成组装的,具体是有cacheKeyIterator.encode函数完成,代码如下:

func (c *cacheKeyIterator) encode() {
concurrency := runtime.GOMAXPROCS(0)
n := len(c.ready)
// Divide the keyset across each CPU
chunkSize := 1
idx := uint64(0)
for i := 0; i < concurrency; i++ {
// Run one goroutine per CPU and encode a section of the key space concurrently
go func() {
tenc := getTimeEncoder(tsdb.DefaultMaxPointsPerBlock)
fenc := getFloatEncoder(tsdb.DefaultMaxPointsPerBlock)
benc := getBooleanEncoder(tsdb.DefaultMaxPointsPerBlock)
uenc := getUnsignedEncoder(tsdb.DefaultMaxPointsPerBlock)
senc := getStringEncoder(tsdb.DefaultMaxPointsPerBlock)
ienc := getIntegerEncoder(tsdb.DefaultMaxPointsPerBlock)
defer putTimeEncoder(tenc)
defer putFloatEncoder(fenc)
defer putBooleanEncoder(benc)
defer putUnsignedEncoder(uenc)
defer putStringEncoder(senc)
defer putIntegerEncoder(ienc)
for {
i := int(atomic.AddUint64(&idx, uint64(chunkSize))) - chunkSize
if i >= n {
break
}
key := c.order[i]
values := c.cache.values(key)
for len(values) > 0 {
end := len(values)
if end > c.size {
end = c.size
}
minTime, maxTime := values[0].UnixNano(), values[end-1].UnixNano()
var b []byte
var err error
switch values[0].(type) {
case FloatValue:
b, err = encodeFloatBlockUsing(nil, values[:end], tenc, fenc)
case IntegerValue:
b, err = encodeIntegerBlockUsing(nil, values[:end], tenc, ienc)
case UnsignedValue:
b, err = encodeUnsignedBlockUsing(nil, values[:end], tenc, uenc)
case BooleanValue:
b, err = encodeBooleanBlockUsing(nil, values[:end], tenc, benc)
case StringValue:
b, err = encodeStringBlockUsing(nil, values[:end], tenc, senc)
default:
b, err = Values(values[:end]).Encode(nil)
}
values = values[end:]
c.blocks[i] = append(c.blocks[i], cacheBlock{
k:       key,
minTime: minTime,
maxTime: maxTime,
b:       b,
err:     err,
})
if err != nil {
c.err = err
}
}
// Notify this key is fully encoded
c.ready[i] <- struct{}{}
}
}()
}
}

其中针对几个不同的数据类型分别通过不同Encoder来进行组装,最后是形成了一个cacheBlock的二维数组,并保存在iter当中。
接下来的问题就是如何将这些二维数组刷盘。

2、依次刷盘
前面我们已知iter中保留有这些cacheBlock,我们只需要遍历迭代器就可以将这些数据刷盘。但还有一个问题,Series Index Section如何生成?
因为Series Index Section最终由IndexEntry构成,而IndexEntry中minTime和maxTime、Size都可以由cacheBlock的数据得到,关键是Offset。
其实Offset的计算是随着迭代的过程,不断地往前走,就像在一个Buffer中,填满了一个Series Data Block,就会更新一次Offset。

关键的代码如下:

    n, err := t.w.Write(block) // Write的过程中,会更新t.n,也就是offset
if err != nil {
return err
}
n += len(checksum)
// Record this block in index
t.index.Add(key, blockType, minTime, maxTime, t.n, uint32(n)) //t.n 就是offset

总结以上所述,大体的过程是,encode生层Series Data Block, 在迭代过程中,生成了Series Index Section,最终,将Series Index Section Append到Series Data Block 就生成了TSM文件。

那么问题来了Series Index Section的保存时需要空间的,如果是Series Index Section占用的内存过大,则可能会因此加大了程序OOME的风险。

snapshot内存使用的优化

如果有n个cache,同时做snapshot。 则耗费的内存为: n IndexSize。 
例如:5db
4 retention, IndexSize = 50m。 则节省:5 4 50 = 1G的内存使用量。

所以,我们可以想到的一个优化点是:在snapshot过程当中利用文件来做Series Index Section的暂存区,从而节省这一部分内存。

验证

当我们利用磁盘来做Index的缓冲区时,系统在snapshot的过程中,会生成1个临时的索引文件,如下图所示。
阿里云InfluxDB®之snapshot及其内存优化

而不用磁盘来做Index缓冲区的时候,则不会生成这个文件,如下图所示。
阿里云InfluxDB®之snapshot及其内存优化

经过我们的长时间的稳定性测试,证实,在利用磁盘来做Index的缓冲区时,能有效降低系统大压力下的OOME概率。

商业化

阿里云InfluxDB®现已正式商业化,欢迎访问购买页面(https://common-buy.aliyun.com/?commodityCode=hitsdb_influxdb_pre#/buy)与文档(https://help.aliyun.com/document_detail/113093.html?spm=a2c4e.11153940.0.0.57b04a02biWzGa)。