Skip to content

Commit

Permalink
spanconfig: replace roachpb.SpanConfigEntry in package spanconfig
Browse files Browse the repository at this point in the history
This patch removes the usage of roachpb.SpanConfigEntry from the
spanconfig package. Going forward, we'll only use
roachpb.SpanConfigEntry in RPCs. We want to decouple types that we
use inside the package with RPCs in preperation for system span
configurations.

We instead introduce a new spanconfig.Record type to tie together
a spanconfig.Target and a spanconfig.Record. For now, we only allow
targeting spans. Once we introduce system span configurations, we'll
make be able to make room for system targets as well. System targets
will allow tenants to target their entire keyspace and the host tenant
to target particular secondary tenants/the entire cluster.

Crucially, spanconfig.Target contains encoding and decoding methods
which we make use of when writing to and reading from
system.span_configurations.

Ripping out roachpb.SpanConfigEntry ended up touching most components
in the package.

Release note: None
  • Loading branch information
arulajmani committed Feb 9, 2022
1 parent c9168d1 commit d988d3b
Show file tree
Hide file tree
Showing 28 changed files with 833 additions and 532 deletions.
34 changes: 23 additions & 11 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,35 +421,47 @@ func (c *Connector) TokenBucket(
return nil, ctx.Err()
}

// GetSpanConfigEntriesFor implements the spanconfig.KVAccessor interface.
func (c *Connector) GetSpanConfigEntriesFor(
ctx context.Context, spans []roachpb.Span,
) (entries []roachpb.SpanConfigEntry, _ error) {
// GetSpanConfigRecords implements the spanconfig.KVAccessor interface.
func (c *Connector) GetSpanConfigRecords(
ctx context.Context, targets []spanconfig.Target,
) (records []spanconfig.Record, _ error) {
if err := c.withClient(ctx, func(ctx context.Context, c *client) error {
spans := make([]roachpb.Span, 0, len(targets))
for _, target := range targets {
spans = append(spans, *target.GetSpan())
}
resp, err := c.GetSpanConfigs(ctx, &roachpb.GetSpanConfigsRequest{
Spans: spans,
})
if err != nil {
return err
}

entries = resp.SpanConfigEntries
records = spanconfig.EntriesToRecords(resp.SpanConfigEntries)
return nil
}); err != nil {
return nil, err
}
return entries, nil
return records, nil
}

// UpdateSpanConfigEntries implements the spanconfig.KVAccessor
// UpdateSpanConfigRecords implements the spanconfig.KVAccessor
// interface.
func (c *Connector) UpdateSpanConfigEntries(
ctx context.Context, toDelete []roachpb.Span, toUpsert []roachpb.SpanConfigEntry,
func (c *Connector) UpdateSpanConfigRecords(
ctx context.Context, toDelete []spanconfig.Target, toUpsert []spanconfig.Record,
) error {
spansToDelete := make([]roachpb.Span, 0, len(toDelete))
for _, toDel := range toDelete {
spansToDelete = append(spansToDelete, roachpb.Span(toDel))
}

entriesToUpsert := spanconfig.RecordsToSpanConfigEntries(toUpsert)

return c.withClient(ctx, func(ctx context.Context, c *client) error {

_, err := c.UpdateSpanConfigs(ctx, &roachpb.UpdateSpanConfigsRequest{
ToDelete: toDelete,
ToUpsert: toUpsert,
ToDelete: spansToDelete,
ToUpsert: entriesToUpsert,
})
return err
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ func TestPreSeedSpanConfigsWrittenWhenActive(t *testing.T) {
tenantSeedSpan := roachpb.Span{Key: tenantPrefix, EndKey: tenantPrefix.Next()}

{
entries, err := scKVAccessor.GetSpanConfigEntriesFor(ctx, []roachpb.Span{
tenantSpan,
records, err := scKVAccessor.GetSpanConfigRecords(ctx, []spanconfig.Target{
spanconfig.MakeSpanTarget(tenantSpan),
})
require.NoError(t, err)
require.Len(t, entries, 1)
require.Equal(t, entries[0].Span, tenantSeedSpan)
require.Len(t, records, 1)
require.Equal(t, *records[0].Target.GetSpan(), tenantSeedSpan)
}
}

Expand Down Expand Up @@ -106,7 +106,9 @@ func TestSeedTenantSpanConfigs(t *testing.T) {

tenantID := roachpb.MakeTenantID(10)
tenantPrefix := keys.MakeTenantPrefix(tenantID)
tenantSpan := roachpb.Span{Key: tenantPrefix, EndKey: tenantPrefix.PrefixEnd()}
tenantTarget := spanconfig.MakeSpanTarget(
roachpb.Span{Key: tenantPrefix, EndKey: tenantPrefix.PrefixEnd()},
)
tenantSeedSpan := roachpb.Span{Key: tenantPrefix, EndKey: tenantPrefix.Next()}
{
_, err := ts.StartTenant(ctx, base.TestTenantArgs{
Expand All @@ -123,31 +125,31 @@ func TestSeedTenantSpanConfigs(t *testing.T) {
require.NoError(t, err)
}

{ // Ensure that no span config entries are to be found
entries, err := scKVAccessor.GetSpanConfigEntriesFor(ctx, []roachpb.Span{
tenantSpan,
{ // Ensure that no span config records are to be found
records, err := scKVAccessor.GetSpanConfigRecords(ctx, []spanconfig.Target{
tenantTarget,
})
require.NoError(t, err)
require.Empty(t, entries)
require.Empty(t, records)
}

tdb.Exec(t,
"SET CLUSTER SETTING version = $1",
clusterversion.ByKey(clusterversion.SeedTenantSpanConfigs).String(),
)

{ // Ensure that the tenant now has a span config entry.
entries, err := scKVAccessor.GetSpanConfigEntriesFor(ctx, []roachpb.Span{
tenantSpan,
{ // Ensure that the tenant now has a span config record.
records, err := scKVAccessor.GetSpanConfigRecords(ctx, []spanconfig.Target{
tenantTarget,
})
require.NoError(t, err)
require.Len(t, entries, 1)
require.Equal(t, entries[0].Span, tenantSeedSpan)
require.Len(t, records, 1)
require.Equal(t, *records[0].Target.GetSpan(), tenantSeedSpan)
}
}

// TestSeedTenantSpanConfigsWithExistingEntry tests that the migration ignores
// tenants with existing span config entries.
// tenants with existing span config records.
func TestSeedTenantSpanConfigsWithExistingEntry(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -173,7 +175,9 @@ func TestSeedTenantSpanConfigsWithExistingEntry(t *testing.T) {

tenantID := roachpb.MakeTenantID(10)
tenantPrefix := keys.MakeTenantPrefix(tenantID)
tenantSpan := roachpb.Span{Key: tenantPrefix, EndKey: tenantPrefix.PrefixEnd()}
tenantTarget := spanconfig.MakeSpanTarget(
roachpb.Span{Key: tenantPrefix, EndKey: tenantPrefix.PrefixEnd()},
)
tenantSeedSpan := roachpb.Span{Key: tenantPrefix, EndKey: tenantPrefix.Next()}
{
_, err := ts.StartTenant(ctx, base.TestTenantArgs{
Expand All @@ -190,13 +194,13 @@ func TestSeedTenantSpanConfigsWithExistingEntry(t *testing.T) {
require.NoError(t, err)
}

{ // Ensure that the tenant already has a span config entry.
entries, err := scKVAccessor.GetSpanConfigEntriesFor(ctx, []roachpb.Span{
tenantSpan,
{ // Ensure that the tenant already has a span config record.
records, err := scKVAccessor.GetSpanConfigRecords(ctx, []spanconfig.Target{
tenantTarget,
})
require.NoError(t, err)
require.Len(t, entries, 1)
require.Equal(t, entries[0].Span, tenantSeedSpan)
require.Len(t, records, 1)
require.Equal(t, *records[0].Target.GetSpan(), tenantSeedSpan)
}

// Ensure the cluster version bump goes through successfully.
Expand All @@ -205,12 +209,12 @@ func TestSeedTenantSpanConfigsWithExistingEntry(t *testing.T) {
clusterversion.ByKey(clusterversion.SeedTenantSpanConfigs).String(),
)

{ // Ensure that the tenant's span config entry stay as it was.
entries, err := scKVAccessor.GetSpanConfigEntriesFor(ctx, []roachpb.Span{
tenantSpan,
{ // Ensure that the tenant's span config record stay as it was.
records, err := scKVAccessor.GetSpanConfigRecords(ctx, []spanconfig.Target{
tenantTarget,
})
require.NoError(t, err)
require.Len(t, entries, 1)
require.Equal(t, entries[0].Span, tenantSeedSpan)
require.Len(t, records, 1)
require.Equal(t, *records[0].Target.GetSpan(), tenantSeedSpan)
}
}
16 changes: 9 additions & 7 deletions pkg/ccl/spanconfigccl/spanconfigreconcilerccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,18 @@ func TestDataDriven(t *testing.T) {
}
return nil
})
entries, err := kvAccessor.GetSpanConfigEntriesFor(ctx, []roachpb.Span{keys.EverythingSpan})
records, err := kvAccessor.GetSpanConfigRecords(
ctx, []spanconfig.Target{spanconfig.MakeSpanTarget(keys.EverythingSpan)},
)
require.NoError(t, err)
sort.Slice(entries, func(i, j int) bool {
return entries[i].Span.Key.Compare(entries[j].Span.Key) < 0
sort.Slice(records, func(i, j int) bool {
return records[i].Target.Less(records[j].Target)
})

lines := make([]string, len(entries))
for i, entry := range entries {
lines[i] = fmt.Sprintf("%-42s %s", entry.Span,
spanconfigtestutils.PrintSpanConfigDiffedAgainstDefaults(entry.Config))
lines := make([]string, len(records))
for i, record := range records {
lines[i] = fmt.Sprintf("%-42s %s", record.Target.GetSpan().String(),
spanconfigtestutils.PrintSpanConfigDiffedAgainstDefaults(record.Config))
}
return spanconfigtestutils.MaybeLimitAndOffset(t, d, "...", lines)

Expand Down
24 changes: 12 additions & 12 deletions pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,31 +159,31 @@ func TestDataDriven(t *testing.T) {
}

sqlTranslator := tenant.SpanConfigSQLTranslator().(spanconfig.SQLTranslator)
entries, _, err := sqlTranslator.Translate(ctx, descpb.IDs{objID})
records, _, err := sqlTranslator.Translate(ctx, descpb.IDs{objID})
require.NoError(t, err)
sort.Slice(entries, func(i, j int) bool {
return entries[i].Span.Key.Compare(entries[j].Span.Key) < 0
sort.Slice(records, func(i, j int) bool {
return records[i].Target.Less(records[j].Target)
})

var output strings.Builder
for _, entry := range entries {
output.WriteString(fmt.Sprintf("%-42s %s\n", entry.Span,
spanconfigtestutils.PrintSpanConfigDiffedAgainstDefaults(entry.Config)))
for _, record := range records {
output.WriteString(fmt.Sprintf("%-42s %s\n", *record.Target.GetSpan(),
spanconfigtestutils.PrintSpanConfigDiffedAgainstDefaults(record.Config)))
}
return output.String()

case "full-translate":
sqlTranslator := tenant.SpanConfigSQLTranslator().(spanconfig.SQLTranslator)
entries, _, err := spanconfig.FullTranslate(ctx, sqlTranslator)
records, _, err := spanconfig.FullTranslate(ctx, sqlTranslator)
require.NoError(t, err)

sort.Slice(entries, func(i, j int) bool {
return entries[i].Span.Key.Compare(entries[j].Span.Key) < 0
sort.Slice(records, func(i, j int) bool {
return records[i].Target.Less(records[j].Target)
})
var output strings.Builder
for _, entry := range entries {
output.WriteString(fmt.Sprintf("%-42s %s\n", entry.Span,
spanconfigtestutils.PrintSpanConfigDiffedAgainstDefaults(entry.Config)))
for _, record := range records {
output.WriteString(fmt.Sprintf("%-42s %s\n", *record.Target.GetSpan(),
spanconfigtestutils.PrintSpanConfigDiffedAgainstDefaults(record.Config)))
}
return output.String()

Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/client_spanconfigs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,14 @@ func TestSpanConfigUpdateAppliedToReplica(t *testing.T) {
span := repl.Desc().RSpan().AsRawSpanWithNoLocals()
conf := roachpb.SpanConfig{NumReplicas: 5, NumVoters: 3}

deleted, added := spanConfigStore.Apply(ctx, false /* dryrun */, spanconfig.Addition(span, conf))
deleted, added := spanConfigStore.Apply(
ctx,
false, /* dryrun */
spanconfig.Addition(spanconfig.MakeSpanTarget(span), conf),
)
require.Empty(t, deleted)
require.Len(t, added, 1)
require.True(t, added[0].Span.Equal(span))
require.True(t, added[0].Target.GetSpan().Equal(span))
require.True(t, added[0].Config.Equal(conf))

require.NotNil(t, mockSubscriber.callback)
Expand Down
1 change: 1 addition & 0 deletions pkg/migration/migrations/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//pkg/roachpb",
"//pkg/security",
"//pkg/server/serverpb",
"//pkg/spanconfig",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
Expand Down
31 changes: 15 additions & 16 deletions pkg/migration/migrations/migrate_span_configs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -69,11 +68,11 @@ func TestEnsureSpanConfigReconciliation(t *testing.T) {
tdb.Exec(t, `SET CLUSTER SETTING spanconfig.reconciliation_job.checkpoint_interval = '100ms'`)

{ // Ensure that no span config entries are found.
entries, err := scKVAccessor.GetSpanConfigEntriesFor(ctx, []roachpb.Span{
keys.EverythingSpan,
records, err := scKVAccessor.GetSpanConfigRecords(ctx, []spanconfig.Target{
spanconfig.MakeSpanTarget(keys.EverythingSpan),
})
require.NoError(t, err)
require.Empty(t, entries)
require.Empty(t, records)
}

// Ensure that upgrade attempts without having reconciled simply fail.
Expand All @@ -91,11 +90,11 @@ func TestEnsureSpanConfigReconciliation(t *testing.T) {
require.False(t, scReconciler.Checkpoint().IsEmpty())

{ // Ensure that the host tenant's span configs are installed.
entries, err := scKVAccessor.GetSpanConfigEntriesFor(ctx, []roachpb.Span{
keys.EverythingSpan,
records, err := scKVAccessor.GetSpanConfigRecords(ctx, []spanconfig.Target{
spanconfig.MakeSpanTarget(keys.EverythingSpan),
})
require.NoError(t, err)
require.NotEmpty(t, entries)
require.NotEmpty(t, records)
}
}

Expand Down Expand Up @@ -154,11 +153,11 @@ func TestEnsureSpanConfigReconciliationMultiNode(t *testing.T) {
tdb.Exec(t, `SET CLUSTER SETTING spanconfig.reconciliation_job.checkpoint_interval = '100ms'`)

{ // Ensure that no span config entries are to be found.
entries, err := scKVAccessor.GetSpanConfigEntriesFor(ctx, []roachpb.Span{
keys.EverythingSpan,
records, err := scKVAccessor.GetSpanConfigRecords(ctx, []spanconfig.Target{
spanconfig.MakeSpanTarget(keys.EverythingSpan),
})
require.NoError(t, err)
require.Empty(t, entries)
require.Empty(t, records)
}

// Ensure that upgrade attempts without having reconciled simply fail.
Expand All @@ -176,11 +175,11 @@ func TestEnsureSpanConfigReconciliationMultiNode(t *testing.T) {
require.False(t, scReconciler.Checkpoint().IsEmpty())

{ // Ensure that the host tenant's span configs are installed.
entries, err := scKVAccessor.GetSpanConfigEntriesFor(ctx, []roachpb.Span{
keys.EverythingSpan,
records, err := scKVAccessor.GetSpanConfigRecords(ctx, []spanconfig.Target{
spanconfig.MakeSpanTarget(keys.EverythingSpan),
})
require.NoError(t, err)
require.NotEmpty(t, entries)
require.NotEmpty(t, records)
}
}

Expand Down Expand Up @@ -220,11 +219,11 @@ func TestEnsureSpanConfigSubscription(t *testing.T) {
tdb.Exec(t, `SET CLUSTER SETTING spanconfig.reconciliation_job.enabled = true`)

testutils.SucceedsSoon(t, func() error {
entries, err := scKVAccessor.GetSpanConfigEntriesFor(ctx, []roachpb.Span{
keys.EverythingSpan,
records, err := scKVAccessor.GetSpanConfigRecords(ctx, []spanconfig.Target{
spanconfig.MakeSpanTarget(keys.EverythingSpan),
})
require.NoError(t, err)
if len(entries) == 0 {
if len(records) == 0 {
return fmt.Errorf("empty global span configuration state")
}
return nil
Expand Down
Loading

0 comments on commit d988d3b

Please sign in to comment.