Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
103808: changefeedccl: freeze the hashing algorithm r=[miretskiy] a=HonoreDB

In almost every place where we need consistent hashing, we specify it. But in the Kafka sink, we just used Sarama's default, which is subject to change. This PR specifies the algorithm explicitly there too, and adds a test with hardcoded values so that future updates can't cause inconsistent behavior in a running changefeed.

Fixes cockroachdb#101779

103835: streamingccl: store replicated time in details r=msbutler a=stevendanna

The cutover process uses the progress field to record how many ranges need to be reverted. This, however, wipes out the high watermark that was previously stored in that progress field.

Here, we move the canonical copy of the high watermark to the progress details.

For clarity, we rename "high watermark" as "replicated time".

Further, we delete a long skipped test that isn't providing much value.

Fixes cockroachdb#103534

Epic: none

Release note: None

103891: kv: allow commit triggers for manually-ended transactions r=nvanbenschoten a=miraradeva

Previously, commit triggers were only invoked upon an explicit transaction Commit(). However, sometimes transactions are committed with a separate EndTxnRequest instead, and in those cases the commit triggers were not invoked. This patch fixes the issue by invoking commit triggers as part of Send().

Part of: cockroachdb#82538

Release note: None

Co-authored-by: Aaron Zinger <[email protected]>
Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: Mira Radeva <[email protected]>
  • Loading branch information
4 people committed May 25, 2023
4 parents df21930 + 361c142 + 327947f + 2674279 commit 0ab47ef
Show file tree
Hide file tree
Showing 29 changed files with 429 additions and 656 deletions.
5 changes: 2 additions & 3 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"crypto/x509"
"encoding/json"
"fmt"
"hash/fnv"
"math"
"net/url"
"strings"
Expand Down Expand Up @@ -711,9 +712,7 @@ var _ sarama.Partitioner = &changefeedPartitioner{}
var _ sarama.PartitionerConstructor = newChangefeedPartitioner

func newChangefeedPartitioner(topic string) sarama.Partitioner {
return &changefeedPartitioner{
hash: sarama.NewHashPartitioner(topic),
}
return sarama.NewCustomHashPartitioner(fnv.New32a)(topic)
}

func (p *changefeedPartitioner) RequiresConsistency() bool { return true }
Expand Down
32 changes: 32 additions & 0 deletions pkg/ccl/changefeedccl/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,3 +871,35 @@ func TestSinkConfigParsing(t *testing.T) {
require.ErrorContains(t, err, "invalid character 's' looking for beginning of value")
})
}

func TestChangefeedConsistentPartitioning(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// We test that these arbitrary strings get mapped to these
// arbitrary partitions to ensure that if an upgrade occurs
// while a changefeed is running, partitioning remains the same
// and therefore ordering guarantees are preserved. Changing
// these values is a breaking change.
referencePartitions := map[string]int32{
"0": 1003,
"01": 351,
"10": 940,
"a": 292,
"\x00": 732,
"\xff \xff": 164,
}
longString1 := strings.Repeat("a", 2048)
referencePartitions[longString1] = 755
longString2 := strings.Repeat("a", 2047) + "A"
referencePartitions[longString2] = 592

partitioner := newChangefeedPartitioner("topic1")

for key, expected := range referencePartitions {
actual, err := partitioner.Partition(&sarama.ProducerMessage{Key: sarama.ByteEncoder(key)}, 1031)
require.NoError(t, err)
require.Equal(t, expected, actual)
}

}
66 changes: 33 additions & 33 deletions pkg/ccl/streamingccl/replicationtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,23 +150,18 @@ func FingerprintTenantAtTimestampNoHistory(
return db.QueryStr(t, fingerprintQuery, tenantID)[0][0]
}

// WaitUntilHighWatermark waits for the ingestion job high watermark to reach the given high watermark.
func (c *TenantStreamingClusters) WaitUntilHighWatermark(
highWatermark hlc.Timestamp, ingestionJobID jobspb.JobID,
// WaitUntilReplicatedTime waits for the ingestion job high watermark
// to reach the given target time.
func (c *TenantStreamingClusters) WaitUntilReplicatedTime(
targetTime hlc.Timestamp, ingestionJobID jobspb.JobID,
) {
testutils.SucceedsSoon(c.T, func() error {
progress := jobutils.GetJobProgress(c.T, c.DestSysSQL, ingestionJobID)
if progress.GetHighWater() == nil {
return errors.Newf("stream ingestion has not recorded any progress yet, waiting to advance pos %s",
highWatermark.String())
}
highwater := *progress.GetHighWater()
if highwater.Less(highWatermark) {
return errors.Newf("waiting for stream ingestion job progress %s to advance beyond %s",
highwater.String(), highWatermark.String())
}
return nil
})
WaitUntilReplicatedTime(c.T, targetTime, c.DestSysSQL, ingestionJobID)
}

// WaitUntilStartTimeReached waits for the ingestion replicated time
// to reach the recorded start time of the job.
func (c *TenantStreamingClusters) WaitUntilStartTimeReached(ingestionJobID jobspb.JobID) {
WaitUntilStartTimeReached(c.T, c.DestSysSQL, ingestionJobID)
}

// Cutover sets the cutover timestamp on the replication job causing the job to
Expand Down Expand Up @@ -333,12 +328,6 @@ func (c *TenantStreamingClusters) SrcExec(exec srcInitExecFunc) {
exec(c.T, c.SrcSysSQL, c.SrcTenantSQL)
}

// WaitUntilStartTimeReached waits for the ingestion job high watermark to reach
// the recorded start time of the job.
func (c *TenantStreamingClusters) WaitUntilStartTimeReached(ingestionJobID jobspb.JobID) {
WaitUntilStartTimeReached(c.T, c.DestSysSQL, ingestionJobID)
}

func WaitUntilStartTimeReached(t *testing.T, db *sqlutils.SQLRunner, ingestionJobID jobspb.JobID) {
timeout := 45 * time.Second
if skip.NightlyStress() {
Expand All @@ -358,20 +347,31 @@ func WaitUntilStartTimeReached(t *testing.T, db *sqlutils.SQLRunner, ingestionJo
return errors.New("ingestion start time not yet recorded")
}

progress := jobutils.GetJobProgress(t, db, ingestionJobID)
if progress.GetHighWater() == nil {
return errors.Newf("stream ingestion has not recorded any progress yet, waiting to advance pos %s",
startTime.String())
}
highwater := *progress.GetHighWater()
if highwater.Less(startTime) {
return errors.Newf("waiting for stream ingestion job progress %s to advance beyond %s",
highwater.String(), startTime.String())
}
return nil
return requireReplicatedTime(startTime, jobutils.GetJobProgress(t, db, ingestionJobID))
}, timeout)
}

func WaitUntilReplicatedTime(
t *testing.T, targetTime hlc.Timestamp, db *sqlutils.SQLRunner, ingestionJobID jobspb.JobID,
) {
testutils.SucceedsSoon(t, func() error {
return requireReplicatedTime(targetTime, jobutils.GetJobProgress(t, db, ingestionJobID))
})
}

func requireReplicatedTime(targetTime hlc.Timestamp, progress *jobspb.Progress) error {
replicatedTime := replicationutils.ReplicatedTimeFromProgress(progress)
if replicatedTime.IsEmpty() {
return errors.Newf("stream ingestion has not recorded any progress yet, waiting to advance pos %s",
targetTime)
}
if replicatedTime.Less(targetTime) {
return errors.Newf("waiting for stream ingestion job progress %s to advance beyond %s",
replicatedTime, targetTime)
}
return nil
}

func CreateScatteredTable(t *testing.T, c *TenantStreamingClusters, numNodes int) {
// Create a source table with multiple ranges spread across multiple nodes
numRanges := 50
Expand Down
12 changes: 9 additions & 3 deletions pkg/ccl/streamingccl/replicationutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,11 @@ func GetStreamIngestionStatsNoHeartbeat(
IngestionDetails: &streamIngestionDetails,
IngestionProgress: jobProgress.GetStreamIngest(),
}
if highwater := jobProgress.GetHighWater(); highwater != nil && !highwater.IsEmpty() {

replicatedTime := ReplicatedTimeFromProgress(&jobProgress)
if !replicatedTime.IsEmpty() {
lagInfo := &streampb.StreamIngestionStats_ReplicationLagInfo{
MinIngestedTimestamp: *highwater,
MinIngestedTimestamp: replicatedTime,
}
lagInfo.EarliestCheckpointedTimestamp = hlc.MaxTimestamp
lagInfo.LatestCheckpointedTimestamp = hlc.MinTimestamp
Expand All @@ -154,12 +156,16 @@ func GetStreamIngestionStatsNoHeartbeat(
}
lagInfo.SlowestFastestIngestionLag = lagInfo.LatestCheckpointedTimestamp.GoTime().
Sub(lagInfo.EarliestCheckpointedTimestamp.GoTime())
lagInfo.ReplicationLag = timeutil.Since(highwater.GoTime())
lagInfo.ReplicationLag = timeutil.Since(replicatedTime.GoTime())
stats.ReplicationLagInfo = lagInfo
}
return stats, nil
}

func ReplicatedTimeFromProgress(p *jobspb.Progress) hlc.Timestamp {
return p.Details.(*jobspb.Progress_StreamIngest).StreamIngest.ReplicatedTime
}

func GetStreamIngestionStats(
ctx context.Context,
streamIngestionDetails jobspb.StreamIngestionDetails,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Client interface {
// open its subscription to its partition of a larger stream.
// TODO(dt): ts -> checkpointToken.
Subscribe(ctx context.Context, streamID streampb.StreamID, spec SubscriptionToken,
initialScanTime hlc.Timestamp, previousHighWater hlc.Timestamp) (Subscription, error)
initialScanTime hlc.Timestamp, previousReplicatedTime hlc.Timestamp) (Subscription, error)

// Close releases all the resources used by this client.
Close(ctx context.Context) error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (p *partitionedStreamClient) Subscribe(
streamID streampb.StreamID,
spec SubscriptionToken,
initialScanTime hlc.Timestamp,
previousHighWater hlc.Timestamp,
previousReplicatedTime hlc.Timestamp,
) (Subscription, error) {
_, sp := tracing.ChildSpan(ctx, "streamclient.Client.Subscribe")
defer sp.Finish()
Expand All @@ -212,7 +212,7 @@ func (p *partitionedStreamClient) Subscribe(
return nil, err
}
sps.InitialScanTimestamp = initialScanTime
sps.PreviousHighWaterTimestamp = previousHighWater
sps.PreviousReplicatedTimestamp = previousReplicatedTime

specBytes, err := protoutil.Marshal(&sps)
if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ go_test(
"rangekey_batcher_test.go",
"replication_random_client_test.go",
"replication_stream_e2e_test.go",
"stream_ingestion_frontier_processor_test.go",
"stream_ingestion_job_test.go",
"stream_ingestion_processor_test.go",
],
Expand Down Expand Up @@ -143,7 +142,6 @@ go_test(
"//pkg/testutils/sqlutils",
"//pkg/testutils/storageutils",
"//pkg/testutils/testcluster",
"//pkg/upgrade/upgradebase",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/leaktest",
Expand Down
7 changes: 4 additions & 3 deletions pkg/ccl/streamingccl/streamingest/alter_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,14 @@ func alterTenantJobCutover(
return hlc.Timestamp{}, errors.Newf("job with id %d is not a stream ingestion job", job.ID())
}
progress := job.Progress()

if alterTenantStmt.Cutover.Latest {
ts := progress.GetHighWater()
if ts == nil || ts.IsEmpty() {
replicatedTime := replicationutils.ReplicatedTimeFromProgress(&progress)
if replicatedTime.IsEmpty() {
return hlc.Timestamp{},
errors.Newf("replicated tenant %q has not yet recorded a safe replication time", tenantName)
}
cutoverTime = *ts
cutoverTime = replicatedTime
}

// TODO(ssd): We could use the replication manager here, but
Expand Down
32 changes: 16 additions & 16 deletions pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestAlterTenantCompleteToTime(t *testing.T) {
jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

var cutoverTime time.Time
c.DestSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime)
Expand All @@ -68,14 +68,14 @@ func TestAlterTenantCompleteToLatest(t *testing.T) {
jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

highWater := c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilHighWatermark(highWater, jobspb.JobID(ingestionJobID))
targetReplicatedTime := c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilReplicatedTime(targetReplicatedTime, jobspb.JobID(ingestionJobID))

var cutoverStr string
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO LATEST`,
args.DestTenantName).Scan(&cutoverStr)
cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
require.GreaterOrEqual(t, cutoverOutput.GoTime(), highWater.GoTime())
require.GreaterOrEqual(t, cutoverOutput.GoTime(), targetReplicatedTime.GoTime())
require.LessOrEqual(t, cutoverOutput.GoTime(), c.SrcCluster.Server(0).Clock().Now().GoTime())
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
}
Expand All @@ -94,7 +94,7 @@ func TestAlterTenantPauseResume(t *testing.T) {
jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

// Pause the replication job.
c.DestSysSQL.Exec(t, `ALTER TENANT $1 PAUSE REPLICATION`, args.DestTenantName)
Expand All @@ -104,7 +104,7 @@ func TestAlterTenantPauseResume(t *testing.T) {
c.DestSysSQL.Exec(t, `ALTER TENANT $1 RESUME REPLICATION`, args.DestTenantName)
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))

c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
var cutoverTime time.Time
c.DestSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime)

Expand Down Expand Up @@ -176,12 +176,12 @@ func TestAlterTenantUpdateExistingCutoverTime(t *testing.T) {
jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

highWater := c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilHighWatermark(highWater, jobspb.JobID(ingestionJobID))
replicatedTimeTarget := c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilReplicatedTime(replicatedTimeTarget, jobspb.JobID(ingestionJobID))

// First cutover to a future time.
var cutoverStr string
cutoverTime := highWater.Add(time.Hour.Nanoseconds(), 0)
cutoverTime := replicatedTimeTarget.Add(time.Hour.Nanoseconds(), 0)
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
args.DestTenantName, cutoverTime.AsOfSystemTime()).Scan(&cutoverStr)
cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
Expand All @@ -190,7 +190,7 @@ func TestAlterTenantUpdateExistingCutoverTime(t *testing.T) {
require.Equal(t, cutoverOutput, getCutoverTime())

// And cutover to an even further time.
cutoverTime = highWater.Add((time.Hour * 2).Nanoseconds(), 0)
cutoverTime = replicatedTimeTarget.Add((time.Hour * 2).Nanoseconds(), 0)
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
args.DestTenantName, cutoverTime.AsOfSystemTime()).Scan(&cutoverStr)
cutoverOutput = replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestAlterTenantFailUpdatingCutoverTime(t *testing.T) {

jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

require.Equal(c.T, "replicating", getTenantStatus())

Expand Down Expand Up @@ -337,7 +337,7 @@ func TestTenantStatusWithFutureCutoverTime(t *testing.T) {

jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

require.Equal(c.T, "replicating", getTenantStatus())

Expand All @@ -351,7 +351,7 @@ func TestTenantStatusWithFutureCutoverTime(t *testing.T) {
c.DestSysSQL.Exec(t, `ALTER TENANT $1 RESUME REPLICATION`, args.DestTenantName)
unblockResumerStart()
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

require.Equal(c.T, "replicating", getTenantStatus())

Expand Down Expand Up @@ -422,7 +422,7 @@ func TestTenantStatusWithLatestCutoverTime(t *testing.T) {

jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

require.Equal(c.T, "replicating", getTenantStatus())

Expand Down Expand Up @@ -459,7 +459,7 @@ func TestTenantReplicationStatus(t *testing.T) {

jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

registry := c.DestSysServer.JobRegistry().(*jobs.Registry)

Expand Down Expand Up @@ -496,7 +496,7 @@ func TestAlterTenantHandleFutureProtectedTimestamp(t *testing.T) {

jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO LATEST`, args.DestTenantName)
}
Loading

0 comments on commit 0ab47ef

Please sign in to comment.