diff --git a/docs/generated/sql/bnf/show_tenant_stmt.bnf b/docs/generated/sql/bnf/show_tenant_stmt.bnf index 3865df1085f5..39a85d8defde 100644 --- a/docs/generated/sql/bnf/show_tenant_stmt.bnf +++ b/docs/generated/sql/bnf/show_tenant_stmt.bnf @@ -1,2 +1,3 @@ show_tenant_stmt ::= - 'SHOW' 'TENANT' name + 'SHOW' 'TENANT' d_expr + | 'SHOW' 'TENANT' d_expr 'WITH' 'REPLICATION' 'STATUS' diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index 8b51f63ca219..b981ae3e5259 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -922,7 +922,8 @@ show_tables_stmt ::= | 'SHOW' 'TABLES' with_comment show_tenant_stmt ::= - 'SHOW' 'TENANT' name + 'SHOW' 'TENANT' d_expr + | 'SHOW' 'TENANT' d_expr 'WITH' 'REPLICATION' 'STATUS' show_trace_stmt ::= 'SHOW' opt_compact 'TRACE' 'FOR' 'SESSION' diff --git a/pkg/ccl/streamingccl/streamclient/client.go b/pkg/ccl/streamingccl/streamclient/client.go index 923436ad54d9..6462ed731f07 100644 --- a/pkg/ccl/streamingccl/streamclient/client.go +++ b/pkg/ccl/streamingccl/streamclient/client.go @@ -172,6 +172,9 @@ func NewStreamClient( // GetFirstActiveClient iterates through each provided stream address // and returns the first client it's able to successfully Dial. func GetFirstActiveClient(ctx context.Context, streamAddresses []string) (Client, error) { + if len(streamAddresses) == 0 { + return nil, errors.Newf("failed to connect, no partition addresses") + } var combinedError error = nil for _, address := range streamAddresses { streamAddress := streamingccl.StreamAddress(address) diff --git a/pkg/ccl/streamingccl/streamclient/client_test.go b/pkg/ccl/streamingccl/streamclient/client_test.go index ebb1abf0ca8f..7f8a1c5db78c 100644 --- a/pkg/ccl/streamingccl/streamclient/client_test.go +++ b/pkg/ccl/streamingccl/streamclient/client_test.go @@ -122,6 +122,15 @@ func (t testStreamSubscription) Err() error { return nil } +func TestGetFirstActiveClientEmpty(t *testing.T) { + defer leaktest.AfterTest(t)() + + var streamAddresses []string + activeClient, err := GetFirstActiveClient(context.Background(), streamAddresses) + require.ErrorContains(t, err, "failed to connect, no partition addresses") + require.Nil(t, activeClient) +} + func TestGetFirstActiveClient(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go index 9438c3f2a77b..9026f7927363 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go @@ -44,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -1244,3 +1245,46 @@ func TestTenantReplicationProtectedTimestampManagement(t *testing.T) { }) }) } + +// TODO(lidor): consider rewriting this test as a data driven test when #92609 is merged. +func TestTenantStreamingShowTenant(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + args := defaultTenantStreamingClustersArgs + + c, cleanup := createTenantStreamingClusters(ctx, t, args) + defer cleanup() + testStartTime := timeutil.Now() + producerJobID, ingestionJobID := c.startStreamReplication() + + jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) + jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + highWatermark := c.srcCluster.Server(0).Clock().Now() + c.waitUntilHighWatermark(highWatermark, jobspb.JobID(ingestionJobID)) + + var ( + id int + dest string + status string + source string + sourceUri string + jobId int + maxReplTime time.Time + protectedTime time.Time + ) + row := c.destSysSQL.QueryRow(t, fmt.Sprintf("SHOW TENANT %s WITH REPLICATION STATUS", args.destTenantName)) + row.Scan(&id, &dest, &status, &source, &sourceUri, &jobId, &maxReplTime, &protectedTime) + require.Equal(t, 2, id) + require.Equal(t, "destination", dest) + require.Equal(t, "ADD", status) + require.Equal(t, "source", source) + require.Equal(t, c.srcURL.String(), sourceUri) + require.Equal(t, ingestionJobID, jobId) + require.Less(t, maxReplTime, timeutil.Now()) + require.Less(t, protectedTime, timeutil.Now()) + require.GreaterOrEqual(t, maxReplTime, highWatermark.GoTime()) + // TODO(lidor): replace this start time with the actual replication start time when we have it. + require.GreaterOrEqual(t, protectedTime, testStartTime) +} diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 0a812636d458..b65429c58773 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -307,6 +307,7 @@ go_library( "//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common", "//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs", "//pkg/repstream", + "//pkg/repstream/streampb", "//pkg/roachpb", "//pkg/rpc", "//pkg/rpc/nodedialer", diff --git a/pkg/sql/catalog/colinfo/result_columns.go b/pkg/sql/catalog/colinfo/result_columns.go index 4d8dfe768624..43e563a2dfee 100644 --- a/pkg/sql/catalog/colinfo/result_columns.go +++ b/pkg/sql/catalog/colinfo/result_columns.go @@ -278,3 +278,17 @@ var TenantColumns = ResultColumns{ {Name: "name", Typ: types.String}, {Name: "status", Typ: types.String}, } + +var TenantColumnsWithReplication = ResultColumns{ + {Name: "id", Typ: types.Int}, + {Name: "name", Typ: types.String}, + {Name: "status", Typ: types.String}, + {Name: "source_tenant_name", Typ: types.String}, + {Name: "source_cluster_uri", Typ: types.String}, + {Name: "replication_job_id", Typ: types.Int}, + // The latest fully replicated time. + {Name: "replicated_time", Typ: types.Timestamp}, + // The protected timestamp on the destination cluster, meaning we cannot + // cutover to before this time. + {Name: "retained_time", Typ: types.Timestamp}, +} diff --git a/pkg/sql/faketreeeval/BUILD.bazel b/pkg/sql/faketreeeval/BUILD.bazel index 56ecd1b2a178..81f115711eb7 100644 --- a/pkg/sql/faketreeeval/BUILD.bazel +++ b/pkg/sql/faketreeeval/BUILD.bazel @@ -8,6 +8,8 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/clusterversion", + "//pkg/jobs/jobspb", + "//pkg/repstream/streampb", "//pkg/roachpb", "//pkg/security/username", "//pkg/sql/catalog/catpb", diff --git a/pkg/sql/faketreeeval/evalctx.go b/pkg/sql/faketreeeval/evalctx.go index f70d1bc41cf2..d47e8d4ce800 100644 --- a/pkg/sql/faketreeeval/evalctx.go +++ b/pkg/sql/faketreeeval/evalctx.go @@ -16,6 +16,8 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" @@ -594,6 +596,12 @@ func (c *DummyTenantOperator) GetTenantInfo( return nil, errors.WithStack(errEvalTenant) } +func (p *DummyTenantOperator) GetTenantReplicationInfo( + ctx context.Context, replicationJobId jobspb.JobID, +) (*streampb.StreamIngestionStats, error) { + return nil, errors.WithStack(errEvalTenant) +} + // DestroyTenant is part of the tree.TenantOperator interface. func (c *DummyTenantOperator) DestroyTenant( ctx context.Context, tenantName roachpb.TenantName, synchronous bool, diff --git a/pkg/sql/logictest/testdata/logic_test/tenant b/pkg/sql/logictest/testdata/logic_test/tenant index a8e2e5ca961c..de923cc459de 100644 --- a/pkg/sql/logictest/testdata/logic_test/tenant +++ b/pkg/sql/logictest/testdata/logic_test/tenant @@ -60,6 +60,12 @@ id name status statement error tenant "seven" does not exist SHOW TENANT seven +statement error tenant "tenant-one" does not have an active replication job +SHOW TENANT "tenant-one" WITH REPLICATION STATUS + +statement error tenant "two" does not have an active replication job +SHOW TENANT two WITH REPLICATION STATUS + # Test creating a tenant with the same name as an existing tenant, but a unique # ID. statement error tenant with name "three" already exists diff --git a/pkg/sql/parser/help_test.go b/pkg/sql/parser/help_test.go index 7f35ba4cb9ab..51d32d071040 100644 --- a/pkg/sql/parser/help_test.go +++ b/pkg/sql/parser/help_test.go @@ -411,6 +411,7 @@ func TestContextualHelp(t *testing.T) { {`SHOW TABLES FROM blah ??`, `SHOW TABLES`}, {`SHOW TENANT ??`, `SHOW TENANT`}, + {`SHOW TENANT ?? WITH REPLICATION STATUS`, `SHOW TENANT`}, {`SHOW TRANSACTION PRIORITY ??`, `SHOW TRANSACTION`}, {`SHOW TRANSACTION STATUS ??`, `SHOW TRANSACTION`}, diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 272e1a303fdf..c130733a02f9 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -5420,11 +5420,21 @@ backup_kms: // %Help: SHOW TENANT - display tenant information // %Category: Misc -// %Text: SHOW TENANT +// %Text: SHOW TENANT [WITH REPLICATION STATUS] show_tenant_stmt: - SHOW TENANT name + SHOW TENANT d_expr { - $$.val = &tree.ShowTenant{Name: tree.Name($3)} + $$.val = &tree.ShowTenant{ + Name: $3.expr(), + WithReplication: false, + } + } +| SHOW TENANT d_expr WITH REPLICATION STATUS + { + $$.val = &tree.ShowTenant{ + Name: $3.expr(), + WithReplication: true, + } } | SHOW TENANT error // SHOW HELP: SHOW TENANT diff --git a/pkg/sql/parser/testdata/show b/pkg/sql/parser/testdata/show index 8690b3598f8b..47ac07e0a6fa 100644 --- a/pkg/sql/parser/testdata/show +++ b/pkg/sql/parser/testdata/show @@ -1799,6 +1799,14 @@ parse SHOW TENANT foo ---- SHOW TENANT foo -SHOW TENANT foo -- fully parenthesized +SHOW TENANT (foo) -- fully parenthesized SHOW TENANT foo -- literals removed SHOW TENANT _ -- identifiers removed + +parse +SHOW TENANT foo WITH REPLICATION STATUS +---- +SHOW TENANT foo WITH REPLICATION STATUS +SHOW TENANT (foo) WITH REPLICATION STATUS -- fully parenthesized +SHOW TENANT foo WITH REPLICATION STATUS -- literals removed +SHOW TENANT _ WITH REPLICATION STATUS -- identifiers removed diff --git a/pkg/sql/sem/eval/deps.go b/pkg/sql/sem/eval/deps.go index 02c78e20882b..c3b8208c69d1 100644 --- a/pkg/sql/sem/eval/deps.go +++ b/pkg/sql/sem/eval/deps.go @@ -573,9 +573,6 @@ type TenantOperator interface { // the gc job will not wait for a GC ttl. DestroyTenant(ctx context.Context, tenantName roachpb.TenantName, synchronous bool) error - // GetTenantInfo returns information about the specified tenant. - GetTenantInfo(ctx context.Context, tenantName roachpb.TenantName) (*descpb.TenantInfo, error) - // GCTenant attempts to garbage collect a DROP tenant from the system. Upon // success it also removes the tenant record. // It returns an error if the tenant does not exist. diff --git a/pkg/sql/sem/tree/show.go b/pkg/sql/sem/tree/show.go index adc28680c1d8..89d4bf9e1aae 100644 --- a/pkg/sql/sem/tree/show.go +++ b/pkg/sql/sem/tree/show.go @@ -777,13 +777,18 @@ func (node *ShowTableStats) Format(ctx *FmtCtx) { // ShowTenant represents a SHOW TENANT statement. type ShowTenant struct { - Name Name + Name Expr + WithReplication bool } // Format implements the NodeFormatter interface. func (node *ShowTenant) Format(ctx *FmtCtx) { ctx.WriteString("SHOW TENANT ") - ctx.FormatNode(&node.Name) + ctx.FormatNode(node.Name) + + if node.WithReplication { + ctx.WriteString(" WITH REPLICATION STATUS") + } } // ShowHistogram represents a SHOW HISTOGRAM statement. diff --git a/pkg/sql/show_tenant.go b/pkg/sql/show_tenant.go index 7bf2c529e0e3..b0bf671b92b1 100644 --- a/pkg/sql/show_tenant.go +++ b/pkg/sql/show_tenant.go @@ -12,33 +12,120 @@ package sql import ( "context" + "time" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/paramparse" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" ) type showTenantNode struct { - columns colinfo.ResultColumns - name roachpb.TenantName - info *descpb.TenantInfo - done bool + name tree.TypedExpr + tenantInfo *descpb.TenantInfo + withReplication bool + replicationInfo *streampb.StreamIngestionStats + protectedTimestamp hlc.Timestamp + columns colinfo.ResultColumns + done bool } -func (p *planner) ShowTenant(_ context.Context, n *tree.ShowTenant) (planNode, error) { - return &showTenantNode{ - name: roachpb.TenantName(n.Name), - columns: colinfo.TenantColumns, - }, nil +func (p *planner) ShowTenant(ctx context.Context, n *tree.ShowTenant) (planNode, error) { + if err := p.RequireAdminRole(ctx, "show tenant"); err != nil { + return nil, err + } + + if err := rejectIfCantCoordinateMultiTenancy(p.execCfg.Codec, "show"); err != nil { + return nil, err + } + + var dummyHelper tree.IndexedVarHelper + strName := paramparse.UnresolvedNameToStrVal(n.Name) + typedName, err := p.analyzeExpr( + ctx, strName, nil, dummyHelper, types.String, + true, "SHOW TENANT ... WITH REPLICATION STATUS") + if err != nil { + return nil, err + } + + node := &showTenantNode{ + name: typedName, + withReplication: n.WithReplication, + } + if n.WithReplication { + node.columns = colinfo.TenantColumnsWithReplication + } else { + node.columns = colinfo.TenantColumns + } + + return node, nil +} + +func (n *showTenantNode) getTenantName(params runParams) (roachpb.TenantName, error) { + dName, err := eval.Expr(params.ctx, params.p.EvalContext(), n.name) + if err != nil { + return "", err + } + name, ok := dName.(*tree.DString) + if !ok || name == nil { + return "", errors.Newf("expected a string, got %T", dName) + } + return roachpb.TenantName(*name), nil } func (n *showTenantNode) startExec(params runParams) error { - info, err := params.p.GetTenantInfo(params.ctx, n.name) + tenantName, err := n.getTenantName(params) + if err != nil { + return err + } + tenantRecord, err := GetTenantRecordByName(params.ctx, params.p.execCfg, params.p.Txn(), tenantName) if err != nil { return err } - n.info = info + + n.tenantInfo = tenantRecord + if n.withReplication { + if n.tenantInfo.TenantReplicationJobID == 0 { + return errors.Newf("tenant %q does not have an active replication job", tenantName) + } + mgr, err := params.p.EvalContext().StreamManagerFactory.GetStreamIngestManager(params.ctx) + if err != nil { + return err + } + stats, err := mgr.GetStreamIngestionStats(params.ctx, n.tenantInfo.TenantReplicationJobID) + if err != nil { + // An error means we don't have stats but we can still present some info, + // therefore we don't fail here. + // TODO(lidor): we need a better signal from GetStreamIngestionStats(), instead of + // ignoring all errors. + log.Infof(params.ctx, "stream ingestion stats unavailable for tenant %q and job %d", + tenantName, n.tenantInfo.TenantReplicationJobID) + } else { + n.replicationInfo = stats + if stats.IngestionDetails.ProtectedTimestampRecordID == nil { + // We don't have the protected timestamp record but we still want to show + // the info we do have about tenant replication status, logging an error + // and continuing. + log.Warningf(params.ctx, "protected timestamp unavailable for tenant %q and job %d", + tenantName, n.tenantInfo.TenantReplicationJobID) + } else { + ptp := params.p.execCfg.ProtectedTimestampProvider + record, err := ptp.GetRecord(params.ctx, params.p.Txn(), *stats.IngestionDetails.ProtectedTimestampRecordID) + if err != nil { + return err + } + n.protectedTimestamp = record.Timestamp + } + } + } return nil } @@ -49,11 +136,49 @@ func (n *showTenantNode) Next(_ runParams) (bool, error) { n.done = true return true, nil } + func (n *showTenantNode) Values() tree.Datums { + tenantId := tree.NewDInt(tree.DInt(n.tenantInfo.ID)) + tenantName := tree.NewDString(string(n.tenantInfo.Name)) + tenantStatus := tree.NewDString(n.tenantInfo.State.String()) + if !n.withReplication { + // This is a simple 'SHOW TENANT name'. + return tree.Datums{ + tenantId, + tenantName, + tenantStatus, + } + } + + // This is a 'SHOW TENANT name WITH REPLICATION STATUS' command. + sourceTenantName := tree.DNull + sourceClusterUri := tree.DNull + replicationJobId := tree.NewDInt(tree.DInt(n.tenantInfo.TenantReplicationJobID)) + replicatedTimestamp := tree.DNull + retainedTimestamp := tree.DNull + + if n.replicationInfo != nil { + sourceTenantName = tree.NewDString(string(n.replicationInfo.IngestionDetails.SourceTenantName)) + sourceClusterUri = tree.NewDString(n.replicationInfo.IngestionDetails.StreamAddress) + if n.replicationInfo.ReplicationLagInfo != nil { + minIngested := n.replicationInfo.ReplicationLagInfo.MinIngestedTimestamp.WallTime + // The latest fully replicated time. + replicatedTimestamp, _ = tree.MakeDTimestamp(timeutil.Unix(0, minIngested), time.Nanosecond) + } + // The protected timestamp on the destination cluster. + retainedTimestamp, _ = tree.MakeDTimestamp(timeutil.Unix(0, n.protectedTimestamp.WallTime), time.Nanosecond) + } + return tree.Datums{ - tree.NewDInt(tree.DInt(n.info.ID)), - tree.NewDString(string(n.info.Name)), - tree.NewDString(n.info.State.String()), + tenantId, + tenantName, + tenantStatus, + sourceTenantName, + sourceClusterUri, + replicationJobId, + replicatedTimestamp, + retainedTimestamp, } } + func (n *showTenantNode) Close(_ context.Context) {} diff --git a/pkg/sql/tenant.go b/pkg/sql/tenant.go index a7f2286f216b..06f482c5468e 100644 --- a/pkg/sql/tenant.go +++ b/pkg/sql/tenant.go @@ -819,18 +819,3 @@ WHERE id = $1`, tenantID, tenantName); err != nil { return nil } - -// GetTenantInfo implements the tree.TenantOperator interface. -func (p *planner) GetTenantInfo( - ctx context.Context, tenantName roachpb.TenantName, -) (*descpb.TenantInfo, error) { - if err := p.RequireAdminRole(ctx, "show tenant"); err != nil { - return nil, err - } - - if err := rejectIfCantCoordinateMultiTenancy(p.execCfg.Codec, "show"); err != nil { - return nil, err - } - - return GetTenantRecordByName(ctx, p.execCfg, p.Txn(), tenantName) -}