Skip to content

Commit

Permalink
Merge branch 'master' into testtypo-issue29669
Browse files Browse the repository at this point in the history
  • Loading branch information
mjonss authored Nov 15, 2021
2 parents d32bb97 + bb37653 commit 1fe8101
Show file tree
Hide file tree
Showing 182 changed files with 7,433 additions and 4,997 deletions.
23 changes: 21 additions & 2 deletions .github/workflows/compile_br.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ on:
- '!br/docs/**'
- '!br/tests/**'
- '!br/docker/**'
#change trigger policy
pull_request:
types:
- labeled # <--
branches:
- master
- 'release-[0-9].[0-9]*'
Expand All @@ -35,8 +38,25 @@ concurrency:
cancel-in-progress: true

jobs:
compile-windows:
if: github.event_name == 'push' || github.event_name == 'pull_request' && github.event.label.name == 'action/run-br-cross-platform-build'
name: Compile for Windows job
runs-on: windows-latest
steps:
- uses: actions/[email protected]

- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.16

- name: Run build
run: make build_tools

compile:
if: github.event_name == 'pull_request' && github.event.label.name == 'action/run-br-cross-platform-build'
name: Compile for ${{ matrix.os }} / ${{ matrix.target}}

runs-on: ${{ matrix.os }}
strategy:
matrix:
Expand All @@ -47,8 +67,6 @@ jobs:
- os: ubuntu-latest
target: aarch64-unknown-linux-gnu

- os: windows-latest
target: x86_64-pc-windows-msvc
steps:
- uses: actions/[email protected]

Expand All @@ -61,6 +79,7 @@ jobs:
run: make build_tools

compile-freebsd:
if: github.event_name == 'pull_request' && github.event.label.name == 'action/run-br-cross-platform-build'
name: Compile for FreeBSD job
runs-on: ubuntu-latest
steps:
Expand Down
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ explaintest: server_check
ddltest:
@cd cmd/ddltest && $(GO) test -o ../../bin/ddltest -c

upload-coverage: SHELL:=/bin/bash
upload-coverage:
ifeq ("$(TRAVIS_COVERAGE)", "1")
mv overalls.coverprofile coverage.txt
bash <(curl -s https://codecov.io/bash)
ifneq ($(CODECOV_TOKEN), "")
curl -LO ${FILE_SERVER_URL}/download/cicd/ci-tools/codecov
chmod +x codecov
./codecov -t ${CODECOV_TOKEN}
endif

devgotest: failpoint-enable
Expand All @@ -129,7 +129,7 @@ devgotest: failpoint-enable
gotest: failpoint-enable
@echo "Running in native mode."
@export log_level=info; export TZ='Asia/Shanghai'; \
$(GOTEST) -ldflags '$(TEST_LDFLAGS)' $(EXTRA_TEST_ARGS) -cover $(PACKAGES_TIDB_TESTS) -check.p true > gotest.log || { $(FAILPOINT_DISABLE); cat 'gotest.log'; exit 1; }
$(GOTEST) -ldflags '$(TEST_LDFLAGS)' $(EXTRA_TEST_ARGS) -cover $(PACKAGES_TIDB_TESTS) -coverprofile=coverage.txt -check.p true > gotest.log || { $(FAILPOINT_DISABLE); cat 'gotest.log'; exit 1; }
@$(FAILPOINT_DISABLE)

race: failpoint-enable
Expand Down
5 changes: 2 additions & 3 deletions bindinfo/capture_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,8 @@ func TestCapturedBindingCharset(t *testing.T) {
require.Len(t, rows, 1)
require.Equal(t, "update `test` . `t` set `name` = ? where `name` <= ?", rows[0][0])
require.Equal(t, "UPDATE /*+ use_index(@`upd_1` `test`.`t` `idx`)*/ `test`.`t` SET `name`='hello' WHERE `name` <= 'abc'", rows[0][1])
// Charset and Collation are empty now, they are not used currently.
require.Equal(t, "", rows[0][6])
require.Equal(t, "", rows[0][7])
require.Equal(t, "utf8mb4", rows[0][6])
require.Equal(t, "utf8mb4_bin", rows[0][7])
}

func TestConcurrentCapture(t *testing.T) {
Expand Down
9 changes: 6 additions & 3 deletions br/pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,13 @@ func (ss *Schemas) BackupSchemas(
metaWriter.StartWriteMetasAsync(ctx, op)
for _, s := range ss.schemas {
schema := s
// Because schema.dbInfo is a pointer that many tables point to.
// Remove "add Temporary-prefix into dbName" from closure to prevent concurrent operations.
if utils.IsSysDB(schema.dbInfo.Name.L) {
schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O)
}

workerPool.ApplyOnErrorGroup(errg, func() error {
if utils.IsSysDB(schema.dbInfo.Name.L) {
schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O)
}
logger := log.With(
zap.String("db", schema.dbInfo.Name.O),
zap.String("table", schema.tableInfo.Name.O),
Expand Down
42 changes: 42 additions & 0 deletions br/pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package backup_test

import (
"context"
"fmt"
"math"
"strings"
"sync/atomic"

"github.com/golang/protobuf/proto"
Expand All @@ -16,6 +18,7 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
Expand Down Expand Up @@ -260,3 +263,42 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchemaWithBrokenStats(c *
c.Assert(schemas2[0].Info, DeepEquals, schemas[0].Info)
c.Assert(schemas2[0].DB, DeepEquals, schemas[0].DB)
}

func (s *testBackupSchemaSuite) TestBackupSchemasForSystemTable(c *C) {
tk := testkit.NewTestKit(c, s.mock.Storage)
es2 := s.GetRandomStorage(c)

systemTablesCount := 32
tablePrefix := "systable"
tk.MustExec("use mysql")
for i := 1; i <= systemTablesCount; i++ {
query := fmt.Sprintf("create table %s%d (a char(1));", tablePrefix, i)
tk.MustExec(query)
}

f, err := filter.Parse([]string{"mysql.systable*"})
c.Assert(err, IsNil)
_, backupSchemas, err := backup.BuildBackupRangeAndSchema(s.mock.Storage, f, math.MaxUint64)
c.Assert(err, IsNil)
c.Assert(backupSchemas.Len(), Equals, systemTablesCount)

ctx := context.Background()
cipher := backuppb.CipherInfo{
CipherType: encryptionpb.EncryptionMethod_PLAINTEXT,
}
updateCh := new(simpleProgress)

metaWriter2 := metautil.NewMetaWriter(es2, metautil.MetaFileSize, false, &cipher)
err = backupSchemas.BackupSchemas(ctx, metaWriter2, s.mock.Storage, nil,
math.MaxUint64, 1, variable.DefChecksumTableConcurrency, true, updateCh)
c.Assert(err, IsNil)
err = metaWriter2.FlushBackupMeta(ctx)
c.Assert(err, IsNil)

schemas2 := s.GetSchemasFromMeta(c, es2)
c.Assert(schemas2, HasLen, systemTablesCount)
for _, schema := range schemas2 {
c.Assert(schema.DB.Name, Equals, utils.TemporaryDBName("mysql"))
c.Assert(strings.HasPrefix(schema.Info.Name.O, tablePrefix), Equals, true)
}
}
5 changes: 1 addition & 4 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,13 +417,10 @@ func (s *mysqlSuite) TestWriteRowsErrorDowngrading(c *C) {
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "9.csv", int64(0), nonRetryableError.Error(), "(3)").
WillReturnResult(driver.ResultNoRows)
// the forth row will exceed the error threshold, won't record this error
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(4)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "10.csv", int64(0), nonRetryableError.Error(), "(4)").
WillReturnResult(driver.ResultNoRows)

ctx := context.Background()
logger := log.L()
Expand Down
52 changes: 52 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,48 @@ func (d *Duration) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(`"%s"`, d.Duration)), nil
}

// Charset defines character set
type Charset int

const (
Binary Charset = iota
UTF8MB4
GB18030
GBK
)

// String return the string value of charset
func (c Charset) String() string {
switch c {
case Binary:
return "binary"
case UTF8MB4:
return "utf8mb4"
case GB18030:
return "gb18030"
case GBK:
return "gbk"
default:
return "unknown_charset"
}
}

// ParseCharset parser character set for string
func ParseCharset(dataCharacterSet string) (Charset, error) {
switch strings.ToLower(dataCharacterSet) {
case "", "binary":
return Binary, nil
case "utf8mb4":
return UTF8MB4, nil
case "gb18030":
return GB18030, nil
case "gbk":
return GBK, nil
default:
return Binary, errors.Errorf("found unsupported data-character-set: %s", dataCharacterSet)
}
}

func NewConfig() *Config {
return &Config{
App: Lightning{
Expand Down Expand Up @@ -786,6 +828,16 @@ func (cfg *Config) Adjust(ctx context.Context) error {
if len(cfg.Mydumper.DataCharacterSet) == 0 {
cfg.Mydumper.DataCharacterSet = defaultCSVDataCharacterSet
}
charset, err1 := ParseCharset(cfg.Mydumper.DataCharacterSet)
if err1 != nil {
return err1
}
if charset == GBK || charset == GB18030 {
log.L().Warn(
"incompatible strings may be encountered during the transcoding process and will be replaced, please be aware of the risk of not being able to retain the original information",
zap.String("source-character-set", charset.String()),
zap.ByteString("invalid-char-replacement", []byte(cfg.Mydumper.DataInvalidCharReplace)))
}

if cfg.TikvImporter.Backend == "" {
return errors.New("tikv-importer.backend must not be empty!")
Expand Down
20 changes: 20 additions & 0 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,3 +854,23 @@ func (s *configTestSuite) TestCheckpointKeepStrategy(c *C) {
c.Assert(res, DeepEquals, []byte(value))
}
}

func (s configTestSuite) TestLoadCharsetFromConfig(c *C) {
cases := map[string]config.Charset{
"binary": config.Binary,
"BINARY": config.Binary,
"GBK": config.GBK,
"gbk": config.GBK,
"Gbk": config.GBK,
"gB18030": config.GB18030,
"GB18030": config.GB18030,
}
for k, v := range cases {
charset, err := config.ParseCharset(k)
c.Assert(err, IsNil)
c.Assert(charset, Equals, v)
}

_, err := config.ParseCharset("Unknown")
c.Assert(err, ErrorMatches, "found unsupported data-character-set: Unknown")
}
46 changes: 34 additions & 12 deletions br/pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package errormanager

import (
"context"
"database/sql"
"fmt"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/common"
Expand Down Expand Up @@ -94,6 +109,7 @@ type ErrorManager struct {
taskID int64
schemaEscaped string
remainingError config.MaxError
dupResolution config.DuplicateResolutionAlgorithm
}

// New creates a new error manager.
Expand All @@ -111,7 +127,7 @@ func New(db *sql.DB, cfg *config.Config) *ErrorManager {

// Init creates the schemas and tables to store the task information.
func (em *ErrorManager) Init(ctx context.Context) error {
if em.db == nil {
if em.db == nil || (em.remainingError.Type.Load() == 0 && em.dupResolution == config.DupeResAlgNone) {
return nil
}

Expand All @@ -120,15 +136,21 @@ func (em *ErrorManager) Init(ctx context.Context) error {
Logger: log.L(),
}

sqls := [][2]string{
{"create task info schema", createSchema},
{"create syntax error table", createSyntaxErrorTable},
{"create type error table", createTypeErrorTable},
{"create conflict error table", createConflictErrorTable},
sqls := make([][2]string, 0)
sqls = append(sqls, [2]string{"create task info schema", createSchema})
if em.remainingError.Syntax.Load() > 0 {
sqls = append(sqls, [2]string{"create syntax error table", createSyntaxErrorTable})
}
if em.remainingError.Type.Load() > 0 {
sqls = append(sqls, [2]string{"create type error table", createTypeErrorTable})
}
if em.dupResolution != config.DupeResAlgNone && em.remainingError.Conflict.Load() > 0 {
sqls = append(sqls, [2]string{"create conflict error table", createConflictErrorTable})
}

for _, sql := range sqls {
err := exec.Exec(ctx, sql[0], fmt.Sprintf(sql[1], em.schemaEscaped))
// trim spaces for unit test pattern matching
err := exec.Exec(ctx, sql[0], strings.TrimSpace(fmt.Sprintf(sql[1], em.schemaEscaped)))
if err != nil {
return err
}
Expand All @@ -148,6 +170,11 @@ func (em *ErrorManager) RecordTypeError(
rowText string,
encodeErr error,
) error {
// elide the encode error if needed.
if em.remainingError.Type.Dec() < 0 {
return encodeErr
}

if em.db != nil {
errMsg := encodeErr.Error()
logger = logger.With(
Expand All @@ -173,11 +200,6 @@ func (em *ErrorManager) RecordTypeError(
return multierr.Append(encodeErr, err)
}
}

// elide the encode error if needed.
if em.remainingError.Type.Dec() < 0 {
return encodeErr
}
return nil
}

Expand Down
Loading

0 comments on commit 1fe8101

Please sign in to comment.