From ea054067dcf57e13db2bdaba99b703f166a0a97f Mon Sep 17 00:00:00 2001 From: Lidor Carmel Date: Mon, 28 Nov 2022 14:59:52 -0800 Subject: [PATCH] sql: add SHOW TENANT name WITH REPLICATION STATUS Extending SHOW TENANT to also allow showing replication status which contains info such as the protected timestamp on the destination cluster and the source cluster name. Command output right after the destination cluster is created: ``` root@127.0.0.1:26257/defaultdb> show tenant dest5 with replication status; id | name | status | source_tenant_name | source_cluster_uri | replication_job_id | replicated_time | retained_time -----+-------+--------+--------------------+--------------------+--------------------+-----------------+---------------- 7 | dest5 | ADD | NULL | NULL | 819890711267737601 | NULL | NULL (1 row) ``` A bit later we have most stats: ``` root@127.0.0.1:26257/defaultdb> show tenant dest5 with replication status; id | name | status | source_tenant_name | source_cluster_uri | replication_job_id | replicated_time | retained_time -----+-------+--------+--------------------+-------------------------------------------------------+--------------------+-----------------+----------------------------- 7 | dest5 | ADD | src | postgresql://root@127.0.0.1:26257/defaultdb?ssl...crt | 819890711267737601 | NULL | 2022-12-05 23:00:04.516331 (1 row) ``` And a moment later the replication time is populated. Informs: #91261 Epic: CRDB-18749 Release note: None --- docs/generated/sql/bnf/show_tenant_stmt.bnf | 1 + docs/generated/sql/bnf/stmt_block.bnf | 1 + pkg/ccl/streamingccl/streamclient/client.go | 3 + .../streamingccl/streamclient/client_test.go | 9 ++ .../stream_replication_e2e_test.go | 43 +++++++ pkg/sql/BUILD.bazel | 1 + pkg/sql/catalog/colinfo/result_columns.go | 14 +++ pkg/sql/faketreeeval/BUILD.bazel | 2 + pkg/sql/faketreeeval/evalctx.go | 8 ++ pkg/sql/logictest/testdata/logic_test/tenant | 3 + pkg/sql/parser/help_test.go | 1 + pkg/sql/parser/sql.y | 16 ++- pkg/sql/parser/testdata/show | 8 ++ pkg/sql/sem/eval/deps.go | 3 - pkg/sql/sem/tree/show.go | 9 +- pkg/sql/show_tenant.go | 116 ++++++++++++++++-- pkg/sql/tenant.go | 15 --- 17 files changed, 218 insertions(+), 35 deletions(-) diff --git a/docs/generated/sql/bnf/show_tenant_stmt.bnf b/docs/generated/sql/bnf/show_tenant_stmt.bnf index 3865df1085f5..4df2adccb6fa 100644 --- a/docs/generated/sql/bnf/show_tenant_stmt.bnf +++ b/docs/generated/sql/bnf/show_tenant_stmt.bnf @@ -1,2 +1,3 @@ show_tenant_stmt ::= 'SHOW' 'TENANT' name + | 'SHOW' 'TENANT' name 'WITH' 'REPLICATION' 'STATUS' diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index 3ec697741f4c..ca757fe350fd 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -923,6 +923,7 @@ show_tables_stmt ::= show_tenant_stmt ::= 'SHOW' 'TENANT' name + | 'SHOW' 'TENANT' name 'WITH' 'REPLICATION' 'STATUS' show_trace_stmt ::= 'SHOW' opt_compact 'TRACE' 'FOR' 'SESSION' diff --git a/pkg/ccl/streamingccl/streamclient/client.go b/pkg/ccl/streamingccl/streamclient/client.go index 923436ad54d9..6462ed731f07 100644 --- a/pkg/ccl/streamingccl/streamclient/client.go +++ b/pkg/ccl/streamingccl/streamclient/client.go @@ -172,6 +172,9 @@ func NewStreamClient( // GetFirstActiveClient iterates through each provided stream address // and returns the first client it's able to successfully Dial. func GetFirstActiveClient(ctx context.Context, streamAddresses []string) (Client, error) { + if len(streamAddresses) == 0 { + return nil, errors.Newf("failed to connect, no partition addresses") + } var combinedError error = nil for _, address := range streamAddresses { streamAddress := streamingccl.StreamAddress(address) diff --git a/pkg/ccl/streamingccl/streamclient/client_test.go b/pkg/ccl/streamingccl/streamclient/client_test.go index ebb1abf0ca8f..7f8a1c5db78c 100644 --- a/pkg/ccl/streamingccl/streamclient/client_test.go +++ b/pkg/ccl/streamingccl/streamclient/client_test.go @@ -122,6 +122,15 @@ func (t testStreamSubscription) Err() error { return nil } +func TestGetFirstActiveClientEmpty(t *testing.T) { + defer leaktest.AfterTest(t)() + + var streamAddresses []string + activeClient, err := GetFirstActiveClient(context.Background(), streamAddresses) + require.ErrorContains(t, err, "failed to connect, no partition addresses") + require.Nil(t, activeClient) +} + func TestGetFirstActiveClient(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go index 9438c3f2a77b..701ab42b2fcf 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go @@ -44,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -1244,3 +1245,45 @@ func TestTenantReplicationProtectedTimestampManagement(t *testing.T) { }) }) } + +// TODO(lidor): consider rewriting this test as a data driven test when #92609 is merged. +func TestTenantStreamingShowTenant(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + args := defaultTenantStreamingClustersArgs + + c, cleanup := createTenantStreamingClusters(ctx, t, args) + defer cleanup() + testStartTime := timeutil.Now() + producerJobID, ingestionJobID := c.startStreamReplication() + + jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) + jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + highWatermark := c.srcCluster.Server(0).Clock().Now() + c.waitUntilHighWatermark(highWatermark, jobspb.JobID(ingestionJobID)) + + var ( + id int + dest string + status string + source string + sourceUri string + jobId int + maxReplTime time.Time + protectedTime time.Time + ) + row := c.destSysSQL.QueryRow(t, fmt.Sprintf("SHOW TENANT %s WITH REPLICATION STATUS", args.destTenantName)) + row.Scan(&id, &dest, &status, &source, &sourceUri, &jobId, &maxReplTime, &protectedTime) + require.Equal(t, 2, id) + require.Equal(t, "destination", dest) + require.Equal(t, "ADD", status) + require.Equal(t, "source", source) + require.Equal(t, c.srcURL.String(), sourceUri) + require.Equal(t, ingestionJobID, jobId) + require.Less(t, maxReplTime, timeutil.Now()) + require.Less(t, protectedTime, timeutil.Now()) + require.Greater(t, maxReplTime, highWatermark.GoTime()) + require.Greater(t, protectedTime, testStartTime) +} diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index be209bb0b72b..0e1643088ec9 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -304,6 +304,7 @@ go_library( "//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common", "//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs", "//pkg/repstream", + "//pkg/repstream/streampb", "//pkg/roachpb", "//pkg/rpc", "//pkg/rpc/nodedialer", diff --git a/pkg/sql/catalog/colinfo/result_columns.go b/pkg/sql/catalog/colinfo/result_columns.go index cf07937fb39c..d1b7e3e374df 100644 --- a/pkg/sql/catalog/colinfo/result_columns.go +++ b/pkg/sql/catalog/colinfo/result_columns.go @@ -268,3 +268,17 @@ var TenantColumns = ResultColumns{ {Name: "name", Typ: types.String}, {Name: "status", Typ: types.String}, } + +var TenantColumnsWithReplication = ResultColumns{ + {Name: "id", Typ: types.Int}, + {Name: "name", Typ: types.String}, + {Name: "status", Typ: types.String}, + {Name: "source_tenant_name", Typ: types.String}, + {Name: "source_cluster_uri", Typ: types.String}, + {Name: "replication_job_id", Typ: types.Int}, + // The latest fully replicated time. + {Name: "replicated_time", Typ: types.Timestamp}, + // The protected timestamp on the destination cluster, meaning we cannot + // cutover to before this time. + {Name: "retained_time", Typ: types.Timestamp}, +} diff --git a/pkg/sql/faketreeeval/BUILD.bazel b/pkg/sql/faketreeeval/BUILD.bazel index 56ecd1b2a178..81f115711eb7 100644 --- a/pkg/sql/faketreeeval/BUILD.bazel +++ b/pkg/sql/faketreeeval/BUILD.bazel @@ -8,6 +8,8 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/clusterversion", + "//pkg/jobs/jobspb", + "//pkg/repstream/streampb", "//pkg/roachpb", "//pkg/security/username", "//pkg/sql/catalog/catpb", diff --git a/pkg/sql/faketreeeval/evalctx.go b/pkg/sql/faketreeeval/evalctx.go index 4e8eb973068c..59c456529403 100644 --- a/pkg/sql/faketreeeval/evalctx.go +++ b/pkg/sql/faketreeeval/evalctx.go @@ -16,6 +16,8 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" @@ -588,6 +590,12 @@ func (c *DummyTenantOperator) GetTenantInfo( return nil, errors.WithStack(errEvalTenant) } +func (p *DummyTenantOperator) GetTenantReplicationInfo( + ctx context.Context, replicationJobId jobspb.JobID, +) (*streampb.StreamIngestionStats, error) { + return nil, errors.WithStack(errEvalTenant) +} + // DestroyTenant is part of the tree.TenantOperator interface. func (c *DummyTenantOperator) DestroyTenant( ctx context.Context, tenantName roachpb.TenantName, synchronous bool, diff --git a/pkg/sql/logictest/testdata/logic_test/tenant b/pkg/sql/logictest/testdata/logic_test/tenant index a8e2e5ca961c..4199ae10ec1b 100644 --- a/pkg/sql/logictest/testdata/logic_test/tenant +++ b/pkg/sql/logictest/testdata/logic_test/tenant @@ -60,6 +60,9 @@ id name status statement error tenant "seven" does not exist SHOW TENANT seven +statement error tenant two does not have replication info +SHOW TENANT two WITH REPLICATION STATUS + # Test creating a tenant with the same name as an existing tenant, but a unique # ID. statement error tenant with name "three" already exists diff --git a/pkg/sql/parser/help_test.go b/pkg/sql/parser/help_test.go index 7f35ba4cb9ab..51d32d071040 100644 --- a/pkg/sql/parser/help_test.go +++ b/pkg/sql/parser/help_test.go @@ -411,6 +411,7 @@ func TestContextualHelp(t *testing.T) { {`SHOW TABLES FROM blah ??`, `SHOW TABLES`}, {`SHOW TENANT ??`, `SHOW TENANT`}, + {`SHOW TENANT ?? WITH REPLICATION STATUS`, `SHOW TENANT`}, {`SHOW TRANSACTION PRIORITY ??`, `SHOW TRANSACTION`}, {`SHOW TRANSACTION STATUS ??`, `SHOW TRANSACTION`}, diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 523169b32c73..6d420d9e15a2 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -5420,11 +5420,21 @@ backup_kms: // %Help: SHOW TENANT - display tenant information // %Category: Misc -// %Text: SHOW TENANT +// %Text: SHOW TENANT [WITH REPLICATION STATUS] show_tenant_stmt: - SHOW TENANT name + SHOW TENANT d_expr { - $$.val = &tree.ShowTenant{Name: tree.Name($3)} + $$.val = &tree.ShowTenant{ + Name: $3.expr(), + WithReplication: false, + } + } +| SHOW TENANT d_expr WITH REPLICATION STATUS + { + $$.val = &tree.ShowTenant{ + Name: $3.expr(), + WithReplication: true, + } } | SHOW TENANT error // SHOW HELP: SHOW TENANT diff --git a/pkg/sql/parser/testdata/show b/pkg/sql/parser/testdata/show index 8690b3598f8b..5029345cb01b 100644 --- a/pkg/sql/parser/testdata/show +++ b/pkg/sql/parser/testdata/show @@ -1802,3 +1802,11 @@ SHOW TENANT foo SHOW TENANT foo -- fully parenthesized SHOW TENANT foo -- literals removed SHOW TENANT _ -- identifiers removed + +parse +SHOW TENANT foo WITH REPLICATION STATUS +---- +SHOW TENANT foo WITH REPLICATION STATUS +SHOW TENANT foo WITH REPLICATION STATUS -- fully parenthesized +SHOW TENANT foo WITH REPLICATION STATUS -- literals removed +SHOW TENANT _ WITH REPLICATION STATUS -- identifiers removed diff --git a/pkg/sql/sem/eval/deps.go b/pkg/sql/sem/eval/deps.go index a634c35ba655..6abe68f8e177 100644 --- a/pkg/sql/sem/eval/deps.go +++ b/pkg/sql/sem/eval/deps.go @@ -575,9 +575,6 @@ type TenantOperator interface { // the gc job will not wait for a GC ttl. DestroyTenant(ctx context.Context, tenantName roachpb.TenantName, synchronous bool) error - // GetTenantInfo returns information about the specified tenant. - GetTenantInfo(ctx context.Context, tenantName roachpb.TenantName) (*descpb.TenantInfo, error) - // GCTenant attempts to garbage collect a DROP tenant from the system. Upon // success it also removes the tenant record. // It returns an error if the tenant does not exist. diff --git a/pkg/sql/sem/tree/show.go b/pkg/sql/sem/tree/show.go index adc28680c1d8..89d4bf9e1aae 100644 --- a/pkg/sql/sem/tree/show.go +++ b/pkg/sql/sem/tree/show.go @@ -777,13 +777,18 @@ func (node *ShowTableStats) Format(ctx *FmtCtx) { // ShowTenant represents a SHOW TENANT statement. type ShowTenant struct { - Name Name + Name Expr + WithReplication bool } // Format implements the NodeFormatter interface. func (node *ShowTenant) Format(ctx *FmtCtx) { ctx.WriteString("SHOW TENANT ") - ctx.FormatNode(&node.Name) + ctx.FormatNode(node.Name) + + if node.WithReplication { + ctx.WriteString(" WITH REPLICATION STATUS") + } } // ShowHistogram represents a SHOW HISTOGRAM statement. diff --git a/pkg/sql/show_tenant.go b/pkg/sql/show_tenant.go index 7bf2c529e0e3..cbcdcf3f08d3 100644 --- a/pkg/sql/show_tenant.go +++ b/pkg/sql/show_tenant.go @@ -12,33 +12,87 @@ package sql import ( "context" + "time" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" ) type showTenantNode struct { - columns colinfo.ResultColumns - name roachpb.TenantName - info *descpb.TenantInfo - done bool + name tree.Expr + info *descpb.TenantInfo + withReplication bool + replicationInfo *streampb.StreamIngestionStats + protectedTimestamp hlc.Timestamp + columns colinfo.ResultColumns + done bool } func (p *planner) ShowTenant(_ context.Context, n *tree.ShowTenant) (planNode, error) { - return &showTenantNode{ - name: roachpb.TenantName(n.Name), - columns: colinfo.TenantColumns, - }, nil + node := &showTenantNode{ + name: n.Name, + withReplication: n.WithReplication, + } + if n.WithReplication { + node.columns = colinfo.TenantColumnsWithReplication + } else { + node.columns = colinfo.TenantColumns + } + return node, nil } func (n *showTenantNode) startExec(params runParams) error { - info, err := params.p.GetTenantInfo(params.ctx, n.name) + if err := params.p.RequireAdminRole(params.ctx, "show tenant"); err != nil { + return err + } + + if err := rejectIfCantCoordinateMultiTenancy(params.p.execCfg.Codec, "show"); err != nil { + return err + } + + info, err := GetTenantRecordByName(params.ctx, params.p.execCfg, params.p.Txn(), roachpb.TenantName(n.name.String())) if err != nil { return err } n.info = info + if n.withReplication { + if info.TenantReplicationJobID == 0 { + return errors.Newf("tenant %q does not have an active replication job", n.name) + } + mgr, err := params.p.EvalContext().StreamManagerFactory.GetStreamIngestManager(params.ctx) + if err != nil { + return err + } + stats, err := mgr.GetStreamIngestionStats(params.ctx, info.TenantReplicationJobID) + // An error means we don't have stats but we can still present some info, + // therefore we don't fail here. + // TODO(lidor): we need a better signal from GetStreamIngestionStats(), instead of + // ignoring all errors. + if err == nil { + n.replicationInfo = stats + if stats.IngestionDetails.ProtectedTimestampRecordID == nil { + // We don't have the protected timestamp record but we still want to show + // the info we do have about tenant replication status, logging an error + // and continuing. + log.Warningf(params.ctx, "protected timestamp unavailable for tenant %q and job %d", + n.name, info.TenantReplicationJobID) + } else { + ptp := params.p.execCfg.ProtectedTimestampProvider + record, err := ptp.GetRecord(params.ctx, params.p.Txn(), *stats.IngestionDetails.ProtectedTimestampRecordID) + if err != nil { + return err + } + n.protectedTimestamp = record.Timestamp + } + } + } return nil } @@ -50,10 +104,48 @@ func (n *showTenantNode) Next(_ runParams) (bool, error) { return true, nil } func (n *showTenantNode) Values() tree.Datums { + tenantId := tree.NewDInt(tree.DInt(n.info.ID)) + tenantName := tree.NewDString(string(n.info.Name)) + tenantStatus := tree.NewDString(n.info.State.String()) + if !n.withReplication { + // This is a simple 'SHOW TENANT name'. + return tree.Datums{ + tenantId, + tenantName, + tenantStatus, + } + } + + // This is a 'SHOW TENANT name WITH REPLICATION STATUS' command. + sourceTenantName := tree.DNull + sourceClusterUri := tree.DNull + replicationJobId := tree.NewDInt(tree.DInt(n.info.TenantReplicationJobID)) + replicatedTimestamp := tree.DNull + retainedTimestamp := tree.DNull + + // New replication clusters don't have replicationInfo initially. + if n.replicationInfo != nil { + sourceTenantName = tree.NewDString(string(n.replicationInfo.IngestionDetails.SourceTenantName)) + sourceClusterUri = tree.NewDString(n.replicationInfo.IngestionDetails.StreamAddress) + if n.replicationInfo.ReplicationLagInfo != nil { + minIngested := n.replicationInfo.ReplicationLagInfo.MinIngestedTimestamp.WallTime + // The latest fully replicated time. + replicatedTimestamp, _ = tree.MakeDTimestamp(timeutil.Unix(0, minIngested), time.Nanosecond) + } + // The protected timestamp on the destination cluster. + retainedTimestamp, _ = tree.MakeDTimestamp(timeutil.Unix(0, n.protectedTimestamp.WallTime), time.Nanosecond) + } + return tree.Datums{ - tree.NewDInt(tree.DInt(n.info.ID)), - tree.NewDString(string(n.info.Name)), - tree.NewDString(n.info.State.String()), + tenantId, + tenantName, + tenantStatus, + sourceTenantName, + sourceClusterUri, + replicationJobId, + replicatedTimestamp, + retainedTimestamp, } } + func (n *showTenantNode) Close(_ context.Context) {} diff --git a/pkg/sql/tenant.go b/pkg/sql/tenant.go index d31620590fc9..f9a6b4c1cd73 100644 --- a/pkg/sql/tenant.go +++ b/pkg/sql/tenant.go @@ -819,18 +819,3 @@ WHERE id = $1`, tenantID, tenantName); err != nil { return nil } - -// GetTenantInfo implements the tree.TenantOperator interface. -func (p *planner) GetTenantInfo( - ctx context.Context, tenantName roachpb.TenantName, -) (*descpb.TenantInfo, error) { - if err := p.RequireAdminRole(ctx, "show tenant"); err != nil { - return nil, err - } - - if err := rejectIfCantCoordinateMultiTenancy(p.execCfg.Codec, "show"); err != nil { - return nil, err - } - - return GetTenantRecordByName(ctx, p.execCfg, p.Txn(), tenantName) -}