diff --git a/util/checksum/checksum.go b/util/checksum/checksum.go index 843440547100c..ef90f44e7cb00 100644 --- a/util/checksum/checksum.go +++ b/util/checksum/checksum.go @@ -42,11 +42,12 @@ var checksumReaderBufPool = sync.Pool{ // | -- 4B -- | -- 1020B -- || -- 4B -- | -- 1020B -- || -- 4B -- | -- 60B -- | // | -- checksum -- | -- payload -- || -- checksum -- | -- payload -- || -- checksum -- | -- payload -- | type Writer struct { - err error - w io.WriteCloser - buf []byte - payload []byte - payloadUsed int + err error + w io.WriteCloser + buf []byte + payload []byte + payloadUsed int + flushedUserDataCnt int64 } // NewWriter returns a new Writer which calculates and stores a CRC-32 checksum for the payload before @@ -104,10 +105,21 @@ func (w *Writer) Flush() error { w.err = err return err } + w.flushedUserDataCnt += int64(w.payloadUsed) w.payloadUsed = 0 return nil } +// GetCache returns the byte slice that holds the data not flushed to disk. +func (w *Writer) GetCache() []byte { + return w.payload[:w.payloadUsed] +} + +// GetCacheDataOffset return the user data offset in cache. +func (w *Writer) GetCacheDataOffset() int64 { + return w.flushedUserDataCnt +} + // Close implements the io.Closer interface. func (w *Writer) Close() (err error) { err = w.Flush() diff --git a/util/checksum/checksum_test.go b/util/checksum/checksum_test.go index b0de5b90586c9..1473903fbe080 100644 --- a/util/checksum/checksum_test.go +++ b/util/checksum/checksum_test.go @@ -651,3 +651,75 @@ func (s *testChecksumSuite) testTiCase3651and3652(c *check.C, encrypt bool) { assertReadAt(0, make([]byte, 10200), nil, 10200, strings.Repeat("0123456789", 1020), f1) assertReadAt(0, make([]byte, 10200), nil, 10200, strings.Repeat("0123456789", 1020), f2) } + +var checkFlushedData = func(c *check.C, f io.ReaderAt, off int64, readBufLen int, assertN int, assertErr interface{}, assertRes []byte) { + readBuf := make([]byte, readBufLen) + r := NewReader(f) + n, err := r.ReadAt(readBuf, off) + c.Assert(err, check.Equals, assertErr) + c.Assert(n, check.Equals, assertN) + c.Assert(bytes.Compare(readBuf, assertRes), check.Equals, 0) +} + +func (s *testChecksumSuite) TestChecksumWriter(c *check.C) { + path := "checksum_TestChecksumWriter" + f, err := os.Create(path) + c.Assert(err, check.IsNil) + defer func() { + err = f.Close() + c.Assert(err, check.IsNil) + err = os.Remove(path) + c.Assert(err, check.IsNil) + }() + + buf := bytes.NewBuffer(nil) + testData := "0123456789" + for i := 0; i < 100; i++ { + buf.WriteString(testData) + } + + // Write 1000 bytes and flush. + w := NewWriter(f) + n, err := w.Write(buf.Bytes()) + c.Assert(err, check.IsNil) + c.Assert(n, check.Equals, 1000) + + err = w.Flush() + c.Assert(err, check.IsNil) + checkFlushedData(c, f, 0, 1000, 1000, nil, buf.Bytes()) + + // All data flushed, so no data in cache. + cacheOff := w.GetCacheDataOffset() + c.Assert(cacheOff, check.Equals, int64(1000)) +} + +func (s *testChecksumSuite) TestChecksumWriterAutoFlush(c *check.C) { + path := "checksum_TestChecksumWriterAutoFlush" + f, err := os.Create(path) + c.Assert(err, check.IsNil) + defer func() { + err = f.Close() + c.Assert(err, check.IsNil) + err = os.Remove(path) + c.Assert(err, check.IsNil) + }() + + w := NewWriter(f) + + buf := bytes.NewBuffer(nil) + testData := "0123456789" + for i := 0; i < 102; i++ { + buf.WriteString(testData) + } + n, err := w.Write(buf.Bytes()) + c.Assert(err, check.IsNil) + c.Assert(n, check.Equals, len(buf.Bytes())) + + // This write will trigger flush. + n, err = w.Write([]byte("0")) + c.Assert(err, check.IsNil) + c.Assert(n, check.Equals, 1) + checkFlushedData(c, f, 0, 1020, 1020, nil, buf.Bytes()) + cacheOff := w.GetCacheDataOffset() + c.Assert(cacheOff, check.Equals, int64(len(buf.Bytes()))) +} diff --git a/util/chunk/disk.go b/util/chunk/disk.go index c7962c9aa9e9d..ef269213e9d0d 100644 --- a/util/chunk/disk.go +++ b/util/chunk/disk.go @@ -46,6 +46,9 @@ type ListInDisk struct { diskTracker *disk.Tracker // track disk usage. numRowsInDisk int + checksumWriter *checksum.Writer + cipherWriter *encrypt.Writer + // ctrCipher stores the key and nonce using by aes encrypt io layer ctrCipher *encrypt.CtrCipher } @@ -78,9 +81,11 @@ func (l *ListInDisk) initDiskFile() (err error) { if err != nil { return } - underlying = encrypt.NewWriter(l.disk, l.ctrCipher) + l.cipherWriter = encrypt.NewWriter(l.disk, l.ctrCipher) + underlying = l.cipherWriter } - l.w = checksum.NewWriter(underlying) + l.checksumWriter = checksum.NewWriter(underlying) + l.w = l.checksumWriter l.bufFlushMutex = sync.RWMutex{} return } @@ -164,16 +169,16 @@ func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) { // GetRow gets a Row from the ListInDisk by RowPtr. func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) { - err = l.flush() if err != nil { return } off := l.offsets[ptr.ChkIdx][ptr.RowIdx] var underlying io.ReaderAt = l.disk if l.ctrCipher != nil { - underlying = encrypt.NewReader(l.disk, l.ctrCipher) + underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset()) } - r := io.NewSectionReader(checksum.NewReader(underlying), off, l.offWrite-off) + checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset()) + r := io.NewSectionReader(checksumReader, off, l.offWrite-off) format := rowInDisk{numCol: len(l.fieldTypes)} _, err = format.ReadFrom(r) if err != nil { @@ -367,3 +372,51 @@ func (format *diskFormatRow) toMutRow(fields []*types.FieldType) MutRow { } return MutRow{c: chk} } + +// ReaderWithCache helps to read data that has not be flushed to underlying layer. +// By using ReaderWithCache, user can still write data into ListInDisk even after reading. +type ReaderWithCache struct { + r io.ReaderAt + cacheOff int64 + cache []byte +} + +// NewReaderWithCache returns a ReaderWithCache. +func NewReaderWithCache(r io.ReaderAt, cache []byte, cacheOff int64) *ReaderWithCache { + return &ReaderWithCache{ + r: r, + cacheOff: cacheOff, + cache: cache, + } +} + +// ReadAt implements the ReadAt interface. +func (r *ReaderWithCache) ReadAt(p []byte, off int64) (readCnt int, err error) { + readCnt, err = r.r.ReadAt(p, off) + if err != io.EOF { + return readCnt, err + } + + if len(p) == readCnt { + return readCnt, err + } else if len(p) < readCnt { + return readCnt, errors2.Trace(errors2.Errorf("cannot read more data than user requested"+ + "(readCnt: %v, len(p): %v", readCnt, len(p))) + } + + // When got here, user input is not filled fully, so we need read data from cache. + err = nil + p = p[readCnt:] + beg := off - r.cacheOff + if beg < 0 { + // This happens when only partial data of user requested resides in r.cache. + beg = 0 + } + end := int(beg) + len(p) + if end > len(r.cache) { + err = io.EOF + end = len(r.cache) + } + readCnt += copy(p, r.cache[beg:end]) + return readCnt, err +} diff --git a/util/chunk/disk_test.go b/util/chunk/disk_test.go index 86461de5659c7..36750aa898244 100644 --- a/util/chunk/disk_test.go +++ b/util/chunk/disk_test.go @@ -14,12 +14,14 @@ package chunk import ( + "bytes" "fmt" "io" "io/ioutil" "math/rand" "os" "path/filepath" + "reflect" "strconv" "strings" "testing" @@ -30,6 +32,8 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/types/json" + "github.com/pingcap/tidb/util/checksum" + "github.com/pingcap/tidb/util/encrypt" ) func initChunks(numChk, numRow int) ([]*Chunk, []*types.FieldType) { @@ -219,6 +223,8 @@ func (s *testChunkSuite) TestListInDiskWithChecksum(c *check.C) { }) testListInDisk(c) + testReaderWithCache(c) + testReaderWithCacheNoFlush(c) } func (s *testChunkSuite) TestListInDiskWithChecksumAndEncrypt(c *check.C) { @@ -227,4 +233,129 @@ func (s *testChunkSuite) TestListInDiskWithChecksumAndEncrypt(c *check.C) { conf.Security.SpilledFileEncryptionMethod = config.SpilledFileEncryptionMethodAES128CTR }) testListInDisk(c) + + testReaderWithCache(c) + testReaderWithCacheNoFlush(c) +} + +// Following diagram describes the testdata we use to test: +// 4 B: checksum of this segment. +// 8 B: all columns' length, in the following example, we will only have one column. +// 1012 B: data in file. because max length of each segment is 1024, so we only have 1020B for user payload. +// +// Data in File Data in mem cache +// +------+------------------------------------------+ +-----------------------------+ +// | | 1020B payload | | | +// |4Bytes| +---------+----------------------------+ | | | +// |checksum|8B collen| 1012B user data | | | 12B remained user data | +// | | +---------+----------------------------+ | | | +// | | | | | +// +------+------------------------------------------+ +-----------------------------+ +func testReaderWithCache(c *check.C) { + testData := "0123456789" + buf := bytes.NewBuffer(nil) + for i := 0; i < 102; i++ { + buf.WriteString(testData) + } + buf.WriteString("0123") + + field := []*types.FieldType{types.NewFieldType(mysql.TypeString)} + chk := NewChunkWithCapacity(field, 1) + chk.AppendString(0, buf.String()) + l := NewListInDisk(field) + err := l.Add(chk) + c.Assert(err, check.IsNil) + + // Basic test for GetRow(). + row, err := l.GetRow(RowPtr{0, 0}) + c.Assert(err, check.IsNil) + c.Assert(row.GetDatumRow(field), check.DeepEquals, chk.GetRow(0).GetDatumRow(field)) + + var underlying io.ReaderAt = l.disk + if l.ctrCipher != nil { + underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset()) + } + checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset()) + + // Read all data. + data := make([]byte, 1024) + // Offset is 8, because we want to ignore col length. + readCnt, err := checksumReader.ReadAt(data, 8) + c.Assert(err, check.IsNil) + c.Assert(readCnt, check.Equals, 1024) + c.Assert(reflect.DeepEqual(data, buf.Bytes()), check.IsTrue) + + // Only read data of mem cache. + data = make([]byte, 1024) + readCnt, err = checksumReader.ReadAt(data, 1020) + c.Assert(err, check.Equals, io.EOF) + c.Assert(readCnt, check.Equals, 12) + c.Assert(reflect.DeepEqual(data[:12], buf.Bytes()[1012:]), check.IsTrue) + + // Read partial data of mem cache. + data = make([]byte, 1024) + readCnt, err = checksumReader.ReadAt(data, 1025) + c.Assert(err, check.Equals, io.EOF) + c.Assert(readCnt, check.Equals, 7) + c.Assert(reflect.DeepEqual(data[:7], buf.Bytes()[1017:]), check.IsTrue) + + // Read partial data from both file and mem cache. + data = make([]byte, 1024) + readCnt, err = checksumReader.ReadAt(data, 1010) + c.Assert(err, check.Equals, io.EOF) + c.Assert(readCnt, check.Equals, 22) + c.Assert(reflect.DeepEqual(data[:22], buf.Bytes()[1002:]), check.IsTrue) + + // Offset is too large, so no data is read. + data = make([]byte, 1024) + readCnt, err = checksumReader.ReadAt(data, 1032) + c.Assert(err, check.Equals, io.EOF) + c.Assert(readCnt, check.Equals, 0) + c.Assert(reflect.DeepEqual(data, make([]byte, 1024)), check.IsTrue) + + // Only read 1 byte from mem cache. + data = make([]byte, 1024) + readCnt, err = checksumReader.ReadAt(data, 1031) + c.Assert(err, check.Equals, io.EOF) + c.Assert(readCnt, check.Equals, 1) + c.Assert(reflect.DeepEqual(data[:1], buf.Bytes()[1023:]), check.IsTrue) + + // Test user requested data is small. + // Only request 10 bytes. + data = make([]byte, 10) + readCnt, err = checksumReader.ReadAt(data, 1010) + c.Assert(err, check.IsNil) + c.Assert(readCnt, check.Equals, 10) + c.Assert(reflect.DeepEqual(data, buf.Bytes()[1002:1012]), check.IsTrue) +} + +// Here we test situations where size of data is small, so no data is flushed to disk. +func testReaderWithCacheNoFlush(c *check.C) { + testData := "0123456789" + + field := []*types.FieldType{types.NewFieldType(mysql.TypeString)} + chk := NewChunkWithCapacity(field, 1) + chk.AppendString(0, testData) + l := NewListInDisk(field) + err := l.Add(chk) + c.Assert(err, check.IsNil) + + // Basic test for GetRow(). + row, err := l.GetRow(RowPtr{0, 0}) + c.Assert(err, check.IsNil) + c.Assert(row.GetDatumRow(field), check.DeepEquals, chk.GetRow(0).GetDatumRow(field)) + + var underlying io.ReaderAt = l.disk + if l.ctrCipher != nil { + underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset()) + } + checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset()) + + // Read all data. + data := make([]byte, 1024) + // Offset is 8, because we want to ignore col length. + readCnt, err := checksumReader.ReadAt(data, 8) + c.Assert(err, check.Equals, io.EOF) + c.Assert(readCnt, check.Equals, len(testData)) + c.Assert(reflect.DeepEqual(data[:10], []byte(testData)), check.IsTrue) } diff --git a/util/chunk/row_container_test.go b/util/chunk/row_container_test.go index feed2290f38b6..a39346e34ff80 100644 --- a/util/chunk/row_container_test.go +++ b/util/chunk/row_container_test.go @@ -113,6 +113,28 @@ func (r *rowContainerTestSuite) TestSpillAction(c *check.C) { rc.actionSpill.WaitForTest() c.Assert(err, check.IsNil) c.Assert(rc.AlreadySpilledSafeForTest(), check.Equals, true) + + // Read + resChk, err := rc.GetChunk(0) + c.Assert(err, check.IsNil) + c.Assert(resChk.NumRows(), check.Equals, chk.NumRows()) + for rowIdx := 0; rowIdx < resChk.NumRows(); rowIdx++ { + c.Assert(resChk.GetRow(rowIdx).GetDatumRow(fields), check.DeepEquals, chk.GetRow(rowIdx).GetDatumRow(fields)) + } + // Write again + err = rc.Add(chk) + rc.actionSpill.WaitForTest() + c.Assert(err, check.IsNil) + c.Assert(rc.AlreadySpilledSafeForTest(), check.Equals, true) + + // Read + resChk, err = rc.GetChunk(2) + c.Assert(err, check.IsNil) + c.Assert(resChk.NumRows(), check.Equals, chk.NumRows()) + for rowIdx := 0; rowIdx < resChk.NumRows(); rowIdx++ { + c.Assert(resChk.GetRow(rowIdx).GetDatumRow(fields), check.DeepEquals, chk.GetRow(rowIdx).GetDatumRow(fields)) + } + err = rc.Reset() c.Assert(err, check.IsNil) } diff --git a/util/encrypt/ase_layer.go b/util/encrypt/aes_layer.go similarity index 91% rename from util/encrypt/ase_layer.go rename to util/encrypt/aes_layer.go index 2bcea4373073f..a27d23da90fa6 100644 --- a/util/encrypt/ase_layer.go +++ b/util/encrypt/aes_layer.go @@ -71,11 +71,12 @@ func (ctr *CtrCipher) stream(counter uint64) cipher.Stream { // Writer implements an io.WriteCloser, it encrypt data using AES before writing to the underlying object. type Writer struct { - err error - w io.WriteCloser - n int - buf []byte - cipherStream cipher.Stream + err error + w io.WriteCloser + n int + buf []byte + cipherStream cipher.Stream + flushedUserDataCnt int64 } // NewWriter returns a new Writer which encrypt data using AES before writing to the underlying object. @@ -123,6 +124,7 @@ func (w *Writer) Flush() error { } w.cipherStream.XORKeyStream(w.buf[:w.n], w.buf[:w.n]) n, err := w.w.Write(w.buf[:w.n]) + w.flushedUserDataCnt += int64(n) if n < w.n && err == nil { err = io.ErrShortWrite } @@ -134,6 +136,16 @@ func (w *Writer) Flush() error { return nil } +// GetCache returns the byte slice that holds the data not flushed to disk. +func (w *Writer) GetCache() []byte { + return w.buf[:w.n] +} + +// GetCacheDataOffset return the user data offset in cache. +func (w *Writer) GetCacheDataOffset() int64 { + return w.flushedUserDataCnt +} + // Close implements the io.Closer interface. func (w *Writer) Close() (err error) { err = w.Flush()