Skip to content

Commit

Permalink
Merge branch 'master' into optimize_topn_ref
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Jun 16, 2022
2 parents 7ae2aac + f4b29f0 commit 35c0dbf
Show file tree
Hide file tree
Showing 70 changed files with 1,912 additions and 553 deletions.
2 changes: 1 addition & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ build --incompatible_strict_action_env --incompatible_enable_cc_toolchain_resolu
build:ci --remote_cache=http://172.16.4.3:8084/tidb
test:ci --verbose_failures
test:ci --test_timeout=180
test:ci --test_env=GO_TEST_WRAP_TESTV=1
test:ci --test_env=GO_TEST_WRAP_TESTV=1 --test_verbose_timeout_warnings
test:ci --remote_cache=http://172.16.4.3:8084/tidb
test:ci --test_env=TZ=Asia/Shanghai --test_output=errors --experimental_ui_max_stdouterr_bytes=104857600
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2244,8 +2244,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:KhOkCnqpxh/B2gGZdXSUyKgNRZaPzYsCIWGjNdrFmOA=",
version = "v2.0.1-0.20220531081749-2807409d4968",
sum = "h1:N5ivsNkDQDgimY0ZVqMnWqXjEnxy5uFChoB4wPIKpPI=",
version = "v2.0.1-0.20220613112734-be31f33ba03b",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,8 @@ func (importer *FileImporter) ImportKVFiles(
logutil.Key("startKey", startKey),
logutil.Key("endKey", endKey))

rs := utils.InitialRetryState(32, 100*time.Millisecond, 8*time.Second)
// This RetryState will retry 48 time, for 5 min - 6 min.
rs := utils.InitialRetryState(48, 100*time.Millisecond, 8*time.Second)
ctl := OverRegionsInRange(startKey, endKey, importer.metaClient, &rs)
err = ctl.Run(ctx, func(ctx context.Context, r *RegionInfo) RPCResult {
return importer.ImportKVFileForRegion(ctx, file, rule, startTS, restoreTS, r)
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,9 @@ func PaginateScanRegion(
var batch []*RegionInfo
batch, err = client.ScanRegions(ctx, scanStartKey, endKey, limit)
if err != nil {
return errors.Trace(err)
err = errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan regions from start-key:%s, err: %s",
redact.Key(scanStartKey), err.Error())
return err
}
regions = append(regions, batch...)
if len(batch) < limit {
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/restore/split_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -499,6 +500,11 @@ func (c *pdClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetO
}

func (c *pdClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error) {
failpoint.Inject("no-leader-error", func(_ failpoint.Value) {
logutil.CL(ctx).Debug("failpoint no-leader-error injected.")
failpoint.Return(nil, status.Error(codes.Unavailable, "not leader"))
})

regions, err := c.client.ScanRegions(ctx, key, endKey, limit)
if err != nil {
return nil, errors.Trace(err)
Expand Down
10 changes: 6 additions & 4 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ type TestClient struct {
injectInScatter func(*restore.RegionInfo) error
supportBatchScatter bool

scattered map[uint64]bool
InjectErr bool
scattered map[uint64]bool
InjectErr bool
InjectTimes int32
}

func NewTestClient(
Expand Down Expand Up @@ -216,8 +217,9 @@ func (c *TestClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.Ge
}

func (c *TestClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*restore.RegionInfo, error) {
if c.InjectErr {
return nil, errors.New("mock scan error")
if c.InjectErr && c.InjectTimes > 0 {
c.InjectTimes -= 1
return nil, status.Error(codes.Unavailable, "not leader")
}

infos := c.regionsInfo.ScanRange(key, endKey, limit)
Expand Down
12 changes: 8 additions & 4 deletions br/pkg/restore/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,10 @@ func TestPaginateScanRegion(t *testing.T) {
require.Regexp(t, ".*scan region return empty result.*", err.Error())

regionMap, regions = makeRegions(1)
batch, err = restore.PaginateScanRegion(ctx, NewTestClient(stores, regionMap, 0), []byte{}, []byte{}, 3)
tc := NewTestClient(stores, regionMap, 0)
tc.InjectErr = true
tc.InjectTimes = 10
batch, err = restore.PaginateScanRegion(ctx, tc, []byte{}, []byte{}, 3)
require.NoError(t, err)
require.Equal(t, regions, batch)

Expand Down Expand Up @@ -273,11 +276,12 @@ func TestPaginateScanRegion(t *testing.T) {
require.True(t, berrors.ErrRestoreInvalidRange.Equal(err))
require.Regexp(t, ".*startKey > endKey.*", err.Error())

tc := NewTestClient(stores, regionMap, 0)
tc = NewTestClient(stores, regionMap, 0)
tc.InjectErr = true
_, err = restore.PaginateScanRegion(ctx, tc, regions[1].Region.EndKey, regions[5].Region.EndKey, 3)
tc.InjectTimes = 65
_, err = restore.PaginateScanRegion(ctx, tc, []byte{}, []byte{}, 3)
require.Error(t, err)
require.Regexp(t, ".*mock scan error.*", err.Error())
require.True(t, berrors.ErrPDBatchScanRegion.Equal(err))

// make the regionMap losing some region, this will cause scan region check fails
delete(regionMap, uint64(3))
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (cfg *RestoreConfig) adjustRestoreConfig() {

func (cfg *RestoreConfig) adjustRestoreConfigForStreamRestore() {
if cfg.Config.Concurrency == 0 {
cfg.Config.Concurrency = 32
cfg.Config.Concurrency = 16
}
}

Expand Down
1 change: 1 addition & 0 deletions br/tests/br_full/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ for ct in limit lz4 zstd; do
# restore full
echo "restore with $ct backup start..."
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/restore/restore-storage-error=1*return(\"connection refused\");github.com/pingcap/tidb/br/pkg/restore/restore-gRPC-error=1*return(true)"
export GO_FAILPOINTS=$GO_FAILPOINTS";github.com/pingcap/tidb/br/pkg/restore/no-leader-error=3*return(true)"
run_br restore full -s "local://$TEST_DIR/$DB-$ct" --pd $PD_ADDR --ratelimit 1024
export GO_FAILPOINTS=""

Expand Down
8 changes: 6 additions & 2 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ func (d *ddl) close() {
d.schemaSyncer.Close()

for _, worker := range d.workers {
worker.close()
worker.Close()
}
// d.delRangeMgr using sessions from d.sessPool.
// Put it before d.sessPool.close to reduce the time spent by d.sessPool.close.
Expand Down Expand Up @@ -812,7 +812,11 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
}
}

func (d *ddl) callHookOnChanged(err error) error {
func (d *ddl) callHookOnChanged(job *model.Job, err error) error {
if job.State == model.JobStateNone {
// We don't call the hook if the job haven't run yet.
return err
}
d.mu.RLock()
defer d.mu.RUnlock()

Expand Down
Loading

0 comments on commit 35c0dbf

Please sign in to comment.