Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql,kvserver: stop gossiping the system config #76279

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-64 set the active cluster version in the format '<major>.<minor>'
version version 21.2-66 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-64</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-66</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ go_test(
"//pkg/cloud",
"//pkg/cloud/impl:cloudimpl",
"//pkg/clusterversion",
"//pkg/config",
"//pkg/gossip",
"//pkg/jobs",
"//pkg/jobs/jobspb",
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/helpers_tenant_shim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -115,3 +116,6 @@ func (t *testServerShim) Engines() []storage.Engine { panic(unsup
func (t *testServerShim) MetricsRecorder() *status.MetricsRecorder { panic(unsupportedShimMethod) }
func (t *testServerShim) CollectionFactory() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigKVSubscriber() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SystemConfigProvider() config.SystemConfigProvider {
panic(unsupportedShimMethod)
}
17 changes: 11 additions & 6 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Connector struct {
client *client
nodeDescs map[roachpb.NodeID]*roachpb.NodeDescriptor
systemConfig *config.SystemConfig
systemConfigChannels []chan<- struct{}
systemConfigChannels map[chan<- struct{}]struct{}
}

settingsMu struct {
Expand Down Expand Up @@ -140,6 +140,7 @@ func NewConnector(cfg kvtenant.ConnectorConfig, addrs []string) *Connector {
}

c.mu.nodeDescs = make(map[roachpb.NodeID]*roachpb.NodeDescriptor)
c.mu.systemConfigChannels = make(map[chan<- struct{}]struct{})
c.settingsMu.allTenantOverrides = make(map[string]settings.EncodedValue)
c.settingsMu.specificOverrides = make(map[string]settings.EncodedValue)
return c
Expand Down Expand Up @@ -250,7 +251,7 @@ var gossipSubsHandlers = map[string]func(*Connector, context.Context, string, ro
// Subscribe to all *NodeDescriptor updates.
gossip.MakePrefixPattern(gossip.KeyNodeIDPrefix): (*Connector).updateNodeAddress,
// Subscribe to a filtered view of *SystemConfig updates.
gossip.KeySystemConfig: (*Connector).updateSystemConfig,
gossip.KeyDeprecatedSystemConfig: (*Connector).updateSystemConfig,
}

var gossipSubsPatterns = func() []string {
Expand Down Expand Up @@ -322,7 +323,7 @@ func (c *Connector) updateSystemConfig(ctx context.Context, key string, content
c.mu.Lock()
defer c.mu.Unlock()
c.mu.systemConfig = cfg
for _, c := range c.mu.systemConfigChannels {
for c := range c.mu.systemConfigChannels {
select {
case c <- struct{}{}:
default:
Expand All @@ -342,20 +343,24 @@ func (c *Connector) GetSystemConfig() *config.SystemConfig {

// RegisterSystemConfigChannel implements the config.SystemConfigProvider
// interface.
func (c *Connector) RegisterSystemConfigChannel() <-chan struct{} {
func (c *Connector) RegisterSystemConfigChannel() (_ <-chan struct{}, unregister func()) {
// Create channel that receives new system config notifications. The channel
// has a size of 1 to prevent connector from having to block on it.
ch := make(chan struct{}, 1)

c.mu.Lock()
defer c.mu.Unlock()
c.mu.systemConfigChannels = append(c.mu.systemConfigChannels, ch)
c.mu.systemConfigChannels[ch] = struct{}{}

// Notify the channel right away if we have a config.
if c.mu.systemConfig != nil {
ch <- struct{}{}
}
return ch
return ch, func() {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.mu.systemConfigChannels, ch)
}
}

// RangeLookup implements the kvcoord.RangeDescriptorDB interface.
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ func gossipEventForSystemConfig(cfg *config.SystemConfigEntries) *roachpb.Gossip
panic(err)
}
return &roachpb.GossipSubscriptionEvent{
Key: gossip.KeySystemConfig,
Key: gossip.KeyDeprecatedSystemConfig,
Content: roachpb.MakeValueFromBytesAndTimestamp(val, hlc.Timestamp{}),
PatternMatched: gossip.KeySystemConfig,
PatternMatched: gossip.KeyDeprecatedSystemConfig,
}
}

Expand Down Expand Up @@ -252,7 +252,7 @@ func TestConnectorGossipSubscription(t *testing.T) {
// Test config.SystemConfigProvider impl. Should not have a SystemConfig yet.
sysCfg := c.GetSystemConfig()
require.Nil(t, sysCfg)
sysCfgC := c.RegisterSystemConfigChannel()
sysCfgC, _ := c.RegisterSystemConfigChannel()
require.Len(t, sysCfgC, 0)

// Return first SystemConfig response.
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestConnectorGossipSubscription(t *testing.T) {
require.Equal(t, sysCfgEntriesUp.Values, sysCfg.Values)

// A newly registered SystemConfig channel will be immediately notified.
sysCfgC2 := c.RegisterSystemConfigChannel()
sysCfgC2, _ := c.RegisterSystemConfigChannel()
require.Len(t, sysCfgC2, 1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2234,7 +2234,7 @@ ALTER TABLE t65064 INJECT STATISTICS '[
}
]';

query T
query T retry
SELECT * FROM [EXPLAIN SELECT * FROM t65064 WHERE username = 'kharris'] OFFSET 2
----
·
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/spanconfigccl/spanconfigcomparedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ go_test(
"//pkg/base",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/ccl/utilccl",
"//pkg/gossip",
"//pkg/jobs",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/server/systemconfigwatcher",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigtestutils",
"//pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster",
Expand Down
33 changes: 19 additions & 14 deletions pkg/ccl/spanconfigccl/spanconfigcomparedccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster"
Expand Down Expand Up @@ -106,14 +106,15 @@ func TestDataDriven(t *testing.T) {
{
tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0))
tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '20ms'`)
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '20ms'`)
}

spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs, nil /* ptsKnobs */)
defer spanConfigTestCluster.Cleanup()

kvSubscriber := tc.Server(0).SpanConfigKVSubscriber().(spanconfig.KVSubscriber)
underlyingGossip := tc.Server(0).GossipI().(*gossip.Gossip)
systemConfig := tc.Server(0).SystemConfigProvider().(*systemconfigwatcher.Cache)

systemTenant := spanConfigTestCluster.InitializeTenant(ctx, roachpb.SystemTenantID)
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
Expand Down Expand Up @@ -161,15 +162,19 @@ func TestDataDriven(t *testing.T) {
// (i) reconciliation processes;
// (ii) tenant initializations (where seed span configs are
// installed).
now := systemTenant.Clock().Now()
testutils.SucceedsSoon(t, func() error {
lastUpdated := kvSubscriber.LastUpdated()
if lastUpdated.Less(now) {
return errors.Newf("kvsubscriber last updated timestamp (%s) lagging barrier timestamp (%s)",
lastUpdated.GoTime(), now.GoTime())
}
return nil
})
checkLastUpdated := func(t *testing.T, n string, c interface{ LastUpdated() hlc.Timestamp }) {
now := systemTenant.Clock().Now()
testutils.SucceedsSoon(t, func() error {
lastUpdated := c.LastUpdated()
if lastUpdated.Less(now) {
return errors.Newf("%s last updated timestamp (%s) lagging barrier timestamp (%s)",
n, lastUpdated.GoTime(), now.GoTime())
}
return nil
})
}
checkLastUpdated(t, "kvsubscriber", kvSubscriber)
checkLastUpdated(t, "systemconfigwatcher", systemConfig)

// As for the gossiped system config span, because we're using a
// single node cluster there's no additional timestamp
Expand Down Expand Up @@ -221,7 +226,7 @@ func TestDataDriven(t *testing.T) {

var reader spanconfig.StoreReader
if version == "legacy" {
reader = underlyingGossip.GetSystemConfig()
reader = systemConfig.GetSystemConfig()
} else {
reader = kvSubscriber
}
Expand All @@ -230,7 +235,7 @@ func TestDataDriven(t *testing.T) {
return spanconfigtestutils.MaybeLimitAndOffset(t, d, "...", data)

case "diff":
var before, after spanconfig.StoreReader = underlyingGossip.GetSystemConfig(), kvSubscriber
var before, after spanconfig.StoreReader = systemConfig.GetSystemConfig(), kvSubscriber
diff, err := difflib.GetUnifiedDiffString(difflib.UnifiedDiff{
A: difflib.SplitLines(spanconfigtestutils.GetSplitPoints(ctx, t, before).String()),
B: difflib.SplitLines(spanconfigtestutils.GetSplitPoints(ctx, t, after).String()),
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ func parseGossipValues(gossipInfo *gossip.InfoStatus) (string, error) {
return "", errors.Wrapf(err, "failed to parse value for key %q", key)
}
output = append(output, fmt.Sprintf("%q: %v", key, clusterID))
} else if key == gossip.KeySystemConfig {
} else if key == gossip.KeyDeprecatedSystemConfig {
if debugCtx.printSystemConfig {
var config config.SystemConfigEntries
if err := protoutil.Unmarshal(bytes, &config); err != nil {
Expand Down
9 changes: 8 additions & 1 deletion pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,10 @@ const (
// engine running at the required format major version, as do all other nodes
// in the cluster.
EnablePebbleFormatVersionBlockProperties

// DisableSystemConfigGossipTrigger is a follow-up to EnableSpanConfigStore
// to disable the data propagation mechanism it and the entire spanconfig
// infrastructure obviates.
DisableSystemConfigGossipTrigger
// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -443,6 +446,10 @@ var versionsSingleton = keyedVersions{
Key: EnablePebbleFormatVersionBlockProperties,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 64},
},
{
Key: DisableSystemConfigGossipTrigger,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 66},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 20 additions & 8 deletions pkg/config/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,32 @@ type SystemConfigProvider interface {
// the system config. It is notified after registration (if a system config
// is already set), and whenever a new system config is successfully
// unmarshaled.
RegisterSystemConfigChannel() <-chan struct{}
RegisterSystemConfigChannel() (_ <-chan struct{}, unregister func())
}

// EmptySystemConfigProvider is an implementation of SystemConfigProvider that
// never provides a system config.
type EmptySystemConfigProvider struct{}
// ConstantSystemConfigProvider is an implementation of SystemConfigProvider which
// always returns the same value.
type ConstantSystemConfigProvider struct {
cfg *SystemConfig
}

// NewConstantSystemConfigProvider constructs a SystemConfigProvider which
// always returns the same value.
func NewConstantSystemConfigProvider(cfg *SystemConfig) *ConstantSystemConfigProvider {
p := &ConstantSystemConfigProvider{cfg: cfg}
return p
}

// GetSystemConfig implements the SystemConfigProvider interface.
func (EmptySystemConfigProvider) GetSystemConfig() *SystemConfig {
return nil
func (c *ConstantSystemConfigProvider) GetSystemConfig() *SystemConfig {
return c.cfg
}

// RegisterSystemConfigChannel implements the SystemConfigProvider interface.
func (EmptySystemConfigProvider) RegisterSystemConfigChannel() <-chan struct{} {
func (c *ConstantSystemConfigProvider) RegisterSystemConfigChannel() (
_ <-chan struct{},
unregister func(),
) {
// The system config will never be updated, so return a nil channel.
return nil
return nil, func() {}
}
16 changes: 10 additions & 6 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func New(

g.mu.Lock()
// Add ourselves as a SystemConfig watcher.
g.mu.is.registerCallback(KeySystemConfig, g.updateSystemConfig)
g.mu.is.registerCallback(KeyDeprecatedSystemConfig, g.updateSystemConfig)
// Add ourselves as a node descriptor watcher.
g.mu.is.registerCallback(MakePrefixPattern(KeyNodeIDPrefix), g.updateNodeAddress)
g.mu.is.registerCallback(MakePrefixPattern(KeyStorePrefix), g.updateStoreMap)
Expand Down Expand Up @@ -1145,18 +1145,22 @@ func (g *Gossip) RegisterCallback(pattern string, method Callback, opts ...Callb
}
}

// GetSystemConfig returns the local unmarshaled version of the system config.
// DeprecatedGetSystemConfig returns the local unmarshaled version of the system config.
// Returns nil if the system config hasn't been set yet.
func (g *Gossip) GetSystemConfig() *config.SystemConfig {
//
// TODO(ajwerner): Remove this in 22.2.
func (g *Gossip) DeprecatedGetSystemConfig() *config.SystemConfig {
g.systemConfigMu.RLock()
defer g.systemConfigMu.RUnlock()
return g.systemConfig
}

// RegisterSystemConfigChannel registers a channel to signify updates for the
// DeprecatedRegisterSystemConfigChannel registers a channel to signify updates for the
// system config. It is notified after registration (if a system config is
// already set), and whenever a new system config is successfully unmarshaled.
func (g *Gossip) RegisterSystemConfigChannel() <-chan struct{} {
//
// TODO(ajwerner): Remove this in 22.2.
func (g *Gossip) DeprecatedRegisterSystemConfigChannel() <-chan struct{} {
// Create channel that receives new system config notifications.
// The channel has a size of 1 to prevent gossip from having to block on it.
c := make(chan struct{}, 1)
Expand All @@ -1177,7 +1181,7 @@ func (g *Gossip) RegisterSystemConfigChannel() <-chan struct{} {
// channel.
func (g *Gossip) updateSystemConfig(key string, content roachpb.Value) {
ctx := g.AnnotateCtx(context.TODO())
if key != KeySystemConfig {
if key != KeyDeprecatedSystemConfig {
log.Fatalf(ctx, "wrong key received on SystemConfig callback: %s", key)
}
cfg := config.NewSystemConfig(g.defaultZoneConfig)
Expand Down
2 changes: 1 addition & 1 deletion pkg/gossip/infostore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func TestInfoStoreMostDistant(t *testing.T) {
scInfo := is.newInfo(nil, time.Second)
scInfo.Hops = 100
scInfo.NodeID = nodes[0]
if err := is.addInfo(KeySystemConfig, scInfo); err != nil {
if err := is.addInfo(KeyDeprecatedSystemConfig, scInfo); err != nil {
t.Fatal(err)
}

Expand Down
12 changes: 10 additions & 2 deletions pkg/gossip/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,18 @@ const (
// bi-level key addressing scheme. The value is a roachpb.RangeDescriptor.
KeyFirstRangeDescriptor = "first-range"

// KeySystemConfig is the gossip key for the system DB span.
// KeyDeprecatedSystemConfig is the gossip key for the system DB span.
// The value if a config.SystemConfig which holds all key/value
// pairs in the system DB span.
KeySystemConfig = "system-db"
//
// This key is used in the 21.2<->22.1 mixed version state. It is not used
// in 22.1. However, it was written without a TTL, so there no guarantee
// that it will actually be removed from the gossip network.
//
// TODO(ajwerner): Write a migration to remove the data, or release a
// a version which drops the key entirely, and then, in a subsequent
// release, delete this key.
KeyDeprecatedSystemConfig = "system-db"

// KeyDistSQLNodeVersionKeyPrefix is key prefix for each node's DistSQL
// version.
Expand Down
Loading