Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: add SHOW TENANT name WITH REPLICATION STATUS #92628

Merged
merged 1 commit into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/generated/sql/bnf/show_tenant_stmt.bnf
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
show_tenant_stmt ::=
'SHOW' 'TENANT' name
'SHOW' 'TENANT' d_expr
| 'SHOW' 'TENANT' d_expr 'WITH' 'REPLICATION' 'STATUS'
3 changes: 2 additions & 1 deletion docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,8 @@ show_tables_stmt ::=
| 'SHOW' 'TABLES' with_comment

show_tenant_stmt ::=
'SHOW' 'TENANT' name
'SHOW' 'TENANT' d_expr
| 'SHOW' 'TENANT' d_expr 'WITH' 'REPLICATION' 'STATUS'

show_trace_stmt ::=
'SHOW' opt_compact 'TRACE' 'FOR' 'SESSION'
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ func NewStreamClient(
// GetFirstActiveClient iterates through each provided stream address
// and returns the first client it's able to successfully Dial.
func GetFirstActiveClient(ctx context.Context, streamAddresses []string) (Client, error) {
if len(streamAddresses) == 0 {
return nil, errors.Newf("failed to connect, no partition addresses")
}
var combinedError error = nil
for _, address := range streamAddresses {
streamAddress := streamingccl.StreamAddress(address)
Expand Down
9 changes: 9 additions & 0 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ func (t testStreamSubscription) Err() error {
return nil
}

func TestGetFirstActiveClientEmpty(t *testing.T) {
defer leaktest.AfterTest(t)()

var streamAddresses []string
activeClient, err := GetFirstActiveClient(context.Background(), streamAddresses)
require.ErrorContains(t, err, "failed to connect, no partition addresses")
require.Nil(t, activeClient)
}

func TestGetFirstActiveClient(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
44 changes: 44 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -1244,3 +1245,46 @@ func TestTenantReplicationProtectedTimestampManagement(t *testing.T) {
})
})
}

// TODO(lidor): consider rewriting this test as a data driven test when #92609 is merged.
func TestTenantStreamingShowTenant(t *testing.T) {
adityamaru marked this conversation as resolved.
Show resolved Hide resolved
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
args := defaultTenantStreamingClustersArgs

c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()
testStartTime := timeutil.Now()
producerJobID, ingestionJobID := c.startStreamReplication()

jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
highWatermark := c.srcCluster.Server(0).Clock().Now()
c.waitUntilHighWatermark(highWatermark, jobspb.JobID(ingestionJobID))

var (
id int
dest string
status string
source string
sourceUri string
jobId int
maxReplTime time.Time
protectedTime time.Time
)
row := c.destSysSQL.QueryRow(t, fmt.Sprintf("SHOW TENANT %s WITH REPLICATION STATUS", args.destTenantName))
row.Scan(&id, &dest, &status, &source, &sourceUri, &jobId, &maxReplTime, &protectedTime)
require.Equal(t, 2, id)
require.Equal(t, "destination", dest)
require.Equal(t, "ADD", status)
require.Equal(t, "source", source)
require.Equal(t, c.srcURL.String(), sourceUri)
require.Equal(t, ingestionJobID, jobId)
require.Less(t, maxReplTime, timeutil.Now())
require.Less(t, protectedTime, timeutil.Now())
require.GreaterOrEqual(t, maxReplTime, highWatermark.GoTime())
// TODO(lidor): replace this start time with the actual replication start time when we have it.
require.GreaterOrEqual(t, protectedTime, testStartTime)
adityamaru marked this conversation as resolved.
Show resolved Hide resolved
}
adityamaru marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ go_library(
"//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common",
"//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs",
"//pkg/repstream",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/rpc/nodedialer",
Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/catalog/colinfo/result_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,17 @@ var TenantColumns = ResultColumns{
{Name: "name", Typ: types.String},
{Name: "status", Typ: types.String},
}

var TenantColumnsWithReplication = ResultColumns{
{Name: "id", Typ: types.Int},
{Name: "name", Typ: types.String},
{Name: "status", Typ: types.String},
{Name: "source_tenant_name", Typ: types.String},
{Name: "source_cluster_uri", Typ: types.String},
{Name: "replication_job_id", Typ: types.Int},
// The latest fully replicated time.
{Name: "replicated_time", Typ: types.Timestamp},
adityamaru marked this conversation as resolved.
Show resolved Hide resolved
// The protected timestamp on the destination cluster, meaning we cannot
// cutover to before this time.
{Name: "retained_time", Typ: types.Timestamp},
}
2 changes: 2 additions & 0 deletions pkg/sql/faketreeeval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/jobs/jobspb",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/security/username",
"//pkg/sql/catalog/catpb",
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/faketreeeval/evalctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
Expand Down Expand Up @@ -594,6 +596,12 @@ func (c *DummyTenantOperator) GetTenantInfo(
return nil, errors.WithStack(errEvalTenant)
}

func (p *DummyTenantOperator) GetTenantReplicationInfo(
ctx context.Context, replicationJobId jobspb.JobID,
) (*streampb.StreamIngestionStats, error) {
return nil, errors.WithStack(errEvalTenant)
}

// DestroyTenant is part of the tree.TenantOperator interface.
func (c *DummyTenantOperator) DestroyTenant(
ctx context.Context, tenantName roachpb.TenantName, synchronous bool,
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/tenant
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ id name status
statement error tenant "seven" does not exist
SHOW TENANT seven

statement error tenant "tenant-one" does not have an active replication job
SHOW TENANT "tenant-one" WITH REPLICATION STATUS

statement error tenant "two" does not have an active replication job
SHOW TENANT two WITH REPLICATION STATUS

# Test creating a tenant with the same name as an existing tenant, but a unique
# ID.
statement error tenant with name "three" already exists
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/parser/help_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ func TestContextualHelp(t *testing.T) {
{`SHOW TABLES FROM blah ??`, `SHOW TABLES`},

{`SHOW TENANT ??`, `SHOW TENANT`},
{`SHOW TENANT ?? WITH REPLICATION STATUS`, `SHOW TENANT`},

{`SHOW TRANSACTION PRIORITY ??`, `SHOW TRANSACTION`},
{`SHOW TRANSACTION STATUS ??`, `SHOW TRANSACTION`},
Expand Down
16 changes: 13 additions & 3 deletions pkg/sql/parser/sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -5420,11 +5420,21 @@ backup_kms:

// %Help: SHOW TENANT - display tenant information
// %Category: Misc
// %Text: SHOW TENANT
// %Text: SHOW TENANT <tenant_name> [WITH REPLICATION STATUS]
show_tenant_stmt:
SHOW TENANT name
SHOW TENANT d_expr
{
$$.val = &tree.ShowTenant{Name: tree.Name($3)}
$$.val = &tree.ShowTenant{
Name: $3.expr(),
WithReplication: false,
}
}
| SHOW TENANT d_expr WITH REPLICATION STATUS
{
$$.val = &tree.ShowTenant{
Name: $3.expr(),
WithReplication: true,
}
}
| SHOW TENANT error // SHOW HELP: SHOW TENANT

Expand Down
10 changes: 9 additions & 1 deletion pkg/sql/parser/testdata/show
Original file line number Diff line number Diff line change
Expand Up @@ -1799,6 +1799,14 @@ parse
SHOW TENANT foo
----
SHOW TENANT foo
SHOW TENANT foo -- fully parenthesized
SHOW TENANT (foo) -- fully parenthesized
SHOW TENANT foo -- literals removed
SHOW TENANT _ -- identifiers removed

parse
SHOW TENANT foo WITH REPLICATION STATUS
----
SHOW TENANT foo WITH REPLICATION STATUS
SHOW TENANT (foo) WITH REPLICATION STATUS -- fully parenthesized
SHOW TENANT foo WITH REPLICATION STATUS -- literals removed
SHOW TENANT _ WITH REPLICATION STATUS -- identifiers removed
3 changes: 0 additions & 3 deletions pkg/sql/sem/eval/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,9 +573,6 @@ type TenantOperator interface {
// the gc job will not wait for a GC ttl.
DestroyTenant(ctx context.Context, tenantName roachpb.TenantName, synchronous bool) error

// GetTenantInfo returns information about the specified tenant.
GetTenantInfo(ctx context.Context, tenantName roachpb.TenantName) (*descpb.TenantInfo, error)

// GCTenant attempts to garbage collect a DROP tenant from the system. Upon
// success it also removes the tenant record.
// It returns an error if the tenant does not exist.
Expand Down
9 changes: 7 additions & 2 deletions pkg/sql/sem/tree/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,13 +777,18 @@ func (node *ShowTableStats) Format(ctx *FmtCtx) {

// ShowTenant represents a SHOW TENANT statement.
type ShowTenant struct {
Name Name
Name Expr
WithReplication bool
}

// Format implements the NodeFormatter interface.
func (node *ShowTenant) Format(ctx *FmtCtx) {
ctx.WriteString("SHOW TENANT ")
ctx.FormatNode(&node.Name)
ctx.FormatNode(node.Name)

if node.WithReplication {
ctx.WriteString(" WITH REPLICATION STATUS")
}
}

// ShowHistogram represents a SHOW HISTOGRAM statement.
Expand Down
Loading