diff --git a/br/pkg/storage/memstore.go b/br/pkg/storage/memstore.go index 2f080c0ce2052..fecba170b4e6b 100644 --- a/br/pkg/storage/memstore.go +++ b/br/pkg/storage/memstore.go @@ -252,9 +252,6 @@ func (s *MemStorage) Create(ctx context.Context, name string, _ *WriterOption) ( } s.rwm.Lock() defer s.rwm.Unlock() - if _, ok := s.dataStore[name]; ok { - return nil, errors.Errorf("the file already exists: %s", name) - } theFile := new(memFile) s.dataStore[name] = theFile return &memFileWriter{ diff --git a/br/pkg/storage/storage.go b/br/pkg/storage/storage.go index 41edeb1a3bf50..042d4a1f4d715 100644 --- a/br/pkg/storage/storage.go +++ b/br/pkg/storage/storage.go @@ -127,7 +127,9 @@ type ExternalStorage interface { // URI returns the base path as a URI URI() string - // Create opens a file writer by path. path is relative path to storage base path. Currently only s3 implemented WriterOption + // Create opens a file writer by path. path is relative path to storage base + // path. The old file under same path will be overwritten. Currently only s3 + // implemented WriterOption. Create(ctx context.Context, path string, option *WriterOption) (ExternalFileWriter, error) // Rename file name from oldFileName to newFileName Rename(ctx context.Context, oldFileName, newFileName string) error diff --git a/pkg/lightning/backend/external/writer.go b/pkg/lightning/backend/external/writer.go index 5c6b57efc71bf..7798553348ae6 100644 --- a/pkg/lightning/backend/external/writer.go +++ b/pkg/lightning/backend/external/writer.go @@ -357,15 +357,15 @@ type Writer struct { } // WriteRow implements ingest.Writer. -func (w *Writer) WriteRow(ctx context.Context, idxKey, idxVal []byte, handle tidbkv.Handle) error { +func (w *Writer) WriteRow(ctx context.Context, key, val []byte, handle tidbkv.Handle) error { keyAdapter := w.keyAdapter var rowID []byte if handle != nil { rowID = handle.Encoded() } - encodedKeyLen := keyAdapter.EncodedLen(idxKey, rowID) - length := encodedKeyLen + len(idxVal) + lengthBytes*2 + encodedKeyLen := keyAdapter.EncodedLen(key, rowID) + length := encodedKeyLen + len(val) + lengthBytes*2 dataBuf, loc := w.kvBuffer.AllocBytesWithSliceLocation(length) if dataBuf == nil { if err := w.flushKVs(ctx, false); err != nil { @@ -378,12 +378,12 @@ func (w *Writer) WriteRow(ctx context.Context, idxKey, idxVal []byte, handle tid } } binary.BigEndian.AppendUint64(dataBuf[:0], uint64(encodedKeyLen)) - binary.BigEndian.AppendUint64(dataBuf[:lengthBytes], uint64(len(idxVal))) - keyAdapter.Encode(dataBuf[2*lengthBytes:2*lengthBytes:2*lengthBytes+encodedKeyLen], idxKey, rowID) - copy(dataBuf[2*lengthBytes+encodedKeyLen:], idxVal) + binary.BigEndian.AppendUint64(dataBuf[:lengthBytes], uint64(len(val))) + keyAdapter.Encode(dataBuf[2*lengthBytes:2*lengthBytes:2*lengthBytes+encodedKeyLen], key, rowID) + copy(dataBuf[2*lengthBytes+encodedKeyLen:], val) w.kvLocations = append(w.kvLocations, loc) - w.kvSize += int64(encodedKeyLen + len(idxVal)) + w.kvSize += int64(encodedKeyLen + len(val)) w.batchSize += uint64(length) w.totalCnt += 1 return nil @@ -510,7 +510,6 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { w.kvLocations = w.kvLocations[:0] w.kvSize = 0 w.kvBuffer.Reset() - w.rc.reset() w.batchSize = 0 w.currentSeq++ return nil @@ -536,6 +535,7 @@ func (w *Writer) flushSortedKVs(ctx context.Context) (string, string, error) { _ = statWriter.Close(ctx) } }() + w.rc.reset() kvStore, err := NewKeyValueStore(ctx, dataWriter, w.rc) if err != nil { return "", "", err diff --git a/pkg/lightning/backend/external/writer_test.go b/pkg/lightning/backend/external/writer_test.go index 73cfba7accf6c..85c61ef56e8b6 100644 --- a/pkg/lightning/backend/external/writer_test.go +++ b/pkg/lightning/backend/external/writer_test.go @@ -519,15 +519,81 @@ func TestWriterSort(t *testing.T) { } return false }) - println("thread quick sort", time.Since(ts).String()) + t.Log("thread quick sort", time.Since(ts).String()) ts = time.Now() slices.SortFunc(kvs2, func(i, j common.KvPair) int { return bytes.Compare(i.Key, j.Key) }) - println("quick sort", time.Since(ts).String()) + t.Log("quick sort", time.Since(ts).String()) for i := 0; i < 1000000; i++ { require.True(t, bytes.Compare(kvs[i].Key, kvs2[i].Key) == 0) } } + +type writerFirstCloseFailStorage struct { + storage.ExternalStorage + shouldFail bool +} + +func (s *writerFirstCloseFailStorage) Create( + ctx context.Context, + path string, + option *storage.WriterOption, +) (storage.ExternalFileWriter, error) { + w, err := s.ExternalStorage.Create(ctx, path, option) + if err != nil { + return nil, err + } + if strings.Contains(path, statSuffix) { + return &firstCloseFailWriter{ExternalFileWriter: w, shouldFail: &s.shouldFail}, nil + } + return w, nil +} + +type firstCloseFailWriter struct { + storage.ExternalFileWriter + shouldFail *bool +} + +func (w *firstCloseFailWriter) Close(ctx context.Context) error { + if *w.shouldFail { + *w.shouldFail = false + return fmt.Errorf("first close fail") + } + return w.ExternalFileWriter.Close(ctx) +} + +func TestFlushKVsRetry(t *testing.T) { + ctx := context.Background() + store := &writerFirstCloseFailStorage{ExternalStorage: storage.NewMemStorage(), shouldFail: true} + + writer := NewWriterBuilder(). + SetPropKeysDistance(4). + SetMemorySizeLimit(100). + SetBlockSize(100). // 2 KV pair will trigger flush + Build(store, "/test", "0") + err := writer.WriteRow(ctx, []byte("key1"), []byte("val1"), nil) + require.NoError(t, err) + err = writer.WriteRow(ctx, []byte("key3"), []byte("val3"), nil) + require.NoError(t, err) + err = writer.WriteRow(ctx, []byte("key2"), []byte("val2"), nil) + require.NoError(t, err) + // manually test flushKVs + err = writer.flushKVs(ctx, false) + require.NoError(t, err) + + require.False(t, store.shouldFail) + + r, err := newStatsReader(ctx, store, "/test/0_stat/0", 100) + require.NoError(t, err) + p, err := r.nextProp() + lastKey := []byte{} + for err != io.EOF { + require.NoError(t, err) + require.True(t, bytes.Compare(lastKey, p.firstKey) < 0) + lastKey = append(lastKey[:0], p.firstKey...) + p, err = r.nextProp() + } +}