From 08f7acd5d52596d7f3d7055d4dbae835fba1df28 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Wed, 1 Nov 2023 21:47:06 +0800 Subject: [PATCH 1/3] This is an automated cherry-pick of #48185 Signed-off-by: ti-chi-bot --- br/pkg/lightning/backend/local/engine.go | 3 + br/pkg/lightning/backend/local/local.go | 85 +++++++++- br/pkg/lightning/backend/local/local_test.go | 157 ++++++++++++++++++ .../backend/local/localhelper_test.go | 10 ++ 4 files changed, 251 insertions(+), 4 deletions(-) diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 399939fcd8e15..7dc5b083ab6ce 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -952,6 +952,9 @@ func (e *Engine) getFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []by LowerBound: lowerBound, UpperBound: upperBound, } + failpoint.Inject("mockGetFirstAndLastKey", func() { + failpoint.Return(lowerBound, upperBound, nil) + }) iter := e.newKVIter(context.Background(), opt) //nolint: errcheck diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 1c6eb1e7cb8d6..befb52e42e46a 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1121,10 +1121,63 @@ func (local *Backend) generateAndSendJob( ) error { logger := log.FromContext(ctx) +<<<<<<< HEAD // when use dynamic region feature, the region may be very big, we need // to split to smaller ranges to increase the concurrency. if regionSplitSize > 2*int64(config.SplitRegionSize) { sizeProps, err := getSizePropertiesFn(logger, engine.getDB(), local.keyAdapter) +======= + logger.Debug("the ranges length write to tikv", zap.Int("length", len(jobRanges))) + + eg, egCtx := errgroup.WithContext(ctx) + + dataAndRangeCh := make(chan common.DataAndRange) + for i := 0; i < local.WorkerConcurrency; i++ { + eg.Go(func() error { + for { + select { + case <-egCtx.Done(): + return nil + case p, ok := <-dataAndRangeCh: + if !ok { + return nil + } + + failpoint.Inject("beforeGenerateJob", nil) + failpoint.Inject("sendDummyJob", func(_ failpoint.Value) { + // this is used to trigger worker failure, used together + // with WriteToTiKVNotEnoughDiskSpace + jobToWorkerCh <- ®ionJob{} + time.Sleep(5 * time.Second) + }) + jobs, err := local.generateJobForRange(egCtx, p.Data, p.Range, regionSplitSize, regionSplitKeys) + if err != nil { + if common.IsContextCanceledError(err) { + return nil + } + return err + } + for _, job := range jobs { + job.ref(jobWg) + select { + case <-egCtx.Done(): + // this job is not put into jobToWorkerCh + job.done(jobWg) + // if the context is canceled, it means worker has error, the first error can be + // found by worker's error group LATER. if this function returns an error it will + // seize the "first error". + return nil + case jobToWorkerCh <- job: + } + } + } + } + }) + } + + eg.Go(func() error { + err := engine.LoadIngestData(egCtx, jobRanges, dataAndRangeCh) +>>>>>>> c652a92df89 (local backend: fix worker err overriden by job generation err (#48185)) if err != nil { return errors.Trace(err) } @@ -1548,6 +1601,7 @@ func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges }) } +<<<<<<< HEAD err := local.prepareAndSendJob( workerCtx, engine, @@ -1563,11 +1617,34 @@ func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges _ = workGroup.Wait() return firstErr.Get() } +======= + failpoint.Label("afterStartWorker") + + workGroup.Go(func() error { + err := local.prepareAndSendJob( + workerCtx, + engine, + regionRanges, + regionSplitSize, + regionSplitKeys, + jobToWorkerCh, + &jobWg, + ) + if err != nil { + return err + } +>>>>>>> c652a92df89 (local backend: fix worker err overriden by job generation err (#48185)) - jobWg.Wait() - workerCancel() - firstErr.Set(workGroup.Wait()) - firstErr.Set(ctx.Err()) + jobWg.Wait() + workerCancel() + return nil + }) + if err := workGroup.Wait(); err != nil { + if !common.IsContextCanceledError(err) { + log.FromContext(ctx).Error("do import meets error", zap.Error(err)) + } + firstErr.Set(err) + } return firstErr.Get() } diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 902dd906fa040..08d78096f2d83 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -2085,3 +2085,160 @@ func TestCtxCancelIsIgnored(t *testing.T) { err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) require.ErrorContains(t, err, "the remaining storage capacity of TiKV") } +<<<<<<< HEAD +======= + +func TestWorkerFailedWhenGeneratingJobs(t *testing.T) { + backup := maxRetryBackoffSecond + maxRetryBackoffSecond = 1 + t.Cleanup(func() { + maxRetryBackoffSecond = backup + }) + + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/sendDummyJob", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockGetFirstAndLastKey", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace", "return()") + t.Cleanup(func() { + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/sendDummyJob") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockGetFirstAndLastKey") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace") + }) + + initRanges := []common.Range{ + {Start: []byte{'c'}, End: []byte{'d'}}, + } + + ctx := context.Background() + l := &Backend{ + BackendConfig: BackendConfig{ + WorkerConcurrency: 1, + }, + splitCli: initTestSplitClient( + [][]byte{{1}, {11}}, + panicSplitRegionClient{}, + ), + } + e := &Engine{} + err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) + require.ErrorContains(t, err, "the remaining storage capacity of TiKV") +} + +func TestExternalEngine(t *testing.T) { + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipStartWorker", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/injectVariables", "return()") + t.Cleanup(func() { + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipStartWorker") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/injectVariables") + }) + ctx := context.Background() + dir := t.TempDir() + storageURI := "file://" + filepath.ToSlash(dir) + storeBackend, err := storage.ParseBackend(storageURI, nil) + require.NoError(t, err) + extStorage, err := storage.New(ctx, storeBackend, nil) + require.NoError(t, err) + keys := make([][]byte, 100) + values := make([][]byte, 100) + for i := range keys { + keys[i] = []byte(fmt.Sprintf("key%06d", i)) + values[i] = []byte(fmt.Sprintf("value%06d", i)) + } + // simple append 0x00 + endKey := make([]byte, len(keys[99])+1) + copy(endKey, keys[99]) + + dataFiles, statFiles, err := external.MockExternalEngine(extStorage, keys, values) + require.NoError(t, err) + + externalCfg := &backend.ExternalEngineConfig{ + StorageURI: storageURI, + DataFiles: dataFiles, + StatFiles: statFiles, + StartKey: keys[0], + EndKey: endKey, + SplitKeys: [][]byte{keys[30], keys[60], keys[90]}, + TotalFileSize: int64(config.SplitRegionSize) + 1, + TotalKVCount: int64(config.SplitRegionKeys) + 1, + } + engineUUID := uuid.New() + pdCtl := &pdutil.PdController{} + pdCtl.SetPDClient(&mockPdClient{}) + local := &Backend{ + BackendConfig: BackendConfig{ + WorkerConcurrency: 2, + }, + splitCli: initTestSplitClient([][]byte{ + keys[0], keys[50], endKey, + }, nil), + pdCtl: pdCtl, + externalEngine: map[uuid.UUID]common.Engine{}, + keyAdapter: common.NoopKeyAdapter{}, + } + jobs := make([]*regionJob, 0, 5) + + jobToWorkerCh := make(chan *regionJob, 10) + testJobToWorkerCh = jobToWorkerCh + + done := make(chan struct{}) + go func() { + for i := 0; i < 5; i++ { + jobs = append(jobs, <-jobToWorkerCh) + testJobWg.Done() + } + }() + go func() { + err2 := local.CloseEngine( + ctx, + &backend.EngineConfig{External: externalCfg}, + engineUUID, + ) + require.NoError(t, err2) + err2 = local.ImportEngine(ctx, engineUUID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) + require.NoError(t, err2) + close(done) + }() + + <-done + + // no jobs left in the channel + require.Len(t, jobToWorkerCh, 0) + + sort.Slice(jobs, func(i, j int) bool { + return bytes.Compare(jobs[i].keyRange.Start, jobs[j].keyRange.Start) < 0 + }) + expectedKeyRanges := []common.Range{ + {Start: keys[0], End: keys[30]}, + {Start: keys[30], End: keys[50]}, + {Start: keys[50], End: keys[60]}, + {Start: keys[60], End: keys[90]}, + {Start: keys[90], End: endKey}, + } + kvIdx := 0 + for i, job := range jobs { + require.Equal(t, expectedKeyRanges[i], job.keyRange) + iter := job.ingestData.NewIter(ctx, job.keyRange.Start, job.keyRange.End) + for iter.First(); iter.Valid(); iter.Next() { + require.Equal(t, keys[kvIdx], iter.Key()) + require.Equal(t, values[kvIdx], iter.Value()) + kvIdx++ + } + require.NoError(t, iter.Error()) + require.NoError(t, iter.Close()) + } + require.Equal(t, 100, kvIdx) +} + +func TestGetExternalEngineKVStatistics(t *testing.T) { + b := Backend{ + externalEngine: map[uuid.UUID]common.Engine{}, + } + // non existent uuid + size, count := b.GetExternalEngineKVStatistics(uuid.New()) + require.Zero(t, size) + require.Zero(t, count) +} +>>>>>>> c652a92df89 (local backend: fix worker err overriden by job generation err (#48185)) diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index d677e9c1dc7ba..881935887c0c2 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -249,6 +249,16 @@ func (c *testSplitClient) GetOperator(ctx context.Context, regionID uint64) (*pd } func (c *testSplitClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*split.RegionInfo, error) { +<<<<<<< HEAD +======= + c.mu.Lock() + defer c.mu.Unlock() + + if err := ctx.Err(); err != nil { + return nil, err + } + +>>>>>>> c652a92df89 (local backend: fix worker err overriden by job generation err (#48185)) if c.hook != nil { key, endKey, limit = c.hook.BeforeScanRegions(ctx, key, endKey, limit) } From 2c7c770fc6e856309297f123f48448c7fc8a72a9 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 2 Nov 2023 13:02:43 +0800 Subject: [PATCH 2/3] fix conflict --- br/pkg/lightning/backend/local/local.go | 79 +----------- br/pkg/lightning/backend/local/local_test.go | 124 +------------------ 2 files changed, 8 insertions(+), 195 deletions(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index befb52e42e46a..df2e20310b43b 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1121,63 +1121,10 @@ func (local *Backend) generateAndSendJob( ) error { logger := log.FromContext(ctx) -<<<<<<< HEAD // when use dynamic region feature, the region may be very big, we need // to split to smaller ranges to increase the concurrency. if regionSplitSize > 2*int64(config.SplitRegionSize) { sizeProps, err := getSizePropertiesFn(logger, engine.getDB(), local.keyAdapter) -======= - logger.Debug("the ranges length write to tikv", zap.Int("length", len(jobRanges))) - - eg, egCtx := errgroup.WithContext(ctx) - - dataAndRangeCh := make(chan common.DataAndRange) - for i := 0; i < local.WorkerConcurrency; i++ { - eg.Go(func() error { - for { - select { - case <-egCtx.Done(): - return nil - case p, ok := <-dataAndRangeCh: - if !ok { - return nil - } - - failpoint.Inject("beforeGenerateJob", nil) - failpoint.Inject("sendDummyJob", func(_ failpoint.Value) { - // this is used to trigger worker failure, used together - // with WriteToTiKVNotEnoughDiskSpace - jobToWorkerCh <- ®ionJob{} - time.Sleep(5 * time.Second) - }) - jobs, err := local.generateJobForRange(egCtx, p.Data, p.Range, regionSplitSize, regionSplitKeys) - if err != nil { - if common.IsContextCanceledError(err) { - return nil - } - return err - } - for _, job := range jobs { - job.ref(jobWg) - select { - case <-egCtx.Done(): - // this job is not put into jobToWorkerCh - job.done(jobWg) - // if the context is canceled, it means worker has error, the first error can be - // found by worker's error group LATER. if this function returns an error it will - // seize the "first error". - return nil - case jobToWorkerCh <- job: - } - } - } - } - }) - } - - eg.Go(func() error { - err := engine.LoadIngestData(egCtx, jobRanges, dataAndRangeCh) ->>>>>>> c652a92df89 (local backend: fix worker err overriden by job generation err (#48185)) if err != nil { return errors.Trace(err) } @@ -1200,6 +1147,12 @@ func (local *Backend) generateAndSendJob( } failpoint.Inject("beforeGenerateJob", nil) + failpoint.Inject("sendDummyJob", func(_ failpoint.Value) { + // this is used to trigger worker failure, used together + // with WriteToTiKVNotEnoughDiskSpace + jobToWorkerCh <- ®ionJob{} + time.Sleep(5 * time.Second) + }) jobs, err := local.generateJobForRange(egCtx, engine, r, regionSplitSize, regionSplitKeys) if err != nil { if common.IsContextCanceledError(err) { @@ -1601,25 +1554,6 @@ func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges }) } -<<<<<<< HEAD - err := local.prepareAndSendJob( - workerCtx, - engine, - regionRanges, - regionSplitSize, - regionSplitKeys, - jobToWorkerCh, - &jobWg, - ) - if err != nil { - firstErr.Set(err) - workerCancel() - _ = workGroup.Wait() - return firstErr.Get() - } -======= - failpoint.Label("afterStartWorker") - workGroup.Go(func() error { err := local.prepareAndSendJob( workerCtx, @@ -1633,7 +1567,6 @@ func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges if err != nil { return err } ->>>>>>> c652a92df89 (local backend: fix worker err overriden by job generation err (#48185)) jobWg.Wait() workerCancel() diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 08d78096f2d83..c54ceac27ef99 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -2085,8 +2085,6 @@ func TestCtxCancelIsIgnored(t *testing.T) { err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) require.ErrorContains(t, err, "the remaining storage capacity of TiKV") } -<<<<<<< HEAD -======= func TestWorkerFailedWhenGeneratingJobs(t *testing.T) { backup := maxRetryBackoffSecond @@ -2106,8 +2104,8 @@ func TestWorkerFailedWhenGeneratingJobs(t *testing.T) { _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace") }) - initRanges := []common.Range{ - {Start: []byte{'c'}, End: []byte{'d'}}, + initRanges := []Range{ + {start: []byte{'c'}, end: []byte{'d'}}, } ctx := context.Background() @@ -2124,121 +2122,3 @@ func TestWorkerFailedWhenGeneratingJobs(t *testing.T) { err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) require.ErrorContains(t, err, "the remaining storage capacity of TiKV") } - -func TestExternalEngine(t *testing.T) { - _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()") - _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipStartWorker", "return()") - _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/injectVariables", "return()") - t.Cleanup(func() { - _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter") - _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipStartWorker") - _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/injectVariables") - }) - ctx := context.Background() - dir := t.TempDir() - storageURI := "file://" + filepath.ToSlash(dir) - storeBackend, err := storage.ParseBackend(storageURI, nil) - require.NoError(t, err) - extStorage, err := storage.New(ctx, storeBackend, nil) - require.NoError(t, err) - keys := make([][]byte, 100) - values := make([][]byte, 100) - for i := range keys { - keys[i] = []byte(fmt.Sprintf("key%06d", i)) - values[i] = []byte(fmt.Sprintf("value%06d", i)) - } - // simple append 0x00 - endKey := make([]byte, len(keys[99])+1) - copy(endKey, keys[99]) - - dataFiles, statFiles, err := external.MockExternalEngine(extStorage, keys, values) - require.NoError(t, err) - - externalCfg := &backend.ExternalEngineConfig{ - StorageURI: storageURI, - DataFiles: dataFiles, - StatFiles: statFiles, - StartKey: keys[0], - EndKey: endKey, - SplitKeys: [][]byte{keys[30], keys[60], keys[90]}, - TotalFileSize: int64(config.SplitRegionSize) + 1, - TotalKVCount: int64(config.SplitRegionKeys) + 1, - } - engineUUID := uuid.New() - pdCtl := &pdutil.PdController{} - pdCtl.SetPDClient(&mockPdClient{}) - local := &Backend{ - BackendConfig: BackendConfig{ - WorkerConcurrency: 2, - }, - splitCli: initTestSplitClient([][]byte{ - keys[0], keys[50], endKey, - }, nil), - pdCtl: pdCtl, - externalEngine: map[uuid.UUID]common.Engine{}, - keyAdapter: common.NoopKeyAdapter{}, - } - jobs := make([]*regionJob, 0, 5) - - jobToWorkerCh := make(chan *regionJob, 10) - testJobToWorkerCh = jobToWorkerCh - - done := make(chan struct{}) - go func() { - for i := 0; i < 5; i++ { - jobs = append(jobs, <-jobToWorkerCh) - testJobWg.Done() - } - }() - go func() { - err2 := local.CloseEngine( - ctx, - &backend.EngineConfig{External: externalCfg}, - engineUUID, - ) - require.NoError(t, err2) - err2 = local.ImportEngine(ctx, engineUUID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) - require.NoError(t, err2) - close(done) - }() - - <-done - - // no jobs left in the channel - require.Len(t, jobToWorkerCh, 0) - - sort.Slice(jobs, func(i, j int) bool { - return bytes.Compare(jobs[i].keyRange.Start, jobs[j].keyRange.Start) < 0 - }) - expectedKeyRanges := []common.Range{ - {Start: keys[0], End: keys[30]}, - {Start: keys[30], End: keys[50]}, - {Start: keys[50], End: keys[60]}, - {Start: keys[60], End: keys[90]}, - {Start: keys[90], End: endKey}, - } - kvIdx := 0 - for i, job := range jobs { - require.Equal(t, expectedKeyRanges[i], job.keyRange) - iter := job.ingestData.NewIter(ctx, job.keyRange.Start, job.keyRange.End) - for iter.First(); iter.Valid(); iter.Next() { - require.Equal(t, keys[kvIdx], iter.Key()) - require.Equal(t, values[kvIdx], iter.Value()) - kvIdx++ - } - require.NoError(t, iter.Error()) - require.NoError(t, iter.Close()) - } - require.Equal(t, 100, kvIdx) -} - -func TestGetExternalEngineKVStatistics(t *testing.T) { - b := Backend{ - externalEngine: map[uuid.UUID]common.Engine{}, - } - // non existent uuid - size, count := b.GetExternalEngineKVStatistics(uuid.New()) - require.Zero(t, size) - require.Zero(t, count) -} ->>>>>>> c652a92df89 (local backend: fix worker err overriden by job generation err (#48185)) From 56e3173c0e32a84c178d68c9a6559f11921e6d49 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 2 Nov 2023 13:14:15 +0800 Subject: [PATCH 3/3] fix conflict --- br/pkg/lightning/backend/local/localhelper_test.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index 881935887c0c2..cc57565ded319 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -249,16 +249,10 @@ func (c *testSplitClient) GetOperator(ctx context.Context, regionID uint64) (*pd } func (c *testSplitClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*split.RegionInfo, error) { -<<<<<<< HEAD -======= - c.mu.Lock() - defer c.mu.Unlock() - if err := ctx.Err(); err != nil { return nil, err } ->>>>>>> c652a92df89 (local backend: fix worker err overriden by job generation err (#48185)) if c.hook != nil { key, endKey, limit = c.hook.BeforeScanRegions(ctx, key, endKey, limit) }