-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathwal_iterator.go
112 lines (90 loc) · 2.29 KB
/
wal_iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package bitcask
import (
"encoding/binary"
)
type WalIterator struct {
wal *Wal
fileOff int
bufOff int
bufSize int
buf []byte
err error
}
func NewWalIterator(wal *Wal) *WalIterator {
wal.Ref()
return &WalIterator{
wal: wal,
fileOff: int(wal.offset),
bufOff: 0,
bufSize: 0,
buf: make([]byte, BlockSize),
}
}
func (i *WalIterator) Close() {
i.wal.Unref()
}
func (i *WalIterator) fd() int {
return int(i.wal.fp.Fd())
}
// try to read a block unless there is less than one block left
// the Next method will return the start offset of data in wal file, and the data itself
func (i *WalIterator) Next() (uint64, []byte, error) {
var off uint64
var record []byte
for i.err == nil {
if i.bufOff+RecordHeaderSize > i.bufSize {
i.fileOff += i.bufSize
// skip the padding
i.bufSize = min(BlockSize, int(i.wal.Size())-i.fileOff)
if i.bufSize == 0 {
i.err = ErrWalIteratorEOF
return 0, nil, i.err
}
if i.err = PreadFull(i.fd(), i.buf[:i.bufSize], int64(i.fileOff)); i.err != nil {
return 0, nil, i.err
}
i.bufOff = 0
}
header := i.buf[i.bufOff : i.bufOff+RecordHeaderSize]
i.bufOff += RecordHeaderSize
crc := binary.LittleEndian.Uint32(header[0:])
length := int(binary.LittleEndian.Uint16(header[4:]))
recordType := header[6]
// record the file offset
if len(record) == 0 {
off = uint64(i.fileOff + i.bufOff)
}
// avoid the corrupted data
length = min(length, i.bufSize-i.bufOff)
data := i.buf[i.bufOff : i.bufOff+length]
i.bufOff += length
if ComputeCRC32(data) != crc {
i.err = ErrWalMismatchCRC
return 0, nil, i.err
}
switch recordType {
case RecordFull:
// reference the backing store of slice
return off, data, nil
case RecordFirst, RecordMiddle:
// Continue reading next chunk
record = append(record, data...)
case RecordLast:
record = append(record, data...)
return off, record, nil
default:
i.err = ErrWalUnknownRecordType
}
}
return 0, nil, i.err
}
// the functionality is same to Next, except that the offset does not contain wal header
// it's useful for the hint generation
func (i *WalIterator) NextWithoutHeaderOffset() (uint64, []byte, error) {
off, data, err := i.Next()
if err != nil {
return 0, nil, err
}
off -= RecordHeaderSize
return off, data, nil
}