From 583b7f58d313ab6d7858df885c0b4c41ca3b8341 Mon Sep 17 00:00:00 2001 From: Tanjin Xu <109303790+tanjinx@users.noreply.github.com> Date: Mon, 6 May 2024 08:23:46 -0700 Subject: [PATCH] [v14.0.5]: Backport upstream 13856 - reshard state check fix (#321) * Backport upstream 13856 - reshard state check fix * revert comment change * fix null check --- go.mod | 1 + go.sum | 3 +- go/test/utils/noleak.go | 93 +++++++ go/vt/discovery/keyspace_events.go | 31 ++- go/vt/discovery/keyspace_events_test.go | 312 ++++++++++++++++++++++++ go/vt/topo/topoproto/srvkeyspace.go | 3 + 6 files changed, 432 insertions(+), 11 deletions(-) create mode 100644 go/test/utils/noleak.go create mode 100644 go/vt/discovery/keyspace_events_test.go diff --git a/go.mod b/go.mod index 04ede0f07b..0637ea9f2b 100644 --- a/go.mod +++ b/go.mod @@ -122,6 +122,7 @@ require ( require ( github.com/bndr/gotabulate v1.1.2 + go.uber.org/goleak v1.2.1 modernc.org/sqlite v1.20.3 ) diff --git a/go.sum b/go.sum index cc300ba389..576bb1b73d 100644 --- a/go.sum +++ b/go.sum @@ -785,7 +785,8 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= diff --git a/go/test/utils/noleak.go b/go/test/utils/noleak.go new file mode 100644 index 0000000000..f27adf9e2e --- /dev/null +++ b/go/test/utils/noleak.go @@ -0,0 +1,93 @@ +/* +Copyright 2023 The Vitess Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "context" + "testing" + "time" + + "go.uber.org/goleak" +) + +// LeakCheckContext returns a Context that will be automatically cancelled at the end +// of this test. If the test has finished successfully, it will be checked for goroutine +// leaks after context cancellation. +func LeakCheckContext(t testing.TB) context.Context { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(func() { + cancel() + EnsureNoLeaks(t) + }) + return ctx +} + +// LeakCheckContextTimeout behaves like LeakCheckContext but the returned Context will +// be cancelled after `timeout`, or after the test finishes, whichever happens first. +func LeakCheckContextTimeout(t testing.TB, timeout time.Duration) context.Context { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + t.Cleanup(func() { + cancel() + EnsureNoLeaks(t) + }) + return ctx +} + +// EnsureNoLeaks checks for goroutine and socket leaks and fails the test if any are found. +func EnsureNoLeaks(t testing.TB) { + if t.Failed() { + return + } + if err := ensureNoLeaks(); err != nil { + t.Fatal(err) + } +} + +// GetLeaks checks for goroutine and socket leaks and returns an error if any are found. +// One use case is in TestMain()s to ensure that all tests are cleaned up. +func GetLeaks() error { + return ensureNoLeaks() +} + +func ensureNoLeaks() error { + if err := ensureNoGoroutines(); err != nil { + return err + } + return nil +} + +func ensureNoGoroutines() error { + var ignored = []goleak.Option{ + goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"), + goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), + goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/dbconfigs.init.0.func1"), + goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.resetAggregators"), + goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.processQueryInfo"), + goleak.IgnoreTopFunction("github.com/patrickmn/go-cache.(*janitor).Run"), + goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/logutil.(*ThrottledLogger).log.func1"), + goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vttablet/tabletserver/throttle.initThrottleTicker.func1.1"), + goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vttablet/tabletserver/throttle.NewBackgroundClient.initThrottleTicker.func1.1"), + goleak.IgnoreTopFunction("testing.tRunner.func1"), + } + + var err error + for i := 0; i < 5; i++ { + err = goleak.Find(ignored...) + if err == nil { + return nil + } + time.Sleep(100 * time.Millisecond) + } + return err +} diff --git a/go/vt/discovery/keyspace_events.go b/go/vt/discovery/keyspace_events.go index 0b3fa7e9ef..f02a898a29 100644 --- a/go/vt/discovery/keyspace_events.go +++ b/go/vt/discovery/keyspace_events.go @@ -23,8 +23,9 @@ import ( "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/proto/query" + querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" @@ -65,7 +66,7 @@ type KeyspaceEvent struct { type ShardEvent struct { Tablet *topodatapb.TabletAlias - Target *query.Target + Target *querypb.Target Serving bool } @@ -125,16 +126,26 @@ func (kss *keyspaceState) beingResharded(currentShard string) bool { defer kss.mu.Unlock() // if the keyspace is gone, or if it has no known availability events, the keyspace - // cannot be in the middle of a resharding operation + // cannot be in the middle of a resharding operation. if kss.deleted || kss.consistent { return false } - // for all the known shards, try to find a primary shard besides the one we're trying to access - // and which is currently healthy. if there are other healthy primaries in the keyspace, it means - // we're in the middle of a resharding operation + // If there are unequal and overlapping shards in the keyspace and any of them are + // currently serving then we assume that we are in the middle of a Reshard. + _, ckr, err := topo.ValidateShardName(currentShard) + if err != nil || ckr == nil { // Assume not and avoid potential panic + return false + } for shard, sstate := range kss.shards { - if shard != currentShard && sstate.serving { + if !sstate.serving || shard == currentShard { + continue + } + _, skr, err := topo.ValidateShardName(shard) + if err != nil || skr == nil { // Assume not and avoid potential panic + return false + } + if key.KeyRangesIntersect(ckr, skr) { return true } } @@ -143,7 +154,7 @@ func (kss *keyspaceState) beingResharded(currentShard string) bool { } type shardState struct { - target *query.Target + target *querypb.Target serving bool externallyReparented int64 currentPrimary *topodatapb.TabletAlias @@ -426,7 +437,7 @@ func (kew *KeyspaceEventWatcher) getKeyspaceStatus(keyspace string) *keyspaceSta // This is not a fully accurate heuristic, but it's good enough that we'd want to buffer the // request for the given target under the assumption that the reason why it cannot be completed // right now is transitory. -func (kew *KeyspaceEventWatcher) TargetIsBeingResharded(target *query.Target) bool { +func (kew *KeyspaceEventWatcher) TargetIsBeingResharded(target *querypb.Target) bool { if target.TabletType != topodatapb.TabletType_PRIMARY { return false } @@ -446,7 +457,7 @@ func (kew *KeyspaceEventWatcher) TargetIsBeingResharded(target *query.Target) bo // The shard state keeps track of the current primary and the last externally reparented time, which we can use // to determine that there was a serving primary which now became non serving. This is only possible in a DemotePrimary // RPC which are only called from ERS and PRS. So buffering will stop when these operations succeed. -func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(target *query.Target) bool { +func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(target *querypb.Target) bool { if target.TabletType != topodatapb.TabletType_PRIMARY { return false } diff --git a/go/vt/discovery/keyspace_events_test.go b/go/vt/discovery/keyspace_events_test.go new file mode 100644 index 0000000000..e08d3a2317 --- /dev/null +++ b/go/vt/discovery/keyspace_events_test.go @@ -0,0 +1,312 @@ +/* +Copyright 2023 The Vitess Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package discovery + +import ( + "context" + "encoding/hex" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/test/utils" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/faketopo" + + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" +) + +func TestSrvKeyspaceWithNilNewKeyspace(t *testing.T) { + ctx := utils.LeakCheckContext(t) + cell := "cell" + keyspace := "testks" + factory := faketopo.NewFakeTopoFactory() + factory.AddCell(cell) + ts := faketopo.NewFakeTopoServer(factory) + ts2 := &fakeTopoServer{} + hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, cell, "") + defer hc.Close() + kew := NewKeyspaceEventWatcher(ctx, ts2, hc, cell) + kss := &keyspaceState{ + kew: kew, + keyspace: keyspace, + shards: make(map[string]*shardState), + } + kss.lastKeyspace = &topodatapb.SrvKeyspace{} + require.True(t, kss.onSrvKeyspace(nil, nil)) +} + +// TestKeyspaceEventTypes confirms that the keyspace event watcher determines +// that the unavailability event is caused by the correct scenario. We should +// consider it to be caused by a resharding operation when the following +// conditions are present: +// 1. The keyspace is inconsistent (in the middle of an availability event) +// 2. The target tablet is a primary +// 3. The keyspace has overlapping shards +// 4. The overlapping shard's tablet is serving +// And we should consider the cause to be a primary not serving when the +// following conditions exist: +// 1. The keyspace is inconsistent (in the middle of an availability event) +// 2. The target tablet is a primary +// 3. The target tablet is not serving +// 4. The shard's externallyReparented time is not 0 +// 5. The shard's currentPrimary state is not nil +// We should never consider both as a possible cause given the same +// keyspace state. +func TestKeyspaceEventTypes(t *testing.T) { + utils.EnsureNoLeaks(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cell := "cell" + keyspace := "testks" + factory := faketopo.NewFakeTopoFactory() + factory.AddCell(cell) + ts := faketopo.NewFakeTopoServer(factory) + ts2 := &fakeTopoServer{} + hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, cell, "") + defer hc.Close() + kew := NewKeyspaceEventWatcher(ctx, ts2, hc, cell) + + type testCase struct { + name string + kss *keyspaceState + shardToCheck string + expectResharding bool + expectPrimaryNotServing bool + } + + testCases := []testCase{ + { + name: "one to two resharding in progress", + kss: &keyspaceState{ + kew: kew, + keyspace: keyspace, + shards: map[string]*shardState{ + "-": { + target: &querypb.Target{ + Keyspace: keyspace, + Shard: "-", + TabletType: topodatapb.TabletType_PRIMARY, + }, + serving: false, + }, + "-80": { + target: &querypb.Target{ + Keyspace: keyspace, + Shard: "-80", + TabletType: topodatapb.TabletType_PRIMARY, + }, + serving: true, + }, + "80-": { + target: &querypb.Target{ + Keyspace: keyspace, + Shard: "80-", + TabletType: topodatapb.TabletType_PRIMARY, + }, + serving: false, + }, + }, + consistent: false, + }, + shardToCheck: "-", + expectResharding: true, + expectPrimaryNotServing: false, + }, + { + name: "two to four resharding in progress", + kss: &keyspaceState{ + kew: kew, + keyspace: keyspace, + shards: map[string]*shardState{ + "-80": { + target: &querypb.Target{ + Keyspace: keyspace, + Shard: "-80", + TabletType: topodatapb.TabletType_PRIMARY, + }, + serving: false, + }, + "80-": { + target: &querypb.Target{ + Keyspace: keyspace, + Shard: "80-", + TabletType: topodatapb.TabletType_PRIMARY, + }, + serving: true, + }, + "-40": { + target: &querypb.Target{ + Keyspace: keyspace, + Shard: "-40", + TabletType: topodatapb.TabletType_PRIMARY, + }, + serving: true, + }, + "40-80": { + target: &querypb.Target{ + Keyspace: keyspace, + Shard: "40-80", + TabletType: topodatapb.TabletType_PRIMARY, + }, + serving: true, + }, + "80-c0": { + target: &querypb.Target{ + Keyspace: keyspace, + Shard: "80-c0", + TabletType: topodatapb.TabletType_PRIMARY, + }, + serving: false, + }, + "c0-": { + target: &querypb.Target{ + Keyspace: keyspace, + Shard: "c0-", + TabletType: topodatapb.TabletType_PRIMARY, + }, + serving: false, + }, + }, + consistent: false, + }, + shardToCheck: "-80", + expectResharding: true, + expectPrimaryNotServing: false, + }, + { + name: "unsharded primary not serving", + kss: &keyspaceState{ + kew: kew, + keyspace: keyspace, + shards: map[string]*shardState{ + "-": { + target: &querypb.Target{ + Keyspace: keyspace, + Shard: "-", + TabletType: topodatapb.TabletType_PRIMARY, + }, + serving: false, + externallyReparented: time.Now().UnixNano(), + currentPrimary: &topodatapb.TabletAlias{ + Cell: cell, + Uid: 100, + }, + }, + }, + consistent: false, + }, + shardToCheck: "-", + expectResharding: false, + expectPrimaryNotServing: true, + }, + { + name: "sharded primary not serving", + kss: &keyspaceState{ + kew: kew, + keyspace: keyspace, + shards: map[string]*shardState{ + "-80": { + target: &querypb.Target{ + Keyspace: keyspace, + Shard: "-80", + TabletType: topodatapb.TabletType_PRIMARY, + }, + serving: false, + externallyReparented: time.Now().UnixNano(), + currentPrimary: &topodatapb.TabletAlias{ + Cell: cell, + Uid: 100, + }, + }, + "80-": { + target: &querypb.Target{ + Keyspace: keyspace, + Shard: "80-", + TabletType: topodatapb.TabletType_PRIMARY, + }, + serving: true, + }, + }, + consistent: false, + }, + shardToCheck: "-80", + expectResharding: false, + expectPrimaryNotServing: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + kew.mu.Lock() + kew.keyspaces[keyspace] = tc.kss + kew.mu.Unlock() + + require.NotNil(t, tc.kss.shards[tc.shardToCheck], "the specified shardToCheck of %q does not exist in the shardState", tc.shardToCheck) + + resharding := kew.TargetIsBeingResharded(tc.kss.shards[tc.shardToCheck].target) + require.Equal(t, resharding, tc.expectResharding, "TargetIsBeingResharded should return %t", tc.expectResharding) + + primaryDown := kew.PrimaryIsNotServing(tc.kss.shards[tc.shardToCheck].target) + require.Equal(t, primaryDown, tc.expectPrimaryNotServing, "PrimaryIsNotServing should return %t", tc.expectPrimaryNotServing) + }) + } +} + +type fakeTopoServer struct { +} + +// GetTopoServer returns the full topo.Server instance. +func (f *fakeTopoServer) GetTopoServer() (*topo.Server, error) { + return nil, nil +} + +// GetSrvKeyspaceNames returns the list of keyspaces served in +// the provided cell. +func (f *fakeTopoServer) GetSrvKeyspaceNames(ctx context.Context, cell string, staleOK bool) ([]string, error) { + return []string{"ks1"}, nil +} + +// GetSrvKeyspace returns the SrvKeyspace for a cell/keyspace. +func (f *fakeTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) { + zeroHexBytes, _ := hex.DecodeString("") + eightyHexBytes, _ := hex.DecodeString("80") + ks := &topodatapb.SrvKeyspace{ + Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{ + { + ServedType: topodatapb.TabletType_PRIMARY, + ShardReferences: []*topodatapb.ShardReference{ + {Name: "-80", KeyRange: &topodatapb.KeyRange{Start: zeroHexBytes, End: eightyHexBytes}}, + {Name: "80-", KeyRange: &topodatapb.KeyRange{Start: eightyHexBytes, End: zeroHexBytes}}, + }, + }, + }, + } + return ks, nil +} + +func (f *fakeTopoServer) WatchSrvKeyspace(ctx context.Context, cell, keyspace string, callback func(*topodatapb.SrvKeyspace, error) bool) { + ks, err := f.GetSrvKeyspace(ctx, cell, keyspace) + callback(ks, err) +} + +// WatchSrvVSchema starts watching the SrvVSchema object for +// the provided cell. It will call the callback when +// a new value or an error occurs. +func (f *fakeTopoServer) WatchSrvVSchema(ctx context.Context, cell string, callback func(*vschemapb.SrvVSchema, error) bool) { + +} diff --git a/go/vt/topo/topoproto/srvkeyspace.go b/go/vt/topo/topoproto/srvkeyspace.go index 24618233fb..cdd0ea20d2 100644 --- a/go/vt/topo/topoproto/srvkeyspace.go +++ b/go/vt/topo/topoproto/srvkeyspace.go @@ -51,6 +51,9 @@ func (sra ShardReferenceArray) Sort() { sort.Sort(sra) } // SrvKeyspaceGetPartition returns a Partition for the given tablet type, // or nil if it's not there. func SrvKeyspaceGetPartition(sk *topodatapb.SrvKeyspace, tabletType topodatapb.TabletType) *topodatapb.SrvKeyspace_KeyspacePartition { + if sk == nil { + return nil + } for _, p := range sk.Partitions { if p.ServedType == tabletType { return p