Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
98348: sqlinstance: refactor sql instance reader r=JeffSwenson a=JeffSwenson

This change is intended to support the migration of sql_instances to a regional by row compatible index structure. There are two main goals of the change:

1. Remove the rowCodec from the instance reader so that it does not need to be involved in the migration protocol.
2. Encapsulate the feed into a sub object so that the implementation can be easily swapped when the index changes.

Part of cockroachdb#94843

Release note: None

Co-authored-by: Jeff <[email protected]>
  • Loading branch information
craig[bot] and jeffswenson committed Mar 11, 2023
2 parents e417cdd + 311c452 commit 76e3eb2
Show file tree
Hide file tree
Showing 9 changed files with 421 additions and 149 deletions.
5 changes: 2 additions & 3 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,12 +558,11 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
)

cfg.sqlInstanceStorage = instancestorage.NewStorage(
cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings)
cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings, cfg.clock, cfg.rangeFeedFactory)
cfg.sqlInstanceReader = instancestorage.NewReader(
cfg.sqlInstanceStorage,
cfg.sqlLivenessProvider,
cfg.rangeFeedFactory,
codec, cfg.clock, cfg.stopper)
cfg.stopper)

// We can't use the nodeDailer as the podNodeDailer unless we
// are serving the system tenant despite the fact that we've
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/sqlinstance/instancestorage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "instancestorage",
srcs = [
"instancecache.go",
"instancereader.go",
"instancestorage.go",
"row_codec.go",
Expand Down Expand Up @@ -50,6 +51,7 @@ go_test(
name = "instancestorage_test",
srcs = [
"helpers_test.go",
"instancecache_test.go",
"instancereader_test.go",
"instancestorage_internal_test.go",
"instancestorage_test.go",
Expand All @@ -60,14 +62,14 @@ go_test(
embed = [":instancestorage"],
deps = [
"//pkg/base",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/rangefeed",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/catalog/systemschema",
Expand All @@ -82,6 +84,7 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util/encoding",
"//pkg/util/envutil",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
206 changes: 206 additions & 0 deletions pkg/sql/sqlinstance/instancestorage/instancecache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package instancestorage

import (
"context"
"strings"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// instanceCache represents a cache over the contents of sql_instances table.
type instanceCache interface {
// getInstance returns a single instance by ID. If the instance is not
// found, ok is set to false.
getInstance(instanceID base.SQLInstanceID) (i instancerow, ok bool)

// listInstances returns a list containing all of the cached instance rows.
// listInstances returns all cached rows, including rows that are unclaimed
// or owned by an inactive session.
listInstances() []instancerow

// Close stops updates to the cache. getInstance and listInstances continue
// to work after Close, but will return stale results.
Close()
}

// emptyInstanceCache is used during initialization. It implements an instance
// feed with no instances.
type emptyInstanceCache struct {
}

// Close implements instanceCache
func (*emptyInstanceCache) Close() {
// no-op
}

// getInstance implements instanceCache
func (*emptyInstanceCache) getInstance(instanceID base.SQLInstanceID) (instancerow, bool) {
return instancerow{}, false
}

// listInstances implements instanceCache
func (*emptyInstanceCache) listInstances() []instancerow {
return nil
}

var _ instanceCache = &emptyInstanceCache{}

// singletonInstanceFeed is used during system start up. It only contains
// server's own sql instance.
type singletonInstanceFeed struct {
instance instancerow
}

var _ instanceCache = &singletonInstanceFeed{}

func (s *singletonInstanceFeed) getInstance(instanceID base.SQLInstanceID) (instancerow, bool) {
if instanceID == s.instance.instanceID {
return s.instance, true
}
return instancerow{}, false
}

func (s *singletonInstanceFeed) listInstances() []instancerow {
return []instancerow{s.instance}
}

func (s *singletonInstanceFeed) Close() {}

type rangeFeedCache struct {
feed *rangefeed.RangeFeed
mu struct {
syncutil.Mutex
instances map[base.SQLInstanceID]instancerow
}
}

var _ instanceCache = &rangeFeedCache{}

// newRangeFeedCache constructs an instanceCache backed by a range feed over the
// sql_instances table. newRangeFeedCache will block until the initial scan is
// complete.
func newRangeFeedCache(
ctx context.Context, rowCodec rowCodec, clock *hlc.Clock, f *rangefeed.Factory,
) (resultFeed instanceCache, err error) {
done := make(chan error, 1)

feed := &rangeFeedCache{}
feed.mu.instances = map[base.SQLInstanceID]instancerow{}

updateCacheFn := func(
ctx context.Context, keyVal *kvpb.RangeFeedValue,
) {
instance, err := rowCodec.decodeRow(keyVal.Key, &keyVal.Value)
if err != nil {
log.Ops.Warningf(ctx, "failed to decode settings row %v: %v", keyVal.Key, err)
return
}
feed.updateInstanceMap(instance, !keyVal.Value.IsPresent())
}
initialScanDoneFn := func(_ context.Context) {
select {
case done <- nil:
// success reported to the caller
default:
// something is already in the done channel
}
}
initialScanErrFn := func(_ context.Context, err error) (shouldFail bool) {
if grpcutil.IsAuthError(err) ||
// This is a hack around the fact that we do not get properly structured
// errors out of gRPC. See #56208.
strings.Contains(err.Error(), "rpc error: code = Unauthenticated") {
shouldFail = true
select {
case done <- err:
// err reported to the caller
default:
// something is already in the done channel
}
}
return shouldFail
}

instancesTablePrefix := rowCodec.makeIndexPrefix()
instancesTableSpan := roachpb.Span{
Key: instancesTablePrefix,
EndKey: instancesTablePrefix.PrefixEnd(),
}
feed.feed, err = f.RangeFeed(ctx,
"sql_instances",
[]roachpb.Span{instancesTableSpan},
clock.Now(),
updateCacheFn,
rangefeed.WithSystemTablePriority(),
rangefeed.WithInitialScan(initialScanDoneFn),
rangefeed.WithOnInitialScanError(initialScanErrFn),
rangefeed.WithRowTimestampInInitialScan(true),
)
if err != nil {
return nil, err
}
defer func() {
// Ensure the feed is cleaned up if there is an error
if resultFeed == nil {
feed.Close()
}
}()

select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-done:
if err != nil {
return nil, err
}
return feed, nil
}
}

func (s *rangeFeedCache) getInstance(instanceID base.SQLInstanceID) (instancerow, bool) {
s.mu.Lock()
defer s.mu.Unlock()
row, ok := s.mu.instances[instanceID]
return row, ok
}

func (s *rangeFeedCache) listInstances() []instancerow {
s.mu.Lock()
defer s.mu.Unlock()
result := make([]instancerow, 0, len(s.mu.instances))
for _, row := range s.mu.instances {
result = append(result, row)
}
return result
}

func (r *rangeFeedCache) updateInstanceMap(instance instancerow, deletionEvent bool) {
r.mu.Lock()
defer r.mu.Unlock()
if deletionEvent {
delete(r.mu.instances, instance.instanceID)
return
}
r.mu.instances[instance.instanceID] = instance
}

func (s *rangeFeedCache) Close() {
s.feed.Close()
}
120 changes: 120 additions & 0 deletions pkg/sql/sqlinstance/instancestorage/instancecache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package instancestorage

import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/enum"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func TestEmptyInstanceFeed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var feed instanceCache = &emptyInstanceCache{}
require.Empty(t, feed.listInstances())

_, ok := feed.getInstance(base.SQLInstanceID(0))
require.False(t, ok)
}

func TestSingletonFeed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

instance := instancerow{
instanceID: base.SQLInstanceID(10),
sqlAddr: "something",
}
var feed instanceCache = &singletonInstanceFeed{instance: instance}

got, ok := feed.getInstance(10)
require.True(t, ok)
require.Equal(t, instance, got)

got, ok = feed.getInstance(11)
require.False(t, ok)
require.NotEqual(t, instance, got)

require.Equal(t, feed.listInstances(), []instancerow{instance})
}

func TestRangeFeed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
host, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer host.Stopper().Stop(ctx)

var _ = kvtenantccl.Connector{}
tenant, tenantSQL := serverutils.StartTenant(t, host, base.TestTenantArgs{
TenantID: serverutils.TestTenantID(),
})
tDB := sqlutils.MakeSQLRunner(tenantSQL)

newStorage := func(t *testing.T, codec keys.SQLCodec) *Storage {
tDB.Exec(t, `CREATE DATABASE "`+t.Name()+`"`)
tDB.Exec(t, GetTableSQLForDatabase(t.Name()))
tableDesc := desctestutils.TestingGetTableDescriptor(tenant.DB(), tenant.Codec(), t.Name(), "public", "sql_instances")
slStorage := slstorage.NewFakeStorage()
return NewTestingStorage(tenant.DB(), codec, tableDesc, slStorage,
tenant.ClusterSettings(), tenant.Clock(), tenant.RangeFeedFactory().(*rangefeed.Factory))
}

t.Run("success", func(t *testing.T) {
storage := newStorage(t, tenant.Codec())

require.NoError(t, storage.generateAvailableInstanceRows(ctx, [][]byte{enum.One}, tenant.Clock().Now().Add(int64(time.Minute), 0)))

feed, err := storage.newInstanceCache(ctx)
require.NoError(t, err)
require.NotNil(t, feed)
defer feed.Close()

// Check the entries in the feed to make sure it is constructed after
// the complete scan.
instances := feed.listInstances()
require.Len(t, instances, int(PreallocatedCount.Get(&tenant.ClusterSettings().SV)))
})

t.Run("auth_error", func(t *testing.T) {
storage := newStorage(t, keys.SystemSQLCodec)
_, err := storage.newInstanceCache(ctx)
require.True(t, grpcutil.IsAuthError(err), "expected %v to be an auth error", err)
})

t.Run("context_cancelled", func(t *testing.T) {
storage := newStorage(t, tenant.Codec())

ctx, cancel := context.WithCancel(ctx)
cancel()

_, err := storage.newInstanceCache(ctx)
require.Error(t, err)
require.ErrorIs(t, err, ctx.Err())
})
}
Loading

0 comments on commit 76e3eb2

Please sign in to comment.