Skip to content

Commit

Permalink
streamingccl: send heartbeat time to source cluster
Browse files Browse the repository at this point in the history
Previously, persisted replicated time is used in hearbeats
sent to the source cluster. By using the persisted replicated time,
failback timestamp may be unprotected when cutover time has been
set and is lower than the persisted replicated time. In this case,
we may fail to replicate data from cutover time since the data
to be replicated may have been garbage collected due to having
a persisted replicated timestamp higher than the cutover timestamp.

In this PR, we fix the issue by taking the minimum of persisted
replicated time and cutover time to be the heartbeat time.
Heartbeat time will be used in heartbeats sent to source
cluster instead of persisted replicated time. This change ensures
that data to be replicated are safe from garbage collection at
and above cutover time.

Informs: #117984
Epic: none

Release note: none
  • Loading branch information
azhu-crl committed Jun 12, 2024
1 parent f00f1cc commit 8192a06
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 19 deletions.
9 changes: 7 additions & 2 deletions pkg/ccl/streamingccl/replicationtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,16 @@ ORDER BY created DESC LIMIT 1`, c.Args.DestTenantName)
// stop eventually. If the provided cutover time is the zero value, cutover to
// the latest replicated time.
func (c *TenantStreamingClusters) Cutover(
producerJobID, ingestionJobID int, cutoverTime time.Time, async bool,
) {
ctx context.Context, producerJobID, ingestionJobID int, cutoverTime time.Time, async bool,
) string {
// Cut over the ingestion job and the job will stop eventually.
var cutoverStr string
if cutoverTime.IsZero() {
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO LATEST`,
c.Args.DestTenantName).Scan(&cutoverStr)
cutoverOutput := DecimalTimeToHLC(c.T, cutoverStr)
protectedTimestamp := replicationutils.TestingGetPTSFromReplicationJob(c.T, ctx, c.SrcSysSQL, c.SrcSysServer, producerJobID)
require.LessOrEqual(c.T, protectedTimestamp.GoTime(), cutoverOutput.GoTime())
} else {
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
c.Args.DestTenantName, cutoverTime).Scan(&cutoverStr)
Expand All @@ -281,6 +284,8 @@ func (c *TenantStreamingClusters) Cutover(
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitForPostCutoverRetentionJob()
}

return cutoverStr
}

// StartStreamReplication producer job ID and ingestion job ID.
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/streamingccl/replicationutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ go_library(
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/sql",
"//pkg/sql/catalog/descs",
"//pkg/sql/isql",
"//pkg/storage",
"//pkg/testutils/fingerprintutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
Expand Down
27 changes: 27 additions & 0 deletions pkg/ccl/streamingccl/replicationutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils/fingerprintutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -277,3 +281,26 @@ func TestingGetStreamIngestionStatsFromReplicationJob(
require.NoError(t, err)
return stats
}

func TestingGetPTSFromReplicationJob(
t *testing.T,
ctx context.Context,
sqlRunner *sqlutils.SQLRunner,
srv serverutils.ApplicationLayerInterface,
producerJobID int,
) hlc.Timestamp {
payload := jobutils.GetJobPayload(t, sqlRunner, jobspb.JobID(producerJobID))
details := payload.GetStreamReplication()
ptsRecordID := details.ProtectedTimestampRecordID
ptsProvider := srv.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider

var ptsRecord *ptpb.Record
err := srv.InternalDB().(descs.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
var err error
ptsRecord, err = ptsProvider.WithTxn(txn).GetRecord(ctx, ptsRecordID)
return err
})
require.NoError(t, err)

return ptsRecord.Timestamp
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,8 @@ func TestAlterTenantCompleteToLatest(t *testing.T) {
targetReplicatedTime := c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilReplicatedTime(targetReplicatedTime, jobspb.JobID(ingestionJobID))

var cutoverStr string
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO LATEST`,
args.DestTenantName).Scan(&cutoverStr)
var emptyCutoverTime time.Time
cutoverStr := c.Cutover(ctx, producerJobID, ingestionJobID, emptyCutoverTime, false)

cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
require.GreaterOrEqual(t, cutoverOutput.GoTime(), targetReplicatedTime.GoTime())
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func TestDataDriven(t *testing.T) {
}
timestamp, _, err := tree.ParseDTimestamp(nil, cutoverTime, time.Microsecond)
require.NoError(t, err)
ds.replicationClusters.Cutover(ds.producerJobID, ds.ingestionJobID, timestamp.Time, async)
ds.replicationClusters.Cutover(ctx, ds.producerJobID, ds.ingestionJobID, timestamp.Time, async)
return ""

case "exec-sql":
Expand Down
14 changes: 7 additions & 7 deletions pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func TestTenantStreamingPauseOnPermanentJobError(t *testing.T) {

// Check dest has caught up the previous updates.
srcTime := c.SrcCluster.Server(0).Clock().Now()
c.Cutover(producerJobID, ingestionJobID, srcTime.GoTime(), false)
c.Cutover(ctx, producerJobID, ingestionJobID, srcTime.GoTime(), false)
c.RequireFingerprintMatchAtTimestamp(srcTime.AsOfSystemTime())

// Ingestion happened one more time after resuming the ingestion job.
Expand Down Expand Up @@ -331,7 +331,7 @@ func TestTenantStreamingCheckpoint(t *testing.T) {

cutoverTime := c.DestSysServer.Clock().Now()
c.WaitUntilReplicatedTime(cutoverTime, jobspb.JobID(ingestionJobID))
c.Cutover(producerJobID, ingestionJobID, cutoverTime.GoTime(), false)
c.Cutover(ctx, producerJobID, ingestionJobID, cutoverTime.GoTime(), false)
c.RequireFingerprintMatchAtTimestamp(cutoverTime.AsOfSystemTime())

// Clients should never be started prior to a checkpointed timestamp
Expand Down Expand Up @@ -566,7 +566,7 @@ INSERT INTO d.t_for_import (i) VALUES (1);
c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID))

cutoverTime := c.SrcSysServer.Clock().Now()
c.Cutover(producerJobID, ingestionJobID, cutoverTime.GoTime(), false)
c.Cutover(ctx, producerJobID, ingestionJobID, cutoverTime.GoTime(), false)
c.RequireFingerprintMatchAtTimestamp(cutoverTime.AsOfSystemTime())
}

Expand Down Expand Up @@ -720,7 +720,7 @@ func TestTenantStreamingMultipleNodes(t *testing.T) {
c.WaitUntilStartTimeReached(jobspb.JobID(ingestionJobID))

cutoverTime := c.DestSysServer.Clock().Now()
c.Cutover(producerJobID, ingestionJobID, cutoverTime.GoTime(), false)
c.Cutover(ctx, producerJobID, ingestionJobID, cutoverTime.GoTime(), false)
counts := telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts)
require.GreaterOrEqual(t, counts["physical_replication.cutover"], int32(1))
c.RequireFingerprintMatchAtTimestamp(cutoverTime.AsOfSystemTime())
Expand Down Expand Up @@ -1055,7 +1055,7 @@ func TestProtectedTimestampManagement(t *testing.T) {
c.DestSysSQL.Exec(t, fmt.Sprintf("RESUME JOB %d", replicationJobID))
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(replicationJobID))
var emptyCutoverTime time.Time
c.Cutover(producerJobID, replicationJobID, emptyCutoverTime, false)
c.Cutover(ctx, producerJobID, replicationJobID, emptyCutoverTime, false)
c.SrcSysSQL.Exec(t, fmt.Sprintf(`ALTER TENANT '%s' SET REPLICATION EXPIRATION WINDOW ='100ms'`, c.Args.SrcTenantName))
}

Expand Down Expand Up @@ -1404,7 +1404,7 @@ func TestStreamingMismatchedMRDatabase(t *testing.T) {

c.WaitUntilStartTimeReached(jobspb.JobID(ingestionJobID))
srcTime := c.SrcCluster.Server(0).Clock().Now()
c.Cutover(producerJobID, ingestionJobID, srcTime.GoTime(), false)
c.Cutover(ctx, producerJobID, ingestionJobID, srcTime.GoTime(), false)

defer c.StartDestTenant(ctx, nil, 0)()

Expand Down Expand Up @@ -1479,7 +1479,7 @@ func TestStreamingZoneConfigsMismatchedRegions(t *testing.T) {

c.WaitUntilStartTimeReached(jobspb.JobID(ingestionJobID))
srcTime := c.SrcCluster.Server(0).Clock().Now()
c.Cutover(producerJobID, ingestionJobID, srcTime.GoTime(), false)
c.Cutover(ctx, producerJobID, ingestionJobID, srcTime.GoTime(), false)

defer c.StartDestTenant(ctx, nil, 0)()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ type streamIngestionFrontier struct {
// persistedReplicatedTime stores the highwater mark of
// progress that is persisted in the job record.
persistedReplicatedTime hlc.Timestamp
// heartbeatTime is the earliest timestamp for which the
// source cluster can begin garbage collection.
heartbeatTime hlc.Timestamp

lastPartitionUpdate time.Time
lastFrontierDump time.Time
Expand Down Expand Up @@ -204,10 +207,10 @@ func (sf *streamIngestionFrontier) Next() (
case <-sf.Ctx().Done():
sf.MoveToDrainingAndLogError(sf.Ctx().Err())
return nil, sf.DrainHelper()
// Send the latest persisted replicated time in the heartbeat to the source cluster
// Send the latest heartbeat time in the heartbeat to the source cluster
// as even with retries we will never request an earlier row than it, and
// the source cluster is free to clean up earlier data.
case sf.heartbeatSender.FrontierUpdates <- sf.persistedReplicatedTime:
case sf.heartbeatSender.FrontierUpdates <- sf.heartbeatTime:
// If heartbeatSender has error, it means remote has error, we want to
// stop the processor.
case <-sf.heartbeatSender.StoppedChan:
Expand Down Expand Up @@ -315,6 +318,7 @@ func (sf *streamIngestionFrontier) maybeUpdateProgress() error {
})

replicatedTime := f.Frontier()
var cutoverTime hlc.Timestamp

sf.lastPartitionUpdate = timeutil.Now()
log.VInfof(ctx, 2, "persisting replicated time of %s", replicatedTime)
Expand All @@ -329,6 +333,8 @@ func (sf *streamIngestionFrontier) maybeUpdateProgress() error {
streamProgress := progress.Details.(*jobspb.Progress_StreamIngest).StreamIngest
streamProgress.Checkpoint.ResolvedSpans = frontierResolvedSpans

cutoverTime = streamProgress.CutoverTime

// Keep the recorded replicatedTime empty until some advancement has been made
if sf.replicatedTimeAtStart.Less(replicatedTime) {
streamProgress.ReplicatedTime = replicatedTime
Expand Down Expand Up @@ -368,8 +374,8 @@ func (sf *streamIngestionFrontier) maybeUpdateProgress() error {

// If we have a CutoverTime set, keep the protected
// timestamp at or below the cutover time.
if !streamProgress.CutoverTime.IsEmpty() && streamProgress.CutoverTime.Less(newProtectAbove) {
newProtectAbove = streamProgress.CutoverTime
if !cutoverTime.IsEmpty() && cutoverTime.Less(newProtectAbove) {
newProtectAbove = cutoverTime
}

if record.Timestamp.Less(newProtectAbove) {
Expand All @@ -382,6 +388,12 @@ func (sf *streamIngestionFrontier) maybeUpdateProgress() error {
}
sf.metrics.JobProgressUpdates.Inc(1)
sf.persistedReplicatedTime = f.Frontier()

if cutoverTime.IsEmpty() || sf.persistedReplicatedTime.Less(cutoverTime) {
sf.heartbeatTime = sf.persistedReplicatedTime
} else {
sf.heartbeatTime = cutoverTime
}
sf.metrics.ReplicatedTimeSeconds.Update(sf.persistedReplicatedTime.GoTime().Unix())
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func TestReplicationJobResumptionStartTime(t *testing.T) {
canContinue <- struct{}{}
srcTime = c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(replicationJobID))
c.Cutover(producerJobID, replicationJobID, srcTime.GoTime(), false)
c.Cutover(ctx, producerJobID, replicationJobID, srcTime.GoTime(), false)
jobutils.WaitForJobToSucceed(t, c.DestSysSQL, jobspb.JobID(replicationJobID))
}

Expand Down Expand Up @@ -592,7 +592,7 @@ func TestCutoverCheckpointing(t *testing.T) {
// Ensure there are no remaining cutover spans before cutover begins.
require.Equal(t, len(getCutoverRemainingSpans()), 0)

c.Cutover(producerJobIDInt, replicationJobIDInt, cutoverTime.GoTime(), true)
c.Cutover(ctx, producerJobIDInt, replicationJobIDInt, cutoverTime.GoTime(), true)
<-progressUpdated

c.DestSysSQL.Exec(t, `PAUSE JOB $1`, &replicationJobID)
Expand Down

0 comments on commit 8192a06

Please sign in to comment.