Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#52127
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
Leavrth authored and ti-chi-bot committed Jun 6, 2024
1 parent 409465c commit bdcafcd
Show file tree
Hide file tree
Showing 9 changed files with 340 additions and 105 deletions.
74 changes: 74 additions & 0 deletions br/pkg/config/kv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0.
package config

import (
"encoding/json"

"github.com/docker/go-units"
)

type ConfigTerm[T uint | uint64] struct {
Value T
Modified bool
}

type KVConfig struct {
ImportGoroutines ConfigTerm[uint]
MergeRegionSize ConfigTerm[uint64]
MergeRegionKeyCount ConfigTerm[uint64]
}

func ParseImportThreadsFromConfig(resp []byte) (uint, error) {
type importer struct {
Threads uint `json:"num-threads"`
}

type config struct {
Import importer `json:"import"`
}
var c config
e := json.Unmarshal(resp, &c)
if e != nil {
return 0, e
}

return c.Import.Threads, nil
}

func ParseMergeRegionSizeFromConfig(resp []byte) (uint64, uint64, error) {
type coprocessor struct {
RegionSplitSize string `json:"region-split-size"`
RegionSplitKeys uint64 `json:"region-split-keys"`
}

type config struct {
Cop coprocessor `json:"coprocessor"`
}
var c config
e := json.Unmarshal(resp, &c)
if e != nil {
return 0, 0, e
}
rs, e := units.RAMInBytes(c.Cop.RegionSplitSize)
if e != nil {
return 0, 0, e
}
urs := uint64(rs)
return urs, c.Cop.RegionSplitKeys, nil
}

func ParseLogBackupEnableFromConfig(resp []byte) (bool, error) {
type logbackup struct {
Enable bool `json:"enable"`
}

type config struct {
LogBackup logbackup `json:"log-backup"`
}
var c config
e := json.Unmarshal(resp, &c)
if e != nil {
return false, e
}
return c.LogBackup.Enable, nil
}
2 changes: 1 addition & 1 deletion br/pkg/conn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ go_test(
],
embed = [":conn"],
flaky = True,
shard_count = 7,
shard_count = 8,
deps = [
"//br/pkg/conn/util",
"//br/pkg/pdutil",
Expand Down
19 changes: 19 additions & 0 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,25 @@ func (mgr *Mgr) GetMergeRegionSizeAndCount(ctx context.Context, client *http.Cli
return regionSplitSize, regionSplitKeys
}

// IsLogBackupEnabled is used for br to check whether tikv has enabled log backup.
func (mgr *Mgr) IsLogBackupEnabled(ctx context.Context, client *http.Client) (bool, error) {
logbackupEnable := true
err := mgr.GetConfigFromTiKV(ctx, client, func(resp *http.Response) error {
respBytes, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
enable, err := kvconfig.ParseLogBackupEnableFromConfig(respBytes)
if err != nil {
log.Warn("Failed to parse log-backup enable from config", logutil.ShortError(err))
return err
}
logbackupEnable = logbackupEnable && enable
return nil
})
return logbackupEnable, errors.Trace(err)
}

// GetConfigFromTiKV get configs from all alive tikv stores.
func (mgr *Mgr) GetConfigFromTiKV(ctx context.Context, cli *http.Client, fn func(*http.Response) error) error {
allStores, err := GetAllTiKVStoresWithRetry(ctx, mgr.GetPDClient(), util.SkipTiFlash)
Expand Down
163 changes: 163 additions & 0 deletions br/pkg/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,169 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) {
}
}

func TestIsLogBackupEnabled(t *testing.T) {
cases := []struct {
stores []*metapb.Store
content []string
enable bool
err bool
}{
{
stores: []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tiflash",
},
},
},
},
content: []string{""},
enable: true,
err: false,
},
{
stores: []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tiflash",
},
},
},
{
Id: 2,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
},
content: []string{
"",
// Assuming the TiKV has failed due to some reason.
"",
},
enable: false,
err: true,
},
{
stores: []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
},
content: []string{
"{\"log-level\": \"debug\", \"log-backup\": {\"enable\": true}}",
},
enable: true,
err: false,
},
{
stores: []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
},
content: []string{
"{\"log-level\": \"debug\", \"log-backup\": {\"enable\": false}}",
},
enable: false,
err: false,
},
{
stores: []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
{
Id: 2,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
},
content: []string{
"{\"log-level\": \"debug\", \"log-backup\": {\"enable\": true}}",
"{\"log-level\": \"debug\", \"log-backup\": {\"enable\": false}}",
},
enable: false,
err: false,
},
}

pctx := context.Background()
for _, ca := range cases {
ctx, cancel := context.WithCancel(pctx)
pdCli := utils.FakePDClient{Stores: ca.stores}
require.Equal(t, len(ca.content), len(ca.stores))
count := 0
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch strings.TrimSpace(r.URL.Path) {
case "/config":
if len(ca.content[count]) == 0 {
cancel()
}
_, _ = fmt.Fprint(w, ca.content[count])
default:
http.NotFoundHandler().ServeHTTP(w, r)
}
count++
}))

for _, s := range ca.stores {
s.Address = mockServer.URL
s.StatusAddress = mockServer.URL
}

httpCli := mockServer.Client()
mgr := &Mgr{PdController: &pdutil.PdController{}}
mgr.PdController.SetPDClient(pdCli)
enable, err := mgr.IsLogBackupEnabled(ctx, httpCli)
if ca.err {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, ca.enable, enable)
}
mockServer.Close()
}
}

func TestHandleTiKVAddress(t *testing.T) {
cases := []struct {
store *metapb.Store
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/stream/stream_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,15 @@ func NewStatusController(meta *MetaDataClient, mgr PDInfoProvider, view TaskPrin
}
}

func (ctl *StatusController) Close() error {
if ctl.meta != nil {
if err := ctl.meta.Close(); err != nil {
return errors.Trace(err)
}
}
return nil
}

// fillTask queries and fills the extra information for a raw task.
func (ctl *StatusController) fillTask(ctx context.Context, task Task) (TaskStatus, error) {
var err error
Expand Down
Loading

0 comments on commit bdcafcd

Please sign in to comment.