Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: no domain to run log command (#52127) #53846

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/conn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ go_test(
],
embed = [":conn"],
flaky = True,
shard_count = 7,
shard_count = 8,
deps = [
"//br/pkg/conn/util",
"//br/pkg/pdutil",
Expand Down
28 changes: 28 additions & 0 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
Expand Down Expand Up @@ -329,6 +330,33 @@
return regionSplitSize, regionSplitKeys
}

// IsLogBackupEnabled is used for br to check whether tikv has enabled log backup.
func (mgr *Mgr) IsLogBackupEnabled(ctx context.Context, client *http.Client) (bool, error) {
logbackupEnable := true
type logbackup struct {
Enable bool `json:"enable"`
}
type config struct {
LogBackup logbackup `json:"log-backup"`
}
err := mgr.GetConfigFromTiKV(ctx, client, func(resp *http.Response) error {
respBytes, err := io.ReadAll(resp.Body)
if err != nil {
return err
}

var c config
err = json.Unmarshal(respBytes, &c)
if err != nil {
log.Warn("Failed to parse log-backup enable from config", logutil.ShortError(err))

Check warning on line 351 in br/pkg/conn/conn.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/conn/conn.go#L345-L351

Added lines #L345 - L351 were not covered by tests
return err
}
logbackupEnable = logbackupEnable && c.LogBackup.Enable
return nil
})
return logbackupEnable, errors.Trace(err)
}

// GetConfigFromTiKV get configs from all alive tikv stores.
func (mgr *Mgr) GetConfigFromTiKV(ctx context.Context, cli *http.Client, fn func(*http.Response) error) error {
allStores, err := GetAllTiKVStoresWithRetry(ctx, mgr.GetPDClient(), util.SkipTiFlash)
Expand Down
163 changes: 163 additions & 0 deletions br/pkg/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,169 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) {
}
}

func TestIsLogBackupEnabled(t *testing.T) {
cases := []struct {
stores []*metapb.Store
content []string
enable bool
err bool
}{
{
stores: []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tiflash",
},
},
},
},
content: []string{""},
enable: true,
err: false,
},
{
stores: []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tiflash",
},
},
},
{
Id: 2,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
},
content: []string{
"",
// Assuming the TiKV has failed due to some reason.
"",
},
enable: false,
err: true,
},
{
stores: []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
},
content: []string{
"{\"log-level\": \"debug\", \"log-backup\": {\"enable\": true}}",
},
enable: true,
err: false,
},
{
stores: []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
},
content: []string{
"{\"log-level\": \"debug\", \"log-backup\": {\"enable\": false}}",
},
enable: false,
err: false,
},
{
stores: []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
{
Id: 2,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
},
content: []string{
"{\"log-level\": \"debug\", \"log-backup\": {\"enable\": true}}",
"{\"log-level\": \"debug\", \"log-backup\": {\"enable\": false}}",
},
enable: false,
err: false,
},
}

pctx := context.Background()
for _, ca := range cases {
ctx, cancel := context.WithCancel(pctx)
pdCli := utils.FakePDClient{Stores: ca.stores}
require.Equal(t, len(ca.content), len(ca.stores))
count := 0
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch strings.TrimSpace(r.URL.Path) {
case "/config":
if len(ca.content[count]) == 0 {
cancel()
}
_, _ = fmt.Fprint(w, ca.content[count])
default:
http.NotFoundHandler().ServeHTTP(w, r)
}
count++
}))

for _, s := range ca.stores {
s.Address = mockServer.URL
s.StatusAddress = mockServer.URL
}

httpCli := mockServer.Client()
mgr := &Mgr{PdController: &pdutil.PdController{}}
mgr.PdController.SetPDClient(pdCli)
enable, err := mgr.IsLogBackupEnabled(ctx, httpCli)
if ca.err {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, ca.enable, enable)
}
mockServer.Close()
}
}

func TestHandleTiKVAddress(t *testing.T) {
cases := []struct {
store *metapb.Store
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/stream/stream_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,15 @@
}
}

func (ctl *StatusController) Close() error {
if ctl.meta != nil {
if err := ctl.meta.Close(); err != nil {
return errors.Trace(err)
}
}
return nil

Check warning on line 353 in br/pkg/stream/stream_status.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/stream/stream_status.go#L353

Added line #L353 was not covered by tests
}

// fillTask queries and fills the extra information for a raw task.
func (ctl *StatusController) fillTask(ctx context.Context, task Task) (TaskStatus, error) {
var err error
Expand Down
Loading
Loading