🤔 A minimize Time Series Database, written from scratch as a learning project.
æ—¶åºæ•°æ®åº“(TSDB: Time Series Database)大多数时候都是为了满足监控场景的需求,这里先介ç»ä¸¤ä¸ªæ¦‚念:
- æ•°æ®ç‚¹ï¼ˆPoint): æ—¶åºæ•°æ®çš„æ•°æ®ç‚¹æ˜¯ä¸€ä¸ªåŒ…å« (Timestamp:int64, Value:float64) 的二元组。
- 时间线(Series): ä¸åŒæ ‡ç¾ï¼ˆLabel)的组åˆç§°ä¸ºä¸åŒçš„æ—¶é—´çº¿ï¼Œå¦‚
series1: {"__name__": "netspeed", "host": "localhost", "iface": "eth0"}
series2: {"__name__": "netspeed", "host": "localhost", "iface": "eth1"}
Prometheus, InfluxDB, M3, TimescaleDB 都是时下æµè¡Œçš„ TSDBã€‚æ—¶åºæ•°æ®çš„压缩算法很大程度上决定了 TSDB çš„æ€§èƒ½ï¼Œä»¥ä¸Šå‡ ä¸ªé¡¹ç›®çš„å®žçŽ°éƒ½å‚考了 Facebook 2015 å¹´å‘表的论文《Gorilla: A fast, scalable, in-memory time series database》 ä¸æåˆ°çš„å·®å€¼ç®—æ³•ï¼Œè¯¥ç®—æ³•å¹³å‡å¯ä»¥å°† 16 å—节的数æ®ç‚¹åŽ‹ç¼©æˆ 1.37 å—节,下文会介ç»ã€‚
Who's mando?
Din Djarin, also known as "the Mandalorian" or simply "Mando," was a human male Mandalorian who worked as a famous bounty hunter during the New Republic Era.
What's mandodb?
作为一å监控系统开å‘人员,自然è¦å¯¹æ—¶åºæ•°æ®åº“有所了解。mandodb æ˜¯æˆ‘åœ¨ç ”ç©¶è¿‡ç¨‹ä¸å®žçŽ°çš„ä¸€ä¸ªæœ€å°åŒ–çš„ TSDB,从概念上æ¥è®²å®ƒè¿˜ç®—ä¸ä¸Šæ˜¯ä¸€ä¸ªå®Œæ•´çš„ TSDBï¼Œå› ä¸ºå®ƒï¼š
- 没有实现自己的查询引擎(实现难度大)
- 缺少ç£ç›˜å½’档文件 Compact æ“作(天气好的è¯ä¼šå®žçŽ°ï¼‰
- 没有 WAL 作为ç¾å¤‡ä¿è¯é«˜å¯ç”¨ï¼ˆå¿ƒæƒ…好的è¯ä¼šå®žçŽ°ï¼‰
mandodb 主è¦å—到了两个项目的å¯å‘。本项目仅é™äºŽå¦ä¹ 用途,未ç»ç”Ÿäº§çŽ¯å¢ƒæµ‹è¯•éªŒè¯ï¼
prometheus çš„æ ¸å¿ƒå¼€å‘者 Fabian Reinartz å†™äº†ä¸€ç¯‡æ–‡ç« ã€ŠWriting a Time Series Database from Scratch》 æ¥ä»‹ç» prometheus TSDB 的演å˜è¿‡ç¨‹ï¼Œéžå¸¸å€¼å¾—一读,强烈推è。
- 💡 æ•°æ®æ¨¡åž‹ & API
- 🛠é…置选项
- 🔖 用法示例
- 🧮 Gorilla 差值算法
- 📠数æ®å†™å…¥
- 🖇 Mmap å†…å˜æ˜ å°„
- 📠索引设计
- 🗂 å˜å‚¨å¸ƒå±€
- â“ FAQ
æ•°æ®æ¨¡åž‹å®šä¹‰
// Point 表示一个数æ®ç‚¹ (ts, value) 二元组
type Point struct {
Ts int64 // in seconds
Value float64
}
// Label ä»£è¡¨ä¸€ä¸ªæ ‡ç¾ç»„åˆ
type Label struct {
Name string
Value string
}
// Row ä¸€è¡Œæ—¶åºæ•°æ® 包括数æ®ç‚¹å’Œæ ‡ç¾ç»„åˆ
type Row struct {
Metric string
Labels LabelSet
Point Point
}
// LabelSet 表示 Label 组åˆ
type LabelSet []Label
// LabelMatcher Label 匹é…器 æ”¯æŒæ£åˆ™
type LabelMatcher struct {
Name string
Value string
IsRegx bool
}
// LabelMatcherSet 表示 LabelMatcher 组åˆ
type LabelMatcherSet []LabelMatcher
API
// InsertRows 写数æ®
InsertRows(rows []*Row) error
// QueryRange æŸ¥è¯¢æ—¶åºæ•°æ®ç‚¹
QueryRange(metric string, lms LabelMatcherSet, start, end int64) ([]MetricRet, error)
// QuerySeries 查询时åºåºåˆ—组åˆ
QuerySeries(lms LabelMatcherSet, start, end int64) ([]map[string]string, error)
// QueryLabelValues æŸ¥è¯¢æ ‡ç¾å€¼
QueryLabelValues(label string, start, end int64) []string
é…置项在åˆå§‹åŒ– TSDB 的时候设置。
// WithMetaSerializerType 设置 Metadata æ•°æ®çš„åºåˆ—化类型
// ç›®å‰åªæä¾›äº† BinaryMetaSerializer
WithMetaSerializerType(t MetaSerializerType) Option
// WithMetaBytesCompressorType 设置å—节数æ®çš„压缩算法
// ç›®å‰æä¾›äº†
// * ä¸åŽ‹ç¼©: NoopBytesCompressor(默认)
// * ZSTD: ZstdBytesCompressor
// * Snappy: SnappyBytesCompressor
WithMetaBytesCompressorType(t BytesCompressorType) Option
// WithOnlyMemoryMode 设置是å¦é»˜è®¤åªå˜å‚¨åœ¨å†…å˜ä¸
// 默认为 false
WithOnlyMemoryMode(memoryMode bool) Option
// WithEnabledOutdated è®¾ç½®æ˜¯å¦æ”¯æŒä¹±åºå†™å…¥ æ¤ç‰¹æ€§ä¼šå¢žåŠ èµ„æºå¼€é”€ 但会æå‡æ•°æ®å®Œæ•´æ€§
// 默认为 true
WithEnabledOutdated(outdated bool) Option
// WithMaxRowsPerSegment è®¾ç½®å• Segment 最大å…许å˜å‚¨çš„点数
// 默认为 19960412(夹æ‚ç§è´§ ðŸ¶ï¼‰
WithMaxRowsPerSegment(n int64) Option
// WithDataPath 设置 Segment æŒä¹…化å˜å‚¨æ–‡ä»¶å¤¹
// 默认为 "."
WithDataPath(d string) Option
// WithRetention 设置 Segment æŒä¹…化数æ®ä¿å˜æ—¶é•¿
// 默认为 7d
WithRetention(t time.Duration) Option
// WithWriteTimeout 设置写入超时阈值
// 默认为 30s
WithWriteTimeout(t time.Duration) Option
// WithLoggerConfig 设置日志é…置项
// logger: github.com/chenjiandongx/logger
WithLoggerConfig(opt *logger.Options) Option
package main
import (
"fmt"
"time"
"github.com/chenjiandongx/mandodb"
)
func main() {
store := mandodb.OpenTSDB(
mandodb.WithOnlyMemoryMode(true),
mandodb.WithWriteTimeout(10*time.Second),
)
defer store.Close()
// æ’入数æ®
_ = store.InsertRows([]*mandodb.Row{
{
Metric: "cpu.busy",
Labels: []mandodb.Label{
{Name: "node", Value: "vm1"},
{Name: "dc", Value: "gz-idc"},
},
Point: mandodb.Point{Ts: 1600000001, Value: 0.1},
},
{
Metric: "cpu.busy",
Labels: []mandodb.Label{
{Name: "node", Value: "vm2"},
{Name: "dc", Value: "sz-idc"},
},
Point: mandodb.Point{Ts: 1600000001, Value: 0.1},
},
})
time.Sleep(time.Millisecond)
// æ—¶åºæ•°æ®æŸ¥è¯¢
data, _ := store.QueryRange("cpu.busy", nil, 1600000000, 1600000002)
fmt.Printf("data: %+v\n", data)
// output:
// data: [{Labels:{__name__="cpu.busy", dc="gz-idc", node="vm1"} Points:[{Ts:1600000001 Value:0.1}]}]
// 查询 Series
// __name__ 是 metric å称在 TSDB ä¸çš„ Label Key
ser, _ := store.QuerySeries(
mandodb.LabelMatcherSet{{Name: "__name__", Value: "cpu.busy"}}, 1600000000, 1600000002)
for _, d := range ser {
fmt.Printf("data: %+v\n", d)
}
// output:
// data: map[__name__:cpu.busy dc:gz-idc node:vm1]
// data: map[__name__:cpu.busy dc:sz-idc node:vm2]
// æŸ¥è¯¢æ ‡ç¾å€¼
lvs := store.QueryLabelValues("node", 1600000000, 1600000002)
fmt.Printf("data: %+v\n", lvs)
// output:
// data: [vm1 vm2]
}
䏋颿˜¯æˆ‘对这段时间å¦ä¹ 内容的整ç†ï¼Œå°è¯•完整介ç»å¦‚何从零开始实现一个å°åž‹çš„ TSDB。
我本身并没有数æ®åº“å¼€å‘的背景,æŸäº›æè¿°å¯èƒ½å¹¶ä¸é‚£ä¹ˆå‡†ç¡®ï¼Œæ‰€ä»¥æ¬¢è¿Ž 实å diss 指æ£ã€‚
Gorilla 论文 4.1 å°èЂ介ç»äº†åŽ‹ç¼©ç®—æ³•ï¼Œå…ˆæ•´ä½“çœ‹ä¸€ä¸‹åŽ‹ç¼©æ–¹æ¡ˆï¼ŒT/V 是紧挨å˜å‚¨çš„,'0'/'10'/'11' 表示控制ä½ã€‚
Figure: Gorilla 压缩算法
Timestamp DOD 压缩:
在时åºçš„场景ä¸ï¼Œæ¯ä¸ªæ—¶åºç‚¹éƒ½æœ‰ä¸€ä¸ªå¯¹åº”çš„ Timestampï¼Œä¸€æ¡æ—¶åºåºåˆ—ä¸ç›¸é‚»æ•°æ®ç‚¹çš„间隔是有规律å¯å¾ªçš„。一般æ¥è®²ï¼Œç›‘控数æ®çš„é‡‡é›†éƒ½æ˜¯ä¼šä» F438 ¥å›ºå®šçš„æ—¶é—´é—´éš”进行的,所以就å¯ä»¥ç”¨å·®å€¼æ¥è®°å½•时间间隔,更进一æ¥ï¼Œæˆ‘们å¯ä»¥ç”¨å·®å€¼çš„差值æ¥è®°å½•ä»¥æ¤æ¥å‡å°‘å˜å‚¨ç©ºé—´ã€‚
t1: 1627401800; t2: 1627401810; t3: 1627401820; t4: 1627401830
--------------------------------------------------------------
// 差值:delta
t1: 1627401800; (t2-t1)d1: 10; (t3-t2)d2: 10; (t4-t3)d3: 10;
--------------------------------------------------------------
// 差值的差值:delta of delta
t1: 1627401800; dod1: 0; dod2: 0; dod3: 0;
实际环境ä¸å½“ç„¶ä¸å¯èƒ½æ¯ä¸ªé—´éš”都这么å‡åŒ€ï¼Œç”±äºŽç½‘络延迟ç‰å…¶ä»–åŽŸå› ï¼Œå·®å€¼ä¼šæœ‰æ³¢åŠ¨ã€‚
Value XOR 压缩:
Figure: IEEE æµ®ç‚¹æ•°ä»¥åŠ XOR 计算结果
当两个数æ®ç‚¹æ•°å€¼å€¼æ¯”较接近的è¯ï¼Œé€šè¿‡å¼‚或æ“作计算出æ¥çš„结果是比较相似的,利用这点就å¯ä»¥é€šè¿‡è®°å½•å‰ç½®é›¶å’ŒåŽç½®é›¶ä¸ªæ•°ä»¥åŠæ•°å€¼éƒ¨åˆ†æ¥è¾¾åˆ°åŽ‹ç¼©ç©ºé—´çš„ç›®çš„ã€‚
下é¢é€šè¿‡ç®—法实现æ¥ä»‹ç»ï¼Œä»£ç æ¥è‡ªé¡¹ç›® dgryski/go-tsz。代ç 完全按照论文ä¸ç»™å‡ºçš„æ¥éª¤æ¥å®žçŽ°ã€‚
// New åˆå§‹åŒ– block 这里会将第一个原始时间戳写入到 block ä¸
func New(t0 uint32) *Series {
s := Series{
T0: t0,
leading: ^uint8(0),
}
s.bw.writeBits(uint64(t0), 32)
return &s
}
// Push è´Ÿè´£å†™å…¥æ—¶åºæ•°æ®
func (s *Series) Push(t uint32, v float64) {
// ....
// 如果是第一个数æ®ç‚¹çš„è¯å†™å…¥åŽŸå§‹æ•°æ®åŽç›´æŽ¥è¿”回
if s.t == 0 {
s.t = t
s.val = v
s.tDelta = t - s.T0 // 实际上这里为 0
// The block header stores the starting time stamp, t-1(å‰ä¸€ä¸ªæ—¶é—´æˆ³ï¼‰,
// which is aligned to a two hour window; the first time
// stamp, t0, in the block is stored as a delta from t−1 in 14 bits.
// 用 14 个 bit 写入时间戳差值
s.bw.writeBits(uint64(s.tDelta), 14)
// 原始数æ®ç‚¹å®Œæ•´å†™å…¥
s.bw.writeBits(math.Float64bits(v), 64)
return
}
tDelta := t - s.t
dod := int32(tDelta - s.tDelta) // 计算差值的差值 Detla of Delta
// 下é¢å¼€å§‹å°±å¤„ç†éžç¬¬ä¸€ä¸ªæ•°æ®ç‚¹çš„æƒ…况了
switch {
// If D is zero, then store a single ‘0’ bit
// å¦‚æžœæ˜¯é›¶çš„è¯ é‚£ç›´æŽ¥ç”¨ '0' 一个å—节就å¯ä»¥ç›´æŽ¥è¡¨ç¤º
case dod == 0:
s.bw.writeBit(zero)
// If D is between [-63, 64], store ‘10’ followed by the value (7 bits)
case -63 <= dod && dod <= 64:
s.bw.writeBits(0x02, 2) // æŽ§åˆ¶ä½ '10'
s.bw.writeBits(uint64(dod), 7) // 7bits å¯ä»¥è¡¨ç¤º [-63, 64] 的范围
// If D is between [-255, 256], store ‘110’ followed by the value (9 bits)
case -255 <= dod && dod <= 256:
s.bw.writeBits(0x06, 3) // æŽ§åˆ¶ä½ '110'
s.bw.writeBits(uint64(dod), 9)
// if D is between [-2047, 2048], store ‘1110’ followed by the value (12 bits)
case -2047 <= dod && dod <= 2048:
s.bw.writeBits(0x0e, 4) // æŽ§åˆ¶ä½ '1110'
s.bw.writeBits(uint64(dod), 12)
// Otherwise store ‘1111’ followed by D using 32 bits
default:
s.bw.writeBits(0x0f, 4) // 其余情况控制ä½å‡ç”¨ '1111'
s.bw.writeBits(uint64(dod), 32)
}
// 到这里 (T, V) ä¸çš„æ—¶é—´æˆ³å·²ç»å†™å…¥å®Œæ¯•了 æŽ¥ä¸‹æ¥æ˜¯å†™ V 部分
// 先计算两个值的异或结果
vDelta := math.Float64bits(v) ^ math.Float64bits(s.val)
// If XOR with the previous is zero (same value), store single ‘0’ bit
// 如果å‰åŽä¸¤ä¸ªå€¼ç›¸ç‰çš„è¯ ç›´æŽ¥ç”¨ '0' 1 个 bit å°±å¯ä»¥è¡¨ç¤º
// æ‰€ä»¥å¦‚æžœä¸ŠæŠ¥çš„æ—¶åºæ•°æ®æ˜¯ 1 或者 0 è¿™ç§çš„è¯ å 用的内å˜ä¼šéžå¸¸å°‘
// zero = '0';
if vDelta == 0 {
s.bw.writeBit(zero)
} else { // éž 0 æƒ…å†µé‚£å°±è¦æŠŠæŽ§åˆ¶ä½ç½®ä¸º 1
s.bw.writeBit(one)
// 计算å‰ç½® 0 å’ŒåŽç½® 0
leading := uint8(bits.LeadingZeros64(vDelta))
trailing := uint8(bits.TrailingZeros64(vDelta))
// clamp number of leading zeros to avoid overflow when encoding
if leading >= 32 {
leading = 31
}
// (Control bit ‘0’) If the block of meaningful bits
// falls within the block of previous meaningful bits,
// i.e., there are at least as many leading zeros and
// as many trailing zeros as with the previous value,
// use that information for the block position and
// just store the meaningful XORed value.
// 如果å‰ç½® 0 ä¸å°äºŽä¸Šä¸€ä¸ªå€¼è®¡ç®—的异或结果的å‰ç½® 0 且åŽç½® 0 也ä¸å°äºŽä¸Šä¸€ä¸ªå€¼è®¡ç®—的异或结果的åŽç½® 0
if s.leading != ^uint8(0) && leading >= s.leading && trailing >= s.trailing { // => æŽ§åˆ¶ä½ '10'
s.bw.writeBit(zero)
// 记录异或值éžé›¶éƒ¨åˆ†
s.bw.writeBits(vDelta>>s.trailing, 64-int(s.leading)-int(s.trailing))
} else { // => æŽ§åˆ¶ä½ '11'
// (Control bit ‘1’) Store the length of the number
// of leading zeros in the next 5 bits, then store the
// length of the meaningful XORed value in the next
// 6 bits. Finally store the meaningful bits of the XORed value.
s.leading, s.trailing = leading, trailing
// 其他情况控制ä½ç½®ä¸º 1 并用接下æ¥çš„ 5bits 记录å‰ç½® 0 个数
s.bw.writeBit(one)
s.bw.writeBits(uint64(leading), 5)
// ç„¶åŽç”¨æŽ¥ä¸‹æ¥çš„ 6bits 记录异或差值ä¸çš„éžé›¶éƒ¨åˆ†
sigbits := 64 - leading - trailing
s.bw.writeBits(uint64(sigbits), 6)
s.bw.writeBits(vDelta>>trailing, int(sigbits))
}
}
// çŠ¶æ€æ›´æ–° 至æ¤ï¼ˆT, V)å‡å·²è¢«åŽ‹ç¼©å†™å…¥åˆ°å†…å˜ä¸
s.tDelta = tDelta
s.t = t
s.val = v
}
// æ¯ä¸ª block çš„ç»“å°¾ä¼šä½¿ç”¨ç‰¹æ®Šæ ‡è®°ç”¨äºŽæ ‡è¯†
func finish(w *bstream) {
// write an end-of-stream record
w.writeBits(0x0f, 4)
w.writeBits(0xffffffff, 32)
w.writeBit(zero)
}
论文给出了ä¸åŒ case çš„ buckets å æ¯”分布。
Figure: Timestamp buckets distribution
Figure: Value buckets distribution
Timestamp buckets ä¸ï¼Œå‰åŽä¸¤ä¸ªæ—¶é—´æˆ³å·®å€¼ç›¸åŒçš„æ¯”例高达 96.39%,而在 Value buckets ä¸åªç”¨ä¸€ä¸ªæŽ§åˆ¶ä½çš„å æ¯”也达到了 59.06%,å¯è§å…¶åŽ‹ç¼©æ¯”ä¹‹é«˜ã€‚
论文还给出了一个é‡è¦ç»“论,数æ®åŽ‹ç¼©æ¯”éšç€æ—¶é—´çš„增长而增长,并在 120 个点的时候开始收敛到一个最佳值。
Figure: 压缩率曲线
Gorilla 差值算法也应用于我的å¦å¤–一个项目 chenjiandongx/tszlistï¼Œä¸€ç§æ—¶åºæ•°æ®çº¿ç¨‹å®‰å…¨é“¾è¡¨ã€‚
æ—¶åºæ•°æ®å…·æœ‰ã€Œåž‚直写,水平查ã€çš„特性,å³åŒä¸€æ—¶åˆ»æœ‰å¤šæ¡æ—¶é—´çº¿çš„æ•°æ®ä¸æ–è¢«è¿½åŠ ã€‚ä½†æŸ¥è¯¢çš„æ—¶å€™å¾€å¾€æ˜¯æŸ¥æŸæ¡æ—¶é—´çº¿æŒç»ä¸€æ®µæ—¶é—´å†…的数æ®ç‚¹ã€‚
series
^
│ . . . . . . . . . . . . . . . . . . . . . . {__name__="request_total", method="GET"}
│ . . . . . . . . . . . . . . . . . . . . . . {__name__="request_total", method="POST"}
│ . . . . . . .
│ . . . . . . . . . . . . . . . . . . . ...
│ . . . . . . . . . . . . . . . . . . . . .
│ . . . . . . . . . . . . . . . . . . . . . {__name__="errors_total", method="POST"}
│ . . . . . . . . . . . . . . . . . {__name__="errors_total", method="GET"}
│ . . . . . . . . . . . . . .
│ . . . . . . . . . . . . . . . . . . . ...
│ . . . . . . . . . . . . . . . . . . . .
v
<-------------------- time --------------------->
æ—¶åºæ•°æ®è·Ÿæ—¶é—´æ˜¯å¼ºç›¸å…³çš„(ä¸ç„¶è¿˜å«æ—¶åºæ•°æ®ï¼ŸðŸ§ï¼‰ï¼Œå³å¤§å¤šæ•°æŸ¥è¯¢å…¶å®žåªä¼šæŸ¥è¯¢æœ€è¿‘时刻的数æ®ï¼Œè¿™é‡Œçš„ã€Œæœ€è¿‘ã€æ˜¯ä¸ªç›¸å¯¹æ¦‚念。所以没必è¦ç»´æŠ¤ä¸€æ¡æ—¶é—´çº¿çš„完整生命周期,特别是在 Kubernetes è¿™ç§äº‘原生场景,Pod éšæ—¶æœ‰å¯èƒ½ä¼šè¢«æ‰©ç¼©å®¹ï¼Œä¹Ÿå°±æ„味ç€ä¸€æ¡æ—¶é—´çº¿çš„生命周期å¯èƒ½ä¼šå¾ˆçŸã€‚å¦‚æžœæˆ‘ä»¬ä¸€ç›´è®°å½•ç€æ‰€æœ‰çš„æ—¶é—´çº¿çš„索引信æ¯ï¼Œé‚£ä¹ˆéšç€æ—¶é—´çš„æŽ¨ç§»ï¼Œæ•°æ®åº“里的时间线的数é‡ä¼šå‘ˆçŽ°ä¸€ä¸ªçº¿æ€§å¢žé•¿çš„è¶‹åŠ¿ 😱,会æžå¤§åœ°å½±å“查询效率。
这里引入一个概念「åºåˆ—分æµã€ï¼Œè¿™ä¸ªæ¦‚念æè¿°çš„æ˜¯ä¸€ç»„æ—¶é—´åºåˆ—å˜å¾—䏿´»è·ƒï¼Œå³ä¸å†æŽ¥æ”¶æ•°æ®ç‚¹ï¼Œå–而代之的是有一组新的活跃的åºåˆ—出现的场景。
series
^
│ . . . . . .
│ . . . . . .
│ . . . . . .
│ . . . . . . .
│ . . . . . . .
│ . . . . . . .
│ . . . . . .
│ . . . . . .
│ . . . . .
│ . . . . .
│ . . . . .
v
<-------------------- time --------------------->
æˆ‘ä»¬å°†å¤šæ¡æ—¶é—´çº¿çš„æ•°æ®æŒ‰ä¸€å®šçš„æ—¶é—´è·¨åº¦åˆ‡å‰²æˆå¤šä¸ªå°å—,æ¯ä¸ªå°å—本质就是一个独立å°åž‹çš„æ•°æ®åº“,这ç§å𿳕å¦å¤–一个优势是清除过期æ“作的时候éžå¸¸æ–¹ä¾¿ï¼Œåªè¦å°†æ•´ä¸ªå—ç»™åˆ äº†å°±è¡Œ 👻(æ¢å“ˆæ˜¯ä¸€ç§æ™ºæ…§ï¼‰ã€‚内å˜ä¸ä¿ç•™æœ€è¿‘ä¸¤ä¸ªå°æ—¶çš„çƒæ•°æ®ï¼ˆMemory Segmentï¼‰ï¼Œå…¶ä½™æ•°æ®æŒä¹…化到ç£ç›˜(Disk Segment)。
Figure: åºåˆ—分å—
DiskSegment 使用的是 AVL Tree
实现的列表,å¯åœ¨æ’入时排åºã€‚为什么ä¸ç”¨æ›´åŠ é«˜å¤§ä¸Šçš„çº¢é»‘æ ‘ï¼Ÿå› ä¸ºä¸å¥½å®žçް...
当 Memory Segment 达到归档æ¡ä»¶çš„æ—¶å€™ï¼Œä¼šåˆ›å»ºä¸€ä¸ªæ–°çš„内å˜å—并异æ¥å°†åˆšå½’档的å—写入到ç£ç›˜ï¼ŒåŒæ—¶ä¼šä½¿ç”¨ mmap å°†ç£ç›˜æ–‡ä»¶å¥æŸ„æ˜ å°„åˆ°å†…å˜ä¸ã€‚代ç 实现如下。
func (tsdb *TSDB) getHeadPartition() (Segment, error) {
tsdb.mut.Lock()
defer tsdb.mut.Unlock()
if tsdb.segs.head.Frozen() {
head := tsdb.segs.head
go func() {
tsdb.wg.Add(1)
defer tsdb.wg.Done()
tsdb.segs.Add(head)
t0 := time.Now()
dn := dirname(head.MinTs(), head.MaxTs())
if err := writeToDisk(head.(*memorySegment)); err != nil {
logger.Errorf("failed to flush data to disk, %v", err)
return
}
fname := path.Join(dn, "data")
mf, err := mmap.OpenMmapFile(fname)
if err != nil {
logger.Errorf("failed to make a mmap file %s, %v", fname, err)
return
}
tsdb.segs.Remove(head)
tsdb.segs.Add(newDiskSegment(mf, dn, head.MinTs(), head.MaxTs()))
logger.Infof("write file %s take: %v", fname, time.Since(t0))
}()
tsdb.segs.head = newMemorySegment()
}
return tsdb.segs.head, nil
}
Figure: Memory Segment 两部分数æ®
å†™å…¥çš„æ—¶å€™æ”¯æŒæ•°æ®æ—¶é—´å›žæ‹¨ï¼Œä¹Ÿå°±æ˜¯æ”¯æŒæœ‰é™çš„ä¹±åºæ•°æ®å†™å…¥ï¼Œå®žçŽ°æ–¹æ¡ˆæ˜¯åœ¨å†…å˜ä¸å¯¹è¿˜æ²¡å½’æ¡£çš„æ¯æ¡æ—¶é—´çº¿ç»´æŠ¤ä¸€ä¸ªé“¾è¡¨ï¼ˆåŒæ ·ä½¿ç”¨ AVL Tree 实现),当数æ®ç‚¹çš„æ—¶é—´æˆ³ä¸æ˜¯é€’增的时候å˜å‚¨åˆ°é“¾è¡¨ä¸ï¼ŒæŸ¥è¯¢çš„æ—¶å€™ä¼šå°†ä¸¤éƒ¨åˆ†æ•°æ®åˆå¹¶æŸ¥è¯¢ï¼ŒæŒä¹…化的时候也会将两者åˆå¹¶å†™å…¥ã€‚
mmap 是一ç§å°†ç£ç›˜æ–‡ä»¶æ˜ 射到进程的虚拟地å€ç©ºé—´æ¥å®žçŽ°å¯¹æ–‡ä»¶è¯»å–和修改æ“作的技术。
从 Linux 角度æ¥çœ‹ï¼Œæ“作系统的内å˜ç©ºé—´è¢«åˆ†ä¸ºã€Œå†…æ ¸ç©ºé—´ã€å’Œã€Œç”¨æˆ·ç©ºé—´ã€ä¸¤å¤§éƒ¨åˆ†ï¼Œå…¶ä¸å†…æ ¸ç©ºé—´å’Œç”¨æˆ·ç©ºé—´çš„ç©ºé—´å¤§å°ã€æ“作æƒé™ä»¥åŠæ ¸å¿ƒåŠŸèƒ½éƒ½ä¸ç›¸åŒã€‚è¿™é‡Œçš„å†…æ ¸ç©ºé—´æ˜¯æŒ‡æ“作系统本身使用的内å˜ç©ºé—´ï¼Œè€Œç”¨æˆ·ç©ºé—´åˆ™æ˜¯æä¾›ç»™å„个进程使用的内å˜ç©ºé—´ã€‚由于用户进程ä¸å…·æœ‰è®¿é—®å†…æ ¸èµ„æºçš„æƒé™ï¼Œä¾‹å¦‚访问硬件资æºï¼Œå› æ¤å½“一个用户进程需è¦ä½¿ç”¨å†…æ ¸èµ„æºçš„æ—¶å€™ï¼Œå°±éœ€è¦é€šè¿‡ 系统调用 æ¥å®Œæˆã€‚
虚拟内å˜ç»†èŠ‚å¯ä»¥é˜…读 《虚拟内å˜ç²¾ç²¹ã€‹ è¿™ç¯‡æ–‡ç« ã€‚
Figure: 常规文件æ“作和 mmap æ“作的区别
常规文件æ“作
读文件: 用户进程首先执行 read(2)
系统调用,会进行系统上下文环境切æ¢ï¼Œä»Žç”¨æˆ·æ€åˆ‡æ¢åˆ°å†…æ ¸æ€ï¼Œä¹‹åŽç”± DMA 将文件数æ®ä»Žç£ç›˜è¯»å–åˆ°å†…æ ¸ç¼“å†²åŒºï¼Œå†å°†å†…æ ¸ç©ºé—´ç¼“å†²åŒºçš„æ•°æ®å¤åˆ¶åˆ°ç”¨æˆ·ç©ºé—´çš„缓冲区ä¸ï¼Œæœ€åŽ read(2)
ç³»ç»Ÿè°ƒç”¨è¿”å›žï¼Œè¿›ç¨‹ä»Žå†…æ ¸æ€åˆ‡æ¢åˆ°ç”¨æˆ·æ€ï¼Œæ•´ä¸ªè¿‡ç¨‹ç»“æŸã€‚
写文件: 用户进程å‘èµ· write(2)
系统调用,从用户æ€åˆ‡æ¢åˆ°å†…æ ¸æ€ï¼Œå°†æ•°æ®ä»Žç”¨æˆ·ç©ºé—´ç¼“冲区å¤åˆ¶åˆ°å†…æ ¸ç©ºé—´ç¼“å†²åŒºï¼ŒæŽ¥ç€ write(2)
ç³»ç»Ÿè°ƒç”¨è¿”å›žï¼ŒåŒæ—¶è¿›ç¨‹ä»Žå†…æ ¸æ€åˆ‡æ¢åˆ°ç”¨æˆ·æ€ï¼Œæ•°æ®ä»Žå†…æ ¸ç¼“å†²åŒºå†™å…¥åˆ°ç£ç›˜ï¼Œæ•´ä¸ªè¿‡ç¨‹ç»“æŸã€‚
mmap æ“作
mmap å†…å˜æ˜ 射的实现过程,总的æ¥è¯´å¯ä»¥åˆ†ä¸ºä¸‰ä¸ªé˜¶æ®µï¼š
- 进程å¯åŠ¨æ˜ å°„è¿‡ç¨‹ï¼Œå¹¶åœ¨è™šæ‹Ÿåœ°å€ç©ºé—´ä¸ä¸ºæ˜ å°„åˆ›å»ºè™šæ‹Ÿæ˜ å°„åŒºåŸŸã€‚
- æ‰§è¡Œå†…æ ¸ç©ºé—´çš„ç³»ç»Ÿè°ƒç”¨å‡½æ•° mmap,建立文件物ç†åœ°å€å’Œè¿›ç¨‹è™šæ‹Ÿåœ°å€çš„ä¸€ä¸€æ˜ å°„å…³ç³»ã€‚
- 进程å‘èµ·å¯¹è¿™ç‰‡æ˜ å°„ç©ºé—´çš„è®¿é—®ï¼Œå¼•å‘缺页异常,实现文件内容到物ç†å†…å˜çš„æ‹·è´ã€‚
📣 å°ç»“
常规文件æ“作为了æé«˜è¯»å†™æ•ˆçŽ‡å’Œä¿æŠ¤ç£ç›˜ï¼Œä½¿ç”¨äº†é¡µç¼“å˜æœºåˆ¶ã€‚è¿™æ ·é€ æˆè¯»æ–‡ä»¶æ—¶éœ€è¦å…ˆå°†æ–‡ä»¶é¡µä»Žç£ç›˜æ‹·è´åˆ°é¡µç¼“å˜ä¸ï¼Œç”±äºŽé¡µç¼“å˜å¤„åœ¨å†…æ ¸ç©ºé—´ï¼Œä¸èƒ½è¢«ç”¨æˆ·è¿›ç¨‹ç›´æŽ¥å¯»å€ï¼Œæ‰€ä»¥è¿˜éœ€è¦å°†é¡µç¼“å˜ä¸æ•°æ®é¡µå†æ¬¡æ‹·è´åˆ°å†…å˜å¯¹åº”的用户空间ä¸ã€‚è¿™æ ·ï¼Œé€šè¿‡äº†ä¸¤æ¬¡æ•°æ®æ‹·è´è¿‡ç¨‹ï¼Œæ‰èƒ½å®Œæˆè¿›ç¨‹å¯¹æ–‡ä»¶å†…容的获å–任务。写æ“ä½œä¹Ÿæ˜¯ä¸€æ ·ï¼Œå¾…å†™å…¥çš„ buffer åœ¨å†…æ ¸ç©ºé—´ä¸èƒ½ç›´æŽ¥è®¿é—®ï¼Œå¿…é¡»è¦å…ˆæ‹·è´è‡³å†…æ ¸ç©ºé—´å¯¹åº”çš„ä¸»å˜ï¼Œå†å†™å›žç£ç›˜ä¸ï¼ˆå»¶è¿Ÿå†™å›žï¼‰ï¼Œä¹Ÿæ˜¯éœ€è¦ä¸¤æ¬¡æ•°æ®æ‹·è´ã€‚
而使用 mmap æ“作文件,创建新的虚拟内å˜åŒºåŸŸå’Œå»ºç«‹æ–‡ä»¶ç£ç›˜åœ°å€å’Œè™šæ‹Ÿå†…å˜åŒºåŸŸæ˜ 射这两æ¥ï¼Œæ²¡æœ‰ä»»ä½•æ–‡ä»¶æ‹·è´æ“作。而之åŽè®¿é—®æ•°æ®æ—¶å‘现内å˜ä¸å¹¶æ— æ•°æ®è€Œå‘起的缺页异常过程,å¯ä»¥é€šè¿‡å·²ç»å»ºç«‹å¥½çš„æ˜ 射关系,åªä½¿ç”¨ä¸€æ¬¡æ•°æ®æ‹·è´ï¼Œå°±ä»Žç£ç›˜ä¸å°†æ•°æ®ä¼ 入内å˜çš„用户空间ä¸ï¼Œä¾›è¿›ç¨‹ä½¿ç”¨ã€‚
😅 总而言之,常规文件æ“作需è¦ä»Žç£ç›˜åˆ°é¡µç¼“å˜å†åˆ°ç”¨æˆ·ä¸»å˜çš„ä¸¤æ¬¡æ•°æ®æ‹·è´ã€‚而 mmap æ“æŽ§æ–‡ä»¶åªéœ€è¦ä»Žç£ç›˜åˆ°ç”¨æˆ·ä¸»å˜çš„ä¸€æ¬¡æ•°æ®æ‹·è´è¿‡ç¨‹ã€‚mmap 的关键点是实现了「用户空间ã€å’Œã€Œå†…æ ¸ç©ºé—´ã€çš„æ•°æ®ç›´æŽ¥äº¤äº’而çœåŽ»äº†ä¸åŒç©ºé—´æ•°æ®å¤åˆ¶çš„开销。
TSDB 的查询,是通过 Label ç»„åˆæ¥é”定到具体的时间线进而确定分å—å移检索出数æ®ã€‚
- Sid(MetricHash/-/LabelHash) 是一个 Series çš„å”¯ä¸€æ ‡è¯†ã€‚
- Label(Name/-/Value) => vm="node1"; vm="node2"; iface="eth0"。
åœ¨ä¼ ç»Ÿçš„å…³ç³»åž‹æ•°æ®åº“,索引设计å¯èƒ½æ˜¯è¿™æ ·çš„。
Sid(主键) | Label1 | Label2 | Label3 | Label4 | ... | LabelN |
---|---|---|---|---|---|---|
sid1 | × | × | × | ... | × | |
sid2 | × | × | × | ... | × | |
sid3 | × | × | × | ... | × | |
sid4 | × | × | × | ... | × |
æ—¶åºæ•°æ®æ˜¯ NoSchema
的,没办法æå‰å»ºè¡¨å’Œå®šä¹‰æ•°æ®æ¨¡åž‹ ðŸ¤”ï¼Œå› ä¸ºæˆ‘ä»¬è¦æ”¯æŒç”¨æˆ·ä¸ŠæŠ¥ä»»æ„ Label 组åˆçš„æ•°æ®ï¼Œè¿™æ ·çš„è¯å°±æ²¡åŠžæ³•è¿›è¡ŒåŠ¨æ€çš„æ‰©å±•äº†ã€‚æˆ–è®¸ä½ ä¼šçµå…‰ä¸€çް âœ¨ï¼Œæ—¢ç„¶è¿™æ ·ï¼Œé‚£æŠŠ Labels æ”¾ä¸€ä¸ªå—æ®µæ‹¼æŽ¥èµ·æ¥ä¸å°±å¯ä»¥æ— 陿‰©å±•啦,比如下é¢è¿™ä¸ªæ ·å。
Sid(主键) | Labels |
---|---|
sid1 | label1, label2, label3, ... |
sid2 | label2, label3, label5, ... |
sid3 | label4, label6, label9, ... |
sid4 | label2, label3, label7, ... |
哟嚯,ä¹ä¸€çœ‹æ²¡æ¯›ç—…,é“仔窃喜。
ä¸å¯¹ï¼Œæœ‰é—®é¢˜ 😨,è¦å®šä½åˆ°å…¶ä¸çš„æŸæ¡æ—¶é—´çº¿ï¼Œé‚£æˆ‘æ˜¯ä¸æ˜¯å¾—全表扫æä¸€è¶Ÿã€‚而且这ç§è®¾è®¡è¿˜æœ‰å¦å¤–ä¸€ä¸ªå¼Šç—…ï¼Œå°±æ˜¯ä¼šå¯¼è‡´å†…å˜æ¿€å¢žï¼ŒLabel çš„ Name å’Œ Value 都å¯èƒ½æ˜¯ç‰¹åˆ«é•¿çš„å—符串。
那怎么办呢(🤡 é“仔沉默...),刹那间我的脑ä¸é—ªè¿‡ä¸€ä¸ªå¸…æ°”çš„èº«å½±ï¼Œæ²¡é”™ï¼Œå°±æ˜¯ä½ ï¼ŒèŠ±æ³½ç±»ã€Œåªè¦å€’立眼泪就ä¸ä¼šæµå‡ºæ¥ã€ã€‚
我悟了ï¼è¦å¦ä¼šé€†å‘æ€ç»´ 🙃,把 Label 当åšä¸»é”®ï¼ŒSid 当åšå…¶å—段ä¸å°±å¥½äº†ã€‚这其实有点类似于 ElasticSearch ä¸çš„倒排索引,主键为 Keywordï¼Œå—æ®µä¸º DocumentID。索引设计如下。
Label(主键) | Sids |
---|---|
label1: {vm="node1"} | sid1, sid2, sid3, ... |
label2: {vm="node2"} | sid2, sid3, sid5, ... |
label3: {iface="eth0"} | sid3, sid5, sid9, ... |
label4: {iface="eth1"} | sid2, sid3, sid7, ... |
Label 作为主键时会建立索引(Hashkey),查找的效率å¯è§†ä¸º O(1)ï¼Œå†æ ¹æ®é”定的 Label æ¥æœ€ç»ˆç¡®å®šæƒ³è¦çš„ Sid。举个例åï¼Œæˆ‘ä»¬æƒ³è¦æŸ¥æ‰¾ {vm="node1", iface="eth0"}
的时间线的è¯å°±å¯ä»¥å¿«é€Ÿå®šä½åˆ° Sids(忽略其他 ... sid)。
sid1; sid2; sid3
sid2; sid3; sid5
两者求一个交集,就å¯ä»¥å¾—åˆ°æœ€ç»ˆè¦æŸ¥è¯¢çš„ Sid 为 sid2
和 sid3
。🙂 Nice!
å‡è®¾æˆ‘ä»¬çš„æŸ¥è¯¢åªæ”¯æŒç›¸ç‰åŒ¹é…çš„è¯ï¼Œæ ¼å±€æ˜Žæ˜¾å°±å°äº† 🤌。查询æ¡ä»¶æ˜¯ {vm=~"node*", iface="eth0"}
肿么办?对 label1ã€label2ã€label3 å’Œ label4 一起求一个并集å—ï¼Ÿæ˜¾ç„¶ä¸æ˜¯ï¼Œå› ä¸ºè¿™æ ·ç®—çš„è¯é‚£ç»“果就是 sid3
。
厘清关系就ä¸éš¾çœ‹å‡ºï¼Œåªè¦å¯¹ç›¸åŒçš„ Label Name åšå¹¶é›†ç„¶åŽå†å¯¹ä¸åŒçš„ Label Name 求交集就å¯ä»¥äº†ã€‚è¿™æ ·ç®—çš„æ£ç¡®ç»“果就是 sid3
和 sid5
。实现的时候用到了 Roaring Bitmap,一ç§ä¼˜åŒ–çš„ä½å›¾ç®—法。
Memory Segment 索引匹é…
func (mim *memoryIndexMap) MatchSids(lvs *labelValueSet, lms LabelMatcherSet) []string {
// ...
sids := newMemorySidSet()
var got bool
for i := len(lms) - 1; i >= 0; i-- {
tmp := newMemorySidSet()
vs := lvs.Match(lms[i])
// 对相åŒçš„ Label Name 求并集
for _, v := range vs {
midx := mim.idx[joinSeparator(lms[i].Name, v)]
if midx == nil || midx.Size() <= 0 {
continue
}
tmp.Union(midx.Copy())
}
if tmp == nil || tmp.Size() <= 0 {
return nil
}
if !got {
sids = tmp
got = true
continue
}
// 对ä¸åŒçš„ Label Name 求交集
sids.Intersection(tmp.Copy())
}
return sids.List()
}
Disk Segment 索引匹é…
func (dim *diskIndexMap) MatchSids(lvs *labelValueSet, lms LabelMatcherSet) []uint32 {
// ...
lst := make([]*roaring.Bitmap, 0)
for i := len(lms) - 1; i >= 0; i-- {
tmp := make([]*roaring.Bitmap, 0)
vs := lvs.Match(lms[i])
// 对相åŒçš„ Label Name 求并集
for _, v := range vs {
didx := dim.label2sids[joinSeparator(lms[i].Name, v)]
if didx == nil || didx.set.IsEmpty() {
continue
}
tmp = append(tmp, didx.set)
}
union := roaring.ParOr(4, tmp...)
if union.IsEmpty() {
return nil
}
lst = append(lst, union)
}
// 对ä¸åŒçš„ Label Name 求交集
return roaring.ParAnd(4, lst...).ToArray()
}
然而,确定相åŒçš„ LabelName ä¹Ÿæ˜¯ä¸€ä¸ªé—®é¢˜ï¼Œå› ä¸º Label æœ¬èº«å°±ä»£è¡¨ç€ Name:Value
ï¼Œéš¾ä¸æˆæˆ‘还è¦é历所有 label æ‰èƒ½ç¡®å®šå˜›ï¼Œè¿™ä¸å°±åˆæˆäº†å…¨è¡¨æ‰«æï¼Ÿï¼Ÿï¼Ÿ
没有什么问题是一个索引解决ä¸äº†çš„,如果有,那就å†å¢žåŠ ä¸€ä¸ªç´¢å¼•ã€‚ --- é²è¿…。
åªè¦æˆ‘们ä¿å˜ Label çš„ Name 对应的 Value åˆ—è¡¨çš„æ˜ å°„å…³ç³»å³å¯é«˜æ•ˆè§£å†³è¿™ä¸ªé—®é¢˜ã€‚
LabelName | LabelValue |
---|---|
vm | node1, node2, ... |
iface | eth0, eth1, ... |
还是上é¢çš„ {vm=~"node1|node2", iface="eth0"}
查询,第一æ¥é€šè¿‡æ£åˆ™åŒ¹é…确定匹é…到 node1, node2
,第二æ¥åŒ¹é…到 eth0
,å†å°† LabelName å’Œ LabelValue 一拼装,Label 就出æ¥äº†ï¼ŒâœŒï¸ 完事ï¼
桥豆麻袋ï¼è¿˜æœ‰ä¸€ä¸ªç²¾å½©çš„æ£åˆ™åŒ¹é…优化算法没介ç»ã€‚
fastRegexMatcher 是一ç§ä¼˜åŒ–çš„æ£åˆ™åŒ¹é…器,算法æ¥è‡ª Prometheus。
// æ€è·¯å°±æ˜¯å°½é‡å…ˆæ‰§è¡Œå‰ç¼€åŒ¹é…å’ŒåŽç¼€åŒ¹é… 能ä¸ç”¨æ£åˆ™å°±ä¸ç”¨æ£åˆ™
// 如 label 表达å¼ä¸º {vm="node*"}
// è€Œæˆ‘ä»¬æ¤æ—¶å†…å˜ä¸æœ‰ vm=node1, vm=node2, vm=foo, vm=bar,那这个时候åªéœ€è¦å‰ç¼€åŒ¹é…就能直接把 vm=foo,vm=bar 给过滤了
// 毕竟å‰ç¼€åŒ¹é…å’ŒåŽç¼€åŒ¹é…的执行效率还是比æ£åˆ™é«˜ä¸å°‘çš„
type fastRegexMatcher struct {
re *regexp.Regexp
prefix string
suffix string
contains string
}
func newFastRegexMatcher(v string) (*fastRegexMatcher, error) {
re, err := regexp.Compile("^(?:" + v + ")$")
if err != nil {
return nil, err
}
parsed, err := syntax.Parse(v, syntax.Perl)
if err != nil {
return nil, err
}
m := &fastRegexMatcher{
re: re,
}
if parsed.Op == syntax.OpConcat {
m.prefix, m.suffix, m.contains = optimizeConcatRegex(parsed)
}
return m, nil
}
// optimizeConcatRegex returns literal prefix/suffix text that can be safely
// checked against the label value before running the regexp matcher.
func optimizeConcatRegex(r *syntax.Regexp) (prefix, suffix, contains string) {
sub := r.Sub
// We can safely remove begin and end text matchers respectively
// at the beginning and end of the regexp.
if len(sub) > 0 && sub[0].Op == syntax.OpBeginText {
sub = sub[1:]
}
if len(sub) > 0 && sub[len(sub)-1].Op == syntax.OpEndText {
sub = sub[:len(sub)-1]
}
if len(sub) == 0 {
return
}
// Given Prometheus regex matchers are always anchored to the begin/end
// of the text, if the first/last operations are literals, we can safely
// treat them as prefix/suffix.
if sub[0].Op == syntax.OpLiteral && (sub[0].Flags&syntax.FoldCase) == 0 {
prefix = string(sub[0].Rune)
}
if last := len(sub) - 1; sub[last].Op == syntax.OpLiteral && (sub[last].Flags&syntax.FoldCase) == 0 {
suffix = string(sub[last].Rune)
}
// If contains any literal which is not a prefix/suffix, we keep the
// 1st one. We do not keep the whole list of literals to simplify the
// fast path.
for i := 1; i < len(sub)-1; i++ {
if sub[i].Op == syntax.OpLiteral && (sub[i].Flags&syntax.FoldCase) == 0 {
contains = string(sub[i].Rune)
break
}
}
return
}
func (m *fastRegexMatcher) MatchString(s string) bool {
if m.prefix != "" && !strings.HasPrefix(s, m.prefix) {
return false
}
if m.suffix != "" && !strings.HasSuffix(s, m.suffix) {
return false
}
if m.contains != "" && !strings.Contains(s, m.contains) {
return false
}
return m.re.MatchString(s)
}
既然是数æ®åº“,那么自然少ä¸äº†æ•°æ®æŒä¹…化的特性。了解完索引的设计,å†çœ‹çœ‹è½åˆ°ç£ç›˜çš„å˜å‚¨å¸ƒå±€å°±å¾ˆæ¸…晰了。先跑个示例程åºå†™å…¥ä¸€äº›æ•°æ®çƒçƒèº«ã€‚
package main
import (
"fmt"
"math/rand"
"strconv"
"time"
"github.com/chenjiandongx/mandodb"
"github.com/satori/go.uuid"
)
// æ¨¡æ‹Ÿä¸€äº›ç›‘æŽ§æŒ‡æ ‡
var metrics = []string{
"cpu.busy", "cpu.load1", "cpu.load5", "cpu.load15", "cpu.iowait",
"disk.write.ops", "disk.read.ops", "disk.used",
"net.in.bytes", "net.out.bytes", "net.in.packages", "net.out.packages",
"mem.used", "mem.idle", "mem.used.bytes", "mem.total.bytes",
}
// å¢žåŠ Label æ•°é‡
var uid1, uid2, uid3 []string
func init() {
for i := 0; i < len(metrics); i++ {
uid1 = append(uid1, uuid.NewV4().String())
uid2 = append(uid2, uuid.NewV4().String())
uid3 = append(uid3, uuid.NewV4().String())
}
}
func genPoints(ts int64, node, dc int) []*mandodb.Row {
points := make([]*mandodb.Row, 0)
for idx, metric := range metrics {
points = append(points, &mandodb.Row{
Metric: metric,
Labels: []mandodb.Label{
{Name: "node", Value: "vm" + strconv.Itoa(node)},
{Name: "dc", Value: strconv.Itoa(dc)},
{Name: "foo", Value: uid1[idx]},
{Name: "bar", Value: uid2[idx]},
{Name: "zoo", Value: uid3[idx]},
},
Point: mandodb.Point{Ts: ts, Value: float64(rand.Int31n(60))},
})
}
return points
}
func main() {
store := mandodb.OpenTSDB()
defer store.Close()
now := time.Now().Unix() - 36000 // 10h ago
for i := 0; i < 720; i++ {
for n := 0; n < 5; n++ {
for j := 0; j < 1024; j++ {
_ = store.InsertRows(genPoints(now, n, j))
}
}
now += 60 //1min
}
fmt.Println("finished")
select {}
}
æ¯ä¸ªåˆ†å—ä¿å˜åœ¨åå—为 seg-${mints}-${maxts}
文件夹里,æ¯ä¸ªæ–‡ä»¶å¤¹å«æœ‰ data
和 meta.json
两个文件。
- data: å˜å‚¨äº†ä¸€ä¸ª Segment 的所有数æ®ï¼ŒåŒ…括数æ®ç‚¹å’Œç´¢å¼•ä¿¡æ¯ã€‚
- meta.json: æè¿°äº†åˆ†å—的时间线数é‡ï¼Œæ•°æ®ç‚¹æ•°é‡ä»¥åŠè¯¥å—çš„æ•°æ®æ—¶é—´è·¨åº¦ã€‚
⯠🶠tree -h seg-*
seg-1627709713-1627716973
├── [ 28M] data
└── [ 110] meta.json
seg-1627716973-1627724233
├── [ 28M] data
└── [ 110] meta.json
seg-1627724233-1627731493
├── [ 28M] data
└── [ 110] meta.json
seg-1627731493-1627738753
├── [ 28M] data
└── [ 110] meta.json
seg-1627738753-1627746013
├── [ 28M] data
└── [ 110] meta.json
0 directories, 10 files
⯠🶠cat seg-1627709713-1627716973/meta.json -p
{
"seriesCount": 81920,
"dataPointsCount": 9912336,
"maxTs": 1627716973,
"minTs": 1627709713
}
å˜å‚¨ 8 ä¸‡æ¡æ—¶é—´çº¿å…±æŽ¥è¿‘ 1 åƒä¸‡çš„æ•°æ®ç‚¹çš„æ•°æ®å—å 用ç£ç›˜ 28Mã€‚å®žé™…ä¸Šåœ¨å†™å…¥çš„æ—¶å€™ï¼Œä¸€æ¡æ•°æ®æ˜¯è¿™ä¸ªæ ·å的。
{__name__="cpu.busy", node="vm0", dc="0", foo="bdac463d-8805-4cbe-bc9a-9bf495f87bab", bar="3689df1d-cbf3-4962-abea-6491861e62d2", zoo="9551010d-9726-4b3b-baf3-77e50655b950"} 1627710454 41
è¿™æ ·ä¸€æ¡æ•°æ®æŒ‰ç…§ JSON æ ¼å¼è¿›è¡Œç½‘络通信的è¯ï¼Œå¤§æ¦‚是 200Byte,åˆç•¥è®¡ç®—一下。
200 * 9912336 = 1982467200Byte = 1890M
å¯ä»¥é€‰æ‹© ZSTD 或者 Snappy 算法进行二次压缩(默认ä¸å¼€å¯ï¼‰ã€‚还是上é¢çš„示例代ç ,ä¸è¿‡åœ¨ TSDB å¯åŠ¨çš„æ—¶å€™æŒ‡å®šäº†åŽ‹ç¼©ç®—æ³•ã€‚
ZstdBytesCompressor
func main() {
store := mandodb.OpenTSDB(mandodb.WithMetaBytesCompressorType(mandodb.ZstdBytesCompressor))
defer store.Close()
// ...
}
// 压缩效果 28M -> 25M
⯠🶠ll seg-1627711905-1627719165
Permissions Size User Date Modified Name
.rwxr-xr-x 25M chenjiandongx 1 Aug 00:13 data
.rwxr-xr-x 110 chenjiandongx 1 Aug 00:13 meta.json
SnappyBytesCompressor
func main() {
store := mandodb.OpenTSDB(mandodb.WithMetaBytesCompressorType(mandodb.SnappyBytesCompressor))
defer store.Close()
// ...
}
// 压缩效果 28M -> 26M
⯠🶠ll seg-1627763918-1627771178
Permissions Size User Date Modified Name
.rwxr-xr-x 26M chenjiandongx 1 Aug 14:39 data
.rwxr-xr-x 110 chenjiandongx 1 Aug 14:39 meta.json
多多少少还是有点效果的 🤪...
åŽ‹ç¼©æ˜¯æœ‰æˆæœ¬çš„ï¼ŒåŽ‹ç¼©ä½“ç§¯çš„åŒæ—¶ä¼šå¢žå¤§ CPU 开销(mbp å¯ä»¥ç…Žé¸¡è›‹äº†ï¼‰ï¼Œå‡ç¼“写入速率。
敲黑æ¿ï¼ŒæŽ¥ä¸‹æ¥å°±è¦æ¥å¥½å¥½è®²è®² data
文件到底写了什么东西。 data
å˜å‚¨å¸ƒå±€å¦‚下。
Figure: Segment Stroage
TOC æè¿°äº† Data Block å’Œ Meta Block(Series Block + Labels Block)的体积,用于åŽé¢å¯¹ data 进行解æžè¯»å–。Data Block å˜å‚¨äº†æ¯æ¡æ—¶é—´çº¿å…·ä½“的数æ®ç‚¹ï¼Œæ—¶é—´çº¿ä¹‹é—´æ•°æ®ç´§æŒ¨å˜å‚¨ã€‚DataContent 就是使用 Gorilla 差值算法压缩的 block。
Figure: Data Block
Labels Block 记录了具体的 Label 值以åŠå¯¹åº” Label 与哪些 Series 相关è”。
Figure: Labels Block
Series Block è®°å½•äº†æ¯æ¡æ—¶é—´çº¿çš„元数æ®ï¼Œå—段解释如下。
- SidLength: Sid 的长度。
- Sid: æ—¶é—´çº¿çš„å”¯ä¸€æ ‡è¯†ã€‚
- StartOffset: 时间线数æ®å—在 Data Block ä¸çš„èµ·å§‹å移。
- EndOffset: 时间线数æ®å—在 Data Block ä¸çš„终æ¢å移。
- LabelCount: 时间线包å«çš„ Label æ•°é‡ã€‚
- Labels: æ ‡ç¾åœ¨ Labels Block ä¸çš„åºå·ï¼ˆä»…记录åºå·ï¼Œä¸è®°å½•具体值)。
Figure: Series Block
了解完设计,å†çœ‹çœ‹ Meta Block ç¼–ç 和解编ç 的代ç 实现,binaryMetaSerializer 实现了 MetaSerializer
接å£ã€‚
type MetaSerializer interface {
Marshal(Metadata) ([]byte, error)
Unmarshal([]byte, *Metadata) error
}
ç¼–ç Metadata
const (
endOfBlock uint16 = 0xffff
uint16Size = 2
uint32Size = 4
uint64Size = 8
magic = "https://github.com/chenjiandongx/mandodb"
)
func (s *binaryMetaSerializer) Marshal(meta Metadata) ([]byte, error) {
encf := newEncbuf()
// labels block
labelOrdered := make(map[string]int)
for idx, row := range meta.Labels {
labelOrdered[row.Name] = idx
encf.MarshalUint16(uint16(len(row.Name)))
encf.MarshalString(row.Name)
encf.MarshalUint32(uint32(len(row.Sids)))
encf.MarshalUint32(row.Sids...)
}
encf.MarshalUint16(endOfBlock)
// series block
for idx, series := range meta.Series {
encf.MarshalUint16(uint16(len(series.Sid)))
encf.MarshalString(series.Sid)
encf.MarshalUint64(series.StartOffset, series.EndOffset)
rl := meta.sidRelatedLabels[idx]
encf.MarshalUint32(uint32(rl.Len()))
lids := make([]uint32, 0, rl.Len())
for _, lb := range rl {
lids = append(lids, uint32(labelOrdered[lb.MarshalName()]))
}
sort.Slice(lids, func(i, j int) bool {
return lids[i] < lids[j]
})
encf.MarshalUint32(lids...)
}
encf.MarshalUint16(endOfBlock)
encf.MarshalUint64(uint64(meta.MinTs))
encf.MarshalUint64(uint64(meta.MaxTs))
encf.MarshalString(magic) // <-- magic here
return ByteCompress(encf.Bytes()), nil
}
è§£ç Metadata
func (s *binaryMetaSerializer) Unmarshal(data []byte, meta *Metadata) error {
data, err := ByteDecompress(data)
if err != nil {
return ErrInvalidSize
}
if len(data) < len(magic) {
return ErrInvalidSize
}
decf := newDecbuf()
// 检验数æ®å®Œæ•´æ€§
if decf.UnmarshalString(data[len(data)-len(magic):]) != magic {
return ErrInvalidSize
}
// labels block
offset := 0
labels := make([]seriesWithLabel, 0)
for {
var labelName string
labelLen := decf.UnmarshalUint16(data[offset : offset+uint16Size])
offset += uint16Size
if labelLen == endOfBlock {
break
}
labelName = decf.UnmarshalString(data[offset : offset+int(labelLen)])
offset += int(labelLen)
sidCnt := decf.UnmarshalUint32(data[offset : offset+uint32Size])
offset += uint32Size
sidLst := make([]uint32, sidCnt)
for i := 0; i < int(sidCnt); i++ {
sidLst[i] = decf.UnmarshalUint32(data[offset : offset+uint32Size])
offset += uint32Size
}
labels = append(labels, seriesWithLabel{Name: labelName, Sids: sidLst})
}
meta.Labels = labels
// series block
rows := make([]metaSeries, 0)
for {
series := metaSeries{}
sidLen := decf.UnmarshalUint16(data[offset : offset+uint16Size])
offset += uint16Size
if sidLen == endOfBlock {
break
}
series.Sid = decf.UnmarshalString(data[offset : offset+int(sidLen)])
offset += int(sidLen)
series.StartOffset = decf.UnmarshalUint64(data[offset : offset+uint64Size])
offset += uint64Size
series.EndOffset = decf.UnmarshalUint64(data[offset : offset+uint64Size])
offset += uint64Size
labelCnt := decf.UnmarshalUint32(data[offset : offset+uint32Size])
offset += uint32Size
labelLst := make([]uint32, labelCnt)
for i := 0; i < int(labelCnt); i++ {
labelLst[i] = decf.UnmarshalUint32(data[offset : offset+uint32Size])
offset += uint32Size
}
series.Labels = labelLst
rows = append(rows, series)
}
meta.Series = rows
meta.MinTs = int64(decf.UnmarshalUint64(data[offset : offset+uint64Size]))
offset += uint64Size
meta.MaxTs = int64(decf.UnmarshalUint64(data[offset : offset+uint64Size]))
offset += uint64Size
return decf.Err()
}
至æ¤ï¼Œå¯¹ mandodb 的索引和å˜å‚¨æ•´ä½“è®¾è®¡æ˜¯ä¸æ˜¯å°±äº†ç„¶äºŽèƒ¸ã€‚🥺 文档较长,建议 Star æ”¶è—,毕竟æ¥éƒ½æ¥äº†...
Q: Is mandodb cool?
A: 🤠Not sure.
Q: Is mando awesome?
A: 😎 Definitely YES!
Q: Write performance?
A: 😯 ~40w/s
Q: PRs or Issues?
A: 😉 are welcome.
Q: What's the hardest part of this project?
A: 😂 Writing this document.
Q:Anything else?
A: 🻠Life is magic. Coding is art. Bilibili!
MIT ©chenjiandongx