Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
86436: kvserver: incorporate remote tracing spans from snapshots r=AlexTalks a=AlexTalks

This adds collected tracing spans into a `SnapshotResponse` object in
order to incorporate remote traces from the receiver side of a snapshot
into the client's (i.e. the sender's) context.

Release justification: Low-risk observability change.
Release note: None

86676: clusterversion, kvserver: remove SpanConfig related version gates r=celiala a=arulajmani

Remove EnsureSPanConfigReconciliation, EnsureSpanConfigSubscription,
and EnableSpanConfigStore.

References cockroachdb#80663
Subsumes cockroachdb#85848

Release justification: cleanup
Release note: None

Co-authored-by: Alex Sarkesian <[email protected]>
Co-authored-by: Arul Ajmani <[email protected]>
  • Loading branch information
3 people committed Aug 25, 2022
3 parents 44e50c1 + 09a1d46 + 0900ea9 commit 9a7ec4b
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 245 deletions.
2 changes: 0 additions & 2 deletions pkg/ccl/kvccl/kvtenantccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ go_test(
"//pkg/config",
"//pkg/gossip",
"//pkg/jobs",
"//pkg/keys",
"//pkg/kv/kvclient/kvtenant",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
Expand Down Expand Up @@ -91,7 +90,6 @@ go_test(
"//pkg/util/stop",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
117 changes: 0 additions & 117 deletions pkg/ccl/kvccl/kvtenantccl/tenant_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,19 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/upgrade"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgrades"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -367,115 +362,3 @@ func TestTenantUpgradeFailure(t *testing.T) {
tenantInfo.v2onMigrationStopper.Stop(ctx)
})
}

// TestTenantSystemConfigUpgrade ensures that the tenant GC job uses the
// appropriate view of the GC TTL.
func TestTenantSystemConfigUpgrade(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
false, // initializeVersion
)
// Initialize the version to the BinaryMinSupportedVersion.
require.NoError(t, clusterversion.Initialize(ctx,
clusterversion.TestingBinaryMinSupportedVersion, &settings.SV))
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Settings: settings,
// Test is designed to run within a tenant. No need
// for the test tenant here.
DisableDefaultTestTenant: true,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: make(chan struct{}),
BinaryVersionOverride: clusterversion.TestingBinaryMinSupportedVersion,
},
},
},
})
hostDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))
hostDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '20ms'`)
hostDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '20ms'`)
defer tc.Stopper().Stop(ctx)
connectToTenant := func(t *testing.T, addr string) (_ *gosql.DB, cleanup func()) {
pgURL, cleanupPGUrl := sqlutils.PGUrl(t, addr, "Tenant", url.User(username.RootUser))
tenantDB, err := gosql.Open("postgres", pgURL.String())
require.NoError(t, err)
return tenantDB, func() {
tenantDB.Close()
cleanupPGUrl()
}
}
mkTenant := func(t *testing.T, id uint64) (
tenant serverutils.TestTenantInterface,
) {
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
false, // initializeVersion
)
// Initialize the version to the minimum it could be.
require.NoError(t, clusterversion.Initialize(ctx,
clusterversion.TestingBinaryMinSupportedVersion, &settings.SV))
tenantArgs := base.TestTenantArgs{
TenantID: roachpb.MakeTenantID(id),
TestingKnobs: base.TestingKnobs{},
Settings: settings,
}
tenant, err := tc.Server(0).StartTenant(ctx, tenantArgs)
require.NoError(t, err)
return tenant
}
const tenantID = 10
codec := keys.MakeSQLCodec(roachpb.MakeTenantID(tenantID))
tenant := mkTenant(t, tenantID)
tenantSQL, cleanup := connectToTenant(t, tenant.SQLAddr())
defer cleanup()
tenantDB := sqlutils.MakeSQLRunner(tenantSQL)
tenantDB.CheckQueryResults(t, "SHOW CLUSTER SETTING version", [][]string{{"21.2"}})
tenantDB.Exec(t, "CREATE TABLE foo ()")
fooID := sqlutils.QueryTableID(t, tenantSQL, "defaultdb", "public", "foo")
tenantP := tenant.SystemConfigProvider()
ch, _ := tenantP.RegisterSystemConfigChannel()

hostDB.Exec(t, "SET CLUSTER SETTING version = crdb_internal.node_executable_version()")
hostDB.Exec(t, "ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = 111")
hostDB.Exec(t,
"ALTER TENANT $1 SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled = true;",
tenantID)
tenantDB.CheckQueryResultsRetry(
t, "SHOW CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled",
[][]string{{"true"}},
)
tenantVersion := func() clusterversion.ClusterVersion {
return tenant.ClusterSettings().Version.ActiveVersionOrEmpty(ctx)
}
checkConfigEqual := func(t *testing.T, exp int32) {
testutils.SucceedsSoon(t, func() error {
cfg := tenantP.GetSystemConfig()
if cfg == nil {
return errors.New("no config")
}
conf, err := tenantP.GetSystemConfig().GetZoneConfigForObject(codec, tenantVersion(), config.ObjectID(fooID))
if err != nil {
return err
}
if conf.GC.TTLSeconds != exp {
return errors.Errorf("got %d, expected %d", conf.GC.TTLSeconds, exp)
}
return nil
})
}
checkConfigEqual(t, 111)
<-ch
hostDB.Exec(t, "ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = 112")
<-ch
checkConfigEqual(t, 112)
tenantDB.Exec(t, "SET CLUSTER SETTING version = crdb_internal.node_executable_version()")
tenantDB.Exec(t, "ALTER TABLE foo CONFIGURE ZONE USING gc.ttlseconds = 113")
<-ch
checkConfigEqual(t, 113)
}
15 changes: 0 additions & 15 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,6 @@ const (
// This version must be active before any ProbeRequest is issued on the
// cluster.
ProbeRequest
// EnsureSpanConfigReconciliation ensures that the host tenant has run its
// reconciliation process at least once.
EnsureSpanConfigReconciliation
// EnsureSpanConfigSubscription ensures that all KV nodes are subscribed to
// the global span configuration state, observing the entries installed as
// in EnsureSpanConfigReconciliation.
EnsureSpanConfigSubscription
// EnableSpanConfigStore enables the use of the span configs infrastructure
// in KV.
EnableSpanConfigStore
Expand Down Expand Up @@ -342,14 +335,6 @@ var versionsSingleton = keyedVersions{
Key: ProbeRequest,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 26},
},
{
Key: EnsureSpanConfigReconciliation,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 36},
},
{
Key: EnsureSpanConfigSubscription,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 38},
},
{
Key: EnableSpanConfigStore,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 40},
Expand Down
74 changes: 36 additions & 38 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pkg/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/config",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/config/zonepb",
"//pkg/keys",
"//pkg/roachpb",
Expand Down
19 changes: 1 addition & 18 deletions pkg/config/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"fmt"
"sort"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -402,26 +401,10 @@ func isPseudoTableID(id uint32) bool {
// NOTE: any subzones from the zone placeholder will be automatically merged
// into the cached zone so the caller doesn't need special-case handling code.
func (s *SystemConfig) GetZoneConfigForObject(
codec keys.SQLCodec, version clusterversion.ClusterVersion, id ObjectID,
codec keys.SQLCodec, id ObjectID,
) (*zonepb.ZoneConfig, error) {
var entry zoneEntry
var err error
// In the case that we've not yet ensured reconciliation of the span
// configurations, use the host-provided view of the RANGE tenants
// configuration.
//
// TODO(ajwerner,arulajmani): If the reconciliation protocol is not active,
// and this is a secondary tenant object we're trying to look up, we're in a
// bit of a pickle. This assumes that if we're in the appropriate version,
// then so too is the system tenant and things are reconciled. Is it possible
// that neither of these object IDs represent reality? It seems like after
// the host cluster has been upgraded but the tenants have not, that we're
// in a weird intermediate state whereby the system tenant's config is no
// longer respected, but neither is the secondary tenant's.
if !codec.ForSystemTenant() &&
(id == 0 || !version.IsActive(clusterversion.EnableSpanConfigStore)) {
codec, id = keys.SystemSQLCodec, keys.TenantsRangesID
}
entry, err = s.getZoneEntry(codec, id)
if err != nil {
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/kvserverpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ message SnapshotResponse {
Status status = 1;
string message = 2;
reserved 3;

// Traces from snapshot processing, returned on status APPLIED or ERROR.
repeated util.tracing.tracingpb.RecordedSpan collected_spans = 4 [(gogoproto.nullable) = false];
}

// DelegateSnapshotRequest is the request used to delegate send snapshot requests.
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/storage_services.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ import "kv/kvserver/api.proto";

service MultiRaft {
rpc RaftMessageBatch (stream cockroach.kv.kvserver.kvserverpb.RaftMessageRequestBatch) returns (stream cockroach.kv.kvserver.kvserverpb.RaftMessageResponse) {}
// RaftSnapshot asks the server to accept and apply a range snapshot.
// The client is expected to initially send a message consisting solely of
// a Header, upon which the server will respond with a message with status
// ACCEPTED, or ERROR if it cannot accept the snapshot. Once accepted, the
// client will send multiple messages with KVBatch data followed by a
// terminal message with the final flag set to true. Once finalized,
// the server will ultimately send a message back with status APPLIED, or
// ERROR, including any collected traces from processing.
rpc RaftSnapshot (stream cockroach.kv.kvserver.kvserverpb.SnapshotRequest) returns (stream cockroach.kv.kvserver.kvserverpb.SnapshotResponse) {}
// DelegateRaftSnapshot asks the server to send a range snapshot to a target
// (so the client delegates the sending of the snapshot to the server). The
Expand Down
26 changes: 1 addition & 25 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2184,29 +2184,6 @@ func (s *Store) GetConfReader(ctx context.Context) (spanconfig.StoreReader, erro
return nil, errSysCfgUnavailable
}

// We need a version gate here before switching over to the span configs
// infrastructure. In a mixed-version cluster we need to wait for
// the host tenant to have fully populated `system.span_configurations`
// (read: reconciled) at least once before using it as a view for all
// split/config decisions.
_ = clusterversion.EnsureSpanConfigReconciliation
//
// We also want to ensure that the KVSubscriber on each store is at least as
// up-to-date as some full reconciliation timestamp.
_ = clusterversion.EnsureSpanConfigSubscription
//
// Without a version gate, it would be possible for a replica on a
// new-binary-server to apply the static fallback config (assuming no
// entries in `system.span_configurations`), in violation of explicit
// configs directly set by the user. Though unlikely, it's also possible for
// us to merge all ranges into a single one -- with no entries in
// system.span_configurations, the infrastructure can erroneously conclude
// that there are zero split points.
//
// We achieve all this through a three-step migration process, culminating
// in the following cluster version gate:
_ = clusterversion.EnableSpanConfigStore

if s.cfg.SpanConfigsDisabled ||
!spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) ||
s.TestingKnobs().UseSystemConfigSpanForQueues {
Expand Down Expand Up @@ -2395,8 +2372,7 @@ func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) {
}

if s.cfg.SpanConfigsDisabled ||
!spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) ||
!s.cfg.Settings.Version.IsActive(ctx, clusterversion.EnableSpanConfigStore) {
!spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) {
repl.SetSpanConfig(conf)
}

Expand Down
Loading

0 comments on commit 9a7ec4b

Please sign in to comment.