Skip to content

Commit

Permalink
local backend: fix sst skip ingest when multiIngest=false and exhaust…
Browse files Browse the repository at this point in the history
…ed retry count (pingcap#50282) (pingcap#43)

close pingcap#50198

Co-authored-by: D3Hunter <[email protected]>
  • Loading branch information
2 people authored and GitHub Enterprise committed Feb 5, 2024
1 parent 55b2eb2 commit 4d8dfde
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 11 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -142,5 +142,6 @@ go_test(
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
)
21 changes: 10 additions & 11 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1518,19 +1518,18 @@ loopWrite:
continue
}
}
if err != nil {
log.FromContext(ctx).Warn("batch ingest fail after retry, will retry import full range", log.ShortError(err),
logutil.Region(region.Region), zap.Reflect("meta", ingestMetas))
return errors.Trace(err)
}
}

if err != nil {
log.FromContext(ctx).Warn("write and ingest region, will retry import full range", log.ShortError(err),
logutil.Region(region.Region), logutil.Key("start", start),
logutil.Key("end", end))
} else {
engine.importedKVSize.Add(rangeStats.totalBytes)
engine.importedKVCount.Add(rangeStats.count)
engine.finishedRanges.add(finishedRange)
if local.metrics != nil {
local.metrics.BytesCounter.WithLabelValues(metric.BytesStateImported).Add(float64(rangeStats.totalBytes))
}
engine.importedKVSize.Add(rangeStats.totalBytes)
engine.importedKVCount.Add(rangeStats.count)
engine.finishedRanges.add(finishedRange)
if local.metrics != nil {
local.metrics.BytesCounter.WithLabelValues(metric.BytesStateImported).Add(float64(rangeStats.totalBytes))
}
return errors.Trace(err)
}
Expand Down
79 changes: 79 additions & 0 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/pingcap/tidb/util/hack"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -872,6 +873,20 @@ func newMockImportClient() *mockImportClient {
}
}

func (c *mockImportClient) Ingest(context.Context, *sst.IngestRequest, ...grpc.CallOption) (*sst.IngestResponse, error) {
defer func() {
c.cnt++
}()
if c.apiInvokeRecorder != nil {
c.apiInvokeRecorder["Ingest"] = append(c.apiInvokeRecorder["Ingest"], c.store.GetId())
}
if c.cnt < c.retry && (c.err != nil || c.resp != nil) {
return c.resp, c.err
}

return &sst.IngestResponse{}, nil
}

func (c *mockImportClient) MultiIngest(context.Context, *sst.MultiIngestRequest, ...grpc.CallOption) (*sst.IngestResponse, error) {
defer func() {
c.cnt++
Expand Down Expand Up @@ -1422,3 +1437,67 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) {
require.Equal(t, []uint64{1, 2, 3, 11, 12, 13}, apiInvokeRecorder["Write"])
require.Equal(t, []uint64{1, 2, 3, 1, 11, 12, 13, 11}, apiInvokeRecorder["MultiIngest"])
}

func TestIngestFailOnFirstBatch(t *testing.T) {
logger := log.Logger{Logger: zap.NewExample()}
ctx := log.NewContext(context.Background(), logger)

apiInvokeRecorder := map[string][]uint64{}
importCli := newMockImportClient()

local := &local{
importClientFactory: &mockImportClientFactory{
stores: []*metapb.Store{
{Id: 1}, {Id: 2}, {Id: 3},
{Id: 11}, {Id: 12}, {Id: 13},
},
createClientFn: func(store *metapb.Store) sst.ImportSSTClient {
importCli.store = store
importCli.apiInvokeRecorder = apiInvokeRecorder
if store.Id == 1 {
importCli.retry = 5
importCli.err = status.Error(codes.Unknown, "Suspended { time_to_lease_expire: 1.204s }")
}
return importCli
},
},
logger: logger,
ingestConcurrency: worker.NewPool(ctx, 1, "ingest"),
writeLimiter: noopStoreWriteLimiter{},
bufferPool: membuf.NewPool(),
supportMultiIngest: false,
shouldCheckWriteStall: false,
}

db, tmpPath := makePebbleDB(t, nil)
_, engineUUID := backend.MakeUUID("ww", 0)
engineCtx, cancel := context.WithCancel(context.Background())
f := &Engine{
UUID: engineUUID,
sstDir: tmpPath,
ctx: engineCtx,
cancel: cancel,
sstMetasChan: make(chan metaOrFlush, 64),
keyAdapter: noopKeyAdapter{},
logger: log.L(),
}
f.db.Store(db)
err := db.Set([]byte("a"), []byte("a"), nil)
require.NoError(t, err)
require.False(t, local.supportMultiIngest)
err = local.writeAndIngestPairs(ctx, f, &split.RegionInfo{
Region: &metapb.Region{
Id: 1,
Peers: []*metapb.Peer{
{Id: 1, StoreId: 1},
},
},
Leader: &metapb.Peer{
Id: 1,
StoreId: 1,
},
}, []byte{}, []byte("b"), 0, 0)
require.ErrorContains(t, err, "Suspended { time_to_lease_expire: 1.204s }")
require.Equal(t, []uint64{1}, apiInvokeRecorder["Write"])
require.Equal(t, []uint64{1, 1, 1, 1, 1}, apiInvokeRecorder["Ingest"])
}

0 comments on commit 4d8dfde

Please sign in to comment.