Skip to content

Commit

Permalink
lightning: check hasDupe and tableID when resolve duplicate rows (#40696
Browse files Browse the repository at this point in the history
) (#40793)

close #40657
  • Loading branch information
ti-chi-bot authored Feb 19, 2023
1 parent adb9d90 commit 772c688
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 10 deletions.
22 changes: 20 additions & 2 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import (
"go.uber.org/atomic"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"google.golang.org/grpc"
Expand Down Expand Up @@ -1709,13 +1710,18 @@ func (local *local) ResolveDuplicateRows(ctx context.Context, tbl table.Table, t
return err
}

tableIDs := physicalTableIDs(tbl.Meta())
keyInTable := func(key []byte) bool {
return slices.Contains(tableIDs, tablecodec.DecodeTableID(key))
}

errLimiter := rate.NewLimiter(1, 1)
pool := utils.NewWorkerPool(uint(local.dupeConcurrency), "resolve duplicate rows")
err = local.errorMgr.ResolveAllConflictKeys(
ctx, tableName, pool,
func(ctx context.Context, handleRows [][2][]byte) error {
for {
err := local.deleteDuplicateRows(ctx, logger, handleRows, decoder)
err := local.deleteDuplicateRows(ctx, logger, handleRows, decoder, keyInTable)
if err == nil {
return nil
}
Expand All @@ -1738,7 +1744,13 @@ func (local *local) ResolveDuplicateRows(ctx context.Context, tbl table.Table, t
return errors.Trace(err)
}

func (local *local) deleteDuplicateRows(ctx context.Context, logger *log.Task, handleRows [][2][]byte, decoder *kv.TableKVDecoder) (err error) {
func (local *local) deleteDuplicateRows(
ctx context.Context,
logger *log.Task,
handleRows [][2][]byte,
decoder *kv.TableKVDecoder,
keyInTable func(key []byte) bool,
) (err error) {
// Starts a Delete transaction.
txn, err := local.tikvCli.Begin()
if err != nil {
Expand All @@ -1763,6 +1775,12 @@ func (local *local) deleteDuplicateRows(ctx context.Context, logger *log.Task, h
// (if the number of duplicates is small this should fit entirely in memory)
// (Txn's MemBuf's bufferSizeLimit is currently infinity)
for _, handleRow := range handleRows {
// Skip the row key if it's not in the table.
// This can happen if the table has been recreated or truncated,
// and the duplicate key is from the old table.
if !keyInTable(handleRow[0]) {
continue
}
logger.Debug("[resolve-dupe] found row to resolve",
logutil.Key("handle", handleRow[0]),
logutil.Key("row", handleRow[1]))
Expand Down
8 changes: 5 additions & 3 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,9 +820,11 @@ func (tr *TableRestore) postProcess(
}
hasDupe = hasDupe || hasRemoteDupe

if err = rc.backend.ResolveDuplicateRows(ctx, tr.encTable, tr.tableName, rc.cfg.TikvImporter.DuplicateResolution); err != nil {
tr.logger.Error("resolve remote duplicate keys failed", log.ShortError(err))
return false, err
if hasDupe {
if err = rc.backend.ResolveDuplicateRows(ctx, tr.encTable, tr.tableName, rc.cfg.TikvImporter.DuplicateResolution); err != nil {
tr.logger.Error("resolve remote duplicate keys failed", log.ShortError(err))
return false, err
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions br/tests/lightning_disable_scheduler_by_key_range/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ ready_for_import_engine

run_curl "https://${PD_ADDR}/pd/api/v1/config/cluster-version"

length=$(run_curl "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq 'select(.[].rule_type == "key-range") | length')
length=$(run_curl "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq '[ .[] | select(.rule_type == "key-range" and .labels[0].key == "schedule") ] | length')
if [ "$length" != "1" ]; then
echo "region-label key-range rules should be 1, but got $length" >&2
echo "region-label key-range schedule rules should be 1, but got $length" >&2
exit 1
fi

wait "$shpid"

length=$(run_curl "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq 'select(.[].rule_type == "key-range") | length')
length=$(run_curl "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq '[ .[] | select(.rule_type == "key-range" and .labels[0].key == "schedule") ] | length')
if [ -n "$length" ] && [ "$length" -ne 0 ]; then
echo "region-label key-range rules should be 0, but got $length" >&2
echo "region-label key-range schedule rules should be 0, but got $length" >&2
exit 1
fi
6 changes: 6 additions & 0 deletions br/tests/lightning_issue_40657/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[tikv-importer]
backend = "local"
duplicate-resolution = "remove"

[mydumper.csv]
header = true
6 changes: 6 additions & 0 deletions br/tests/lightning_issue_40657/data1/test.t-schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE `t` (
`id` int(11) NOT NULL,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */,
UNIQUE KEY `uni_name` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
6 changes: 6 additions & 0 deletions br/tests/lightning_issue_40657/data1/test.t.0.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
id,name
1,"aaa01"
2,"aaa02"
3,"aaa03"
4,"aaa04"
5,"aaa04"
6 changes: 6 additions & 0 deletions br/tests/lightning_issue_40657/data2/test.t-schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE `t` (
`id` int(11) NOT NULL,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */,
UNIQUE KEY `uni_name` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
6 changes: 6 additions & 0 deletions br/tests/lightning_issue_40657/data2/test.t.0.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
id,name
1,"aaa01"
2,"aaa02"
3,"aaa03"
4,"aaa04"
5,"aaa05"
32 changes: 32 additions & 0 deletions br/tests/lightning_issue_40657/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/bin/bash
#
# Copyright 2023 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eux

check_cluster_version 5 2 0 'duplicate detection' || exit 0

run_lightning -d "tests/$TEST_NAME/data1"
run_sql 'admin check table test.t'
run_sql 'select count(*) from test.t'
check_contains 'count(*): 3'
run_sql 'select count(*) from lightning_task_info.conflict_error_v1'
check_contains 'count(*): 2'

run_sql 'truncate table test.t'
run_lightning -d "tests/$TEST_NAME/data2"
run_sql 'admin check table test.t'
run_sql 'select count(*) from test.t'
check_contains 'count(*): 5'
2 changes: 1 addition & 1 deletion br/tests/lightning_reload_cert/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ shpid="$!"
sleep 15
ok=0
for _ in {0..60}; do
if grep -Fq "connection closed before server preface received" "$TEST_DIR"/lightning.log; then
if grep -Fq "connection error" "$TEST_DIR"/lightning.log; then
ok=1
break
fi
Expand Down

0 comments on commit 772c688

Please sign in to comment.