diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index f2d1cffef436c..69c5a68fe5432 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1370,7 +1370,11 @@ func (local *Backend) executeJob( job.lastRetryableErr = err return nil } - if job.stage == needRescan { + // if the job.stage successfully converted into "ingested", it means + // these data are ingested into TiKV so we handle remaining data. + // For other job.stage, the job should be sent back to caller to retry + // later. + if job.stage != ingested { return nil } diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 8b1dfb3b4754f..09e714b8368c5 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -682,6 +682,7 @@ type mockImportClient struct { sst.ImportSSTClient store *metapb.Store resp *sst.IngestResponse + onceResp *atomic.Pointer[sst.IngestResponse] err error retry int cnt int @@ -709,8 +710,17 @@ func (c *mockImportClient) MultiIngest(_ context.Context, req *sst.MultiIngestRe if c.apiInvokeRecorder != nil { c.apiInvokeRecorder["MultiIngest"] = append(c.apiInvokeRecorder["MultiIngest"], c.store.GetId()) } - if c.cnt < c.retry && (c.err != nil || c.resp != nil) { - return c.resp, c.err + if c.cnt < c.retry { + if c.err != nil { + return c.resp, c.err + } + if c.onceResp != nil { + resp := c.onceResp.Swap(&sst.IngestResponse{}) + return resp, nil + } + if c.resp != nil { + return c.resp, nil + } } if !c.multiIngestCheckFn(c.store) { @@ -1452,6 +1462,123 @@ func TestPartialWriteIngestErrorWillPanic(t *testing.T) { require.Equal(t, []uint64{1}, apiInvokeRecorder["MultiIngest"]) } +func TestPartialWriteIngestBusy(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + apiInvokeRecorder := map[string][]uint64{} + notLeaderResp := &sst.IngestResponse{ + Error: &errorpb.Error{ + ServerIsBusy: &errorpb.ServerIsBusy{}, + }} + onceResp := &atomic.Pointer[sst.IngestResponse]{} + onceResp.Store(notLeaderResp) + + local := &Backend{ + splitCli: initTestSplitClient3Replica([][]byte{{}, {'c'}}, nil), + importClientFactory: &mockImportClientFactory{ + stores: []*metapb.Store{ + {Id: 1}, {Id: 2}, {Id: 3}, + }, + createClientFn: func(store *metapb.Store) sst.ImportSSTClient { + importCli := newMockImportClient() + importCli.store = store + importCli.apiInvokeRecorder = apiInvokeRecorder + if store.Id == 1 { + importCli.retry = 1 + importCli.onceResp = onceResp + } + return importCli + }, + }, + logger: log.L(), + writeLimiter: noopStoreWriteLimiter{}, + bufferPool: membuf.NewPool(), + supportMultiIngest: true, + tikvCodec: keyspace.CodecV1, + } + + db, tmpPath := makePebbleDB(t, nil) + _, engineUUID := backend.MakeUUID("ww", 0) + engineCtx, cancel2 := context.WithCancel(context.Background()) + f := &Engine{ + db: db, + UUID: engineUUID, + sstDir: tmpPath, + ctx: engineCtx, + cancel: cancel2, + sstMetasChan: make(chan metaOrFlush, 64), + keyAdapter: noopKeyAdapter{}, + logger: log.L(), + } + err := f.db.Set([]byte("a"), []byte("a"), nil) + require.NoError(t, err) + err = f.db.Set([]byte("a2"), []byte("a2"), nil) + require.NoError(t, err) + + jobCh := make(chan *regionJob, 10) + + partialWriteJob := ®ionJob{ + keyRange: Range{start: []byte("a"), end: []byte("c")}, + region: &split.RegionInfo{ + Region: &metapb.Region{ + Id: 1, + Peers: []*metapb.Peer{ + {Id: 1, StoreId: 1}, {Id: 2, StoreId: 2}, {Id: 3, StoreId: 3}, + }, + StartKey: []byte("a"), + EndKey: []byte("c"), + }, + Leader: &metapb.Peer{Id: 1, StoreId: 1}, + }, + stage: regionScanned, + engine: f, + // use small regionSplitSize to trigger partial write + regionSplitSize: 1, + } + var jobWg sync.WaitGroup + jobWg.Add(1) + jobCh <- partialWriteJob + + var wg sync.WaitGroup + wg.Add(1) + jobOutCh := make(chan *regionJob) + go func() { + defer wg.Done() + for { + job := <-jobOutCh + switch job.stage { + case wrote: + // mimic retry later + jobCh <- job + case ingested: + // partially write will change the start key + require.Equal(t, []byte("a2"), job.keyRange.start) + require.Equal(t, []byte("c"), job.keyRange.end) + jobWg.Done() + return + default: + require.Fail(t, "job stage %s is not expected, job: %v", job.stage, job) + } + } + }() + wg.Add(1) + go func() { + defer wg.Done() + err := local.startWorker(ctx, jobCh, jobOutCh, &jobWg) + require.NoError(t, err) + }() + + jobWg.Wait() + cancel() + wg.Wait() + + require.Equal(t, int64(2), f.importedKVCount.Load()) + + require.Equal(t, []uint64{1, 2, 3, 1, 2, 3}, apiInvokeRecorder["Write"]) + require.Equal(t, []uint64{1, 1, 1}, apiInvokeRecorder["MultiIngest"]) +} + // mockGetSizeProperties mocks that 50MB * 20 SST file. func mockGetSizeProperties(log.Logger, *pebble.DB, KeyAdapter) (*sizeProperties, error) { props := newSizeProperties()