Skip to content

Commit

Permalink
sql: implement SHOW BR JOBS <n:int> and CANCEL BR JOB <n:int> (#4…
Browse files Browse the repository at this point in the history
  • Loading branch information
YuJuncen authored May 5, 2023
1 parent 88428bc commit 6488c35
Show file tree
Hide file tree
Showing 19 changed files with 517 additions and 45 deletions.
2 changes: 2 additions & 0 deletions br/cmd/br/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"cmd.go",
"debug.go",
"main.go",
"operator.go",
"restore.go",
"stream.go",
],
Expand All @@ -26,6 +27,7 @@ go_library(
"//br/pkg/streamhelper/config",
"//br/pkg/summary",
"//br/pkg/task",
"//br/pkg/task/operator",
"//br/pkg/trace",
"//br/pkg/utils",
"//br/pkg/version/build",
Expand Down
1 change: 1 addition & 0 deletions br/cmd/br/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func main() {
NewBackupCommand(),
NewRestoreCommand(),
NewStreamCommand(),
newOpeartorCommand(),
)
// Outputs cmd.Print to stdout.
rootCmd.SetOut(os.Stdout)
Expand Down
49 changes: 49 additions & 0 deletions br/cmd/br/operator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0.

package main

import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/task"
"github.com/pingcap/tidb/br/pkg/task/operator"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version/build"
"github.com/spf13/cobra"
)

func newOpeartorCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "operator <subcommand>",
Short: "utilities for operators like tidb-operator.",
PersistentPreRunE: func(c *cobra.Command, args []string) error {
if err := Init(c); err != nil {
return errors.Trace(err)
}
build.LogInfo(build.BR)
utils.LogEnvVariables()
task.LogArguments(c)
return nil
},
Hidden: true,
}
cmd.AddCommand(newPauseGcCommand())
return cmd
}

func newPauseGcCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "pause-gc",
Short: "pause gc to the ts until the program exits.",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
cfg := operator.PauseGcConfig{}
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return err
}
ctx := GetDefaultContext()
return operator.PauseGC(ctx, &cfg)
},
}
operator.DefineFlagsForPauseGcConfig(cmd.Flags())
return cmd
}
2 changes: 1 addition & 1 deletion br/pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func NewPdController(
}
if failure != nil {
return nil, errors.Annotatef(berrors.ErrPDUpdateFailed,
"pd address (%s) not available, please check network", pdAddrs)
"pd address (%s) not available, error is %s, please check network", pdAddrs, failure)
}

version := parseVersion(versionBytes)
Expand Down
7 changes: 4 additions & 3 deletions br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/docker/go-units"
Expand Down Expand Up @@ -603,19 +604,19 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
ctx, cmdName, int64(len(ranges)), !cfg.LogProgress)
}

progressCount := 0
progressCount := uint64(0)
progressCallBack := func(callBackUnit backup.ProgressUnit) {
if unit == callBackUnit {
updateCh.Inc()
progressCount++
atomic.AddUint64(&progressCount, 1)
failpoint.Inject("progress-call-back", func(v failpoint.Value) {
log.Info("failpoint progress-call-back injected")
if fileName, ok := v.(string); ok {
f, osErr := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if osErr != nil {
log.Warn("failed to create file", zap.Error(osErr))
}
msg := []byte(fmt.Sprintf("%s:%d\n", unit, progressCount))
msg := []byte(fmt.Sprintf("%s:%d\n", unit, atomic.LoadUint64(&progressCount)))
_, err = f.Write(msg)
if err != nil {
log.Warn("failed to write data to file", zap.Error(err))
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,15 @@ func (tls *TLSConfig) ToTLSConfig() (*tls.Config, error) {
return tlsConfig, nil
}

// Convert the TLS config to the PD security option.
func (tls *TLSConfig) ToPDSecurityOption() pd.SecurityOption {
securityOption := pd.SecurityOption{}
securityOption.CAPath = tls.CA
securityOption.CertPath = tls.Cert
securityOption.KeyPath = tls.Key
return securityOption
}

// ParseFromFlags parses the TLS config from the flag set.
func (tls *TLSConfig) ParseFromFlags(flags *pflag.FlagSet) (err error) {
tls.CA, tls.Cert, tls.Key, err = ParseTLSTripleFromFlags(flags)
Expand Down
19 changes: 19 additions & 0 deletions br/pkg/task/operator/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "operator",
srcs = [
"cmd.go",
"config.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/task/operator",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/pdutil",
"//br/pkg/task",
"//br/pkg/utils",
"@com_github_pingcap_log//:log",
"@com_github_spf13_pflag//:pflag",
"@org_uber_go_zap//:zap",
],
)
57 changes: 57 additions & 0 deletions br/pkg/task/operator/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package operator

import (
"context"
"crypto/tls"
"strings"

"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/task"
"github.com/pingcap/tidb/br/pkg/utils"
"go.uber.org/zap"
)

func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error) {
pdAddrs := strings.Join(cfg.PD, ",")
var tc *tls.Config
if cfg.TLS.IsEnabled() {
var err error
tc, err = cfg.TLS.ToTLSConfig()
if err != nil {
return nil, err
}
}
mgr, err := pdutil.NewPdController(ctx, pdAddrs, tc, cfg.TLS.ToPDSecurityOption())
if err != nil {
return nil, err
}
return mgr, nil
}

// PauseGC blocks the current goroutine and pause the GC safepoint by the config.
func PauseGC(ctx context.Context, cfg *PauseGcConfig) error {
mgr, err := dialPD(ctx, &cfg.Config)
if err != nil {
return err
}
sp := utils.BRServiceSafePoint{
ID: utils.MakeSafePointID(),
TTL: int64(cfg.TTL.Seconds()),
BackupTS: cfg.SafePoint,
}
if sp.BackupTS == 0 {
rts, err := mgr.GetMinResolvedTS(ctx)
if err != nil {
return err
}
log.Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts))
sp.BackupTS = rts
}
err = utils.StartServiceSafePointKeeper(ctx, mgr.GetPDClient(), sp)
if err != nil {
return err
}
<-ctx.Done()
return nil
}
41 changes: 41 additions & 0 deletions br/pkg/task/operator/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0.

package operator

import (
"time"

"github.com/pingcap/tidb/br/pkg/task"
"github.com/spf13/pflag"
)

type PauseGcConfig struct {
task.Config

SafePoint uint64 `json:"safepoint" yaml:"safepoint"`
TTL time.Duration `json:"ttl" yaml:"ttl"`
}

func DefineFlagsForPauseGcConfig(f *pflag.FlagSet) {
_ = f.DurationP("ttl", "i", 5*time.Minute, "The time-to-live of the safepoint.")
_ = f.Uint64P("safepoint", "t", 0, "The GC safepoint to be kept.")
}

// ParseFromFlags fills the config via the flags.
func (cfg *PauseGcConfig) ParseFromFlags(flags *pflag.FlagSet) error {
if err := cfg.Config.ParseFromFlags(flags); err != nil {
return err
}

var err error
cfg.SafePoint, err = flags.GetUint64("safepoint")
if err != nil {
return err
}
cfg.TTL, err = flags.GetDuration("ttl")
if err != nil {
return err
}

return nil
}
1 change: 1 addition & 0 deletions errno/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,7 @@ const (
ErrLoadDataInvalidOperation = 8171
ErrLoadDataLocalUnsupportedOption = 8172
ErrLoadDataPreCheckFailed = 8173
ErrBRJobNotFound = 8174

// Error codes used by TiDB ddl package
ErrUnsupportedDDLOperation = 8200
Expand Down
1 change: 1 addition & 0 deletions errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrBRIERestoreFailed: mysql.Message("Restore failed: %s", nil),
ErrBRIEImportFailed: mysql.Message("Import failed: %s", nil),
ErrBRIEExportFailed: mysql.Message("Export failed: %s", nil),
ErrBRJobNotFound: mysql.Message("BRIE Job %d not found", nil),

ErrInvalidTableSample: mysql.Message("Invalid TABLESAMPLE: %s", nil),

Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1761,6 +1761,11 @@ error = '''
PreCheck failed: %s
'''

["executor:8174"]
error = '''
BRIE Job %d not found
'''

["executor:8212"]
error = '''
Failed to split region ranges: %s
Expand Down
Loading

0 comments on commit 6488c35

Please sign in to comment.