0%

Golang WAL 设计与实现

Write Ahead Log(预写日志) 简称 WAL,用于记录程序内部操作,在程序奔溃恢复后,可以根据 WAL 来恢复程序原先的状态。

WAL 与日志框架设计有异曲同工之处,两者目标都是为了持久化数据,只不过两者用处不同罢了。本文会对 Golang 的 WAL 库进行分析,从而来学习 WAL 如何设计与优化。

需求分析

对于 WAL 库,最基本的功能就是向外部提供读写的接口,用户读取时根据 log offset 读取,写入时则追加到WAL 文件的尾部。WAL 与日志框架一样,最终都将数据持久到文件系统中,所以需要对文件分段(segment)设计,分段之后可以有利于并发读写。对于文件的写入操作,批处理机制可以将多个操作合并成一个操作,从而将多次系统调用减少成一次,这样可以极大的提高性能。谈及数据持久化时,则需要对 entry 进行序列化与反序列化操作。

不断的向 WAL 写入日志,会使得 WAL 大小不断的增大,分段设计虽然能解决单个的文件很大的问题,但无法释放原先存储空间。此时需要对 WAL 进行 GC(垃圾收集) 操作,回收不在使用的 WAL,从而减少存储空间。

综合以上场景,一个 WAL 库应该具备如下的功能:

  1. 基本的读写接口
  2. 日志文件分段
  3. 批处理
  4. 序列化与反序列化
  5. 垃圾回收

Struct 设计

对于库来说,拥有一个优秀的 struct 设计,使得后续的实现更加的事半功倍,同时可以让模块之间职责清晰,分工明确。

在 Golang WAL 中,我们重点关注 Log、segment、batch、Options 这些 struct。Options 可以影响 Log 内部行为,比如 Segment 大小、序列化方式、写同步。Log 由多个 segment 组成,同时会持有整个 WAL 第一条与最后一条日志的 index,写入数据时,直接追加到 sfile(WAL 中尾部 segment)文件尾部,Log 中的 LRU 的 cache,用于日志的读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
type Log struct {
mu sync.RWMutex
path string // absolute path to log directory
opts Options // log options
closed bool // log is closed
corrupt bool // log may be corrupt
segments []*segment // all known log segments
firstIndex uint64 // index of the first entry in log
lastIndex uint64 // index of the last entry in log
sfile *os.File // tail segment file handle
wbatch Batch // reusable write batch
scache tinylru.LRU // segment entries cache
}
1
2
3
4
5
6
type segment struct {
path string // path of segment file
index uint64 // first index of segment
ebuf []byte // cached entries buffer
epos []bpos // cached entries positions in buffer
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Options struct {
// NoSync disables fsync after writes. This is less durable and puts the
// log at risk of data loss when there's a server crash.
NoSync bool
// SegmentSize of each segment. This is just a target value, actual size
// may differ. Default is 20 MB.
SegmentSize int
// LogFormat is the format of the log files. Default is Binary.
LogFormat LogFormat
// SegmentCacheSize is the maximum number of segments that will be held in
// memory for caching. Increasing this value may enhance performance for
// concurrent read operations. Default is 1
SegmentCacheSize int
// NoCopy allows for the Read() operation to return the raw underlying data
// slice. This is an optimization to help minimize allocations. When this
// option is set, do not modify the returned data because it may affect
// other Read calls. Default false
NoCopy bool
// Perms represents the datafiles modes and permission bits
DirPerms os.FileMode
FilePerms os.FileMode
}

源码阅读

Open

Open 方法接收 string 与 Options 参数,返回 Log struct,首先对传入的 Options 进行校验,如果不符合要求,那么设置默认的参数。

倘若传入的 path 目录不存在,说明当前 WAL 并无数据,那么则需要创建对应的 path 目录。最后调用 load 方法来加载 path 目录下所有的 segment,并更新 Log 属性。

load 方法会获取 path 所有的文件,文件名由 20、24、26 个字符组成,该文件名前 20 个字符对应的 segment 开始的 index。首先将文件名前 20 个字符并解析为 uint64,然后根据 segment index 与 filename(path+name)创建 segment 追加到 Log 的 segments 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// wal.go load
for _, fi := range fis {
name := fi.Name()
if fi.IsDir() || len(name) < 20 {
continue
}
index, err := strconv.ParseUint(name[:20], 10, 64)
if err != nil || index == 0 {
continue
}
isStart := len(name) == 26 && strings.HasSuffix(name, ".START")
isEnd := len(name) == 24 && strings.HasSuffix(name, ".END")
if len(name) == 20 || isStart || isEnd {
if isStart {
startIdx = len(l.segments)
} else if isEnd && endIdx == -1 {
endIdx = len(l.segments)
}
l.segments = append(l.segments, &segment{
index: index,
path: filepath.Join(l.path, name),
})
}
}

如果 path 目录下包含了 .START 与 .END 结尾的文件,那么进行清除的工作,这操作实现原理在此不进行讲解。之后会打开 WAL 最后一个 segment,并且将该文件的 position 偏移到 2。然后对 Log 中的 firstIndex 与 lastIndex 进行更新操作,前者为 segments 第一个元素的 index,后者为 segments 最后一个元素的 index 加上 epos 值减去 1,从而推导出最后一个 entry 的 index。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
l.firstIndex = l.segments[0].index
// Open the last segment for appending
lseg := l.segments[len(l.segments)-1]
l.sfile, err = os.OpenFile(lseg.path, os.O_WRONLY, l.opts.FilePerms)
if err != nil {
return err
}
if _, err := l.sfile.Seek(0, 2); err != nil {
return err
}
// Load the last segment entries
if err := l.loadSegmentEntries(lseg); err != nil {
return err
}
l.lastIndex = lseg.index + uint64(len(lseg.epos)) - 1

到此为止,整个 open 操作都已经完成了,后续对读写操作进行讲解。

Write

Write 方法接收 uint64 与 []byte 参数,前者为entry 的index,后者为 entry 的数据。首先需要对 WAL 进行加锁操作,然后判断当前 WAL 状态,决定是否继续之后的操作。状态校验通过之后,则清除 wbatch 中的数据,调用其 write 写入数据到 batch 中,最后调用 Log 的 writeBatch 方法将 batch 中数据写入。

1
2
3
4
5
6
7
8
9
10
11
12
func (l *Log) Write(index uint64, data []byte) error {
l.mu.Lock()
defer l.mu.Unlock()
if l.corrupt {
return ErrCorrupt
} else if l.closed {
return ErrClosed
}
l.wbatch.Clear()
l.wbatch.Write(index, data)
return l.writeBatch(&l.wbatch)
}

writeBatch 实现并不复杂,先对 tail segment 中的 ebuf 进行校验,如果 ebuf 大小超过了 segmentSize,那么需要创建一个新的 segment。

然后遍历 batch 中的 entries,并调用 appendEntry 方法追加数据到 tail segment 中的 ebuf 中,appendEntry 会涉及一个序列化操作,采用了 Json 内部调用appendJSONEntry,binary 则调用 appendBinaryEntry。在遍历过程中,每向 ebuf 追加一次数据,那么就会对 ebuf 大小进行校验,判断是否超过了 segmentSize。如果超过将 ebuf 数据写入到 Log sfile ,并更新 lastIndex 与 tail segment。

遍历完 entries 后,则将 ebuf 中 mark 后面的数据写入到 sfile ,并更新 lastIndex。如果开启了 sync,写入之后需要调用 File 的 sync 方法将缓冲区数据刷新到 disk,最后将 batch 数据清空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (l *Log) writeBatch(b *Batch) error {
// check that all indexes in batch are sane
for i := 0; i < len(b.entries); i++ {
if b.entries[i].index != l.lastIndex+uint64(i+1) {
return ErrOutOfOrder
}
}
// load the tail segment
s := l.segments[len(l.segments)-1]
if len(s.ebuf) > l.opts.SegmentSize {
// tail segment has reached capacity. Close it and create a new one.
if err := l.cycle(); err != nil {
return err
}
s = l.segments[len(l.segments)-1]
}

mark := len(s.ebuf)
datas := b.datas
// 遍历 entries,并将数据 append 到 ebuf 中
for i := 0; i < len(b.entries); i++ {
data := datas[:b.entries[i].size]
var epos bpos
s.ebuf, epos = l.appendEntry(s.ebuf, b.entries[i].index, data)
s.epos = append(s.epos, epos)
if len(s.ebuf) >= l.opts.SegmentSize {
// segment has reached capacity, cycle now
if _, err := l.sfile.Write(s.ebuf[mark:]); err != nil {
return err
}
l.lastIndex = b.entries[i].index
if err := l.cycle(); err != nil {
return err
}
s = l.segments[len(l.segments)-1]
mark = 0
}
datas = datas[b.entries[i].size:]
}
if len(s.ebuf)-mark > 0 {
// 写入数据
if _, err := l.sfile.Write(s.ebuf[mark:]); err != nil {
return err
}
l.lastIndex = b.entries[len(b.entries)-1].index
}
// 判断是否开启 sync
if !l.opts.NoSync {
if err := l.sfile.Sync(); err != nil {
return err
}
}
b.Clear()
return nil
}

Read

Read 方法根据 log index 从 WAL 读取对应的数据,该方法内部实现会先获取一把写锁,之后与 Write 方法类似,都会检查 WAL 状态。

如果 index 等于 0、index 小于 firstIndex、index 大于 lastIndex,这三个条件满足一者,那么该方法会直接 return,并返回 ErrNotFound 错误。

调用 loadSegment 方法,首先从 Log LRU 中找到对应的 segment。倘若未找到,那么调用 findSegment 方法根据 index 从 segments 中进行二分查找,找到后则从 disk 读取 segment 数据,数据会被读取到 segment 中的 ebuf 中,并维护 segment 中的 epos 索引,最后将 segment 添加到 Log LRU 中。

获取到 index 对应的 segment 后,根据 index 从 segment 的 epos 索引获取到该 entry 在 ebuf 中的 start position 与 end position,从 ebuf 拿到 log entry 数据后,如果采用 Json 序列化方式,那么使用 readJson 进行读取,binary 则不进行额外操作。

要是 WAL 开启了 NoCopy,return 的 data 则是 ebuf 的分片,这种设计虽然减少内存开辟和释放,但用户对该 ebuf 分片的数据进行修改,则会影响后续读取到的数据准确性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (l *Log) Read(index uint64) (data []byte, err error) {
l.mu.RLock()
defer l.mu.RUnlock()
if l.corrupt {
return nil, ErrCorrupt
} else if l.closed {
return nil, ErrClosed
}
if index == 0 || index < l.firstIndex || index > l.lastIndex {
return nil, ErrNotFound
}
// 根据 index 加载 segment
s, err := l.loadSegment(index)
if err != nil {
return nil, err
}
// 根据 index 从 epos 获取该 entry 的 start 与 end
epos := s.epos[index-s.index]
edata := s.ebuf[epos.pos:epos.end]
// json 序列化
if l.opts.LogFormat == JSON {
return readJSON(edata)
}
// binary read
size, n := binary.Uvarint(edata)
if n <= 0 {
return nil, ErrCorrupt
}
if uint64(len(edata)-n) < size {
return nil, ErrCorrupt
}
if l.opts.NoCopy {
data = edata[n : uint64(n)+size]
} else {
data = make([]byte, size)
copy(data, edata[n:])
}
return data, nil
}

GC 操作

对 WAL 进行 GC 操作,只需要删除指定 index 之前的 log,并生成新的 segment,在 Golang WAL 中提供了两个截断操作。

  1. TruncateFront:删除 index 之前的 log,使得 index 处的 log 成为第一个 log
  2. TruncateBack:删除 index 之后的 log,使得 index 处的 log 成为最后一个 log

下面对 TruncateFront 实现逻辑进行分析,因为 TruncateBack 不太常用,在 WAL 中一般都是删除指定 index 之前的 log,从而来达到释放存储空间目的。

在 TruncateFront 中,首先获取获取锁,对 WAL 状态校验后,则调用内部的 truncateFront 方法。

1
2
3
4
5
6
7
8
9
10
func (l *Log) TruncateFront(index uint64) error {
l.mu.Lock()
defer l.mu.Unlock()
if l.corrupt {
return ErrCorrupt
} else if l.closed {
return ErrClosed
}
return l.truncateFront(index)
}

截断 index 之前的 log,首先需要获取到 index 对应的 segment后,创建一个 temp 文件来存储该 segment index 之后的 log,将该 temp 文件重命名为 .START 结尾文件,然后删除从 disk 删除所有被截断的 segment,如果删除期间发生错误,后续 open 操作时,可以根据 .START 标志找到 WAL 开始点,从而来舍弃之前的 segment。反之,删除成功后,则将 .START 重名为 20 个字符的 index 名。

之前是对 disk 进行更新,现在则对内存记录进行更新,该操作从 segments 删除 index 之前所有的 segment。最后在进行一些收尾工作,首先清除 Log 的 LRU Cache,然后更新 Log 中的 firstIndex 为传入的 index。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
func (l *Log) truncateFront(index uint64) (err error) {
if index == 0 || l.lastIndex == 0 ||
index < l.firstIndex || index > l.lastIndex {
return ErrOutOfRange
}
if index == l.firstIndex {
// nothing to truncate
return nil
}
segIdx := l.findSegment(index)
var s *segment
s, err = l.loadSegment(index)
if err != nil {
return err
}
epos := s.epos[index-s.index:]
ebuf := s.ebuf[epos[0].pos:]
// Create a temp file contains the truncated segment.
tempName := filepath.Join(l.path, "TEMP")
err = func() error {
f, err := os.OpenFile(tempName, os.O_CREATE|os.O_RDWR|os.O_TRUNC, l.opts.FilePerms)
if err != nil {
return err
}
defer f.Close()
if _, err := f.Write(ebuf); err != nil {
return err
}
if err := f.Sync(); err != nil {
return err
}
return f.Close()
}()
// Rename the TEMP file to it's START file name.
startName := filepath.Join(l.path, segmentName(index)+".START")
if err = os.Rename(tempName, startName); err != nil {
return err
}
// The log was truncated but still needs some file cleanup. Any errors
// following this message will not cause an on-disk data ocorruption, but
// may cause an inconsistency with the current program, so we'll return
// ErrCorrupt so the the user can attempt a recover by calling Close()
// followed by Open().
defer func() {
if v := recover(); v != nil {
err = ErrCorrupt
l.corrupt = true
}
}()
if segIdx == len(l.segments)-1 {
// Close the tail segment file
if err = l.sfile.Close(); err != nil {
return err
}
}
// Delete truncated segment files
for i := 0; i <= segIdx; i++ {
if err = os.Remove(l.segments[i].path); err != nil {
return err
}
}
// Rename the START file to the final truncated segment name.
newName := filepath.Join(l.path, segmentName(index))
if err = os.Rename(startName, newName); err != nil {
return err
}
s.path = newName
s.index = index
if segIdx == len(l.segments)-1 {
// Reopen the tail segment file
if l.sfile, err = os.OpenFile(newName, os.O_WRONLY, l.opts.FilePerms); err != nil {
return err
}
var n int64
if n, err = l.sfile.Seek(0, 2); err != nil {
return err
}
if n != int64(len(ebuf)) {
err = errors.New("invalid seek")
return err
}
// Load the last segment entries
if err = l.loadSegmentEntries(s); err != nil {
return err
}
}
l.segments = append([]*segment{}, l.segments[segIdx:]...)
l.firstIndex = index
l.clearCache()
return nil
}

对于垃圾回收实现可谓是百家争鸣,但作为通用的 WAL 库,golang-wal 垃圾回收是一个很好的设计。在 leveldb 中,该项目内部实现了自己的 WAL,每个 MemTable 对应了一个 WAL 文件,后续将 memtable 转成 immuttable,最后落盘成 SSTable,才将原先的 WAL 删除,从而实现垃圾回收。

参考文档

  1. Golang WAL
  2. goleveldb