Skip to content

Commit

Permalink
sql: add SHOW TENANT name WITH REPLICATION STATUS
Browse files Browse the repository at this point in the history
Extending SHOW TENANT to also allow showing replication status which
contains info such as the protected timestamp on the destination cluster
and the source cluster name.

Command output right after the destination cluster is created:

```
[email protected]:26257/defaultdb> show tenant dest5 with replication status;
  id | name  | status | source_tenant_name | source_cluster_uri | replication_job_id | replicated_time | retained_time
-----+-------+--------+--------------------+--------------------+--------------------+-----------------+----------------
   7 | dest5 | ADD    | NULL               | NULL               | 819890711267737601 | NULL            | NULL
(1 row)
```

A bit later we have most stats:
```
[email protected]:26257/defaultdb> show tenant dest5 with replication status;
  id | name  | status | source_tenant_name | source_cluster_uri                                    | replication_job_id | replicated_time |       retained_time
-----+-------+--------+--------------------+-------------------------------------------------------+--------------------+-----------------+-----------------------------
   7 | dest5 | ADD    | src                | postgresql://[email protected]:26257/defaultdb?ssl...crt | 819890711267737601 | NULL            | 2022-12-05 23:00:04.516331
(1 row)
```

And a moment later the replication time is populated.

Informs: #91261

Epic: CRDB-18749

Release note: None
  • Loading branch information
lidorcarmel committed Dec 6, 2022
1 parent 1a6e9f8 commit ea05406
Show file tree
Hide file tree
Showing 17 changed files with 218 additions and 35 deletions.
1 change: 1 addition & 0 deletions docs/generated/sql/bnf/show_tenant_stmt.bnf
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
show_tenant_stmt ::=
'SHOW' 'TENANT' name
| 'SHOW' 'TENANT' name 'WITH' 'REPLICATION' 'STATUS'
1 change: 1 addition & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,7 @@ show_tables_stmt ::=

show_tenant_stmt ::=
'SHOW' 'TENANT' name
| 'SHOW' 'TENANT' name 'WITH' 'REPLICATION' 'STATUS'

show_trace_stmt ::=
'SHOW' opt_compact 'TRACE' 'FOR' 'SESSION'
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ func NewStreamClient(
// GetFirstActiveClient iterates through each provided stream address
// and returns the first client it's able to successfully Dial.
func GetFirstActiveClient(ctx context.Context, streamAddresses []string) (Client, error) {
if len(streamAddresses) == 0 {
return nil, errors.Newf("failed to connect, no partition addresses")
}
var combinedError error = nil
for _, address := range streamAddresses {
streamAddress := streamingccl.StreamAddress(address)
Expand Down
9 changes: 9 additions & 0 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ func (t testStreamSubscription) Err() error {
return nil
}

func TestGetFirstActiveClientEmpty(t *testing.T) {
defer leaktest.AfterTest(t)()

var streamAddresses []string
activeClient, err := GetFirstActiveClient(context.Background(), streamAddresses)
require.ErrorContains(t, err, "failed to connect, no partition addresses")
require.Nil(t, activeClient)
}

func TestGetFirstActiveClient(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
43 changes: 43 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -1244,3 +1245,45 @@ func TestTenantReplicationProtectedTimestampManagement(t *testing.T) {
})
})
}

// TODO(lidor): consider rewriting this test as a data driven test when #92609 is merged.
func TestTenantStreamingShowTenant(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
args := defaultTenantStreamingClustersArgs

c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()
testStartTime := timeutil.Now()
producerJobID, ingestionJobID := c.startStreamReplication()

jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
highWatermark := c.srcCluster.Server(0).Clock().Now()
c.waitUntilHighWatermark(highWatermark, jobspb.JobID(ingestionJobID))

var (
id int
dest string
status string
source string
sourceUri string
jobId int
maxReplTime time.Time
protectedTime time.Time
)
row := c.destSysSQL.QueryRow(t, fmt.Sprintf("SHOW TENANT %s WITH REPLICATION STATUS", args.destTenantName))
row.Scan(&id, &dest, &status, &source, &sourceUri, &jobId, &maxReplTime, &protectedTime)
require.Equal(t, 2, id)
require.Equal(t, "destination", dest)
require.Equal(t, "ADD", status)
require.Equal(t, "source", source)
require.Equal(t, c.srcURL.String(), sourceUri)
require.Equal(t, ingestionJobID, jobId)
require.Less(t, maxReplTime, timeutil.Now())
require.Less(t, protectedTime, timeutil.Now())
require.Greater(t, maxReplTime, highWatermark.GoTime())
require.Greater(t, protectedTime, testStartTime)
}
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ go_library(
"//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common",
"//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs",
"//pkg/repstream",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/rpc/nodedialer",
Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/catalog/colinfo/result_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,17 @@ var TenantColumns = ResultColumns{
{Name: "name", Typ: types.String},
{Name: "status", Typ: types.String},
}

var TenantColumnsWithReplication = ResultColumns{
{Name: "id", Typ: types.Int},
{Name: "name", Typ: types.String},
{Name: "status", Typ: types.String},
{Name: "source_tenant_name", Typ: types.String},
{Name: "source_cluster_uri", Typ: types.String},
{Name: "replication_job_id", Typ: types.Int},
// The latest fully replicated time.
{Name: "replicated_time", Typ: types.Timestamp},
// The protected timestamp on the destination cluster, meaning we cannot
// cutover to before this time.
{Name: "retained_time", Typ: types.Timestamp},
}
2 changes: 2 additions & 0 deletions pkg/sql/faketreeeval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/jobs/jobspb",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/security/username",
"//pkg/sql/catalog/catpb",
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/faketreeeval/evalctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
Expand Down Expand Up @@ -588,6 +590,12 @@ func (c *DummyTenantOperator) GetTenantInfo(
return nil, errors.WithStack(errEvalTenant)
}

func (p *DummyTenantOperator) GetTenantReplicationInfo(
ctx context.Context, replicationJobId jobspb.JobID,
) (*streampb.StreamIngestionStats, error) {
return nil, errors.WithStack(errEvalTenant)
}

// DestroyTenant is part of the tree.TenantOperator interface.
func (c *DummyTenantOperator) DestroyTenant(
ctx context.Context, tenantName roachpb.TenantName, synchronous bool,
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/tenant
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ id name status
statement error tenant "seven" does not exist
SHOW TENANT seven

statement error tenant two does not have replication info
SHOW TENANT two WITH REPLICATION STATUS

# Test creating a tenant with the same name as an existing tenant, but a unique
# ID.
statement error tenant with name "three" already exists
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/parser/help_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ func TestContextualHelp(t *testing.T) {
{`SHOW TABLES FROM blah ??`, `SHOW TABLES`},

{`SHOW TENANT ??`, `SHOW TENANT`},
{`SHOW TENANT ?? WITH REPLICATION STATUS`, `SHOW TENANT`},

{`SHOW TRANSACTION PRIORITY ??`, `SHOW TRANSACTION`},
{`SHOW TRANSACTION STATUS ??`, `SHOW TRANSACTION`},
Expand Down
16 changes: 13 additions & 3 deletions pkg/sql/parser/sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -5420,11 +5420,21 @@ backup_kms:

// %Help: SHOW TENANT - display tenant information
// %Category: Misc
// %Text: SHOW TENANT
// %Text: SHOW TENANT <tenant_name> [WITH REPLICATION STATUS]
show_tenant_stmt:
SHOW TENANT name
SHOW TENANT d_expr
{
$$.val = &tree.ShowTenant{Name: tree.Name($3)}
$$.val = &tree.ShowTenant{
Name: $3.expr(),
WithReplication: false,
}
}
| SHOW TENANT d_expr WITH REPLICATION STATUS
{
$$.val = &tree.ShowTenant{
Name: $3.expr(),
WithReplication: true,
}
}
| SHOW TENANT error // SHOW HELP: SHOW TENANT

Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/parser/testdata/show
Original file line number Diff line number Diff line change
Expand Up @@ -1802,3 +1802,11 @@ SHOW TENANT foo
SHOW TENANT foo -- fully parenthesized
SHOW TENANT foo -- literals removed
SHOW TENANT _ -- identifiers removed

parse
SHOW TENANT foo WITH REPLICATION STATUS
----
SHOW TENANT foo WITH REPLICATION STATUS
SHOW TENANT foo WITH REPLICATION STATUS -- fully parenthesized
SHOW TENANT foo WITH REPLICATION STATUS -- literals removed
SHOW TENANT _ WITH REPLICATION STATUS -- identifiers removed
3 changes: 0 additions & 3 deletions pkg/sql/sem/eval/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,6 @@ type TenantOperator interface {
// the gc job will not wait for a GC ttl.
DestroyTenant(ctx context.Context, tenantName roachpb.TenantName, synchronous bool) error

// GetTenantInfo returns information about the specified tenant.
GetTenantInfo(ctx context.Context, tenantName roachpb.TenantName) (*descpb.TenantInfo, error)

// GCTenant attempts to garbage collect a DROP tenant from the system. Upon
// success it also removes the tenant record.
// It returns an error if the tenant does not exist.
Expand Down
9 changes: 7 additions & 2 deletions pkg/sql/sem/tree/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,13 +777,18 @@ func (node *ShowTableStats) Format(ctx *FmtCtx) {

// ShowTenant represents a SHOW TENANT statement.
type ShowTenant struct {
Name Name
Name Expr
WithReplication bool
}

// Format implements the NodeFormatter interface.
func (node *ShowTenant) Format(ctx *FmtCtx) {
ctx.WriteString("SHOW TENANT ")
ctx.FormatNode(&node.Name)
ctx.FormatNode(node.Name)

if node.WithReplication {
ctx.WriteString(" WITH REPLICATION STATUS")
}
}

// ShowHistogram represents a SHOW HISTOGRAM statement.
Expand Down
116 changes: 104 additions & 12 deletions pkg/sql/show_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,87 @@ package sql

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

type showTenantNode struct {
columns colinfo.ResultColumns
name roachpb.TenantName
info *descpb.TenantInfo
done bool
name tree.Expr
info *descpb.TenantInfo
withReplication bool
replicationInfo *streampb.StreamIngestionStats
protectedTimestamp hlc.Timestamp
columns colinfo.ResultColumns
done bool
}

func (p *planner) ShowTenant(_ context.Context, n *tree.ShowTenant) (planNode, error) {
return &showTenantNode{
name: roachpb.TenantName(n.Name),
columns: colinfo.TenantColumns,
}, nil
node := &showTenantNode{
name: n.Name,
withReplication: n.WithReplication,
}
if n.WithReplication {
node.columns = colinfo.TenantColumnsWithReplication
} else {
node.columns = colinfo.TenantColumns
}
return node, nil
}

func (n *showTenantNode) startExec(params runParams) error {
info, err := params.p.GetTenantInfo(params.ctx, n.name)
if err := params.p.RequireAdminRole(params.ctx, "show tenant"); err != nil {
return err
}

if err := rejectIfCantCoordinateMultiTenancy(params.p.execCfg.Codec, "show"); err != nil {
return err
}

info, err := GetTenantRecordByName(params.ctx, params.p.execCfg, params.p.Txn(), roachpb.TenantName(n.name.String()))
if err != nil {
return err
}
n.info = info
if n.withReplication {
if info.TenantReplicationJobID == 0 {
return errors.Newf("tenant %q does not have an active replication job", n.name)
}
mgr, err := params.p.EvalContext().StreamManagerFactory.GetStreamIngestManager(params.ctx)
if err != nil {
return err
}
stats, err := mgr.GetStreamIngestionStats(params.ctx, info.TenantReplicationJobID)
// An error means we don't have stats but we can still present some info,
// therefore we don't fail here.
// TODO(lidor): we need a better signal from GetStreamIngestionStats(), instead of
// ignoring all errors.
if err == nil {
n.replicationInfo = stats
if stats.IngestionDetails.ProtectedTimestampRecordID == nil {
// We don't have the protected timestamp record but we still want to show
// the info we do have about tenant replication status, logging an error
// and continuing.
log.Warningf(params.ctx, "protected timestamp unavailable for tenant %q and job %d",
n.name, info.TenantReplicationJobID)
} else {
ptp := params.p.execCfg.ProtectedTimestampProvider
record, err := ptp.GetRecord(params.ctx, params.p.Txn(), *stats.IngestionDetails.ProtectedTimestampRecordID)
if err != nil {
return err
}
n.protectedTimestamp = record.Timestamp
}
}
}
return nil
}

Expand All @@ -50,10 +104,48 @@ func (n *showTenantNode) Next(_ runParams) (bool, error) {
return true, nil
}
func (n *showTenantNode) Values() tree.Datums {
tenantId := tree.NewDInt(tree.DInt(n.info.ID))
tenantName := tree.NewDString(string(n.info.Name))
tenantStatus := tree.NewDString(n.info.State.String())
if !n.withReplication {
// This is a simple 'SHOW TENANT name'.
return tree.Datums{
tenantId,
tenantName,
tenantStatus,
}
}

// This is a 'SHOW TENANT name WITH REPLICATION STATUS' command.
sourceTenantName := tree.DNull
sourceClusterUri := tree.DNull
replicationJobId := tree.NewDInt(tree.DInt(n.info.TenantReplicationJobID))
replicatedTimestamp := tree.DNull
retainedTimestamp := tree.DNull

// New replication clusters don't have replicationInfo initially.
if n.replicationInfo != nil {
sourceTenantName = tree.NewDString(string(n.replicationInfo.IngestionDetails.SourceTenantName))
sourceClusterUri = tree.NewDString(n.replicationInfo.IngestionDetails.StreamAddress)
if n.replicationInfo.ReplicationLagInfo != nil {
minIngested := n.replicationInfo.ReplicationLagInfo.MinIngestedTimestamp.WallTime
// The latest fully replicated time.
replicatedTimestamp, _ = tree.MakeDTimestamp(timeutil.Unix(0, minIngested), time.Nanosecond)
}
// The protected timestamp on the destination cluster.
retainedTimestamp, _ = tree.MakeDTimestamp(timeutil.Unix(0, n.protectedTimestamp.WallTime), time.Nanosecond)
}

return tree.Datums{
tree.NewDInt(tree.DInt(n.info.ID)),
tree.NewDString(string(n.info.Name)),
tree.NewDString(n.info.State.String()),
tenantId,
tenantName,
tenantStatus,
sourceTenantName,
sourceClusterUri,
replicationJobId,
replicatedTimestamp,
retainedTimestamp,
}
}

func (n *showTenantNode) Close(_ context.Context) {}
Loading

0 comments on commit ea05406

Please sign in to comment.