-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
importinto/lightning: do remote checksum via sql #44803
Conversation
br/pkg/lightning/importer/import.go
Outdated
manager, err := NewChecksumManager(ctx, rc, kvStore) | ||
var manager local.ChecksumManager | ||
if rc.cfg.TikvImporter.Backend == config.BackendLocal && rc.cfg.PostRestore.Checksum != config.OpLevelOff { | ||
manager = local.NewTiDBChecksumExecutor(rc.db) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about add a hidden switch for checksum via sql or tikv ?
@@ -73,6 +84,115 @@ func (e *postProcessMinimalTaskExecutor) Run(ctx context.Context) error { | |||
return postProcess(ctx, mTask.taskMeta, &mTask.meta, mTask.logger) | |||
} | |||
|
|||
// postProcess does the post-processing for the task. | |||
func postProcess(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) (err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to add more unit tests for this function or checksum via sql
zap.Int("concurrency", distSQLScanConcurrency), zap.Int("retry", i)) | ||
if distSQLScanConcurrency > local.MinDistSQLScanConcurrency { | ||
distSQLScanConcurrency = mathutil.Max(distSQLScanConcurrency/2, local.MinDistSQLScanConcurrency) | ||
se.GetSessionVars().SetDistSQLScanConcurrency(distSQLScanConcurrency) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert it back after execute? the session taken from resource pool
distSQLScanConcurrency = se.GetSessionVars().DistSQLScanConcurrency() | ||
|
||
for i := 0; i < maxErrorRetryCount; i++ { | ||
rs, execErr = storage.ExecSQL(ctx, se, sql) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put in explicit txn?
br/pkg/checksum/executor.go
Outdated
@@ -342,12 +342,14 @@ func (exec *Executor) Execute( | |||
// | |||
// It is useful in TiDB, however, it's a place holder in BR. | |||
killed := uint32(0) | |||
vars := kv.NewVariables(&killed) | |||
vars.BackOffWeight *= 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make it a configurable parameter ?
@@ -111,7 +113,11 @@ func (e *tidbChecksumExecutor) Checksum(ctx context.Context, tableInfo *checkpoi | |||
// +---------+------------+---------------------+-----------+-------------+ | |||
|
|||
cs := RemoteChecksum{} | |||
err = common.SQLWithRetry{DB: e.db, Logger: task.Logger}.QueryRow(ctx, "compute remote checksum", | |||
exec := common.SQLWithRetry{DB: e.db, Logger: task.Logger} | |||
if err := exec.Exec(ctx, "increase tidb_backoff_weight", fmt.Sprintf("SET SESSION tidb_backoff_weight = '%d';", 3*tikvstore.DefBackOffWeight)); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make backoff weight a configurable parameter ?
if !ok { | ||
return errors.New("tidb_backoff_weight not set") | ||
} | ||
backoffWeight := mathutil.Max(backoffWeightBackup, strconv.Itoa(3*tikvstore.DefBackOffWeight)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make backoff weight a configurable parameter ?
/run-integration-br-test |
1 similar comment
/run-integration-br-test |
/run-integration-br-test |
/run-integration-br-test |
/run-cherry-picker |
In response to a cherrypick label: new pull request created to branch |
In response to a cherrypick label: new pull request created to branch |
Signed-off-by: ti-chi-bot <[email protected]>
In response to a cherrypick label: new pull request created to branch |
Signed-off-by: ti-chi-bot <[email protected]>
This reverts commit d0bb20d.
This reverts commit d19162e.
/run-cherry-picker |
/run-cherry-picker |
In response to a cherrypick label: new pull request created to branch |
Signed-off-by: ti-chi-bot <[email protected]>
This reverts commit 3a13785.
/cherry-pick release-7.1-20230706-v7.1.0 |
@D3Hunter: new pull request created to branch In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
Signed-off-by: ti-chi-bot <[email protected]>
What problem does this PR solve?
Issue Number: close #41941 , ref #44711
Problem Summary:
What is changed and how it works?
Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.