Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

cherry-pick master to release 4.0 #250

Merged
merged 11 commits into from
Apr 26, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ backupmeta
coverage.txt
docker/data/
docker/logs/
*.swp
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# BR

[![Build Status](https://internal.pingcap.net/idc-jenkins/job/build_br_master/badge/icon)](https://internal.pingcap.net/idc-jenkins/job/build_br_master/)
[![Build Status](https://internal.pingcap.net/idc-jenkins/job/build_br_multi_branch/job/master/badge/icon)](https://internal.pingcap.net/idc-jenkins/job/build_br_multi_branch/job/master/)
3pointer marked this conversation as resolved.
Show resolved Hide resolved
[![codecov](https://codecov.io/gh/pingcap/br/branch/master/graph/badge.svg)](https://codecov.io/gh/pingcap/br)
[![LICENSE](https://img.shields.io/github/license/pingcap/br.svg)](https://github.com/pingcap/br/blob/master/LICENSE)
[![Language](https://img.shields.io/badge/Language-Go-blue.svg)](https://golang.org/)
Expand Down
2 changes: 1 addition & 1 deletion cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
)

func timestampLogFileName() string {
return filepath.Join(os.TempDir(), "br.log."+time.Now().Format(time.RFC3339))
return filepath.Join(os.TempDir(), time.Now().Format("br.log.2006-01-02T15.04.05Z0700"))
}

// AddFlags adds flags to the given cmd.
Expand Down
12 changes: 5 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,24 @@ require (
github.com/cheggaaa/pb/v3 v3.0.1
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/fatih/color v1.9.0 // indirect
github.com/fsouza/fake-gcs-server v1.15.0
github.com/go-sql-driver/mysql v1.4.1
github.com/gogo/protobuf v1.3.1
github.com/google/btree v1.0.0
github.com/google/uuid v1.1.1
github.com/klauspost/cpuid v1.2.0 // indirect
github.com/mattn/go-runewidth v0.0.7 // indirect
github.com/montanaflynn/stats v0.5.0 // indirect
github.com/onsi/ginkgo v1.11.0 // indirect
github.com/onsi/gomega v1.8.1 // indirect
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011
github.com/pingcap/kvproto v0.0.0-20200330093347-98f910b71904
github.com/pingcap/kvproto v0.0.0-20200424032552-6650270c39c3
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd
github.com/pingcap/parser v0.0.0-20200326020624-68d423641be5
github.com/pingcap/pd/v4 v4.0.0-beta.1.0.20200305072537-61d9f9cc35d3
github.com/pingcap/tidb v0.0.0-20200401141416-959eca8f3a39
github.com/pingcap/parser v0.0.0-20200425031156-fb338edcaac2
github.com/pingcap/pd/v4 v4.0.0-rc.1.0.20200422143320-428acd53eba2
github.com/pingcap/tidb v1.1.0-beta.0.20200424160056-7267747ae0ec
github.com/pingcap/tidb-tools v4.0.0-rc.1.0.20200421113014-507d2bb3a15e+incompatible
github.com/pingcap/tipb v0.0.0-20200212061130-c4d518eb1d60
github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/common v0.4.1
github.com/sirupsen/logrus v1.4.2
Expand Down
74 changes: 47 additions & 27 deletions go.sum

Large diffs are not rendered by default.

103 changes: 82 additions & 21 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ type ClientMgr interface {
Close()
}

// Checksum is the checksum of some backup files calculated by CollectChecksums.
type Checksum struct {
Crc64Xor uint64
TotalKvs uint64
TotalBytes uint64
}

// Maximum total sleep time(in ms) for kv/cop commands.
const (
backupFineGrainedMaxBackoff = 80000
Expand Down Expand Up @@ -748,8 +755,59 @@ func SendBackup(
return nil
}

// FastChecksum check data integrity by xor all(sst_checksum) per table
func (bc *Client) FastChecksum() (bool, error) {
// ChecksumMatches tests whether the "local" checksum matches the checksum from TiKV.
func (bc *Client) ChecksumMatches(local []Checksum) (bool, error) {
if len(local) != len(bc.backupMeta.Schemas) {
return false, nil
}

for i, schema := range bc.backupMeta.Schemas {
localChecksum := local[i]
dbInfo := &model.DBInfo{}
err := json.Unmarshal(schema.Db, dbInfo)
if err != nil {
log.Error("failed in fast checksum, and cannot parse db info.")
return false, err
}
tblInfo := &model.TableInfo{}
err = json.Unmarshal(schema.Table, tblInfo)
if err != nil {
log.Error("failed in fast checksum, and cannot parse table info.")
return false, err
}
if localChecksum.Crc64Xor != schema.Crc64Xor ||
localChecksum.TotalBytes != schema.TotalBytes ||
localChecksum.TotalKvs != schema.TotalKvs {
log.Error("failed in fast checksum",
zap.Stringer("db", dbInfo.Name),
zap.Stringer("table", tblInfo.Name),
zap.Uint64("origin tidb crc64", schema.Crc64Xor),
zap.Uint64("calculated crc64", localChecksum.Crc64Xor),
zap.Uint64("origin tidb total kvs", schema.TotalKvs),
zap.Uint64("calculated total kvs", localChecksum.TotalKvs),
zap.Uint64("origin tidb total bytes", schema.TotalBytes),
zap.Uint64("calculated total bytes", localChecksum.TotalBytes),
)
return false, nil
}
log.Info("fast checksum success",
zap.String("database", dbInfo.Name.L),
zap.String("table", tblInfo.Name.L))
}
return true, nil
}

// CollectFileInfo collects ungrouped file summary information, like kv count and size.
func (bc *Client) CollectFileInfo() {
for _, file := range bc.backupMeta.Files {
summary.CollectSuccessUnit(summary.TotalKV, 1, file.TotalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, file.TotalBytes)
}
}

// CollectChecksums check data integrity by xor all(sst_checksum) per table
// it returns the checksum of all local files.
func (bc *Client) CollectChecksums() ([]Checksum, error) {
start := time.Now()
defer func() {
elapsed := time.Since(start)
Expand All @@ -758,19 +816,20 @@ func (bc *Client) FastChecksum() (bool, error) {

dbs, err := utils.LoadBackupTables(&bc.backupMeta)
if err != nil {
return false, err
return nil, err
}

checksums := make([]Checksum, 0, len(bc.backupMeta.Schemas))
for _, schema := range bc.backupMeta.Schemas {
dbInfo := &model.DBInfo{}
err = json.Unmarshal(schema.Db, dbInfo)
if err != nil {
return false, err
return nil, err
}
tblInfo := &model.TableInfo{}
err = json.Unmarshal(schema.Table, tblInfo)
if err != nil {
return false, err
return nil, err
}
tbl := dbs[dbInfo.Name.String()].GetTable(tblInfo.Name.String())

Expand All @@ -785,25 +844,16 @@ func (bc *Client) FastChecksum() (bool, error) {

summary.CollectSuccessUnit(summary.TotalKV, 1, totalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, totalBytes)

if schema.Crc64Xor == checksum && schema.TotalKvs == totalKvs && schema.TotalBytes == totalBytes {
log.Info("fast checksum success", zap.Stringer("db", dbInfo.Name), zap.Stringer("table", tblInfo.Name))
} else {
log.Error("failed in fast checksum",
zap.String("database", dbInfo.Name.String()),
zap.String("table", tblInfo.Name.String()),
zap.Uint64("origin tidb crc64", schema.Crc64Xor),
zap.Uint64("calculated crc64", checksum),
zap.Uint64("origin tidb total kvs", schema.TotalKvs),
zap.Uint64("calculated total kvs", totalKvs),
zap.Uint64("origin tidb total bytes", schema.TotalBytes),
zap.Uint64("calculated total bytes", totalBytes),
)
return false, nil
log.Info("fast checksum calculated", zap.Stringer("db", dbInfo.Name), zap.Stringer("table", tblInfo.Name))
localChecksum := Checksum{
Crc64Xor: checksum,
TotalKvs: totalKvs,
TotalBytes: totalBytes,
}
checksums = append(checksums, localChecksum)
}

return true, nil
return checksums, nil
}

// CompleteMeta wait response of admin checksum from TiDB to complete backup meta
Expand All @@ -815,3 +865,14 @@ func (bc *Client) CompleteMeta(backupSchemas *Schemas) error {
bc.backupMeta.Schemas = schemas
return nil
}

// CopyMetaFrom copies schema metadata directly from pending backupSchemas, without calculating checksum.
// use this when user skip the checksum generating.
func (bc *Client) CopyMetaFrom(backupSchemas *Schemas) {
schemas := make([]*kvproto.Schema, 0, len(backupSchemas.schemas))
for _, v := range backupSchemas.schemas {
schema := v
schemas = append(schemas, &schema)
}
bc.backupMeta.Schemas = schemas
}
12 changes: 0 additions & 12 deletions pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type Schemas struct {
backupSchemaCh chan backup.Schema
errCh chan error
wg *sync.WaitGroup
skipChecksum bool
}

func newBackupSchemas() *Schemas {
Expand All @@ -57,11 +56,6 @@ func (pending *Schemas) pushPending(
pending.schemas[name] = schema
}

// SetSkipChecksum sets whether it should skip checksum
func (pending *Schemas) SetSkipChecksum(skip bool) {
pending.skipChecksum = skip
}

// Start backups schemas
func (pending *Schemas) Start(
ctx context.Context,
Expand All @@ -81,12 +75,6 @@ func (pending *Schemas) Start(
workerPool.Apply(func() {
defer pending.wg.Done()

if pending.skipChecksum {
pending.backupSchemaCh <- schema
updateCh.Inc()
return
}

start := time.Now()
table := model.TableInfo{}
err := json.Unmarshal(schema.Table, &table)
Expand Down
8 changes: 8 additions & 0 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,14 @@ func (rc *Client) ValidateChecksum(
workers.Apply(func() {
defer wg.Done()

if table.NoChecksum() {
log.Info("table doesn't have checksum, skipping checksum",
zap.Stringer("db", table.Db.Name),
zap.Stringer("table", table.Info.Name))
updateCh.Inc()
return
}

startTS, err := rc.GetTS(ctx)
if err != nil {
errCh <- errors.Trace(err)
Expand Down
22 changes: 21 additions & 1 deletion pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,27 @@ func NewDB(g glue.Glue, store kv.Storage) (*DB, error) {
// ExecDDL executes the query of a ddl job.
func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error {
var err error
if ddlJob.BinlogInfo.TableInfo != nil {
tableInfo := ddlJob.BinlogInfo.TableInfo
dbInfo := ddlJob.BinlogInfo.DBInfo
switch ddlJob.Type {
case model.ActionCreateSchema:
err = db.se.CreateDatabase(ctx, dbInfo)
if err != nil {
log.Error("create database failed", zap.Stringer("db", dbInfo.Name), zap.Error(err))
}
return errors.Trace(err)
case model.ActionCreateTable:
err = db.se.CreateTable(ctx, model.NewCIStr(ddlJob.SchemaName), tableInfo)
if err != nil {
log.Error("create table failed",
zap.Stringer("db", dbInfo.Name),
zap.Stringer("table", tableInfo.Name),
zap.Error(err))
}
return errors.Trace(err)
}

if tableInfo != nil {
switchDbSQL := fmt.Sprintf("use %s;", utils.EncloseName(ddlJob.SchemaName))
err = db.se.Execute(ctx, switchDbSQL)
if err != nil {
Expand Down
75 changes: 66 additions & 9 deletions pkg/storage/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package storage

import (
"net/url"
"reflect"
"strconv"
"strings"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -46,19 +48,23 @@ func ParseBackend(rawURL string, options *BackendOptions) (*backup.StorageBacken
}
prefix := strings.Trim(u.Path, "/")
s3 := &backup.S3{Bucket: u.Host, Prefix: prefix}
if options != nil {
if err := options.S3.apply(s3); err != nil {
return nil, err
}
if options == nil {
options = &BackendOptions{}
}
ExtractQueryParameters(u, &options.S3)
if err := options.S3.apply(s3); err != nil {
return nil, err
}
return &backup.StorageBackend{Backend: &backup.StorageBackend_S3{S3: s3}}, nil

case "gcs":
case "gs", "gcs":
gcs := &backup.GCS{Bucket: u.Host, Prefix: u.Path[1:]}
if options != nil {
if err := options.GCS.apply(gcs); err != nil {
return nil, err
}
if options == nil {
options = &BackendOptions{}
}
ExtractQueryParameters(u, &options.GCS)
if err := options.GCS.apply(gcs); err != nil {
return nil, err
}
return &backup.StorageBackend{Backend: &backup.StorageBackend_Gcs{Gcs: gcs}}, nil

Expand All @@ -67,6 +73,57 @@ func ParseBackend(rawURL string, options *BackendOptions) (*backup.StorageBacken
}
}

// ExtractQueryParameters moves the query parameters of the URL into the options
// using reflection.
//
// The options must be a pointer to a struct which contains only string or bool
// fields (more types will be supported in the future), and tagged for JSON
// serialization.
//
// All of the URL's query parameters will be removed after calling this method.
func ExtractQueryParameters(u *url.URL, options interface{}) {
type field struct {
index int
kind reflect.Kind
}

// First, find all JSON fields in the options struct type.
o := reflect.Indirect(reflect.ValueOf(options))
ty := o.Type()
numFields := ty.NumField()
tagToField := make(map[string]field, numFields)
for i := 0; i < numFields; i++ {
f := ty.Field(i)
tag := f.Tag.Get("json")
tagToField[tag] = field{index: i, kind: f.Type.Kind()}
}

// Then, read content from the URL into the options.
for key, params := range u.Query() {
if len(params) == 0 {
continue
}
param := params[0]
normalizedKey := strings.ToLower(strings.ReplaceAll(key, "_", "-"))
if f, ok := tagToField[normalizedKey]; ok {
field := o.Field(f.index)
switch f.kind {
case reflect.Bool:
if v, e := strconv.ParseBool(param); e == nil {
field.SetBool(v)
}
case reflect.String:
field.SetString(param)
default:
panic("BackendOption introduced an unsupported kind, please handle it! " + f.kind.String())
}
}
}

// Clean up the URL finally.
u.RawQuery = ""
}

// FormatBackendURL obtains the raw URL which can be used the reconstruct the
// backend. The returned URL does not contain options for further configurating
// the backend. This is to avoid exposing secret tokens.
Expand Down
Loading