Skip to content

Commit

Permalink
Revert "add isReaderStale"
Browse files Browse the repository at this point in the history
This reverts commit c45b41d.
  • Loading branch information
guo-shaoge committed May 18, 2021
1 parent 923734b commit 0759556
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 44 deletions.
19 changes: 5 additions & 14 deletions util/chunk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ type ListInDisk struct {
checksumWriter *checksum.Writer
cipherWriter *encrypt.Writer

// By using this flag, we don't need to allocate a new reader every time.
isReaderStale bool
r io.ReaderAt

// ctrCipher stores the key and nonce using by aes encrypt io layer
ctrCipher *encrypt.CtrCipher
}
Expand All @@ -64,8 +60,7 @@ func NewListInDisk(fieldTypes []*types.FieldType) *ListInDisk {
l := &ListInDisk{
fieldTypes: fieldTypes,
// TODO(fengliyuan): set the quota of disk usage.
diskTracker: disk.NewTracker(memory.LabelForChunkListInDisk, -1),
isReaderStale: true,
diskTracker: disk.NewTracker(memory.LabelForChunkListInDisk, -1),
}
return l
}
Expand Down Expand Up @@ -155,7 +150,6 @@ func (l *ListInDisk) Add(chk *Chunk) (err error) {
l.offsets = append(l.offsets, chk2.getOffsetsOfRows())
l.diskTracker.Consume(n)
l.numRowsInDisk += chk.NumRows()
l.isReaderStale = true
return
}

Expand All @@ -180,14 +174,11 @@ func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) {
}
off := l.offsets[ptr.ChkIdx][ptr.RowIdx]
var underlying io.ReaderAt = l.disk
if l.isReaderStale {
if l.ctrCipher != nil {
underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset())
}
l.r = NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset())
l.isReaderStale = false
if l.ctrCipher != nil {
underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset())
}
r := io.NewSectionReader(l.r, off, l.offWrite-off)
checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset())
r := io.NewSectionReader(checksumReader, off, l.offWrite-off)
format := rowInDisk{numCol: len(l.fieldTypes)}
_, err = format.ReadFrom(r)
if err != nil {
Expand Down
30 changes: 0 additions & 30 deletions util/chunk/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ func testReaderWithCache(c *check.C) {
chk := NewChunkWithCapacity(field, 1)
chk.AppendString(0, buf.String())
l := NewListInDisk(field)
c.Assert(l.isReaderStale, check.IsTrue)
err := l.Add(chk)
c.Assert(err, check.IsNil)

Expand Down Expand Up @@ -328,35 +327,6 @@ func testReaderWithCache(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(readCnt, check.Equals, 10)
c.Assert(reflect.DeepEqual(data, buf.Bytes()[1002:1012]), check.IsTrue)

// Test l.isReaderStale works properly
// It means only new reader is alloced after writing.
oriReader := l.r
for i := 0; i < 100; i++ {
row, err = l.GetRow(RowPtr{0, 0})
c.Assert(err, check.IsNil)
c.Assert(oriReader == l.r, check.IsTrue)
c.Assert(l.isReaderStale, check.IsFalse)
}
// After write, reader is stale.
err = l.Add(chk)
c.Assert(err, check.IsNil)
c.Assert(oriReader == l.r, check.IsTrue)
c.Assert(l.isReaderStale, check.IsTrue)

// New reader is generated when reading.
row, err = l.GetRow(RowPtr{1, 0})
c.Assert(err, check.IsNil)
c.Assert(oriReader != l.r, check.IsTrue)
c.Assert(l.isReaderStale, check.IsFalse)
oriReader = l.r

for i := 0; i < 100; i++ {
row, err = l.GetRow(RowPtr{0, 0})
c.Assert(err, check.IsNil)
c.Assert(oriReader == l.r, check.IsTrue)
c.Assert(l.isReaderStale, check.IsFalse)
}
}

// Here we test situations where size of data is small, so no data is flushed to disk.
Expand Down

0 comments on commit 0759556

Please sign in to comment.