Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
84751: sql,externalconn: introduce DROP EXTERNAL CONNECTION r=benbardin a=adityamaru

This change introduces `DROP EXTERNAL CONNECTION` that can be
used to drop an existing ExternalConnection object from the
`system.external_connections` table.

Fixes: cockroachdb#84226

Release note (sql change): `DROP EXTERNAL CONNECTION` can be
used to drop a previously created External Connection object.

84872: importer: clean up importResumer.dropTables r=adityamaru a=msbutler

Previously, importResumer.DropTables() assumed that IMPORT INTO could act upon
multiple tables, which isn't actually the case, leading to overly complex code.
This pr is a simple refactor, in preparation for more import rollback work
in cockroachdb#76722 and cockroachdb#70428.

Release note: none

Co-authored-by: Aditya Maru <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
3 people committed Jul 22, 2022
3 parents 3adb070 + c92fd96 + fbaf116 commit 4ce560f
Show file tree
Hide file tree
Showing 19 changed files with 260 additions and 36 deletions.
1 change: 1 addition & 0 deletions docs/generated/sql/bnf/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ FILES = [
"drop_constraint",
"drop_database",
"drop_ddl_stmt",
"drop_external_connection_stmt",
"drop_index",
"drop_owned_by_stmt",
"drop_role_stmt",
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/sql/bnf/drop_external_connection_stmt.bnf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
drop_external_connection_stmt ::=
'DROP' 'EXTERNAL' 'CONNECTION' string_or_placeholder
1 change: 1 addition & 0 deletions docs/generated/sql/bnf/drop_stmt.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ drop_stmt ::=
| drop_type_stmt
| drop_role_stmt
| drop_schedule_stmt
| drop_external_connection_stmt
4 changes: 4 additions & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ drop_stmt ::=
drop_ddl_stmt
| drop_role_stmt
| drop_schedule_stmt
| drop_external_connection_stmt

explain_stmt ::=
'EXPLAIN' explainable_stmt
Expand Down Expand Up @@ -605,6 +606,9 @@ drop_schedule_stmt ::=
'DROP' 'SCHEDULE' a_expr
| 'DROP' 'SCHEDULES' select_stmt

drop_external_connection_stmt ::=
'DROP' 'EXTERNAL' 'CONNECTION' string_or_placeholder

explainable_stmt ::=
preparable_stmt
| execute_stmt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,24 @@ inspect-system-table
bar123 STORAGE {"nodelocal": {"cfg": {"nodeId": 1, "path": "/baz"}}}
foo STORAGE {"nodelocal": {"cfg": {"nodeId": 1, "path": "/foo/bar"}}}

# Drop an External Connection that does not exist.
exec-sql
DROP EXTERNAL CONNECTION baz;
----

exec-sql
DROP EXTERNAL CONNECTION bar123;
----

inspect-system-table
----
foo STORAGE {"nodelocal": {"cfg": {"nodeId": 1, "path": "/foo/bar"}}}

exec-sql
DROP EXTERNAL CONNECTION foo;
----

inspect-system-table
----

subtest end
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,24 @@ inspect-system-table
bar123 STORAGE {"nodelocal": {"cfg": {"nodeId": 1, "path": "/baz"}}}
foo STORAGE {"nodelocal": {"cfg": {"nodeId": 1, "path": "/foo/bar"}}}

# Drop an External Connection that does not exist.
exec-sql
DROP EXTERNAL CONNECTION baz;
----

exec-sql
DROP EXTERNAL CONNECTION bar123;
----

inspect-system-table
----
foo STORAGE {"nodelocal": {"cfg": {"nodeId": 1, "path": "/foo/bar"}}}

exec-sql
DROP EXTERNAL CONNECTION foo;
----

inspect-system-table
----

subtest end
1 change: 1 addition & 0 deletions pkg/gen/bnf.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ BNF_SRCS = [
"//docs/generated/sql/bnf:drop_constraint.bnf",
"//docs/generated/sql/bnf:drop_database.bnf",
"//docs/generated/sql/bnf:drop_ddl_stmt.bnf",
"//docs/generated/sql/bnf:drop_external_connection_stmt.bnf",
"//docs/generated/sql/bnf:drop_index.bnf",
"//docs/generated/sql/bnf:drop_owned_by_stmt.bnf",
"//docs/generated/sql/bnf:drop_role_stmt.bnf",
Expand Down
1 change: 1 addition & 0 deletions pkg/gen/diagrams.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ DIAGRAMS_SRCS = [
"//docs/generated/sql/bnf:drop_constraint.html",
"//docs/generated/sql/bnf:drop_database.html",
"//docs/generated/sql/bnf:drop_ddl.html",
"//docs/generated/sql/bnf:drop_external_connection.html",
"//docs/generated/sql/bnf:drop_index.html",
"//docs/generated/sql/bnf:drop_owned_by.html",
"//docs/generated/sql/bnf:drop_role.html",
Expand Down
1 change: 1 addition & 0 deletions pkg/gen/docs.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ DOCS_SRCS = [
"//docs/generated/sql/bnf:drop_constraint.bnf",
"//docs/generated/sql/bnf:drop_database.bnf",
"//docs/generated/sql/bnf:drop_ddl_stmt.bnf",
"//docs/generated/sql/bnf:drop_external_connection_stmt.bnf",
"//docs/generated/sql/bnf:drop_index.bnf",
"//docs/generated/sql/bnf:drop_owned_by_stmt.bnf",
"//docs/generated/sql/bnf:drop_role_stmt.bnf",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ go_library(
"doc.go",
"drop_cascade.go",
"drop_database.go",
"drop_external_connection.go",
"drop_index.go",
"drop_owned_by.go",
"drop_role.go",
Expand Down
97 changes: 97 additions & 0 deletions pkg/sql/drop_external_connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql

import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/errors"
)

const dropExternalConnectionOp = "DROP EXTERNAL CONNECTION"

type dropExternalConnectionNode struct {
n *tree.DropExternalConnection
}

// DropExternalConnection represents a DROP EXTERNAL CONNECTION statement.
func (p *planner) DropExternalConnection(
_ context.Context, n *tree.DropExternalConnection,
) (planNode, error) {
return &dropExternalConnectionNode{n: n}, nil
}

func (c *dropExternalConnectionNode) startExec(params runParams) error {
return params.p.dropExternalConnection(params, c.n)
}

type dropExternalConnectionEval struct {
externalConnectionName func() (string, error)
}

func (p *planner) makeDropExternalConnectionEval(
ctx context.Context, n *tree.DropExternalConnection,
) (*dropExternalConnectionEval, error) {
var err error
eval := &dropExternalConnectionEval{}
eval.externalConnectionName, err = p.TypeAsString(ctx, n.ConnectionLabel, externalConnectionOp)
if err != nil {
return nil, err
}
return eval, err
}

func (p *planner) dropExternalConnection(params runParams, n *tree.DropExternalConnection) error {
// TODO(adityamaru): Check that the user has `DROP` privileges on the External
// Connection once we add support for it. Remove admin only check.
hasAdmin, err := params.p.HasAdminRole(params.ctx)
if err != nil {
return err
}
if !hasAdmin {
return pgerror.New(
pgcode.InsufficientPrivilege,
"only users with the admin role are allowed to DROP EXTERNAL CONNECTION")
}

// TODO(adityamaru): Add some metrics to track DROP EXTERNAL CONNECTION
// usage.

eval, err := p.makeDropExternalConnectionEval(params.ctx, n)
if err != nil {
return err
}

name, err := eval.externalConnectionName()
if err != nil {
return errors.Wrap(err, "failed to resolve External Connection name")
}

if _ /* rows */, err = params.extendedEvalCtx.ExecCfg.InternalExecutor.ExecEx(
params.ctx,
dropExternalConnectionOp,
params.p.Txn(),
sessiondata.InternalExecutorOverride{User: params.p.User()},
`DELETE FROM system.external_connections WHERE connection_name = $1`, name,
); err != nil {
return errors.Wrapf(err, "failed to delete external connection")
}

return nil
}

func (c *dropExternalConnectionNode) Next(_ runParams) (bool, error) { return false, nil }
func (c *dropExternalConnectionNode) Values() tree.Datums { return nil }
func (c *dropExternalConnectionNode) Close(_ context.Context) {}
89 changes: 53 additions & 36 deletions pkg/sql/importer/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1453,7 +1453,6 @@ func (r *importResumer) dropTables(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, execCfg *sql.ExecutorConfig,
) error {
details := r.job.Details().(jobspb.ImportDetails)
dropTime := int64(1)

// If the prepare step of the import job was not completed then the
// descriptors do not need to be rolled back as the txn updating them never
Expand All @@ -1462,31 +1461,33 @@ func (r *importResumer) dropTables(
return nil
}

var revert []catalog.TableDescriptor
var empty []catalog.TableDescriptor
var tableWasEmpty bool
var intoTable catalog.TableDescriptor
for _, tbl := range details.Tables {
if !tbl.IsNew {
desc, err := descsCol.GetMutableTableVersionByID(ctx, tbl.Desc.ID, txn)
if err != nil {
return err
}
imm := desc.ImmutableCopy().(catalog.TableDescriptor)
if tbl.WasEmpty {
empty = append(empty, imm)
} else {
revert = append(revert, imm)
}
intoTable = desc.ImmutableCopy().(catalog.TableDescriptor)
tableWasEmpty = tbl.WasEmpty
break
}
}

if intoTable == nil {
// Rolling back IMPORT (i.e. not IMPORT INTO), where for all tables tbl.IsNew==true
return r.dropNewTables(ctx, txn, descsCol, execCfg)
}
// Clear table data from a rolling back IMPORT INTO cmd
//
// The walltime can be 0 if there is a failure between publishing the tables
// as OFFLINE and then choosing a ingestion timestamp. This might happen
// while waiting for the descriptor version to propagate across the cluster
// for example.
//
// In this case, we don't want to rollback the data since data ingestion has
// not yet begun (since we have not chosen a timestamp at which to ingest.)
if details.Walltime != 0 && len(revert) > 0 {
if details.Walltime != 0 && !tableWasEmpty {
// NB: if a revert fails it will abort the rest of this failure txn, which is
// also what brings tables back online. We _could_ change the error handling
// or just move the revert into Resume()'s error return path, however it isn't
Expand All @@ -1502,23 +1503,44 @@ func (r *importResumer) dropTables(
// writes, so even if GC has run it would not have GC'ed any keys to which
// we need to revert, so we can safely ignore the target-time GC check.
const ignoreGC = true
if err := sql.RevertTables(ctx, txn.DB(), execCfg, revert, ts, ignoreGC, sql.RevertTableDefaultBatchSize); err != nil {
if err := sql.RevertTables(ctx, txn.DB(), execCfg, []catalog.TableDescriptor{intoTable}, ts, ignoreGC,
sql.RevertTableDefaultBatchSize); err != nil {
return errors.Wrap(err, "rolling back partially completed IMPORT")
}
}

for i := range empty {
} else if tableWasEmpty {
// Set a DropTime on the table descriptor to differentiate it from an
// older-format (v1.1) descriptor. This enables ClearTableData to use a
// RangeClear for faster data removal, rather than removing by chunks.
empty[i].TableDesc().DropTime = dropTime
intoTable.TableDesc().DropTime = int64(1)
if err := gcjob.ClearTableData(
ctx, execCfg.DB, execCfg.DistSender, execCfg.Codec, &execCfg.Settings.SV, empty[i],
ctx, execCfg.DB, execCfg.DistSender, execCfg.Codec, &execCfg.Settings.SV, intoTable,
); err != nil {
return errors.Wrapf(err, "clearing data for table %d", empty[i].GetID())
return errors.Wrapf(err, "clearing data for table %d", intoTable.GetID())
}
}

// Bring the IMPORT INTO table back online
b := txn.NewBatch()
intoDesc, err := descsCol.GetMutableTableVersionByID(ctx, intoTable.GetID(), txn)
if err != nil {
return err
}
intoDesc.SetPublic()
const kvTrace = false
if err := descsCol.WriteDescToBatch(ctx, kvTrace, intoDesc, b); err != nil {
return err
}
return errors.Wrap(txn.Run(ctx, b), "putting IMPORT INTO table back online")
}

// dropNewTables drops the tables that were created as part of an IMPORT and
// queues a GC job to clean up the dropped descriptors.
func (r *importResumer) dropNewTables(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, execCfg *sql.ExecutorConfig,
) error {
details := r.job.Details().(jobspb.ImportDetails)
dropTime := int64(1)

b := txn.NewBatch()
tablesToGC := make([]descpb.ID, 0, len(details.Tables))
toWrite := make([]*tabledesc.Mutable, 0, len(details.Tables))
Expand All @@ -1527,22 +1549,18 @@ func (r *importResumer) dropTables(
if err != nil {
return err
}
if tbl.IsNew {
newTableDesc.SetDropped()
// If the DropTime if set, a table uses RangeClear for fast data removal. This
// operation starts at DropTime + the GC TTL. If we used now() here, it would
// not clean up data until the TTL from the time of the error. Instead, use 1
// (that is, 1ns past the epoch) to allow this to be cleaned up as soon as
// possible. This is safe since the table data was never visible to users,
// and so we don't need to preserve MVCC semantics.
newTableDesc.DropTime = dropTime
b.Del(catalogkeys.EncodeNameKey(execCfg.Codec, newTableDesc))
tablesToGC = append(tablesToGC, newTableDesc.ID)
descsCol.AddDeletedDescriptor(newTableDesc.GetID())
} else {
// IMPORT did not create this table, so we should not drop it.
newTableDesc.SetPublic()
}
newTableDesc.SetDropped()
// If the DropTime if set, a table uses RangeClear for fast data removal. This
// operation starts at DropTime + the GC TTL. If we used now() here, it would
// not clean up data until the TTL from the time of the error. Instead, use 1
// (that is, 1ns past the epoch) to allow this to be cleaned up as soon as
// possible. This is safe since the table data was never visible to users,
// and so we don't need to preserve MVCC semantics.
newTableDesc.DropTime = dropTime
b.Del(catalogkeys.EncodeNameKey(execCfg.Codec, newTableDesc))
tablesToGC = append(tablesToGC, newTableDesc.ID)
descsCol.AddDeletedDescriptor(newTableDesc.GetID())

// Accumulate the changes before adding them to the batch to avoid
// making any table invalid before having read it.
toWrite = append(toWrite, newTableDesc)
Expand Down Expand Up @@ -1574,8 +1592,7 @@ func (r *importResumer) dropTables(
ctx, gcJobRecord, execCfg.JobRegistry.MakeJobID(), txn); err != nil {
return err
}

return errors.Wrap(txn.Run(ctx, b), "rolling back tables")
return errors.Wrap(txn.Run(ctx, b), "rolling back IMPORT tables")
}

func (r *importResumer) dropSchemas(
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/opaque.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ func planOpaque(ctx context.Context, p *planner, stmt tree.Statement) (planNode,
return p.CreateExtension(ctx, n)
case *tree.CreateExternalConnection:
return p.CreateExternalConnection(ctx, n)
case *tree.DropExternalConnection:
return p.DropExternalConnection(ctx, n)
case *tree.Deallocate:
return p.Deallocate(ctx, n)
case *tree.DeclareCursor:
Expand Down Expand Up @@ -297,6 +299,7 @@ func init() {
&tree.DeclareCursor{},
&tree.Discard{},
&tree.DropDatabase{},
&tree.DropExternalConnection{},
&tree.DropIndex{},
&tree.DropOwnedBy{},
&tree.DropRole{},
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/parser/help_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ func TestContextualHelp(t *testing.T) {
{`DROP INDEX blah, ??`, `DROP INDEX`},
{`DROP INDEX blah@blih ??`, `DROP INDEX`},

{`DROP EXTERNAL CONNECTION blah ??`, `DROP EXTERNAL CONNECTION`},

{`DROP USER ??`, `DROP ROLE`},
{`DROP USER IF ??`, `DROP ROLE`},
{`DROP USER IF EXISTS bluh ??`, `DROP ROLE`},
Expand Down
Loading

0 comments on commit 4ce560f

Please sign in to comment.