Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v15.0.5]: Backport upstream 13856 - reshard state check fix #322

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ require (
github.com/hashicorp/go-version v1.6.0
github.com/planetscale/log v0.0.0-20221118170849-fb599bc35c50
github.com/slok/noglog v0.2.0
go.uber.org/goleak v1.2.1
go.uber.org/zap v1.23.0
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63
golang.org/x/sync v0.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -780,8 +780,8 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
Expand Down
93 changes: 93 additions & 0 deletions go/test/utils/noleak.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
Copyright 2023 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"context"
"testing"
"time"

"go.uber.org/goleak"
)

// LeakCheckContext returns a Context that will be automatically cancelled at the end
// of this test. If the test has finished successfully, it will be checked for goroutine
// leaks after context cancellation.
func LeakCheckContext(t testing.TB) context.Context {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(func() {
cancel()
EnsureNoLeaks(t)
})
return ctx
}

// LeakCheckContextTimeout behaves like LeakCheckContext but the returned Context will
// be cancelled after `timeout`, or after the test finishes, whichever happens first.
func LeakCheckContextTimeout(t testing.TB, timeout time.Duration) context.Context {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
t.Cleanup(func() {
cancel()
EnsureNoLeaks(t)
})
return ctx
}

// EnsureNoLeaks checks for goroutine and socket leaks and fails the test if any are found.
func EnsureNoLeaks(t testing.TB) {
if t.Failed() {
return
}
if err := ensureNoLeaks(); err != nil {
t.Fatal(err)
}
}

// GetLeaks checks for goroutine and socket leaks and returns an error if any are found.
// One use case is in TestMain()s to ensure that all tests are cleaned up.
func GetLeaks() error {
return ensureNoLeaks()
}

func ensureNoLeaks() error {
if err := ensureNoGoroutines(); err != nil {
return err
}
return nil
}

func ensureNoGoroutines() error {
var ignored = []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/dbconfigs.init.0.func1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.resetAggregators"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.processQueryInfo"),
goleak.IgnoreTopFunction("github.com/patrickmn/go-cache.(*janitor).Run"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/logutil.(*ThrottledLogger).log.func1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vttablet/tabletserver/throttle.initThrottleTicker.func1.1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vttablet/tabletserver/throttle.NewBackgroundClient.initThrottleTicker.func1.1"),
goleak.IgnoreTopFunction("testing.tRunner.func1"),
}

var err error
for i := 0; i < 5; i++ {
err = goleak.Find(ignored...)
if err == nil {
return nil
}
time.Sleep(100 * time.Millisecond)
}
return err
}
29 changes: 20 additions & 9 deletions go/vt/discovery/keyspace_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/query"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
Expand Down Expand Up @@ -65,7 +66,7 @@ type KeyspaceEvent struct {

type ShardEvent struct {
Tablet *topodatapb.TabletAlias
Target *query.Target
Target *querypb.Target
Serving bool
}

Expand Down Expand Up @@ -130,11 +131,21 @@ func (kss *keyspaceState) beingResharded(currentShard string) bool {
return false
}

// for all the known shards, try to find a primary shard besides the one we're trying to access
// and which is currently healthy. if there are other healthy primaries in the keyspace, it means
// we're in the middle of a resharding operation
// If there are unequal and overlapping shards in the keyspace and any of them are
// currently serving then we assume that we are in the middle of a Reshard.
_, ckr, err := topo.ValidateShardName(currentShard)
if err != nil || ckr == nil { // Assume not and avoid potential panic
return false
}
for shard, sstate := range kss.shards {
if shard != currentShard && sstate.serving {
if !sstate.serving || shard == currentShard {
continue
}
_, skr, err := topo.ValidateShardName(shard)
if err != nil || skr == nil { // Assume not and avoid potential panic
return false
}
if key.KeyRangesIntersect(ckr, skr) {
return true
}
}
Expand All @@ -143,7 +154,7 @@ func (kss *keyspaceState) beingResharded(currentShard string) bool {
}

type shardState struct {
target *query.Target
target *querypb.Target
serving bool
externallyReparented int64
currentPrimary *topodatapb.TabletAlias
Expand Down Expand Up @@ -426,7 +437,7 @@ func (kew *KeyspaceEventWatcher) getKeyspaceStatus(keyspace string) *keyspaceSta
// This is not a fully accurate heuristic, but it's good enough that we'd want to buffer the
// request for the given target under the assumption that the reason why it cannot be completed
// right now is transitory.
func (kew *KeyspaceEventWatcher) TargetIsBeingResharded(target *query.Target) bool {
func (kew *KeyspaceEventWatcher) TargetIsBeingResharded(target *querypb.Target) bool {
if target.TabletType != topodatapb.TabletType_PRIMARY {
return false
}
Expand All @@ -446,7 +457,7 @@ func (kew *KeyspaceEventWatcher) TargetIsBeingResharded(target *query.Target) bo
// The shard state keeps track of the current primary and the last externally reparented time, which we can use
// to determine that there was a serving primary which now became non serving. This is only possible in a DemotePrimary
// RPC which are only called from ERS and PRS. So buffering will stop when these operations succeed.
func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(target *query.Target) bool {
func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(target *querypb.Target) bool {
if target.TabletType != topodatapb.TabletType_PRIMARY {
return false
}
Expand Down
Loading
Loading