diff --git a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go index b33868f6fe9a..b72155e7783f 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go @@ -1249,26 +1249,30 @@ func TestTenantStreamingShowTenant(t *testing.T) { c, cleanup := createTenantStreamingClusters(ctx, t, args) defer cleanup() - testStartTime := timeutil.Now() producerJobID, ingestionJobID := c.startStreamReplication() jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) highWatermark := c.srcCluster.Server(0).Clock().Now() c.waitUntilHighWatermark(highWatermark, jobspb.JobID(ingestionJobID)) + destRegistry := c.destCluster.Server(0).JobRegistry().(*jobs.Registry) + details, err := destRegistry.LoadJob(ctx, jobspb.JobID(ingestionJobID)) + require.NoError(t, err) + replicationDetails := details.Details().(jobspb.StreamIngestionDetails) var ( - id int - dest string - status string - source string - sourceUri string - jobId int - maxReplTime time.Time - protectedTime time.Time + id int + dest string + status string + source string + sourceUri string + jobId int + maxReplTime time.Time + protectedTime time.Time + replicationStartTime time.Time ) row := c.destSysSQL.QueryRow(t, fmt.Sprintf("SHOW TENANT %s WITH REPLICATION STATUS", args.destTenantName)) - row.Scan(&id, &dest, &status, &source, &sourceUri, &jobId, &maxReplTime, &protectedTime) + row.Scan(&id, &dest, &status, &source, &sourceUri, &jobId, &maxReplTime, &protectedTime, &replicationStartTime) require.Equal(t, 2, id) require.Equal(t, "destination", dest) require.Equal(t, "ADD", status) @@ -1278,6 +1282,7 @@ func TestTenantStreamingShowTenant(t *testing.T) { require.Less(t, maxReplTime, timeutil.Now()) require.Less(t, protectedTime, timeutil.Now()) require.GreaterOrEqual(t, maxReplTime, highWatermark.GoTime()) - // TODO(lidor): replace this start time with the actual replication start time when we have it. - require.GreaterOrEqual(t, protectedTime, testStartTime) + require.GreaterOrEqual(t, protectedTime, replicationDetails.ReplicationStartTime.GoTime()) + require.Equal(t, replicationStartTime.UnixMicro(), + timeutil.Unix(0, replicationDetails.ReplicationStartTime.WallTime).UnixMicro()) } diff --git a/pkg/sql/catalog/colinfo/result_columns.go b/pkg/sql/catalog/colinfo/result_columns.go index 43e563a2dfee..92ac197bbee0 100644 --- a/pkg/sql/catalog/colinfo/result_columns.go +++ b/pkg/sql/catalog/colinfo/result_columns.go @@ -291,4 +291,5 @@ var TenantColumnsWithReplication = ResultColumns{ // The protected timestamp on the destination cluster, meaning we cannot // cutover to before this time. {Name: "retained_time", Typ: types.Timestamp}, + {Name: "replication_start_time", Typ: types.Timestamp}, } diff --git a/pkg/sql/show_tenant.go b/pkg/sql/show_tenant.go index b0bf671b92b1..3f668d760ceb 100644 --- a/pkg/sql/show_tenant.go +++ b/pkg/sql/show_tenant.go @@ -156,6 +156,7 @@ func (n *showTenantNode) Values() tree.Datums { replicationJobId := tree.NewDInt(tree.DInt(n.tenantInfo.TenantReplicationJobID)) replicatedTimestamp := tree.DNull retainedTimestamp := tree.DNull + replicationStartTimestamp := tree.DNull if n.replicationInfo != nil { sourceTenantName = tree.NewDString(string(n.replicationInfo.IngestionDetails.SourceTenantName)) @@ -167,6 +168,8 @@ func (n *showTenantNode) Values() tree.Datums { } // The protected timestamp on the destination cluster. retainedTimestamp, _ = tree.MakeDTimestamp(timeutil.Unix(0, n.protectedTimestamp.WallTime), time.Nanosecond) + replicationStartTimestamp, _ = tree.MakeDTimestamp( + timeutil.Unix(0, n.replicationInfo.IngestionDetails.ReplicationStartTime.WallTime), time.Nanosecond) } return tree.Datums{ @@ -178,6 +181,7 @@ func (n *showTenantNode) Values() tree.Datums { replicationJobId, replicatedTimestamp, retainedTimestamp, + replicationStartTimestamp, } }