Skip to content

Commit

Permalink
kv,sql,importccl,backupccl: only SetSystemConfigTrigger as the system…
Browse files Browse the repository at this point in the history
… tenant

We need to call SetSystemConfigTrigger when writing to the system config
of the system tenant. The system tenant's system config is gossipped to KV
nodes for the purpose of determining split points and zone configuration.

When tables or zone configurations are modified in secondary tenants, the
system zone config will not be modified and there is no reason to anchor the
transaction to the tenant-scoped system config. Worse yet, the
`SetSystemConfigTrigger` call is totally unaware of the existence of tenants so
such calls would ultimately violate the access of the tenant's KV client.

To deal with this, we add a boolean parameter to calls to `SetSystemConfigSpan`
indicating whether the current client is the system tenant.

Fixes #48184

Release note: None
  • Loading branch information
ajwerner committed Jul 20, 2020
1 parent febcd4f commit 9547502
Show file tree
Hide file tree
Showing 18 changed files with 38 additions and 29 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1318,7 +1318,7 @@ func (r *restoreResumer) dropTables(ctx context.Context, jr *jobs.Registry, txn
}

// Needed to trigger the schema change manager.
if err := txn.SetSystemConfigTrigger(); err != nil {
if err := txn.SetSystemConfigTrigger(r.execCfg.Codec.ForSystemTenant()); err != nil {
return err
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ func prepareExistingTableDescForIngestion(
importing.OfflineReason = "importing"
// TODO(dt): de-validate all the FKs.

if err := txn.SetSystemConfigTrigger(); err != nil {
if err := txn.SetSystemConfigTrigger(execCfg.Codec.ForSystemTenant()); err != nil {
return nil, err
}

Expand Down Expand Up @@ -1212,7 +1212,7 @@ func (r *importResumer) publishTables(ctx context.Context, execCfg *sql.Executor

// Needed to trigger the schema change manager.
err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetSystemConfigTrigger(); err != nil {
if err := txn.SetSystemConfigTrigger(execCfg.Codec.ForSystemTenant()); err != nil {
return err
}
b := txn.NewBatch()
Expand Down Expand Up @@ -1331,7 +1331,7 @@ func (r *importResumer) dropTables(
details := r.job.Details().(jobspb.ImportDetails)

// Needed to trigger the schema change manager.
if err := txn.SetSystemConfigTrigger(); err != nil {
if err := txn.SetSystemConfigTrigger(execCfg.Codec.ForSystemTenant()); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestGetDescriptorFromDB(t *testing.T) {
bobDesc := sqlbase.NewInitialDatabaseDescriptor(9999, "bob")

err := kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetSystemConfigTrigger(); err != nil {
if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil {
return err
}
batch := txn.NewBatch()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestRangefeedWorksOnSystemRangesUnconditionally(t *testing.T) {
junkDescriptorKey := sqlbase.MakeDescMetadataKey(keys.SystemSQLCodec, junkDescriptorID)
junkDescriptor := sqlbase.NewInitialDatabaseDescriptor(junkDescriptorID, "junk")
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetSystemConfigTrigger(); err != nil {
if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil {
return err
}
return txn.Put(ctx, junkDescriptorKey, junkDescriptor.DescriptorProto())
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func TestStoreRangeSplitAtTablePrefix(t *testing.T) {

// Update SystemConfig to trigger gossip.
if err := store.DB().Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetSystemConfigTrigger(); err != nil {
if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil {
return err
}
// We don't care about the values, just the keys.
Expand Down Expand Up @@ -1302,7 +1302,7 @@ func TestStoreRangeSystemSplits(t *testing.T) {
// - the write triggers a SystemConfig update and gossip
// We should end up with splits at each user table prefix.
if err := store.DB().Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetSystemConfigTrigger(); err != nil {
if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil {
return err
}
descTablePrefix := keys.SystemSQLCodec.TablePrefix(keys.DescriptorTableID)
Expand Down Expand Up @@ -1379,7 +1379,7 @@ func TestStoreRangeSystemSplits(t *testing.T) {
userTableMax += 3
exceptions = map[int]struct{}{userTableMax - 1: {}, userTableMax - 2: {}}
if err := store.DB().Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetSystemConfigTrigger(); err != nil {
if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil {
return err
}
// This time, only write the last table descriptor. Splits only occur for
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,13 @@ func TestGossipAfterAbortOfSystemConfigTransactionAfterFailureDueToIntents(t *te
txA := db.NewTxn(ctx, "a")
txB := db.NewTxn(ctx, "b")

require.NoError(t, txA.SetSystemConfigTrigger())
require.NoError(t, txA.SetSystemConfigTrigger(true /* forSystemTenant */))
db1000 := sqlbase.NewInitialDatabaseDescriptor(1000, "1000")
require.NoError(t, txA.Put(ctx,
keys.SystemSQLCodec.DescMetadataKey(1000),
db1000.DescriptorProto()))

require.NoError(t, txB.SetSystemConfigTrigger())
require.NoError(t, txB.SetSystemConfigTrigger(true /* forSystemTenant */))
db2000 := sqlbase.NewInitialDatabaseDescriptor(2000, "2000")
require.NoError(t, txB.Put(ctx,
keys.SystemSQLCodec.DescMetadataKey(2000),
Expand Down
12 changes: 10 additions & 2 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,19 @@ func (txn *Txn) ProvisionalCommitTimestamp() hlc.Timestamp {
}

// SetSystemConfigTrigger sets the system db trigger to true on this transaction.
// This will impact the EndTxnRequest.
func (txn *Txn) SetSystemConfigTrigger() error {
// This will impact the EndTxnRequest. Note that this method takes a boolean
// argument indicating whether this transaction is intended for the system
// tenant. Only transactions for the system tenant need to set the system config
// trigger which is used to gossip updates to the system config to KV servers.
// The KV servers need access to an up-to-date system config in order to
// determine split points and zone configurations.
func (txn *Txn) SetSystemConfigTrigger(forSystemTenant bool) error {
if txn.typ != RootTxn {
return errors.AssertionFailedf("SetSystemConfigTrigger() called on leaf txn")
}
if !forSystemTenant {
return nil
}

txn.mu.Lock()
defer txn.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,6 @@ func TestAnchoringErrorNoTrigger(t *testing.T) {
}),
clock)
txn := NewTxn(ctx, db, 0 /* gatewayNodeID */)
require.EqualError(t, txn.SetSystemConfigTrigger(), "unimplemented")
require.EqualError(t, txn.SetSystemConfigTrigger(true /* forSystemTenant */), "unimplemented")
require.False(t, txn.systemConfigTrigger)
}
2 changes: 1 addition & 1 deletion pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ func TestSystemConfigGossip(t *testing.T) {

// Write a system key with the transaction marked as having a Gossip trigger.
if err := kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetSystemConfigTrigger(); err != nil {
if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil {
return err
}
return txn.Put(ctx, key, valAt(2))
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,8 @@ func (m *Manager) PublishMultiple(
versions[id] = descsToUpdate[id].GetVersion()
}

// This is to write the updated descriptors.
if err := txn.SetSystemConfigTrigger(); err != nil {
// This is to write the updated descriptors if we're the system tenant.
if err := txn.SetSystemConfigTrigger(m.storage.codec.ForSystemTenant()); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/crdb_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ CREATE TABLE t.test (k INT);

// Write the modified descriptor.
if err := kvDB.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetSystemConfigTrigger(); err != nil {
if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil {
return err
}
return txn.Put(ctx, sqlbase.MakeDescMetadataKey(keys.SystemSQLCodec, tableDesc.ID), tableDesc.DescriptorProto())
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ func writeTableDesc(
ctx context.Context, db *kv.DB, tableDesc *sqlbase.MutableTableDescriptor,
) error {
return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetSystemConfigTrigger(); err != nil {
if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil {
return err
}
tableDesc.ModificationTime = txn.CommitTimestamp()
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/gcjob/descriptor_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func dropTableDesc(
) error {
log.Infof(ctx, "removing table descriptor for table %d", tableDesc.ID)
return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetSystemConfigTrigger(); err != nil {
if err := txn.SetSystemConfigTrigger(codec.ForSystemTenant()); err != nil {
return err
}
b := &kv.Batch{}
Expand Down Expand Up @@ -90,7 +90,7 @@ func deleteDatabaseZoneConfig(
return nil
}
return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetSystemConfigTrigger(); err != nil {
if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil {
return err
}
b := &kv.Batch{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (b *Builder) buildRelational(e memo.RelExpr) (execPlan, error) {
// `BEGIN; INSERT INTO ...; CREATE TABLE IF NOT EXISTS ...; COMMIT;`
// where the table already exists. This will generate some false schema
// cache refreshes, but that's expected to be quite rare in practice.
if err := b.evalCtx.Txn.SetSystemConfigTrigger(); err != nil {
if err := b.evalCtx.Txn.SetSystemConfigTrigger(b.evalCtx.Codec.ForSystemTenant()); err != nil {
return execPlan{}, errors.WithSecondaryError(
unimplemented.NewWithIssuef(26508,
"schema change statement cannot follow a statement that has written in the same transaction"),
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,8 @@ func (p *planner) maybeSetSystemConfig(id sqlbase.ID) error {
return nil
}
// Mark transaction as operating on the system DB.
return p.txn.SetSystemConfigTrigger()
// Only the system tenant marks the SystemConfigTrigger.
return p.txn.SetSystemConfigTrigger(p.execCfg.Codec.ForSystemTenant())
}

// planFlags is used throughout the planning code to keep track of various
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/schema_change_migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func migrateJobToOldFormat(

// Write the table descriptor back.
return kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetSystemConfigTrigger(); err != nil {
if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil {
return err
}
return kvDB.Put(ctx, sqlbase.MakeDescMetadataKey(
Expand Down Expand Up @@ -443,7 +443,7 @@ func migrateGCJobToOldFormat(

// Write the table descriptor back.
return kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetSystemConfigTrigger(); err != nil {
if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil {
return err
}
return kvDB.Put(ctx, sqlbase.MakeDescMetadataKey(
Expand Down Expand Up @@ -882,7 +882,7 @@ func TestGCJobCreated(t *testing.T) {
tableDesc.Version++
tableDesc.DropTime = 1
if err := kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetSystemConfigTrigger(); err != nil {
if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil {
return err
}
if err := sqlbase.RemoveObjectNamespaceEntry(
Expand Down Expand Up @@ -965,7 +965,7 @@ func TestMissingMutation(t *testing.T) {
tableDesc.Mutations = nil
require.NoError(
t, kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetSystemConfigTrigger(); err != nil {
if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil {
return err
}
return kvDB.Put(ctx, sqlbase.MakeDescMetadataKey(
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/zone_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func forceNewConfig(t testing.TB, s *server.TestServer) *config.SystemConfig {

// This needs to be done in a transaction with the system trigger set.
if err := s.DB().Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetSystemConfigTrigger(); err != nil {
if err := txn.SetSystemConfigTrigger(true /* forSystemTenant */); err != nil {
return err
}
return txn.Put(ctx, configDescKey, configDesc)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sqlmigrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -1488,7 +1488,7 @@ func createSystemTable(ctx context.Context, r runner, desc sqlbase.TableDescript
tKey := sqlbase.MakePublicTableNameKey(ctx, r.settings, desc.GetParentID(), desc.GetName())
b.CPut(tKey.Key(r.codec), desc.GetID(), nil)
b.CPut(sqlbase.MakeDescMetadataKey(r.codec, desc.GetID()), desc.DescriptorProto(), nil)
if err := txn.SetSystemConfigTrigger(); err != nil {
if err := txn.SetSystemConfigTrigger(r.codec.ForSystemTenant()); err != nil {
return err
}
return txn.Run(ctx, b)
Expand Down

0 comments on commit 9547502

Please sign in to comment.