type Log struct { mu sync.RWMutex path string// absolute path to log directory opts Options // log options closed bool// log is closed corrupt bool// log may be corrupt segments []*segment // all known log segments firstIndex uint64// index of the first entry in log lastIndex uint64// index of the last entry in log sfile *os.File // tail segment file handle wbatch Batch // reusable write batch scache tinylru.LRU // segment entries cache }
1 2 3 4 5 6
type segment struct { path string// path of segment file index uint64// first index of segment ebuf []byte// cached entries buffer epos []bpos // cached entries positions in buffer }
type Options struct { // NoSync disables fsync after writes. This is less durable and puts the // log at risk of data loss when there's a server crash. NoSync bool // SegmentSize of each segment. This is just a target value, actual size // may differ. Default is 20 MB. SegmentSize int // LogFormat is the format of the log files. Default is Binary. LogFormat LogFormat // SegmentCacheSize is the maximum number of segments that will be held in // memory for caching. Increasing this value may enhance performance for // concurrent read operations. Default is 1 SegmentCacheSize int // NoCopy allows for the Read() operation to return the raw underlying data // slice. This is an optimization to help minimize allocations. When this // option is set, do not modify the returned data because it may affect // other Read calls. Default false NoCopy bool // Perms represents the datafiles modes and permission bits DirPerms os.FileMode FilePerms os.FileMode }
源码阅读
Open
Open 方法接收 string 与 Options 参数,返回 Log struct,首先对传入的 Options 进行校验,如果不符合要求,那么设置默认的参数。
func(l *Log)writeBatch(b *Batch)error { // check that all indexes in batch are sane for i := 0; i < len(b.entries); i++ { if b.entries[i].index != l.lastIndex+uint64(i+1) { return ErrOutOfOrder } } // load the tail segment s := l.segments[len(l.segments)-1] iflen(s.ebuf) > l.opts.SegmentSize { // tail segment has reached capacity. Close it and create a new one. if err := l.cycle(); err != nil { return err } s = l.segments[len(l.segments)-1] }
mark := len(s.ebuf) datas := b.datas // 遍历 entries,并将数据 append 到 ebuf 中 for i := 0; i < len(b.entries); i++ { data := datas[:b.entries[i].size] var epos bpos s.ebuf, epos = l.appendEntry(s.ebuf, b.entries[i].index, data) s.epos = append(s.epos, epos) iflen(s.ebuf) >= l.opts.SegmentSize { // segment has reached capacity, cycle now if _, err := l.sfile.Write(s.ebuf[mark:]); err != nil { return err } l.lastIndex = b.entries[i].index if err := l.cycle(); err != nil { return err } s = l.segments[len(l.segments)-1] mark = 0 } datas = datas[b.entries[i].size:] } iflen(s.ebuf)-mark > 0 { // 写入数据 if _, err := l.sfile.Write(s.ebuf[mark:]); err != nil { return err } l.lastIndex = b.entries[len(b.entries)-1].index } // 判断是否开启 sync if !l.opts.NoSync { if err := l.sfile.Sync(); err != nil { return err } } b.Clear() returnnil }
func(l *Log)truncateFront(index uint64)(err error) { if index == 0 || l.lastIndex == 0 || index < l.firstIndex || index > l.lastIndex { return ErrOutOfRange } if index == l.firstIndex { // nothing to truncate returnnil } segIdx := l.findSegment(index) var s *segment s, err = l.loadSegment(index) if err != nil { return err } epos := s.epos[index-s.index:] ebuf := s.ebuf[epos[0].pos:] // Create a temp file contains the truncated segment. tempName := filepath.Join(l.path, "TEMP") err = func()error { f, err := os.OpenFile(tempName, os.O_CREATE|os.O_RDWR|os.O_TRUNC, l.opts.FilePerms) if err != nil { return err } defer f.Close() if _, err := f.Write(ebuf); err != nil { return err } if err := f.Sync(); err != nil { return err } return f.Close() }() // Rename the TEMP file to it's START file name. startName := filepath.Join(l.path, segmentName(index)+".START") if err = os.Rename(tempName, startName); err != nil { return err } // The log was truncated but still needs some file cleanup. Any errors // following this message will not cause an on-disk data ocorruption, but // may cause an inconsistency with the current program, so we'll return // ErrCorrupt so the the user can attempt a recover by calling Close() // followed by Open(). deferfunc() { if v := recover(); v != nil { err = ErrCorrupt l.corrupt = true } }() if segIdx == len(l.segments)-1 { // Close the tail segment file if err = l.sfile.Close(); err != nil { return err } } // Delete truncated segment files for i := 0; i <= segIdx; i++ { if err = os.Remove(l.segments[i].path); err != nil { return err } } // Rename the START file to the final truncated segment name. newName := filepath.Join(l.path, segmentName(index)) if err = os.Rename(startName, newName); err != nil { return err } s.path = newName s.index = index if segIdx == len(l.segments)-1 { // Reopen the tail segment file if l.sfile, err = os.OpenFile(newName, os.O_WRONLY, l.opts.FilePerms); err != nil { return err } var n int64 if n, err = l.sfile.Seek(0, 2); err != nil { return err } if n != int64(len(ebuf)) { err = errors.New("invalid seek") return err } // Load the last segment entries if err = l.loadSegmentEntries(s); err != nil { return err } } l.segments = append([]*segment{}, l.segments[segIdx:]...) l.firstIndex = index l.clearCache() returnnil }