Skip to content

Commit

Permalink
lightning: check peers write stall when switch-mode is disabled (#40228)
Browse files Browse the repository at this point in the history
close #40163
  • Loading branch information
lance6716 authored Dec 30, 2022
1 parent f7de8be commit 47080d9
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 71 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ go_test(
"//br/pkg/lightning/glue",
"//br/pkg/lightning/log",
"//br/pkg/lightning/mydump",
"//br/pkg/lightning/worker",
"//br/pkg/membuf",
"//br/pkg/mock",
"//br/pkg/pdutil",
Expand Down
17 changes: 11 additions & 6 deletions br/pkg/lightning/backend/local/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,17 @@ import (
"github.com/stretchr/testify/require"
)

func TestIngestSSTWithClosedEngine(t *testing.T) {
func makePebbleDB(t *testing.T, opt *pebble.Options) (*pebble.DB, string) {
dir := t.TempDir()
db, err := pebble.Open(path.Join(dir, "test"), opt)
require.NoError(t, err)
tmpPath := filepath.Join(dir, "test.sst")
err = os.Mkdir(tmpPath, 0o755)
require.NoError(t, err)
return db, tmpPath
}

func TestIngestSSTWithClosedEngine(t *testing.T) {
opt := &pebble.Options{
MemTableSize: 1024 * 1024,
MaxConcurrentCompactions: 16,
Expand All @@ -41,11 +50,7 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
DisableWAL: true,
ReadOnly: false,
}
db, err := pebble.Open(filepath.Join(dir, "test"), opt)
require.NoError(t, err)
tmpPath := filepath.Join(dir, "test.sst")
err = os.Mkdir(tmpPath, 0o755)
require.NoError(t, err)
db, tmpPath := makePebbleDB(t, opt)

_, engineUUID := backend.MakeUUID("ww", 0)
engineCtx, cancel := context.WithCancel(context.Background())
Expand Down
44 changes: 44 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ const (
gRPCKeepAliveTime = 10 * time.Minute
gRPCKeepAliveTimeout = 5 * time.Minute
gRPCBackOffMaxDelay = 10 * time.Minute
writeStallSleepTime = 10 * time.Second

// The max ranges count in a batch to split and scatter.
maxBatchSplitRanges = 4096
Expand Down Expand Up @@ -381,6 +382,12 @@ type local struct {

encBuilder backend.EncodingBuilder
targetInfoGetter backend.TargetInfoGetter

// When TiKV is in normal mode, ingesting too many SSTs will cause TiKV write stall.
// To avoid this, we should check write stall before ingesting SSTs. Note that, we
// must check both leader node and followers in client side, because followers will
// not check write stall as long as ingest command is accepted by leader.
shouldCheckWriteStall bool
}

func openDuplicateDB(storeDir string) (*pebble.DB, error) {
Expand Down Expand Up @@ -503,6 +510,7 @@ func NewLocalBackend(
logger: log.FromContext(ctx),
encBuilder: NewEncodingBuilder(ctx),
targetInfoGetter: NewTargetInfoGetter(tls, g, cfg.TiDB.PdAddr),
shouldCheckWriteStall: cfg.Cron.SwitchMode.Duration == 0,
}
if m, ok := metric.FromContext(ctx); ok {
local.metrics = m
Expand Down Expand Up @@ -1146,6 +1154,25 @@ func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *sp
return resp, errors.Trace(err)
}

if local.shouldCheckWriteStall {
for {
maybeWriteStall, err := local.checkWriteStall(ctx, region)
if err != nil {
return nil, err
}
if !maybeWriteStall {
break
}
log.FromContext(ctx).Warn("ingest maybe cause write stall, sleep and retry",
zap.Duration("duration", writeStallSleepTime))
select {
case <-time.After(writeStallSleepTime):
case <-ctx.Done():
return nil, errors.Trace(ctx.Err())
}
}
}

req := &sst.MultiIngestRequest{
Context: reqCtx,
Ssts: metas,
Expand All @@ -1154,6 +1181,23 @@ func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *sp
return resp, errors.Trace(err)
}

func (local *local) checkWriteStall(ctx context.Context, region *split.RegionInfo) (bool, error) {
for _, peer := range region.Region.GetPeers() {
cli, err := local.getImportClient(ctx, peer.StoreId)
if err != nil {
return false, errors.Trace(err)
}
resp, err := cli.MultiIngest(ctx, &sst.MultiIngestRequest{})
if err != nil {
return false, errors.Trace(err)
}
if resp.Error != nil && resp.Error.ServerIsBusy != nil {
return true, nil
}
}
return false, nil
}

func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit int64, keysLimit int64) []Range {
ranges := make([]Range, 0, sizeProps.totalSize/uint64(sizeLimit))
curSize := uint64(0)
Expand Down
Loading

0 comments on commit 47080d9

Please sign in to comment.