Skip to content

Commit

Permalink
builtins: change complete_stream builtin to take a timestamp
Browse files Browse the repository at this point in the history
This change adds a ts parameter to
crdb_internal.complete_stream_ingestion_job builtin. This ts will be the
ts as of which the cluster being ingested into will be considered in a
consistent state.

Release note: None
  • Loading branch information
adityamaru committed Feb 12, 2021
1 parent ea1ef0b commit ae32109
Show file tree
Hide file tree
Showing 10 changed files with 572 additions and 548 deletions.
2 changes: 1 addition & 1 deletion docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2274,7 +2274,7 @@ The swap_ordinate_string parameter is a 2-character string naming the ordinates
<table>
<thead><tr><th>Function &rarr; Returns</th><th>Description</th></tr></thead>
<tbody>
<tr><td><a name="crdb_internal.complete_stream_ingestion_job"></a><code>crdb_internal.complete_stream_ingestion_job(job_id: <a href="int.html">int</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used to signal a running stream ingestion job to complete. The job will eventually stop ingesting, revert to the latest resolved timestamp and leave the cluster in a consistent state. This function does not wait for the job to reach a terminal state, but instead returns the job id as soon as it has signaled the job to complete. This builtin can be used in conjunction with SHOW JOBS WHEN COMPLETE to ensure that the job has left the cluster in a consistent state.</p>
<tr><td><a name="crdb_internal.complete_stream_ingestion_job"></a><code>crdb_internal.complete_stream_ingestion_job(job_id: <a href="int.html">int</a>, cutover_ts: <a href="timestamp.html">timestamptz</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used to signal a running stream ingestion job to complete. The job will eventually stop ingesting, revert to the latest resolved timestamp and leave the cluster in a consistent state. This function does not wait for the job to reach a terminal state, but instead returns the job id as soon as it has signaled the job to complete. This builtin can be used in conjunction with SHOW JOBS WHEN COMPLETE to ensure that the job has left the cluster in a consistent state.</p>
</span></td></tr></tbody>
</table>

Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"//pkg/kv",
"//pkg/sql/sem/tree",
"//pkg/streaming",
"//pkg/util/hlc",
"//pkg/util/protoutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/streamingccl/streamingutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand All @@ -23,7 +24,9 @@ func init() {
streaming.CompleteIngestionHook = doCompleteIngestion
}

func doCompleteIngestion(evalCtx *tree.EvalContext, txn *kv.Txn, jobID int) error {
func doCompleteIngestion(
evalCtx *tree.EvalContext, txn *kv.Txn, jobID int, cutoverTimestamp hlc.Timestamp,
) error {
// Get the job payload for job_id.
const jobsQuery = `SELECT progress FROM system.jobs WHERE id=$1 FOR UPDATE`
row, err := evalCtx.InternalExecutor.QueryRow(evalCtx.Context,
Expand All @@ -49,7 +52,7 @@ func doCompleteIngestion(evalCtx *tree.EvalContext, txn *kv.Txn, jobID int) erro

// Update the sentinel being polled by the stream ingestion job to
// check if a complete has been signaled.
sp.StreamIngest.MarkedForCompletion = true
sp.StreamIngest.CutoverTime = cutoverTimestamp
progress.ModifiedMicros = timeutil.ToUnixMicros(txn.ReadTimestamp().GoTime())
progressBytes, err := protoutil.Marshal(progress)
if err != nil {
Expand Down
12 changes: 8 additions & 4 deletions pkg/ccl/streamingccl/streamingutils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,14 @@ func TestCutoverBuiltin(t *testing.T) {
progress := job.Progress()
sp, ok := progress.GetDetails().(*jobspb.Progress_StreamIngest)
require.True(t, ok)
require.False(t, sp.StreamIngest.MarkedForCompletion)
require.True(t, sp.StreamIngest.CutoverTime.IsEmpty())

cutoverTime := time.Now()
var jobID int
err = db.QueryRowContext(
ctx,
`SELECT crdb_internal.complete_stream_ingestion_job($1)`,
*job.ID()).Scan(&jobID)
`SELECT crdb_internal.complete_stream_ingestion_job($1, $2)`,
*job.ID(), cutoverTime).Scan(&jobID)
require.NoError(t, err)
require.Equal(t, *job.ID(), int64(jobID))

Expand All @@ -73,5 +74,8 @@ func TestCutoverBuiltin(t *testing.T) {
progress = sj.Progress()
sp, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest)
require.True(t, ok)
require.True(t, sp.StreamIngest.MarkedForCompletion)
// The builtin only offers microsecond precision and so we must account for
// that when comparing against our chosen time.
cutoverTime = cutoverTime.Round(time.Microsecond)
require.Equal(t, hlc.Timestamp{WallTime: cutoverTime.UnixNano()}, sp.StreamIngest.CutoverTime)
}
1,073 changes: 540 additions & 533 deletions pkg/jobs/jobspb/jobs.pb.go

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ message StreamIngestionDetails {
}

message StreamIngestionProgress {
// MarkedForCompletion is used to signal to the stream ingestion job to
// complete its ingestion. This involves stopping any subsequent ingestion,
// and rolling back to the latest resolved ts to bring the ingested cluster to
// a consistent state.
bool markedForCompletion = 1;
// CutoverTime is set to signal to the stream ingestion job to complete its
// ingestion. This involves stopping any subsequent ingestion, and rolling
// back any additional ingested data, to bring the ingested cluster to a
// consistent state as of the CutoverTime.
util.hlc.Timestamp cutover_time = 1 [(gogoproto.nullable) = false];
}

message BackupDetails {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sem/builtins/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ go_library(
"//pkg/util/errorutil",
"//pkg/util/errorutil/unimplemented",
"//pkg/util/fuzzystrmatch",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/ipaddr",
"//pkg/util/json",
Expand Down
10 changes: 8 additions & 2 deletions pkg/sql/sem/builtins/builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/fuzzystrmatch"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/ipaddr"
"github.com/cockroachdb/cockroach/pkg/util/json"
Expand Down Expand Up @@ -4775,11 +4776,16 @@ may increase either contention or retry errors, or both.`,
Category: categoryStreamIngestion,
},
tree.Overload{
Types: tree.ArgTypes{{"job_id", types.Int}},
Types: tree.ArgTypes{
{"job_id", types.Int},
{"cutover_ts", types.TimestampTZ},
},
ReturnType: tree.FixedReturnType(types.Int),
Fn: func(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
jobID := int(*args[0].(*tree.DInt))
err := streaming.CompleteIngestionHook(evalCtx, evalCtx.Txn, jobID)
cutoverTime := args[1].(*tree.DTimestampTZ).Time
cutoverTimestamp := hlc.Timestamp{WallTime: cutoverTime.UnixNano()}
err := streaming.CompleteIngestionHook(evalCtx, evalCtx.Txn, jobID, cutoverTimestamp)
return tree.NewDInt(tree.DInt(jobID)), err
},
Info: "This function can be used to signal a running stream ingestion job to complete. " +
Expand Down
1 change: 1 addition & 0 deletions pkg/streaming/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ go_library(
deps = [
"//pkg/kv",
"//pkg/sql/sem/tree",
"//pkg/util/hlc",
],
)
3 changes: 2 additions & 1 deletion pkg/streaming/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ package streaming
import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// CompleteIngestionHook is the hook run by the
// crdb_internal.complete_stream_ingestion_job builtin.
// It is used to signal to a running stream ingestion job to stop ingesting data
// and eventually move to a consistent state as of the latest resolved
// timestamp.
var CompleteIngestionHook func(*tree.EvalContext, *kv.Txn, int) error
var CompleteIngestionHook func(*tree.EvalContext, *kv.Txn, int, hlc.Timestamp) error

0 comments on commit ae32109

Please sign in to comment.