Skip to content

Commit

Permalink
Merge branch 'm/fix-aggessivelocking-panic-in-analyze' of https://git…
Browse files Browse the repository at this point in the history
…hub.com/MyonKeminta/tidb into m/fix-aggessivelocking-panic-in-analyze
  • Loading branch information
MyonKeminta committed Feb 8, 2023
2 parents 9112608 + 61e4c1d commit 3e8741a
Show file tree
Hide file tree
Showing 19 changed files with 10,072 additions and 9,886 deletions.
2 changes: 2 additions & 0 deletions br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"@com_github_golang_protobuf//proto",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/logbackuppb",
"@com_github_pingcap_kvproto//pkg/metapb",
Expand Down Expand Up @@ -78,6 +79,7 @@ go_test(
"//tablecodec",
"//util/codec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/logbackuppb",
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,16 @@ func (c *CheckpointAdvancer) StartTaskListener(ctx context.Context) {
return
case e, ok := <-ch:
if !ok {
log.Info("[log backup advancer] Task watcher exits due to stream ends.")
return
}
log.Info("meet task event", zap.Stringer("event", &e))
log.Info("[log backup advancer] Meet task event", zap.Stringer("event", &e))
if err := c.onTaskEvent(ctx, e); err != nil {
if errors.Cause(e.Err) != context.Canceled {
log.Error("listen task meet error, would reopen.", logutil.ShortError(err))
time.AfterFunc(c.cfg.BackoffTime, func() { c.StartTaskListener(ctx) })
}
log.Info("[log backup advancer] Task watcher exits due to some error.", logutil.ShortError(err))
return
}
}
Expand Down
46 changes: 32 additions & 14 deletions br/pkg/streamhelper/advancer_cliext.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import (
"context"
"encoding/binary"
"fmt"
"io"
"strings"

"github.com/golang/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/kv"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -94,6 +97,9 @@ func (t AdvancerExt) toTaskEvent(ctx context.Context, event *clientv3.Event) (Ta

func (t AdvancerExt) eventFromWatch(ctx context.Context, resp clientv3.WatchResponse) ([]TaskEvent, error) {
result := make([]TaskEvent, 0, len(resp.Events))
if err := resp.Err(); err != nil {
return nil, err
}
for _, event := range resp.Events {
te, err := t.toTaskEvent(ctx, event)
if err != nil {
Expand All @@ -110,6 +116,7 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE
handleResponse := func(resp clientv3.WatchResponse) bool {
events, err := t.eventFromWatch(ctx, resp)
if err != nil {
log.Warn("[log backup advancer] Meet error during receiving the task event.", logutil.ShortError(err))
ch <- errorEvent(err)
return false
}
Expand All @@ -118,33 +125,44 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE
}
return true
}
collectRemaining := func() {
log.Info("[log backup advancer] Start collecting remaining events in the channel.", zap.Int("remained", len(c)))
defer log.Info("[log backup advancer] Finish collecting remaining events in the channel.")
for {
select {
case resp, ok := <-c:
if !ok {
return
}
if !handleResponse(resp) {
return
}
default:
return
}
}
}

go func() {
defer close(ch)
for {
select {
case resp, ok := <-c:
failpoint.Inject("advancer_close_channel", func() {
// We cannot really close the channel, just simulating it.
ok = false
})
if !ok {
ch <- errorEvent(io.EOF)
return
}
if !handleResponse(resp) {
return
}
case <-ctx.Done():
// drain the remain event from channel.
for {
select {
case resp, ok := <-c:
if !ok {
return
}
if !handleResponse(resp) {
return
}
default:
return
}
}
collectRemaining()
ch <- errorEvent(ctx.Err())
return
}
}
}()
Expand Down
44 changes: 42 additions & 2 deletions br/pkg/streamhelper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"context"
"encoding/binary"
"fmt"
"io"
"net"
"net/url"
"path"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
Expand Down Expand Up @@ -143,6 +145,7 @@ func TestIntegration(t *testing.T) {
t.Run("TestStreamListening", func(t *testing.T) { testStreamListening(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
t.Run("TestStreamCheckpoint", func(t *testing.T) { testStreamCheckpoint(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
t.Run("testStoptask", func(t *testing.T) { testStoptask(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
t.Run("TestStreamClose", func(t *testing.T) { testStreamClose(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
}

func TestChecking(t *testing.T) {
Expand Down Expand Up @@ -295,6 +298,7 @@ func testStreamListening(t *testing.T, metaCli streamhelper.AdvancerExt) {
taskInfo2 := simpleTask(taskName2, 4)
require.NoError(t, metaCli.PutTask(ctx, taskInfo2))
require.NoError(t, metaCli.DeleteTask(ctx, taskName2))

first := <-ch
require.Equal(t, first.Type, streamhelper.EventAdd)
require.Equal(t, first.Name, taskName)
Expand All @@ -310,8 +314,44 @@ func testStreamListening(t *testing.T, metaCli streamhelper.AdvancerExt) {
require.Equal(t, forth.Type, streamhelper.EventDel)
require.Equal(t, forth.Name, taskName2)
cancel()
_, ok := <-ch
require.False(t, ok)
fifth, ok := <-ch
require.True(t, ok)
require.Equal(t, fifth.Type, streamhelper.EventErr)
require.Error(t, fifth.Err, context.Canceled)
item, ok := <-ch
require.False(t, ok, "%v", item)
}

func testStreamClose(t *testing.T, metaCli streamhelper.AdvancerExt) {
ctx := context.Background()
taskName := "close_simple"
taskInfo := simpleTask(taskName, 4)

require.NoError(t, metaCli.PutTask(ctx, taskInfo))
ch := make(chan streamhelper.TaskEvent, 1024)
require.NoError(t, metaCli.Begin(ctx, ch))
require.NoError(t, metaCli.DeleteTask(ctx, taskName))
first := <-ch
require.Equal(t, first.Type, streamhelper.EventAdd)
require.Equal(t, first.Name, taskName)
require.ElementsMatch(t, first.Ranges, simpleRanges(4))
second := <-ch
require.Equal(t, second.Type, streamhelper.EventDel, "%s", second)
require.Equal(t, second.Name, taskName, "%s", second)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/advancer_close_channel", "return"))
defer failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/advancer_close_channel")
// We need to make the channel file some events hence we can simulate the closed channel.
taskName2 := "close_simple2"
taskInfo2 := simpleTask(taskName2, 4)
require.NoError(t, metaCli.PutTask(ctx, taskInfo2))
require.NoError(t, metaCli.DeleteTask(ctx, taskName2))

third := <-ch
require.Equal(t, third.Type, streamhelper.EventErr)
require.Error(t, third.Err, io.EOF)
item, ok := <-ch
require.False(t, ok, "%#v", item)
}

func testStreamCheckpoint(t *testing.T, metaCli streamhelper.AdvancerExt) {
Expand Down
2 changes: 2 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3026,6 +3026,8 @@ func SetDirectPlacementOpt(placementSettings *model.PlacementSettings, placement
placementSettings.FollowerConstraints = stringVal
case ast.PlacementOptionVoterConstraints:
placementSettings.VoterConstraints = stringVal
case ast.PlacementOptionSurvivalPreferences:
placementSettings.SurvivalPreferences = stringVal
default:
return errors.Trace(errors.New("unknown placement policy option"))
}
Expand Down
46 changes: 45 additions & 1 deletion ddl/placement/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"golang.org/x/exp/slices"
"gopkg.in/yaml.v2"
)

// Refer to https://github.com/tikv/pd/issues/2701 .
Expand Down Expand Up @@ -123,7 +124,13 @@ func NewBundleFromConstraintsOptions(options *model.PlacementSettings) (*Bundle,
rules = append(rules, rule)
}
}

labels, err := newLocationLabelsFromSurvivalPreferences(options.SurvivalPreferences)
if err != nil {
return nil, err
}
for _, rule := range rules {
rule.LocationLabels = labels
}
return &Bundle{Rules: rules}, nil
}

Expand Down Expand Up @@ -155,9 +162,17 @@ func NewBundleFromSugarOptions(options *model.PlacementSettings) (*Bundle, error

var rules []*Rule

locationLabels, err := newLocationLabelsFromSurvivalPreferences(options.SurvivalPreferences)
if err != nil {
return nil, err
}

// in case empty primaryRegion and regions, just return an empty bundle
if primaryRegion == "" && len(regions) == 0 {
rules = append(rules, NewRule(Voter, followers+1, NewConstraintsDirect()))
for _, rule := range rules {
rule.LocationLabels = locationLabels
}
return &Bundle{Rules: rules}, nil
}

Expand Down Expand Up @@ -195,6 +210,11 @@ func NewBundleFromSugarOptions(options *model.PlacementSettings) (*Bundle, error
}
}

// set location labels
for _, rule := range rules {
rule.LocationLabels = locationLabels
}

return &Bundle{Rules: rules}, nil
}

Expand Down Expand Up @@ -223,6 +243,19 @@ func newBundleFromOptions(options *model.PlacementSettings) (bundle *Bundle, err
return bundle, err
}

// newLocationLabelsFromSurvivalPreferences will parse the survival preferences into location labels.
func newLocationLabelsFromSurvivalPreferences(survivalPreferenceStr string) ([]string, error) {
if len(survivalPreferenceStr) > 0 {
labels := []string{}
err := yaml.UnmarshalStrict([]byte(survivalPreferenceStr), &labels)
if err != nil {
return nil, ErrInvalidSurvivalPreferenceFormat
}
return labels, nil
}
return nil, nil
}

// NewBundleFromOptions will transform options into the bundle.
func NewBundleFromOptions(options *model.PlacementSettings) (bundle *Bundle, err error) {
bundle, err = newBundleFromOptions(options)
Expand Down Expand Up @@ -257,6 +290,15 @@ func (b *Bundle) String() string {
func (b *Bundle) Tidy() error {
extraCnt := map[PeerRoleType]int{}
newRules := b.Rules[:0]

// One Bundle is from one PlacementSettings, rule share same location labels, so we can use the first rule's location labels.
var locationLabels []string
for _, rule := range b.Rules {
if len(rule.LocationLabels) > 0 {
locationLabels = rule.LocationLabels
break
}
}
for i, rule := range b.Rules {
// useless Rule
if rule.Count <= 0 {
Expand Down Expand Up @@ -300,6 +342,8 @@ func (b *Bundle) Tidy() error {
Key: EngineLabelKey,
Values: []string{EngineLabelTiFlash},
}},
// the merged rule should have the same location labels with the original rules.
LocationLabels: locationLabels,
})
}
b.Rules = newRules
Expand Down
4 changes: 4 additions & 0 deletions ddl/placement/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,9 @@ func TestTidy(t *testing.T) {
bundle.Rules = append(bundle.Rules, rules3...)
bundle.Rules = append(bundle.Rules, rules4...)

for _, r := range bundle.Rules {
r.LocationLabels = []string{"zone", "host"}
}
chkfunc := func() {
require.NoError(t, err)
require.Len(t, bundle.Rules, 3)
Expand All @@ -901,6 +904,7 @@ func TestTidy(t *testing.T) {
Values: []string{EngineLabelTiFlash},
},
}, bundle.Rules[2].Constraints)
require.Equal(t, []string{"zone", "host"}, bundle.Rules[2].LocationLabels)
}
err = bundle.Tidy()
chkfunc()
Expand Down
2 changes: 2 additions & 0 deletions ddl/placement/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ var (
ErrInvalidConstraintsMapcnt = errors.New("label constraints in map syntax have invalid replicas")
// ErrInvalidConstraintsFormat is from rule.go.
ErrInvalidConstraintsFormat = errors.New("invalid label constraints format")
// ErrInvalidSurvivalPreferenceFormat is from rule.go.
ErrInvalidSurvivalPreferenceFormat = errors.New("survival preference format should be in format [xxx=yyy, ...]")
// ErrInvalidConstraintsRelicas is from rule.go.
ErrInvalidConstraintsRelicas = errors.New("label constraints with invalid REPLICAS")
// ErrInvalidBundleID is from bundle.go.
Expand Down
19 changes: 10 additions & 9 deletions ddl/placement/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,16 @@ type RuleGroupConfig struct {

// Rule is the core placement rule struct. Check https://github.com/tikv/pd/blob/master/server/schedule/placement/rule.go.
type Rule struct {
GroupID string `json:"group_id"`
ID string `json:"id"`
Index int `json:"index,omitempty"`
Override bool `json:"override,omitempty"`
StartKeyHex string `json:"start_key"`
EndKeyHex string `json:"end_key"`
Role PeerRoleType `json:"role"`
Count int `json:"count"`
Constraints Constraints `json:"label_constraints,omitempty"`
GroupID string `json:"group_id"`
ID string `json:"id"`
Index int `json:"index,omitempty"`
Override bool `json:"override,omitempty"`
StartKeyHex string `json:"start_key"`
EndKeyHex string `json:"end_key"`
Role PeerRoleType `json:"role"`
Count int `json:"count"`
Constraints Constraints `json:"label_constraints,omitempty"`
LocationLabels []string `json:"location_labels,omitempty"`
}

// TiFlashRule extends Rule with other necessary fields.
Expand Down
Loading

0 comments on commit 3e8741a

Please sign in to comment.