Skip to content

Commit

Permalink
Merge #60495 #60543
Browse files Browse the repository at this point in the history
60495: sql: ensure type schema change cleanup job is resilient to retries r=lucy-zhang,otan a=arulajmani

Previously if the type schema changer ran into a non-permanent error,
it wouldn't retry transparently. Instead, manual cleanup would be
required. This patch fixes this behavior.

This patch also adds a testing knob, `RunAfterOnFailOrCancel` to test
the afformentioned bug.

Fixes #60489

Release note (bug fix): Previosly, retryable errors in the cleanup
phase of the type schema changer wouldn't be retried automatically
in the background. This is now fixed.

60543: builtins: change complete_stream builtin to take a timestamp r=pbardea,miretskiy a=adityamaru

This change adds a ts parameter to
crdb_internal.complete_stream_ingestion_job builtin. This ts will be the
ts as of which the cluster being ingested into will be considered in a
consistent state.

Release note: None

Co-authored-by: arulajmani <[email protected]>
Co-authored-by: Aditya Maru <[email protected]>
  • Loading branch information
3 people committed Feb 16, 2021
3 parents e1911bc + 755fa25 + 8494822 commit 0c62e88
Show file tree
Hide file tree
Showing 14 changed files with 679 additions and 556 deletions.
2 changes: 1 addition & 1 deletion docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2274,7 +2274,7 @@ The swap_ordinate_string parameter is a 2-character string naming the ordinates
<table>
<thead><tr><th>Function &rarr; Returns</th><th>Description</th></tr></thead>
<tbody>
<tr><td><a name="crdb_internal.complete_stream_ingestion_job"></a><code>crdb_internal.complete_stream_ingestion_job(job_id: <a href="int.html">int</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used to signal a running stream ingestion job to complete. The job will eventually stop ingesting, revert to the latest resolved timestamp and leave the cluster in a consistent state. This function does not wait for the job to reach a terminal state, but instead returns the job id as soon as it has signaled the job to complete. This builtin can be used in conjunction with SHOW JOBS WHEN COMPLETE to ensure that the job has left the cluster in a consistent state.</p>
<tr><td><a name="crdb_internal.complete_stream_ingestion_job"></a><code>crdb_internal.complete_stream_ingestion_job(job_id: <a href="int.html">int</a>, cutover_ts: <a href="timestamp.html">timestamptz</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used to signal a running stream ingestion job to complete. The job will eventually stop ingesting, revert to the specified timestamp and leave the cluster in a consistent state. The specified timestamp can only be specified up to the microsecond. This function does not wait for the job to reach a terminal state, but instead returns the job id as soon as it has signaled the job to complete. This builtin can be used in conjunction with SHOW JOBS WHEN COMPLETE to ensure that the job has left the cluster in a consistent state.</p>
</span></td></tr></tbody>
</table>

Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/ccl/storageccl",
"//pkg/ccl/storageccl/engineccl",
"//pkg/ccl/streamingccl/streamingest",
"//pkg/ccl/streamingccl/streamingutils",
"//pkg/ccl/utilccl",
"//pkg/ccl/workloadccl",
],
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/ccl_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingutils"
_ "github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/workloadccl"
)
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"//pkg/kv",
"//pkg/sql/sem/tree",
"//pkg/streaming",
"//pkg/util/hlc",
"//pkg/util/protoutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/streamingccl/streamingutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand All @@ -23,7 +24,9 @@ func init() {
streaming.CompleteIngestionHook = doCompleteIngestion
}

func doCompleteIngestion(evalCtx *tree.EvalContext, txn *kv.Txn, jobID int) error {
func doCompleteIngestion(
evalCtx *tree.EvalContext, txn *kv.Txn, jobID int, cutoverTimestamp hlc.Timestamp,
) error {
// Get the job payload for job_id.
const jobsQuery = `SELECT progress FROM system.jobs WHERE id=$1 FOR UPDATE`
row, err := evalCtx.InternalExecutor.QueryRow(evalCtx.Context,
Expand All @@ -49,7 +52,7 @@ func doCompleteIngestion(evalCtx *tree.EvalContext, txn *kv.Txn, jobID int) erro

// Update the sentinel being polled by the stream ingestion job to
// check if a complete has been signaled.
sp.StreamIngest.MarkedForCompletion = true
sp.StreamIngest.CutoverTime = cutoverTimestamp
progress.ModifiedMicros = timeutil.ToUnixMicros(txn.ReadTimestamp().GoTime())
progressBytes, err := protoutil.Marshal(progress)
if err != nil {
Expand Down
12 changes: 8 additions & 4 deletions pkg/ccl/streamingccl/streamingutils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,14 @@ func TestCutoverBuiltin(t *testing.T) {
progress := job.Progress()
sp, ok := progress.GetDetails().(*jobspb.Progress_StreamIngest)
require.True(t, ok)
require.False(t, sp.StreamIngest.MarkedForCompletion)
require.True(t, sp.StreamIngest.CutoverTime.IsEmpty())

cutoverTime := timeutil.Now()
var jobID int
err = db.QueryRowContext(
ctx,
`SELECT crdb_internal.complete_stream_ingestion_job($1)`,
*job.ID()).Scan(&jobID)
`SELECT crdb_internal.complete_stream_ingestion_job($1, $2)`,
*job.ID(), cutoverTime).Scan(&jobID)
require.NoError(t, err)
require.Equal(t, *job.ID(), int64(jobID))

Expand All @@ -73,5 +74,8 @@ func TestCutoverBuiltin(t *testing.T) {
progress = sj.Progress()
sp, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest)
require.True(t, ok)
require.True(t, sp.StreamIngest.MarkedForCompletion)
// The builtin only offers microsecond precision and so we must account for
// that when comparing against our chosen time.
cutoverTime = cutoverTime.Round(time.Microsecond)
require.Equal(t, hlc.Timestamp{WallTime: cutoverTime.UnixNano()}, sp.StreamIngest.CutoverTime)
}
Loading

0 comments on commit 0c62e88

Please sign in to comment.