Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
71833: streamingccl: source cluster producer job in streaming replication r=gh-casper a=gh-casper

Introduce a producer job in source cluster that tracks a stream replication's liveness. The stream replication will be stopped if the job has been inactive for a period of time.

Release note: none

Co-authored-by: Casper <[email protected]>
  • Loading branch information
craig[bot] and gh-casper committed Nov 2, 2021
2 parents 09c5659 + 66ff69f commit f69253e
Show file tree
Hide file tree
Showing 11 changed files with 1,375 additions and 503 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"addresses.go",
"event.go",
"settings.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl",
visibility = ["//visibility:public"],
Expand Down
23 changes: 23 additions & 0 deletions pkg/ccl/streamingccl/settings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamingccl

import "time"

// DefaultJobLivenessTrackingFrequency is the default frequency to check
// the liveness of a streaming replication producer job.
var DefaultJobLivenessTrackingFrequency = 1 * time.Minute

// TestingSetDefaultJobLivenessTrackingFrequency changes DefaultJobLivenessTrackingFrequency for tests.
// Returns function to restore the frequency to its original value.
func TestingSetDefaultJobLivenessTrackingFrequency(f time.Duration) func() {
old := DefaultJobLivenessTrackingFrequency
DefaultJobLivenessTrackingFrequency = f
return func() { DefaultJobLivenessTrackingFrequency = old }
}
18 changes: 17 additions & 1 deletion pkg/ccl/streamingccl/streamproducer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,32 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "streamproducer",
srcs = ["replication_stream_planning.go"],
srcs = [
"producer_job.go",
"replication_stream_planning.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/ccl/changefeedccl/changefeeddist",
"//pkg/ccl/streamingccl",
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/server/telemetry",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/hlc",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand All @@ -28,29 +36,37 @@ go_test(
name = "streamproducer_test",
srcs = [
"main_test.go",
"producer_job_test.go",
"replication_stream_test.go",
],
embed = [":streamproducer"],
deps = [
"//pkg/base",
"//pkg/ccl/changefeedccl",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/ccl/storageccl",
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streamingtest",
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql/catalog/catalogkv",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/timeutil",
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_stretchr_testify//require",
],
Expand Down
101 changes: 101 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/producer_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamproducer

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

func makeProducerJobRecord(
registry *jobs.Registry, tenantID uint64, timeout time.Duration, username security.SQLUsername,
) (jobspb.JobID, jobs.Record) {
prefix := keys.MakeTenantPrefix(roachpb.MakeTenantID(tenantID))
spans := []*roachpb.Span{{Key: prefix, EndKey: prefix.PrefixEnd()}}
jr := jobs.Record{
Description: fmt.Sprintf("stream replication for tenant %d", tenantID),
Username: username,
Details: jobspb.StreamReplicationDetails{
Spans: spans,
},
Progress: jobspb.StreamReplicationProgress{
Expiration: timeutil.Now().Add(timeout),
},
}
return registry.MakeJobID(), jr
}

type producerJobResumer struct {
job *jobs.Job

timeSource timeutil.TimeSource
timer timeutil.TimerI
}

// Resume is part of the jobs.Resumer interface.
func (p *producerJobResumer) Resume(ctx context.Context, execCtx interface{}) error {
jobExec := execCtx.(sql.JobExecContext)
execCfg := jobExec.ExecCfg()
isTimedOut := func(job *jobs.Job) bool {
progress := p.job.Progress()
return progress.GetStreamReplication().Expiration.Before(p.timeSource.Now())
}
if isTimedOut(p.job) {
return errors.Errorf("replication stream %d timed out", p.job.ID())
}

p.timer.Reset(streamingccl.DefaultJobLivenessTrackingFrequency)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-p.timer.Ch():
p.timer.MarkRead()
p.timer.Reset(streamingccl.DefaultJobLivenessTrackingFrequency)
j, err := execCfg.JobRegistry.LoadJob(ctx, p.job.ID())
if err != nil {
return err
}
if isTimedOut(j) {
return errors.Errorf("replication stream %d timed out", p.job.ID())
}
}
}
}

// OnFailOrCancel implements jobs.Resumer interface
func (p *producerJobResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
return nil
}

func init() {
jobs.RegisterConstructor(
jobspb.TypeStreamReplication,
func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
ts := timeutil.DefaultTimeSource{}
return &producerJobResumer{
job: job,
timeSource: ts,
timer: ts.NewTimer(),
}
},
)
}
179 changes: 179 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/producer_job_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamproducer

import (
"context"
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

type coordinatedTimer struct {
timer timeutil.TimerI
in <-chan struct{}
out chan<- struct{}
}

// Reset resets the timer and waits for a new time to be
// assigned to the underlying manual time source.
func (s *coordinatedTimer) Reset(duration time.Duration) {
s.timer.Reset(duration)
s.out <- struct{}{}
}

// Stop behaves the same as the normal timer.
func (s *coordinatedTimer) Stop() bool {
return s.timer.Stop()
}

// Ch returns next timer event after a new time is assigned to
// the underlying manual time source.
func (s *coordinatedTimer) Ch() <-chan time.Time {
<-s.in
return s.timer.Ch()
}

// MarkRead behaves the same as the normal timer.
func (s *coordinatedTimer) MarkRead() {
s.timer.MarkRead()
}

func makeCoordinatedTimer(
i timeutil.TimerI, in <-chan struct{}, out chan<- struct{},
) timeutil.TimerI {
return &coordinatedTimer{
timer: i,
in: in,
out: out,
}
}

type coordinatedResumer struct {
resumer jobs.Resumer
revertingConfirmed chan<- struct{}
}

// Resume behaves the same as the normal resumer.
func (c coordinatedResumer) Resume(ctx context.Context, execCtx interface{}) error {
return c.resumer.Resume(ctx, execCtx)
}

// OnFailOrCancel is called after the job reaches 'reverting' status
// and notifies watcher after it finishes.
func (c coordinatedResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
err := c.resumer.OnFailOrCancel(ctx, execCtx)
c.revertingConfirmed <- struct{}{}
return err
}

func TestStreamReplicationProducerJob(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
clusterArgs := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
},
}
tc := testcluster.StartTestCluster(t, 1, clusterArgs)
defer tc.Stopper().Stop(ctx)

source := tc.Server(0)
sql := sqlutils.MakeSQLRunner(tc.ServerConn(0))
registry := source.JobRegistry().(*jobs.Registry)

registerConstructor := func(initialTime time.Time) (*timeutil.ManualTime, func(), func(), func()) {
mt := timeutil.NewManualTime(initialTime)
waitUntilReverting := make(chan struct{})
in, out := make(chan struct{}, 1), make(chan struct{}, 1)
jobs.RegisterConstructor(jobspb.TypeStreamReplication, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
r := &producerJobResumer{
job: job,
timeSource: mt,
timer: makeCoordinatedTimer(mt.NewTimer(), in, out),
}
return coordinatedResumer{
resumer: r,
revertingConfirmed: waitUntilReverting,
}
})
return mt,
func() {
in <- struct{}{} // Signals the timer that a new time is assigned to time source
},
func() {
<-out // Waits until caller starts waiting for a new timer event
},
func() {
<-waitUntilReverting // Waits until job reaches 'reverting' status
}
}

startJob := func(jobID jobspb.JobID, jr jobs.Record) error {
return source.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
_, err := registry.CreateAdoptableJobWithTxn(ctx, jr, jobID, txn)
return err
})
}

timeout, username := 1*time.Second, security.MakeSQLUsernameFromPreNormalizedString("user")
jobsQuery := func(jobID jobspb.JobID) string {
return fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", jobID)
}
expirationTime := func(record jobs.Record) time.Time {
return record.Progress.(jobspb.StreamReplicationProgress).Expiration
}
t.Run("producer-job", func(t *testing.T) {
jobID, jr := makeProducerJobRecord(registry, 10, timeout, username)

// Case 1: Resumer wakes up and finds the job timed out.
_, _, _, waitUntilReverting := registerConstructor(expirationTime(jr).Add(1 * time.Millisecond))
require.NoError(t, startJob(jobID, jr))

waitUntilReverting()
sql.SucceedsSoonDuration = 1 * time.Second
sql.CheckQueryResultsRetry(t, jobsQuery(jobID), [][]string{{"failed"}})

// Shorten the tracking frequency to make timer easy to be triggerred.
reset := streamingccl.TestingSetDefaultJobLivenessTrackingFrequency(1 * time.Millisecond)
defer reset()

// Case 2: Resumer wakes up and find the job still active.
jobID, jr = makeProducerJobRecord(registry, 20, timeout, username)
mt, timeGiven, waitForTimeRequest, waitUntilReverting := registerConstructor(expirationTime(jr).Add(-5 * time.Millisecond))
require.NoError(t, startJob(jobID, jr))
waitForTimeRequest()
sql.CheckQueryResults(t, jobsQuery(jobID), [][]string{{"running"}})

// Reset the time to be after the timeout
mt.AdvanceTo(expirationTime(jr).Add(2 * time.Millisecond))
timeGiven()
waitUntilReverting()
status := sql.QueryStr(t, jobsQuery(jobID))[0][0]
require.True(t, status == "reverting" || status == "failed")
})
}
1 change: 1 addition & 0 deletions pkg/jobs/jobspb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ proto_library(
"@com_github_cockroachdb_errors//errorspb:errorspb_proto",
"@com_github_gogo_protobuf//gogoproto:gogo_proto",
"@com_google_protobuf//:any_proto",
"@com_google_protobuf//:timestamp_proto",
],
)

Expand Down
Loading

0 comments on commit f69253e

Please sign in to comment.