8000 GitHub - chenjiandongx/mandodb: 🤔 A minimize Time Series Database, written from scratch as a learning project. 从零开始实现一个 TSDB
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

🤔 A minimize Time Series Database, written from scratch as a learning project. 从零开始实现一个 TSDB

License

Notifications You must be signed in to change notification settings

chenjiandongx/mandodb

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

59 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

mandodb

🤔 A minimize Time Series Database, written from scratch as a learning project.

Contributions welcome Go Report Card MIT License GoDoc

æ—¶åºæ•°æ®åº“(TSDB: Time Series Database)大多数时候都是为了满足监控场景的需求,这里先介ç»ä¸¤ä¸ªæ¦‚念:

  • æ•°æ®ç‚¹ï¼ˆPoint): æ—¶åºæ•°æ®çš„æ•°æ®ç‚¹æ˜¯ä¸€ä¸ªåŒ…å« (Timestamp:int64, Value:float64) 的二元组。
  • 时间线(Series): ä¸åŒæ ‡ç­¾ï¼ˆLabel)的组åˆç§°ä¸ºä¸åŒçš„æ—¶é—´çº¿ï¼Œå¦‚
series1: {"__name__": "netspeed", "host": "localhost", "iface": "eth0"}
series2: {"__name__": "netspeed", "host": "localhost", "iface": "eth1"}

Prometheus, InfluxDB, M3, TimescaleDB 都是时下æµè¡Œçš„ TSDBã€‚æ—¶åºæ•°æ®çš„压缩算法很大程度上决定了 TSDB 的性能,以上几个项目的实现都å‚考了 Facebook 2015 å¹´å‘表的论文《Gorilla: A fast, scalable, in-memory time series database》 中æåˆ°çš„差值算法,该算法平å‡å¯ä»¥å°† 16 字节的数æ®ç‚¹åŽ‹ç¼©æˆ 1.37 字节,下文会介ç»ã€‚

Who's mando?

Din Djarin, also known as "the Mandalorian" or simply "Mando," was a human male Mandalorian who worked as a famous bounty hunter during the New Republic Era.

What's mandodb?

作为一å监控系统开å‘人员,自然è¦å¯¹æ—¶åºæ•°æ®åº“有所了解。mandodb 是我在研究过程中实现的一个最å°åŒ–çš„ TSDB,从概念上æ¥è®²å®ƒè¿˜ç®—ä¸ä¸Šæ˜¯ä¸€ä¸ªå®Œæ•´çš„ TSDB,因为它:

  • 没有实现自己的查询引擎(实现难度大)
  • 缺少ç£ç›˜å½’档文件 Compact æ“作(天气好的è¯ä¼šå®žçŽ°ï¼‰
  • 没有 WAL 作为ç¾å¤‡ä¿è¯é«˜å¯ç”¨ï¼ˆå¿ƒæƒ…好的è¯ä¼šå®žçŽ°ï¼‰

mandodb 主è¦å—到了两个项目的å¯å‘。本项目仅é™äºŽå­¦ä¹ ç”¨é€”,未ç»ç”Ÿäº§çŽ¯å¢ƒæµ‹è¯•éªŒè¯ï¼

prometheus 的核心开å‘者 Fabian Reinartz 写了一篇文章 《Writing a Time Series Database from Scratch》 æ¥ä»‹ç» prometheus TSDB 的演å˜è¿‡ç¨‹ï¼Œéžå¸¸å€¼å¾—一读,强烈推è。

📖 TOC

  • 💡 æ•°æ®æ¨¡åž‹ & API
  • 🛠 é…置选项
  • 🔖 用法示例
  • 🧮 Gorilla 差值算法
  • 📠数æ®å†™å…¥
  • 🖇 Mmap 内存映射
  • 📠索引设计
  • 🗂 存储布局
  • â“ FAQ

💡 æ•°æ®æ¨¡åž‹ & API 文档

æ•°æ®æ¨¡åž‹å®šä¹‰

// Point 表示一个数æ®ç‚¹ (ts, value) 二元组
type Point struct {
	Ts    int64 // in seconds
	Value float64
}

// Label 代表一个标签组åˆ
type Label struct {
	Name  string
	Value string
}

// Row ä¸€è¡Œæ—¶åºæ•°æ® 包括数æ®ç‚¹å’Œæ ‡ç­¾ç»„åˆ
type Row struct {
	Metric string
	Labels LabelSet
	Point  Point
}

// LabelSet 表示 Label 组åˆ
type LabelSet []Label

// LabelMatcher Label 匹é…器 æ”¯æŒæ­£åˆ™
type LabelMatcher struct {
	Name   string
	Value  string
	IsRegx bool
}

// LabelMatcherSet 表示 LabelMatcher 组åˆ
type LabelMatcherSet []LabelMatcher

API

// InsertRows 写数æ®
InsertRows(rows []*Row) error 

// QueryRange æŸ¥è¯¢æ—¶åºæ•°æ®ç‚¹
QueryRange(metric string, lms LabelMatcherSet, start, end int64) ([]MetricRet, error)

// QuerySeries 查询时åºåºåˆ—组åˆ
QuerySeries(lms LabelMatcherSet, start, end int64) ([]map[string]string, error)

// QueryLabelValues 查询标签值
QueryLabelValues(label string, start, end int64) []string

🛠 é…置选项

é…置项在åˆå§‹åŒ– TSDB 的时候设置。

// WithMetaSerializerType 设置 Metadata æ•°æ®çš„åºåˆ—化类型
// ç›®å‰åªæä¾›äº† BinaryMetaSerializer
WithMetaSerializerType(t MetaSerializerType) Option 

// WithMetaBytesCompressorType 设置字节数æ®çš„压缩算法
// ç›®å‰æä¾›äº†
// * ä¸åŽ‹ç¼©: NoopBytesCompressor(默认)
// * ZSTD: ZstdBytesCompressor
// * Snappy: SnappyBytesCompressor
WithMetaBytesCompressorType(t BytesCompressorType) Option

// WithOnlyMemoryMode 设置是å¦é»˜è®¤åªå­˜å‚¨åœ¨å†…存中
// 默认为 false
WithOnlyMemoryMode(memoryMode bool) Option

// WithEnabledOutdated è®¾ç½®æ˜¯å¦æ”¯æŒä¹±åºå†™å…¥ 此特性会增加资æºå¼€é”€ 但会æå‡æ•°æ®å®Œæ•´æ€§
// 默认为 true
WithEnabledOutdated(outdated bool) Option

// WithMaxRowsPerSegment è®¾ç½®å• Segment 最大å…许存储的点数
// 默认为 19960412(夹æ‚ç§è´§ ðŸ¶ï¼‰
WithMaxRowsPerSegment(n int64) Option

// WithDataPath 设置 Segment æŒä¹…化存储文件夹
// 默认为 "."
WithDataPath(d string) Option

// WithRetention 设置 Segment æŒä¹…化数æ®ä¿å­˜æ—¶é•¿
// 默认为 7d
WithRetention(t time.Duration) Option

// WithWriteTimeout 设置写入超时阈值
// 默认为 30s
WithWriteTimeout(t time.Duration) Option

// WithLoggerConfig 设置日志é…置项
// logger: github.com/chenjiandongx/logger
WithLoggerConfig(opt *logger.Options) Option

🔖 用法示例

package main

import (
	"fmt"
	"time"

	"github.com/chenjiandongx/mandodb"
)

func main() {
	store := mandodb.OpenTSDB(
		mandodb.WithOnlyMemoryMode(true),
		mandodb.WithWriteTimeout(10*time.Second),
	)
	defer store.Close()

	// æ’入数æ®
	_ = store.InsertRows([]*mandodb.Row{
		{
			Metric: "cpu.busy",
			Labels: []mandodb.Label{
				{Name: "node", Value: "vm1"},
				{Name: "dc", Value: "gz-idc"},
			},
			Point: mandodb.Point{Ts: 1600000001, Value: 0.1},
		},
		{
			Metric: "cpu.busy",
			Labels: []mandodb.Label{
				{Name: "node", Value: "vm2"},
				{Name: "dc", Value: "sz-idc"},
			},
			Point: mandodb.Point{Ts: 1600000001, Value: 0.1},
		},
	})

	time.Sleep(time.Millisecond)

	// æ—¶åºæ•°æ®æŸ¥è¯¢
	data, _ := store.QueryRange("cpu.busy", nil, 1600000000, 1600000002)
	fmt.Printf("data: %+v\n", data)
	// output:
	// data: [{Labels:{__name__="cpu.busy", dc="gz-idc", node="vm1"} Points:[{Ts:1600000001 Value:0.1}]}]

	// 查询 Series
	// __name__ 是 metric å称在 TSDB 中的 Label Key
	ser, _ := store.QuerySeries(
        mandodb.LabelMatcherSet{{Name: "__name__", Value: "cpu.busy"}}, 1600000000, 1600000002)
	for _, d := range ser {
		fmt.Printf("data: %+v\n", d)
	}
	// output:
	// data: map[__name__:cpu.busy dc:gz-idc node:vm1]
	// data: map[__name__:cpu.busy dc:sz-idc node:vm2]

	// 查询标签值
	lvs := store.QueryLabelValues("node", 1600000000, 1600000002)
	fmt.Printf("data: %+v\n", lvs)
	// output:
	// data: [vm1 vm2]
}

䏋颿˜¯æˆ‘对这段时间学习内容的整ç†ï¼Œå°è¯•完整介ç»å¦‚何从零开始实现一个å°åž‹çš„ TSDB。

我本身并没有数æ®åº“å¼€å‘的背景,æŸäº›æè¿°å¯èƒ½å¹¶ä¸é‚£ä¹ˆå‡†ç¡®ï¼Œæ‰€ä»¥æ¬¢è¿Ž 实å diss 指正。

🧮 Gorilla 差值算法

Gorilla 论文 4.1 å°èЂ介ç»äº†åŽ‹ç¼©ç®—æ³•ï¼Œå…ˆæ•´ä½“çœ‹ä¸€ä¸‹åŽ‹ç¼©æ–¹æ¡ˆï¼ŒT/V 是紧挨存储的,'0'/'10'/'11' 表示控制ä½ã€‚

Figure: Gorilla 压缩算法

Timestamp DOD 压缩:

在时åºçš„场景中,æ¯ä¸ªæ—¶åºç‚¹éƒ½æœ‰ä¸€ä¸ªå¯¹åº”çš„ Timestampï¼Œä¸€æ¡æ—¶åºåºåˆ—中相邻数æ®ç‚¹çš„间隔是有规律å¯å¾ªçš„。一般æ¥è®²ï¼Œç›‘控数æ®çš„é‡‡é›†éƒ½æ˜¯ä¼šä» F438 ¥å›ºå®šçš„æ—¶é—´é—´éš”进行的,所以就å¯ä»¥ç”¨å·®å€¼æ¥è®°å½•时间间隔,更进一步,我们å¯ä»¥ç”¨å·®å€¼çš„差值æ¥è®°å½•以此æ¥å‡å°‘存储空间。

t1: 1627401800; t2: 1627401810; t3: 1627401820; t4: 1627401830
--------------------------------------------------------------
// 差值:delta
t1: 1627401800; (t2-t1)d1: 10; (t3-t2)d2: 10; (t4-t3)d3: 10; 
--------------------------------------------------------------
// 差值的差值:delta of delta
t1: 1627401800; dod1: 0; dod2: 0; dod3: 0; 

实际环境中当然ä¸å¯èƒ½æ¯ä¸ªé—´éš”都这么å‡åŒ€ï¼Œç”±äºŽç½‘络延迟等其他原因,差值会有波动。

Value XOR 压缩:

Figure: IEEE æµ®ç‚¹æ•°ä»¥åŠ XOR 计算结果

当两个数æ®ç‚¹æ•°å€¼å€¼æ¯”较接近的è¯ï¼Œé€šè¿‡å¼‚或æ“作计算出æ¥çš„结果是比较相似的,利用这点就å¯ä»¥é€šè¿‡è®°å½•å‰ç½®é›¶å’ŒåŽç½®é›¶ä¸ªæ•°ä»¥åŠæ•°å€¼éƒ¨åˆ†æ¥è¾¾åˆ°åŽ‹ç¼©ç©ºé—´çš„ç›®çš„ã€‚

下é¢é€šè¿‡ç®—法实现æ¥ä»‹ç»ï¼Œä»£ç æ¥è‡ªé¡¹ç›® dgryski/go-tsz。代ç å®Œå…¨æŒ‰ç…§è®ºæ–‡ä¸­ç»™å‡ºçš„æ­¥éª¤æ¥å®žçŽ°ã€‚

// New åˆå§‹åŒ– block 这里会将第一个原始时间戳写入到 block 中
func New(t0 uint32) *Series {
	s := Series{
		T0:      t0,
		leading: ^uint8(0),
	}

	s.bw.writeBits(uint64(t0), 32)
	return &s
}

// Push è´Ÿè´£å†™å…¥æ—¶åºæ•°æ®
func (s *Series) Push(t uint32, v float64) {
	// ....
	// 如果是第一个数æ®ç‚¹çš„è¯å†™å…¥åŽŸå§‹æ•°æ®åŽç›´æŽ¥è¿”回
	if s.t == 0 {
		s.t = t
		s.val = v
		s.tDelta = t - s.T0 // 实际上这里为 0

		// The block header stores the starting time stamp, t-1(å‰ä¸€ä¸ªæ—¶é—´æˆ³ï¼‰,
		// which is aligned to a two hour window; the first time
		// stamp, t0, in the block is stored as a delta from t−1 in 14 bits.
        
		// 用 14 个 bit 写入时间戳差值
		s.bw.writeBits(uint64(s.tDelta), 14)
		// 原始数æ®ç‚¹å®Œæ•´å†™å…¥
		s.bw.writeBits(math.Float64bits(v), 64)
		return
	}

	tDelta := t - s.t
	dod := int32(tDelta - s.tDelta) // 计算差值的差值 Detla of Delta

	// 下é¢å¼€å§‹å°±å¤„ç†éžç¬¬ä¸€ä¸ªæ•°æ®ç‚¹çš„æƒ…况了
	switch {
		// If D is zero, then store a single ‘0’ bit
		// å¦‚æžœæ˜¯é›¶çš„è¯ é‚£ç›´æŽ¥ç”¨ '0' 一个字节就å¯ä»¥ç›´æŽ¥è¡¨ç¤º
	case dod == 0:
		s.bw.writeBit(zero)

		//  If D is between [-63, 64], store ‘10’ followed by the value (7 bits)
	case -63 <= dod && dod <= 64:
		s.bw.writeBits(0x02, 2) // æŽ§åˆ¶ä½ '10'
		s.bw.writeBits(uint64(dod), 7) // 7bits å¯ä»¥è¡¨ç¤º [-63, 64] 的范围

		// If D is between [-255, 256], store ‘110’ followed by the value (9 bits)
	case -255 <= dod && dod <= 256:
		s.bw.writeBits(0x06, 3) // æŽ§åˆ¶ä½ '110'
		s.bw.writeBits(uint64(dod), 9)

		// if D is between [-2047, 2048], store ‘1110’ followed by the value (12 bits)
	case -2047 <= dod && dod <= 2048:
		s.bw.writeBits(0x0e, 4) // æŽ§åˆ¶ä½ '1110'
		s.bw.writeBits(uint64(dod), 12)

		// Otherwise store ‘1111’ followed by D using 32 bits
	default:
		s.bw.writeBits(0x0f, 4) // 其余情况控制ä½å‡ç”¨ '1111'
		s.bw.writeBits(uint64(dod), 32)
	}

	// 到这里 (T, V) 中的时间戳已ç»å†™å…¥å®Œæ¯•了 æŽ¥ä¸‹æ¥æ˜¯å†™ V 部分

	// 先计算两个值的异或结果
	vDelta := math.Float64bits(v) ^ math.Float64bits(s.val)

	// If XOR with the previous is zero (same value), store single ‘0’ bit
	// 如果å‰åŽä¸¤ä¸ªå€¼ç›¸ç­‰çš„è¯ ç›´æŽ¥ç”¨ '0' 1 个 bit å°±å¯ä»¥è¡¨ç¤º
	// æ‰€ä»¥å¦‚æžœä¸ŠæŠ¥çš„æ—¶åºæ•°æ®æ˜¯ 1 或者 0 è¿™ç§çš„è¯ å ç”¨çš„内存会éžå¸¸å°‘

	// zero = '0'; 
	if vDelta == 0 {
		s.bw.writeBit(zero)
	} else {    // éž 0 æƒ…å†µé‚£å°±è¦æŠŠæŽ§åˆ¶ä½ç½®ä¸º 1
		s.bw.writeBit(one)

		// 计算å‰ç½® 0 å’ŒåŽç½® 0
		leading := uint8(bits.LeadingZeros64(vDelta))
		trailing := uint8(bits.TrailingZeros64(vDelta))

		// clamp number of leading zeros to avoid overflow when encoding
		if leading >= 32 {
			leading = 31
		}

		// (Control bit ‘0’) If the block of meaningful bits
		// falls within the block of previous meaningful bits,
		// i.e., there are at least as many leading zeros and
		// as many trailing zeros as with the previous value,
		// use that information for the block position and
		// just store the meaningful XORed value.

		// 如果å‰ç½® 0 ä¸å°äºŽä¸Šä¸€ä¸ªå€¼è®¡ç®—的异或结果的å‰ç½® 0 且åŽç½® 0 也ä¸å°äºŽä¸Šä¸€ä¸ªå€¼è®¡ç®—的异或结果的åŽç½® 0
		if s.leading != ^uint8(0) && leading >= s.leading && trailing >= s.trailing { // => æŽ§åˆ¶ä½ '10'
			s.bw.writeBit(zero)
			// 记录异或值éžé›¶éƒ¨åˆ†
			s.bw.writeBits(vDelta>>s.trailing, 64-int(s.leading)-int(s.trailing))
		} else { // => æŽ§åˆ¶ä½ '11'

			// (Control bit ‘1’) Store the length of the number
			// of leading zeros in the next 5 bits, then store the
			// length of the meaningful XORed value in the next
			// 6 bits. Finally store the meaningful bits of the XORed value.
			s.leading, s.trailing = leading, trailing

			// 其他情况控制ä½ç½®ä¸º 1 并用接下æ¥çš„ 5bits 记录å‰ç½® 0 个数
			s.bw.writeBit(one)
			s.bw.writeBits(uint64(leading), 5)

			// ç„¶åŽç”¨æŽ¥ä¸‹æ¥çš„ 6bits 记录异或差值中的éžé›¶éƒ¨åˆ†
			sigbits := 64 - leading - trailing
			s.bw.writeBits(uint64(sigbits), 6)
			s.bw.writeBits(vDelta>>trailing, int(sigbits))
		}
	}

	// çŠ¶æ€æ›´æ–° 至此(T, V)å‡å·²è¢«åŽ‹ç¼©å†™å…¥åˆ°å†…å­˜ä¸­
	s.tDelta = tDelta
	s.t = t
	s.val = v
}

// æ¯ä¸ª block 的结尾会使用特殊标记用于标识
func finish(w *bstream) {
	// write an end-of-stream record
	w.writeBits(0x0f, 4)
	w.writeBits(0xffffffff, 32)
	w.writeBit(zero)
}

论文给出了ä¸åŒ case çš„ buckets å æ¯”分布。

Figure: Timestamp buckets distribution

Figure: Value buckets distribution

Timestamp buckets 中,å‰åŽä¸¤ä¸ªæ—¶é—´æˆ³å·®å€¼ç›¸åŒçš„æ¯”例高达 96.39%,而在 Value buckets 中åªç”¨ä¸€ä¸ªæŽ§åˆ¶ä½çš„å æ¯”也达到了 59.06%,å¯è§å…¶åŽ‹ç¼©æ¯”ä¹‹é«˜ã€‚

论文还给出了一个é‡è¦ç»“论,数æ®åŽ‹ç¼©æ¯”éšç€æ—¶é—´çš„增长而增长,并在 120 个点的时候开始收敛到一个最佳值。

Figure: 压缩率曲线

Gorilla 差值算法也应用于我的å¦å¤–一个项目 chenjiandongx/tszlistï¼Œä¸€ç§æ—¶åºæ•°æ®çº¿ç¨‹å®‰å…¨é“¾è¡¨ã€‚

📠数æ®å†™å…¥

æ—¶åºæ•°æ®å…·æœ‰ã€Œåž‚直写,水平查ã€çš„特性,å³åŒä¸€æ—¶åˆ»æœ‰å¤šæ¡æ—¶é—´çº¿çš„æ•°æ®ä¸æ–­è¢«è¿½åŠ ã€‚ä½†æŸ¥è¯¢çš„æ—¶å€™å¾€å¾€æ˜¯æŸ¥æŸæ¡æ—¶é—´çº¿æŒç»­ä¸€æ®µæ—¶é—´å†…的数æ®ç‚¹ã€‚

series
  ^   
  │   . . . . . . . . . . . . . . . . .   . . . . .   {__name__="request_total", method="GET"}
  │     . . . . . . . . . . . . . . . . . . . . . .   {__name__="request_total", method="POST"}
  │         . . . . . . .
  │       . . .     . . . . . . . . . . . . . . . .                  ... 
  │     . . . . . . . . . . . . . . . . .   . . . .   
  │     . . . . . . . . . .   . . . . . . . . . . .   {__name__="errors_total", method="POST"}
  │           . . .   . . . . . . . . .   . . . . .   {__name__="errors_total", method="GET"}
  │         . . . . . . . . .       . . . . .
  │       . . .     . . . . . . . . . . . . . . . .                  ... 
  │     . . . . . . . . . . . . . . . .   . . . . 
  v
    <-------------------- time --------------------->

æ—¶åºæ•°æ®è·Ÿæ—¶é—´æ˜¯å¼ºç›¸å…³çš„(ä¸ç„¶è¿˜å«æ—¶åºæ•°æ®ï¼ŸðŸ§ï¼‰ï¼Œå³å¤§å¤šæ•°æŸ¥è¯¢å…¶å®žåªä¼šæŸ¥è¯¢æœ€è¿‘时刻的数æ®ï¼Œè¿™é‡Œçš„ã€Œæœ€è¿‘ã€æ˜¯ä¸ªç›¸å¯¹æ¦‚念。所以没必è¦ç»´æŠ¤ä¸€æ¡æ—¶é—´çº¿çš„完整生命周期,特别是在 Kubernetes è¿™ç§äº‘原生场景,Pod éšæ—¶æœ‰å¯èƒ½ä¼šè¢«æ‰©ç¼©å®¹ï¼Œä¹Ÿå°±æ„味ç€ä¸€æ¡æ—¶é—´çº¿çš„生命周期å¯èƒ½ä¼šå¾ˆçŸ­ã€‚å¦‚æžœæˆ‘ä»¬ä¸€ç›´è®°å½•ç€æ‰€æœ‰çš„æ—¶é—´çº¿çš„索引信æ¯ï¼Œé‚£ä¹ˆéšç€æ—¶é—´çš„æŽ¨ç§»ï¼Œæ•°æ®åº“里的时间线的数é‡ä¼šå‘ˆçŽ°ä¸€ä¸ªçº¿æ€§å¢žé•¿çš„è¶‹åŠ¿ 😱,会æžå¤§åœ°å½±å“查询效率。

这里引入一个概念「åºåˆ—分æµã€ï¼Œè¿™ä¸ªæ¦‚念æè¿°çš„æ˜¯ä¸€ç»„æ—¶é—´åºåˆ—å˜å¾—䏿´»è·ƒï¼Œå³ä¸å†æŽ¥æ”¶æ•°æ®ç‚¹ï¼Œå–而代之的是有一组新的活跃的åºåˆ—出现的场景。

series
  ^
  │   . . . . . .
  │   . . . . . .
  │   . . . . . .
  │               . . . . . . .
  │               . . . . . . .
  │               . . . . . . .
  │                             . . . . . .
  │                             . . . . . .
  │                                         . . . . .
  │                                         . . . . .
  │                                         . . . . .
  v
    <-------------------- time --------------------->

æˆ‘ä»¬å°†å¤šæ¡æ—¶é—´çº¿çš„æ•°æ®æŒ‰ä¸€å®šçš„æ—¶é—´è·¨åº¦åˆ‡å‰²æˆå¤šä¸ªå°å—,æ¯ä¸ªå°å—本质就是一个独立å°åž‹çš„æ•°æ®åº“,这ç§å𿳕å¦å¤–一个优势是清除过期æ“作的时候éžå¸¸æ–¹ä¾¿ï¼Œåªè¦å°†æ•´ä¸ªå—给删了就行 ðŸ‘»ï¼ˆæ¢­å“ˆæ˜¯ä¸€ç§æ™ºæ…§ï¼‰ã€‚内存中ä¿ç•™æœ€è¿‘ä¸¤ä¸ªå°æ—¶çš„热数æ®ï¼ˆMemory Segmentï¼‰ï¼Œå…¶ä½™æ•°æ®æŒä¹…化到ç£ç›˜(Disk Segment)。

Figure: åºåˆ—分å—

DiskSegment 使用的是 AVL Tree 实现的列表,å¯åœ¨æ’入时排åºã€‚为什么ä¸ç”¨æ›´åŠ é«˜å¤§ä¸Šçš„çº¢é»‘æ ‘ï¼Ÿå› ä¸ºä¸å¥½å®žçް...

当 Memory Segment 达到归档æ¡ä»¶çš„æ—¶å€™ï¼Œä¼šåˆ›å»ºä¸€ä¸ªæ–°çš„内存å—并异步将刚归档的å—写入到ç£ç›˜ï¼ŒåŒæ—¶ä¼šä½¿ç”¨ mmap å°†ç£ç›˜æ–‡ä»¶å¥æŸ„映射到内存中。代ç å®žçŽ°å¦‚ä¸‹ã€‚

func (tsdb *TSDB) getHeadPartition() (Segment, error) {
	tsdb.mut.Lock()
	defer tsdb.mut.Unlock()

	if tsdb.segs.head.Frozen() {
		head := tsdb.segs.head

		go func() {
			tsdb.wg.Add(1)
			defer tsdb.wg.Done()

			tsdb.segs.Add(head)

			t0 := time.Now()
			dn := dirname(head.MinTs(), head.MaxTs())

			if err := writeToDisk(head.(*memorySegment)); err != nil {
				logger.Errorf("failed to flush data to disk, %v", err)
				return
			}

			fname := path.Join(dn, "data")
			mf, err := mmap.OpenMmapFile(fname)
			if err != nil {
				logger.Errorf("failed to make a mmap file %s, %v", fname, err)
				return
			}

			tsdb.segs.Remove(head)
			tsdb.segs.Add(newDiskSegment(mf, dn, head.MinTs(), head.MaxTs()))
			logger.Infof("write file %s take: %v", fname, time.Since(t0))
		}()

		tsdb.segs.head = newMemorySegment()
	}

	return tsdb.segs.head, nil
}

Figure: Memory Segment 两部分数æ®

å†™å…¥çš„æ—¶å€™æ”¯æŒæ•°æ®æ—¶é—´å›žæ‹¨ï¼Œä¹Ÿå°±æ˜¯æ”¯æŒæœ‰é™çš„ä¹±åºæ•°æ®å†™å…¥ï¼Œå®žçŽ°æ–¹æ¡ˆæ˜¯åœ¨å†…å­˜ä¸­å¯¹è¿˜æ²¡å½’æ¡£çš„æ¯æ¡æ—¶é—´çº¿ç»´æŠ¤ä¸€ä¸ªé“¾è¡¨ï¼ˆåŒæ ·ä½¿ç”¨ AVL Tree 实现),当数æ®ç‚¹çš„æ—¶é—´æˆ³ä¸æ˜¯é€’增的时候存储到链表中,查询的时候会将两部分数æ®åˆå¹¶æŸ¥è¯¢ï¼ŒæŒä¹…化的时候也会将两者åˆå¹¶å†™å…¥ã€‚

🖇 Mmap 内存映射

mmap 是一ç§å°†ç£ç›˜æ–‡ä»¶æ˜ å°„到进程的虚拟地å€ç©ºé—´æ¥å®žçŽ°å¯¹æ–‡ä»¶è¯»å–和修改æ“作的技术。

从 Linux 角度æ¥çœ‹ï¼Œæ“作系统的内存空间被分为「内核空间ã€å’Œã€Œç”¨æˆ·ç©ºé—´ã€ä¸¤å¤§éƒ¨åˆ†ï¼Œå…¶ä¸­å†…核空间和用户空间的空间大å°ã€æ“作æƒé™ä»¥åŠæ ¸å¿ƒåŠŸèƒ½éƒ½ä¸ç›¸åŒã€‚这里的内核空间是指æ“作系统本身使用的内存空间,而用户空间则是æä¾›ç»™å„个进程使用的内存空间。由于用户进程ä¸å…·æœ‰è®¿é—®å†…核资æºçš„æƒé™ï¼Œä¾‹å¦‚访问硬件资æºï¼Œå› æ­¤å½“一个用户进程需è¦ä½¿ç”¨å†…核资æºçš„æ—¶å€™ï¼Œå°±éœ€è¦é€šè¿‡ 系统调用 æ¥å®Œæˆã€‚

虚拟内存细节å¯ä»¥é˜…读 《虚拟内存精粹》 这篇文章。

Figure: 常规文件æ“作和 mmap æ“作的区别

image

常规文件æ“作

读文件: 用户进程首先执行 read(2) 系统调用,会进行系统上下文环境切æ¢ï¼Œä»Žç”¨æˆ·æ€åˆ‡æ¢åˆ°å†…æ ¸æ€ï¼Œä¹‹åŽç”± DMA 将文件数æ®ä»Žç£ç›˜è¯»å–到内核缓冲区,å†å°†å†…核空间缓冲区的数æ®å¤åˆ¶åˆ°ç”¨æˆ·ç©ºé—´çš„ç¼“å†²åŒºä¸­ï¼Œæœ€åŽ read(2) 系统调用返回,进程从内核æ€åˆ‡æ¢åˆ°ç”¨æˆ·æ€ï¼Œæ•´ä¸ªè¿‡ç¨‹ç»“æŸã€‚

写文件: 用户进程å‘èµ· write(2) 系统调用,从用户æ€åˆ‡æ¢åˆ°å†…æ ¸æ€ï¼Œå°†æ•°æ®ä»Žç”¨æˆ·ç©ºé—´ç¼“冲区å¤åˆ¶åˆ°å†…æ ¸ç©ºé—´ç¼“å†²åŒºï¼ŒæŽ¥ç€ write(2) ç³»ç»Ÿè°ƒç”¨è¿”å›žï¼ŒåŒæ—¶è¿›ç¨‹ä»Žå†…æ ¸æ€åˆ‡æ¢åˆ°ç”¨æˆ·æ€ï¼Œæ•°æ®ä»Žå†…核缓冲区写入到ç£ç›˜ï¼Œæ•´ä¸ªè¿‡ç¨‹ç»“æŸã€‚

mmap æ“作

mmap 内存映射的实现过程,总的æ¥è¯´å¯ä»¥åˆ†ä¸ºä¸‰ä¸ªé˜¶æ®µï¼š

  1. 进程å¯åŠ¨æ˜ å°„è¿‡ç¨‹ï¼Œå¹¶åœ¨è™šæ‹Ÿåœ°å€ç©ºé—´ä¸­ä¸ºæ˜ å°„创建虚拟映射区域。
  2. 执行内核空间的系统调用函数 mmap,建立文件物ç†åœ°å€å’Œè¿›ç¨‹è™šæ‹Ÿåœ°å€çš„一一映射关系。
  3. 进程å‘起对这片映射空间的访问,引å‘缺页异常,实现文件内容到物ç†å†…存的拷è´ã€‚

📣 å°ç»“

常规文件æ“作为了æé«˜è¯»å†™æ•ˆçŽ‡å’Œä¿æŠ¤ç£ç›˜ï¼Œä½¿ç”¨äº†é¡µç¼“存机制。这样造æˆè¯»æ–‡ä»¶æ—¶éœ€è¦å…ˆå°†æ–‡ä»¶é¡µä»Žç£ç›˜æ‹·è´åˆ°é¡µç¼“存中,由于页缓存处在内核空间,ä¸èƒ½è¢«ç”¨æˆ·è¿›ç¨‹ç›´æŽ¥å¯»å€ï¼Œæ‰€ä»¥è¿˜éœ€è¦å°†é¡µç¼“存中数æ®é¡µå†æ¬¡æ‹·è´åˆ°å†…å­˜å¯¹åº”çš„ç”¨æˆ·ç©ºé—´ä¸­ã€‚è¿™æ ·ï¼Œé€šè¿‡äº†ä¸¤æ¬¡æ•°æ®æ‹·è´è¿‡ç¨‹ï¼Œæ‰èƒ½å®Œæˆè¿›ç¨‹å¯¹æ–‡ä»¶å†…容的获å–任务。写æ“作也是一样,待写入的 buffer 在内核空间ä¸èƒ½ç›´æŽ¥è®¿é—®ï¼Œå¿…é¡»è¦å…ˆæ‹·è´è‡³å†…核空间对应的主存,å†å†™å›žç£ç›˜ä¸­ï¼ˆå»¶è¿Ÿå†™å›žï¼‰ï¼Œä¹Ÿæ˜¯éœ€è¦ä¸¤æ¬¡æ•°æ®æ‹·è´ã€‚

而使用 mmap æ“作文件,创建新的虚拟内存区域和建立文件ç£ç›˜åœ°å€å’Œè™šæ‹Ÿå†…å­˜åŒºåŸŸæ˜ å°„è¿™ä¸¤æ­¥ï¼Œæ²¡æœ‰ä»»ä½•æ–‡ä»¶æ‹·è´æ“作。而之åŽè®¿é—®æ•°æ®æ—¶å‘现内存中并无数æ®è€Œå‘起的缺页异常过程,å¯ä»¥é€šè¿‡å·²ç»å»ºç«‹å¥½çš„æ˜ å°„关系,åªä½¿ç”¨ä¸€æ¬¡æ•°æ®æ‹·è´ï¼Œå°±ä»Žç£ç›˜ä¸­å°†æ•°æ®ä¼ å…¥å†…存的用户空间中,供进程使用。

😅 总而言之,常规文件æ“作需è¦ä»Žç£ç›˜åˆ°é¡µç¼“å­˜å†åˆ°ç”¨æˆ·ä¸»å­˜çš„ä¸¤æ¬¡æ•°æ®æ‹·è´ã€‚而 mmap æ“æŽ§æ–‡ä»¶åªéœ€è¦ä»Žç£ç›˜åˆ°ç”¨æˆ·ä¸»å­˜çš„ä¸€æ¬¡æ•°æ®æ‹·è´è¿‡ç¨‹ã€‚mmap 的关键点是实现了「用户空间ã€å’Œã€Œå†…核空间ã€çš„æ•°æ®ç›´æŽ¥äº¤äº’而çœåŽ»äº†ä¸åŒç©ºé—´æ•°æ®å¤åˆ¶çš„开销。

📠索引设计

TSDB 的查询,是通过 Label ç»„åˆæ¥é”定到具体的时间线进而确定分å—å移检索出数æ®ã€‚

  • Sid(MetricHash/-/LabelHash) 是一个 Series 的唯一标识。
  • Label(Name/-/Value) => vm="node1"; vm="node2"; iface="eth0"。

在传统的关系型数æ®åº“,索引设计å¯èƒ½æ˜¯è¿™æ ·çš„。

Sid(主键) Label1 Label2 Label3 Label4 ... LabelN
sid1 × × × ... ×
sid2 × × × ... ×
sid3 × × × ... ×
sid4 × × × ... ×

æ—¶åºæ•°æ®æ˜¯ NoSchema 的,没办法æå‰å»ºè¡¨å’Œå®šä¹‰æ•°æ®æ¨¡åž‹ ðŸ¤”ï¼Œå› ä¸ºæˆ‘ä»¬è¦æ”¯æŒç”¨æˆ·ä¸ŠæŠ¥ä»»æ„ Label 组åˆçš„æ•°æ®ï¼Œè¿™æ ·çš„è¯å°±æ²¡åŠžæ³•è¿›è¡ŒåŠ¨æ€çš„æ‰©å±•了。或许你会çµå…‰ä¸€çް ✨,既然这样,那把 Labels 放一个字段拼接起æ¥ä¸å°±å¯ä»¥æ— é™æ‰©å±•啦,比如下é¢è¿™ä¸ªæ ·å­ã€‚

Sid(主键) Labels
sid1 label1, label2, label3, ...
sid2 label2, label3, label5, ...
sid3 label4, label6, label9, ...
sid4 label2, label3, label7, ...

哟嚯,ä¹ä¸€çœ‹æ²¡æ¯›ç—…,é“仔窃喜。

ä¸å¯¹ï¼Œæœ‰é—®é¢˜ 😨,è¦å®šä½åˆ°å…¶ä¸­çš„æŸæ¡æ—¶é—´çº¿ï¼Œé‚£æˆ‘æ˜¯ä¸æ˜¯å¾—全表扫æä¸€è¶Ÿã€‚而且这ç§è®¾è®¡è¿˜æœ‰å¦å¤–一个弊病,就是会导致内存激增,Label çš„ Name å’Œ Value 都å¯èƒ½æ˜¯ç‰¹åˆ«é•¿çš„字符串。

那怎么办呢(🤡 é“仔沉默...),刹那间我的脑中闪过一个帅气的身影,没错,就是你,花泽类「åªè¦å€’立眼泪就ä¸ä¼šæµå‡ºæ¥ã€ã€‚

我悟了ï¼è¦å­¦ä¼šé€†å‘æ€ç»´ 🙃,把 Label 当åšä¸»é”®ï¼ŒSid 当åšå…¶å­—段ä¸å°±å¥½äº†ã€‚这其实有点类似于 ElasticSearch 中的倒排索引,主键为 Keyword,字段为 DocumentID。索引设计如下。

Label(主键) Sids
label1: {vm="node1"} sid1, sid2, sid3, ...
label2: {vm="node2"} sid2, sid3, sid5, ...
label3: {iface="eth0"} sid3, sid5, sid9, ...
label4: {iface="eth1"} sid2, sid3, sid7, ...

Label 作为主键时会建立索引(Hashkey),查找的效率å¯è§†ä¸º O(1)ï¼Œå†æ ¹æ®é”定的 Label æ¥æœ€ç»ˆç¡®å®šæƒ³è¦çš„ Sid。举个例å­ï¼Œæˆ‘ä»¬æƒ³è¦æŸ¥æ‰¾ {vm="node1", iface="eth0"} 的时间线的è¯å°±å¯ä»¥å¿«é€Ÿå®šä½åˆ° Sids(忽略其他 ... sid)。

sid1; sid2; sid3
sid2; sid3; sid5

两者求一个交集,就å¯ä»¥å¾—åˆ°æœ€ç»ˆè¦æŸ¥è¯¢çš„ Sid 为 sid2 å’Œ sid3。🙂 Nice!

å‡è®¾æˆ‘ä»¬çš„æŸ¥è¯¢åªæ”¯æŒç›¸ç­‰åŒ¹é…çš„è¯ï¼Œæ ¼å±€æ˜Žæ˜¾å°±å°äº† 🤌。查询æ¡ä»¶æ˜¯ {vm=~"node*", iface="eth0"} 肿么办?对 label1ã€label2ã€label3 å’Œ label4 一起求一个并集å—ï¼Ÿæ˜¾ç„¶ä¸æ˜¯ï¼Œå› ä¸ºè¿™æ ·ç®—çš„è¯é‚£ç»“果就是 sid3。

厘清关系就ä¸éš¾çœ‹å‡ºï¼Œåªè¦å¯¹ç›¸åŒçš„ Label Name åšå¹¶é›†ç„¶åŽå†å¯¹ä¸åŒçš„ Label Name 求交集就å¯ä»¥äº†ã€‚这样算的正确结果就是 sid3 å’Œ sid5。实现的时候用到了 Roaring Bitmap,一ç§ä¼˜åŒ–çš„ä½å›¾ç®—法。

Memory Segment 索引匹é…

func (mim *memoryIndexMap) MatchSids(lvs *labelValueSet, lms LabelMatcherSet) []string {
	// ...
	sids := newMemorySidSet()
	var got bool
	for i := len(lms) - 1; i >= 0; i-- {
		tmp := newMemorySidSet()
		vs := lvs.Match(lms[i])
		// 对相åŒçš„ Label Name 求并集
		for _, v := range vs {
			midx := mim.idx[joinSeparator(lms[i].Name, v)]
			if midx == nil || midx.Size() <= 0 {
				continue
			}

			tmp.Union(midx.Copy())
		}

		if tmp == nil || tmp.Size() <= 0 {
			return nil
		}

		if !got {
			sids = tmp
			got = true
			continue
		}

		// 对ä¸åŒçš„ Label Name 求交集
		sids.Intersection(tmp.Copy())
	}

	return sids.List()
}

Disk Segment 索引匹é…

func (dim *diskIndexMap) MatchSids(lvs *labelValueSet, lms LabelMatcherSet) []uint32 {
	// ...

	lst := make([]*roaring.Bitmap, 0)
	for i := len(lms) - 1; i >= 0; i-- {
		tmp := make([]*roaring.Bitmap, 0)
		vs := lvs.Match(lms[i])

		// 对相åŒçš„ Label Name 求并集
		for _, v := range vs {
			didx := dim.label2sids[joinSeparator(lms[i].Name, v)]
			if didx == nil || didx.set.IsEmpty() {
				continue
			}

			tmp = append(tmp, didx.set)
		}

		union := roaring.ParOr(4, tmp...)
		if union.IsEmpty() {
			return nil
		}

		lst = append(lst, union)
	}

	// 对ä¸åŒçš„ Label Name 求交集
	return roaring.ParAnd(4, lst...).ToArray()
}

然而,确定相åŒçš„ LabelName 也是一个问题,因为 Label æœ¬èº«å°±ä»£è¡¨ç€ Name:Valueï¼Œéš¾ä¸æˆæˆ‘还è¦é历所有 label æ‰èƒ½ç¡®å®šå˜›ï¼Œè¿™ä¸å°±åˆæˆäº†å…¨è¡¨æ‰«æï¼Ÿï¼Ÿï¼Ÿ

没有什么问题是一个索引解决ä¸äº†çš„,如果有,那就å†å¢žåŠ ä¸€ä¸ªç´¢å¼•ã€‚ --- é²è¿…。

åªè¦æˆ‘们ä¿å­˜ Label çš„ Name 对应的 Value 列表的映射关系å³å¯é«˜æ•ˆè§£å†³è¿™ä¸ªé—®é¢˜ã€‚

LabelName LabelValue
vm node1, node2, ...
iface eth0, eth1, ...

还是上é¢çš„ {vm=~"node1|node2", iface="eth0"} 查询,第一步通过正则匹é…确定匹é…到 node1, node2,第二步匹é…到 eth0,å†å°† LabelName å’Œ LabelValue 一拼装,Label 就出æ¥äº†ï¼ŒâœŒï¸ 完事ï¼

桥豆麻袋ï¼è¿˜æœ‰ä¸€ä¸ªç²¾å½©çš„æ­£åˆ™åŒ¹é…优化算法没介ç»ã€‚

fastRegexMatcher 是一ç§ä¼˜åŒ–的正则匹é…器,算法æ¥è‡ª Prometheus。

// æ€è·¯å°±æ˜¯å°½é‡å…ˆæ‰§è¡Œå‰ç¼€åŒ¹é…å’ŒåŽç¼€åŒ¹é… 能ä¸ç”¨æ­£åˆ™å°±ä¸ç”¨æ­£åˆ™
// 如 label 表达å¼ä¸º {vm="node*"}
// 而我们此时内存中有 vm=node1, vm=node2, vm=foo, vm=bar,那这个时候åªéœ€è¦å‰ç¼€åŒ¹é…就能直接把 vm=foo,vm=bar 给过滤了
// 毕竟å‰ç¼€åŒ¹é…å’ŒåŽç¼€åŒ¹é…的执行效率还是比正则高ä¸å°‘çš„
type fastRegexMatcher struct {
	re       *regexp.Regexp
	prefix   string
	suffix   string
	contains string
}

func newFastRegexMatcher(v string) (*fastRegexMatcher, error) {
	re, err := regexp.Compile("^(?:" + v + ")$")
	if err != nil {
		return nil, err
	}

	parsed, err := syntax.Parse(v, syntax.Perl)
	if err != nil {
		return nil, err
	}

	m := &fastRegexMatcher{
		re: re,
	}

	if parsed.Op == syntax.OpConcat {
		m.prefix, m.suffix, m.contains = optimizeConcatRegex(parsed)
	}

	return m, nil
}

// optimizeConcatRegex returns literal prefix/suffix text that can be safely
// checked against the label value before running the regexp matcher.
func optimizeConcatRegex(r *syntax.Regexp) (prefix, suffix, contains string) {
	sub := r.Sub

	// We can safely remove begin and end text matchers respectively
	// at the beginning and end of the regexp.
	if len(sub) > 0 && sub[0].Op == syntax.OpBeginText {
		sub = sub[1:]
	}
	if len(sub) > 0 && sub[len(sub)-1].Op == syntax.OpEndText {
		sub = sub[:len(sub)-1]
	}

	if len(sub) == 0 {
		return
	}

	// Given Prometheus regex matchers are always anchored to the begin/end
	// of the text, if the first/last operations are literals, we can safely
	// treat them as prefix/suffix.
	if sub[0].Op == syntax.OpLiteral && (sub[0].Flags&syntax.FoldCase) == 0 {
		prefix = string(sub[0].Rune)
	}
	if last := len(sub) - 1; sub[last].Op == syntax.OpLiteral && (sub[last].Flags&syntax.FoldCase) == 0 {
		suffix = string(sub[last].Rune)
	}

	// If contains any literal which is not a prefix/suffix, we keep the
	// 1st one. We do not keep the whole list of literals to simplify the
	// fast path.
	for i := 1; i < len(sub)-1; i++ {
		if sub[i].Op == syntax.OpLiteral && (sub[i].Flags&syntax.FoldCase) == 0 {
			contains = string(sub[i].Rune)
			break
		}
	}

	return
}

func (m *fastRegexMatcher) MatchString(s string) bool {
	if m.prefix != "" && !strings.HasPrefix(s, m.prefix) {
		return false
	}

	if m.suffix != "" && !strings.HasSuffix(s, m.suffix) {
		return false
	}

	if m.contains != "" && !strings.Contains(s, m.contains) {
		return false
	}
	return m.re.MatchString(s)
}

🗂 存储布局

既然是数æ®åº“,那么自然少ä¸äº†æ•°æ®æŒä¹…化的特性。了解完索引的设计,å†çœ‹çœ‹è½åˆ°ç£ç›˜çš„存储布局就很清晰了。先跑个示例程åºå†™å…¥ä¸€äº›æ•°æ®çƒ­çƒ­èº«ã€‚

package main

import (
	"fmt"
	"math/rand"
	"strconv"
	"time"

	"github.com/chenjiandongx/mandodb"
	"github.com/satori/go.uuid"
)

// 模拟一些监控指标
var metrics = []string{
	"cpu.busy", "cpu.load1", "cpu.load5", "cpu.load15", "cpu.iowait",
	"disk.write.ops", "disk.read.ops", "disk.used",
	"net.in.bytes", "net.out.bytes", "net.in.packages", "net.out.packages",
	"mem.used", "mem.idle", "mem.used.bytes", "mem.total.bytes",
}

// 增加 Label æ•°é‡
var uid1, uid2, uid3 []string

func init() {
	for i := 0; i < len(metrics); i++ {
		uid1 = append(uid1, uuid.NewV4().String())
		uid2 = append(uid2, uuid.NewV4().String())
		uid3 = append(uid3, uuid.NewV4().String())
	}
}

func genPoints(ts int64, node, dc int) []*mandodb.Row {
	points := make([]*mandodb.Row, 0)
	for idx, metric := range metrics {
		points = append(points, &mandodb.Row{
			Metric: metric,
			Labels: []mandodb.Label{
				{Name: "node", Value: "vm" + strconv.Itoa(node)},
				{Name: "dc", Value: strconv.Itoa(dc)},
				{Name: "foo", Value: uid1[idx]},
				{Name: "bar", Value: uid2[idx]},
				{Name: "zoo", Value: uid3[idx]},
			},
			Point: mandodb.Point{Ts: ts, Value: float64(rand.Int31n(60))},
		})
	}

	return points
}

func main() {
	store := mandodb.OpenTSDB()
	defer store.Close()

	now := time.Now().Unix() - 36000 // 10h ago

	for i := 0; i < 720; i++ {
		for n := 0; n < 5; n++ {
			for j := 0; j < 1024; j++ {
				_ = store.InsertRows(genPoints(now, n, j))
			}
		}

		now += 60 //1min
	}

	fmt.Println("finished")

	select {}
}

æ¯ä¸ªåˆ†å—ä¿å­˜åœ¨å字为 seg-${mints}-${maxts} 文件夹里,æ¯ä¸ªæ–‡ä»¶å¤¹å«æœ‰ data å’Œ meta.json 两个文件。

  • data: 存储了一个 Segment 的所有数æ®ï¼ŒåŒ…括数æ®ç‚¹å’Œç´¢å¼•ä¿¡æ¯ã€‚
  • meta.json: æè¿°äº†åˆ†å—的时间线数é‡ï¼Œæ•°æ®ç‚¹æ•°é‡ä»¥åŠè¯¥å—çš„æ•°æ®æ—¶é—´è·¨åº¦ã€‚
⯠🶠tree -h seg-*
seg-1627709713-1627716973
├── [ 28M]  data
└── [ 110]  meta.json
seg-1627716973-1627724233
├── [ 28M]  data
└── [ 110]  meta.json
seg-1627724233-1627731493
├── [ 28M]  data
└── [ 110]  meta.json
seg-1627731493-1627738753
├── [ 28M]  data
└── [ 110]  meta.json
seg-1627738753-1627746013
├── [ 28M]  data
└── [ 110]  meta.json

0 directories, 10 files

⯠🶠cat seg-1627709713-1627716973/meta.json -p
{
    "seriesCount": 81920,
    "dataPointsCount": 9912336,
    "maxTs": 1627716973,
    "minTs": 1627709713
}

存储 8 ä¸‡æ¡æ—¶é—´çº¿å…±æŽ¥è¿‘ 1 åƒä¸‡çš„æ•°æ®ç‚¹çš„æ•°æ®å—å ç”¨ç£ç›˜ 28Mã€‚å®žé™…ä¸Šåœ¨å†™å…¥çš„æ—¶å€™ï¼Œä¸€æ¡æ•°æ®æ˜¯è¿™ä¸ªæ ·å­çš„。

{__name__="cpu.busy", node="vm0", dc="0", foo="bdac463d-8805-4cbe-bc9a-9bf495f87bab", bar="3689df1d-cbf3-4962-abea-6491861e62d2", zoo="9551010d-9726-4b3b-baf3-77e50655b950"} 1627710454 41

è¿™æ ·ä¸€æ¡æ•°æ®æŒ‰ç…§ JSON æ ¼å¼è¿›è¡Œç½‘络通信的è¯ï¼Œå¤§æ¦‚是 200Byte,åˆç•¥è®¡ç®—一下。

200 * 9912336 = 1982467200Byte = 1890M

å¯ä»¥é€‰æ‹© ZSTD 或者 Snappy 算法进行二次压缩(默认ä¸å¼€å¯ï¼‰ã€‚还是上é¢çš„示例代ç ï¼Œä¸è¿‡åœ¨ TSDB å¯åŠ¨çš„æ—¶å€™æŒ‡å®šäº†åŽ‹ç¼©ç®—æ³•ã€‚

ZstdBytesCompressor

func main() {
	store := mandodb.OpenTSDB(mandodb.WithMetaBytesCompressorType(mandodb.ZstdBytesCompressor))
	defer store.Close()
	// ...
}

// 压缩效果 28M -> 25M
⯠🶠ll seg-1627711905-1627719165
Permissions Size User          Date Modified Name
.rwxr-xr-x   25M chenjiandongx  1 Aug 00:13  data
.rwxr-xr-x   110 chenjiandongx  1 Aug 00:13  meta.json

SnappyBytesCompressor

func main() {
	store := mandodb.OpenTSDB(mandodb.WithMetaBytesCompressorType(mandodb.SnappyBytesCompressor))
	defer store.Close()
	// ...
}

// 压缩效果 28M -> 26M
⯠🶠ll seg-1627763918-1627771178
Permissions Size User          Date Modified Name
.rwxr-xr-x   26M chenjiandongx  1 Aug 14:39  data
.rwxr-xr-x   110 chenjiandongx  1 Aug 14:39  meta.json

多多少少还是有点效果的 🤪...

åŽ‹ç¼©æ˜¯æœ‰æˆæœ¬çš„ï¼ŒåŽ‹ç¼©ä½“ç§¯çš„åŒæ—¶ä¼šå¢žå¤§ CPU 开销(mbp å¯ä»¥ç…Žé¸¡è›‹äº†ï¼‰ï¼Œå‡ç¼“写入速率。

敲黑æ¿ï¼ŒæŽ¥ä¸‹æ¥å°±è¦æ¥å¥½å¥½è®²è®² data 文件到底写了什么东西。 data 存储布局如下。

Figure: Segment Stroage

TOC æè¿°äº† Data Block å’Œ Meta Block(Series Block + Labels Block)的体积,用于åŽé¢å¯¹ data 进行解æžè¯»å–。Data Block å­˜å‚¨äº†æ¯æ¡æ—¶é—´çº¿å…·ä½“的数æ®ç‚¹ï¼Œæ—¶é—´çº¿ä¹‹é—´æ•°æ®ç´§æŒ¨å­˜å‚¨ã€‚DataContent 就是使用 Gorilla 差值算法压缩的 block。

Figure: Data Block

Labels Block 记录了具体的 Label 值以åŠå¯¹åº” Label 与哪些 Series 相关è”。

Figure: Labels Block

Series Block è®°å½•äº†æ¯æ¡æ—¶é—´çº¿çš„元数æ®ï¼Œå­—段解释如下。

  • SidLength: Sid 的长度。
  • Sid: 时间线的唯一标识。
  • StartOffset: 时间线数æ®å—在 Data Block 中的起始å移。
  • EndOffset: 时间线数æ®å—在 Data Block 中的终止å移。
  • LabelCount: 时间线包å«çš„ Label æ•°é‡ã€‚
  • Labels: 标签在 Labels Block 中的åºå·ï¼ˆä»…记录åºå·ï¼Œä¸è®°å½•具体值)。

Figure: Series Block

了解完设计,å†çœ‹çœ‹ Meta Block ç¼–ç å’Œè§£ç¼–ç çš„代ç å®žçŽ°ï¼ŒbinaryMetaSerializer 实现了 MetaSerializer 接å£ã€‚

type MetaSerializer interface {
	Marshal(Metadata) ([]byte, error)
	Unmarshal([]byte, *Metadata) error
}

ç¼–ç  Metadata

const (
	endOfBlock uint16 = 0xffff
	uint16Size        = 2
	uint32Size        = 4
	uint64Size        = 8

	magic = "https://github.com/chenjiandongx/mandodb"
)

func (s *binaryMetaSerializer) Marshal(meta Metadata) ([]byte, error) {
	encf := newEncbuf()

	// labels block
	labelOrdered := make(map[string]int)
	for idx, row := range meta.Labels {
		labelOrdered[row.Name] = idx
		encf.MarshalUint16(uint16(len(row.Name)))
		encf.MarshalString(row.Name)
		encf.MarshalUint32(uint32(len(row.Sids)))
		encf.MarshalUint32(row.Sids...)
	}
	encf.MarshalUint16(endOfBlock)

	// series block
	for idx, series := range meta.Series {
		encf.MarshalUint16(uint16(len(series.Sid)))
		encf.MarshalString(series.Sid)
		encf.MarshalUint64(series.StartOffset, series.EndOffset)

		rl := meta.sidRelatedLabels[idx]
		encf.MarshalUint32(uint32(rl.Len()))

		lids := make([]uint32, 0, rl.Len())
		for _, lb := range rl {
			lids = append(lids, uint32(labelOrdered[lb.MarshalName()]))
		}

		sort.Slice(lids, func(i, j int) bool {
			return lids[i] < lids[j]
		})
		encf.MarshalUint32(lids...)
	}
	encf.MarshalUint16(endOfBlock)

	encf.MarshalUint64(uint64(meta.MinTs))
	encf.MarshalUint64(uint64(meta.MaxTs))
	encf.MarshalString(magic)   // <-- magic here

	return ByteCompress(encf.Bytes()), nil
}

è§£ç  Metadata

func (s *binaryMetaSerializer) Unmarshal(data []byte, meta *Metadata) error {
	data, err := ByteDecompress(data)
	if err != nil {
		return ErrInvalidSize
	}

	if len(data) < len(magic) {
		return ErrInvalidSize
	}

	decf := newDecbuf()
	// 检验数æ®å®Œæ•´æ€§
	if decf.UnmarshalString(data[len(data)-len(magic):]) != magic {
		return ErrInvalidSize
	}

	// labels block
	offset := 0
	labels := make([]seriesWithLabel, 0)
	for {
		var labelName string
		labelLen := decf.UnmarshalUint16(data[offset : offset+uint16Size])
		offset += uint16Size

		if labelLen == endOfBlock {
			break
		}

		labelName = decf.UnmarshalString(data[offset : offset+int(labelLen)])
		offset += int(labelLen)
		sidCnt := decf.UnmarshalUint32(data[offset : offset+uint32Size])
		offset += uint32Size

		sidLst := make([]uint32, sidCnt)
		for i := 0; i < int(sidCnt); i++ {
			sidLst[i] = decf.UnmarshalUint32(data[offset : offset+uint32Size])
			offset += uint32Size
		}
		labels = append(labels, seriesWithLabel{Name: labelName, Sids: sidLst})
	}
	meta.Labels = labels

	// series block
	rows := make([]metaSeries, 0)
	for {
		series := metaSeries{}
		sidLen := decf.UnmarshalUint16(data[offset : offset+uint16Size])
		offset += uint16Size

		if sidLen == endOfBlock {
			break
		}

		series.Sid = decf.UnmarshalString(data[offset : offset+int(sidLen)])
		offset += int(sidLen)
		series.StartOffset = decf.UnmarshalUint64(data[offset : offset+uint64Size])
		offset += uint64Size
		series.EndOffset = decf.UnmarshalUint64(data[offset : offset+uint64Size])
		offset += uint64Size
		labelCnt := decf.UnmarshalUint32(data[offset : offset+uint32Size])
		offset += uint32Size

		labelLst := make([]uint32, labelCnt)
		for i := 0; i < int(labelCnt); i++ {
			labelLst[i] = decf.UnmarshalUint32(data[offset : offset+uint32Size])
			offset += uint32Size
		}
		series.Labels = labelLst
		rows = append(rows, series)
	}
	meta.Series = rows

	meta.MinTs = int64(decf.UnmarshalUint64(data[offset : offset+uint64Size]))
	offset += uint64Size
	meta.MaxTs = int64(decf.UnmarshalUint64(data[offset : offset+uint64Size]))
	offset += uint64Size

	return decf.Err()
}

至此,对 mandodb çš„ç´¢å¼•å’Œå­˜å‚¨æ•´ä½“è®¾è®¡æ˜¯ä¸æ˜¯å°±äº†ç„¶äºŽèƒ¸ã€‚🥺 文档较长,建议 Star æ”¶è—,毕竟æ¥éƒ½æ¥äº†...

â“ FAQ

Q: Is mandodb cool?

A: 🤭 Not sure.

Q: Is mando awesome?

A: 😎 Definitely YES!

Q: Write performance?

A: 😯 ~40w/s

Q: PRs or Issues?

A: 😉 are welcome.

Q: What's the hardest part of this project?

A: 😂 Writing this document.

Q:Anything else?

A: 🻠Life is magic. Coding is art. Bilibili!

bilibili

📑 License

MIT ©chenjiandongx

About

🤔 A minimize Time Series Database, written from scratch as a learning project. 从零开始实现一个 TSDB

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •  

Languages

0