Skip to content

Commit

Permalink
sql: add replication start time to SHOW TENANT WITH REPLICATION STATUS
Browse files Browse the repository at this point in the history
The start time is an important and useful piece of information
to expose to the end user. It is the lower bound for the data we have
replicated.

Release note: None
  • Loading branch information
adityamaru committed Dec 15, 2022
1 parent 9af72fb commit 3833eed
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 12 deletions.
29 changes: 17 additions & 12 deletions pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1249,26 +1249,30 @@ func TestTenantStreamingShowTenant(t *testing.T) {

c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()
testStartTime := timeutil.Now()
producerJobID, ingestionJobID := c.startStreamReplication()

jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
highWatermark := c.srcCluster.Server(0).Clock().Now()
c.waitUntilHighWatermark(highWatermark, jobspb.JobID(ingestionJobID))
destRegistry := c.destCluster.Server(0).JobRegistry().(*jobs.Registry)
details, err := destRegistry.LoadJob(ctx, jobspb.JobID(ingestionJobID))
require.NoError(t, err)
replicationDetails := details.Details().(jobspb.StreamIngestionDetails)

var (
id int
dest string
status string
source string
sourceUri string
jobId int
maxReplTime time.Time
protectedTime time.Time
id int
dest string
status string
source string
sourceUri string
jobId int
maxReplTime time.Time
protectedTime time.Time
replicationStartTime time.Time
)
row := c.destSysSQL.QueryRow(t, fmt.Sprintf("SHOW TENANT %s WITH REPLICATION STATUS", args.destTenantName))
row.Scan(&id, &dest, &status, &source, &sourceUri, &jobId, &maxReplTime, &protectedTime)
row.Scan(&id, &dest, &status, &source, &sourceUri, &jobId, &maxReplTime, &protectedTime, &replicationStartTime)
require.Equal(t, 2, id)
require.Equal(t, "destination", dest)
require.Equal(t, "ADD", status)
Expand All @@ -1278,6 +1282,7 @@ func TestTenantStreamingShowTenant(t *testing.T) {
require.Less(t, maxReplTime, timeutil.Now())
require.Less(t, protectedTime, timeutil.Now())
require.GreaterOrEqual(t, maxReplTime, highWatermark.GoTime())
// TODO(lidor): replace this start time with the actual replication start time when we have it.
require.GreaterOrEqual(t, protectedTime, testStartTime)
require.GreaterOrEqual(t, protectedTime, replicationDetails.ReplicationStartTime.GoTime())
require.Equal(t, replicationStartTime.UnixMicro(),
timeutil.Unix(0, replicationDetails.ReplicationStartTime.WallTime).UnixMicro())
}
1 change: 1 addition & 0 deletions pkg/sql/catalog/colinfo/result_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,4 +291,5 @@ var TenantColumnsWithReplication = ResultColumns{
// The protected timestamp on the destination cluster, meaning we cannot
// cutover to before this time.
{Name: "retained_time", Typ: types.Timestamp},
{Name: "replication_start_time", Typ: types.Timestamp},
}
4 changes: 4 additions & 0 deletions pkg/sql/show_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func (n *showTenantNode) Values() tree.Datums {
replicationJobId := tree.NewDInt(tree.DInt(n.tenantInfo.TenantReplicationJobID))
replicatedTimestamp := tree.DNull
retainedTimestamp := tree.DNull
replicationStartTimestamp := tree.DNull

if n.replicationInfo != nil {
sourceTenantName = tree.NewDString(string(n.replicationInfo.IngestionDetails.SourceTenantName))
Expand All @@ -167,6 +168,8 @@ func (n *showTenantNode) Values() tree.Datums {
}
// The protected timestamp on the destination cluster.
retainedTimestamp, _ = tree.MakeDTimestamp(timeutil.Unix(0, n.protectedTimestamp.WallTime), time.Nanosecond)
replicationStartTimestamp, _ = tree.MakeDTimestamp(
timeutil.Unix(0, n.replicationInfo.IngestionDetails.ReplicationStartTime.WallTime), time.Nanosecond)
}

return tree.Datums{
Expand All @@ -178,6 +181,7 @@ func (n *showTenantNode) Values() tree.Datums {
replicationJobId,
replicatedTimestamp,
retainedTimestamp,
replicationStartTimestamp,
}
}

Expand Down

0 comments on commit 3833eed

Please sign in to comment.