diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index 7b60c262131e..b511083a0f66 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "ingest_span_configs.go", "merged_subscription.go", "metrics.go", + "replication_execution_details.go", "stream_ingest_manager.go", "stream_ingestion_dist.go", "stream_ingestion_frontier_processor.go", @@ -67,6 +68,7 @@ go_library( "//pkg/util/bulk", "//pkg/util/ctxgroup", "//pkg/util/hlc", + "//pkg/util/humanizeutil", "//pkg/util/log", "//pkg/util/metric", "//pkg/util/protoutil", @@ -92,6 +94,7 @@ go_test( "main_test.go", "merged_subscription_test.go", "rangekey_batcher_test.go", + "replication_execution_details_test.go", "replication_random_client_test.go", "replication_stream_e2e_test.go", "stream_ingestion_dist_test.go", @@ -134,6 +137,7 @@ go_test( "//pkg/security/securitytest", "//pkg/security/username", "//pkg/server", + "//pkg/server/serverpb", "//pkg/settings", "//pkg/settings/cluster", "//pkg/spanconfig", @@ -159,6 +163,7 @@ go_test( "//pkg/testutils/testcluster", "//pkg/util/ctxgroup", "//pkg/util/hlc", + "//pkg/util/httputil", "//pkg/util/leaktest", "//pkg/util/limit", "//pkg/util/log", diff --git a/pkg/ccl/streamingccl/streamingest/datadriven_test.go b/pkg/ccl/streamingccl/streamingest/datadriven_test.go index f77b5160c3f4..2e7a841ecaa3 100644 --- a/pkg/ccl/streamingccl/streamingest/datadriven_test.go +++ b/pkg/ccl/streamingccl/streamingest/datadriven_test.go @@ -100,6 +100,11 @@ func TestDataDriven(t *testing.T) { ctx := context.Background() datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) { + // Skip the test if it is a .txt file. This is to allow us to have non-test + // testdata in the same directory as the test files. + if strings.HasSuffix(path, ".txt") { + return + } ds := newDatadrivenTestState() defer ds.cleanup(t) datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { diff --git a/pkg/ccl/streamingccl/streamingest/replication_execution_details.go b/pkg/ccl/streamingccl/streamingest/replication_execution_details.go new file mode 100644 index 000000000000..d28e3774a7f1 --- /dev/null +++ b/pkg/ccl/streamingccl/streamingest/replication_execution_details.go @@ -0,0 +1,189 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamingest + +import ( + "bytes" + "context" + "fmt" + "sort" + "text/tabwriter" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/span" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/redact" +) + +type frontierExecutionDetails struct { + srcInstanceID base.SQLInstanceID + destInstanceID base.SQLInstanceID + span string + frontierTS string + behindBy redact.SafeString +} + +// constructSpanFrontierExecutionDetails constructs the frontierExecutionDetails +// using the initial partition specs that map spans to the src and dest +// instances, and a snapshot of the current state of the frontier. +// +// The shape of the spans tracked by the frontier can be different from the +// initial partitioned set of spans. To account for this, for each span in the +// initial partition set we want to output all the intersecting sub-spans in the +// frontier along with their timestamps. +func constructSpanFrontierExecutionDetails( + partitionSpecs execinfrapb.StreamIngestionPartitionSpecs, + frontierSpans execinfrapb.FrontierEntries, +) ([]frontierExecutionDetails, error) { + f, err := span.MakeFrontier() + if err != nil { + return nil, err + } + for _, rs := range frontierSpans.ResolvedSpans { + if err := f.AddSpansAt(rs.Timestamp, rs.Span); err != nil { + return nil, err + } + } + + now := timeutil.Now() + res := make([]frontierExecutionDetails, 0) + for _, spec := range partitionSpecs.Specs { + for _, sp := range spec.Spans { + f.SpanEntries(sp, func(r roachpb.Span, timestamp hlc.Timestamp) (done span.OpResult) { + res = append(res, frontierExecutionDetails{ + srcInstanceID: spec.SrcInstanceID, + destInstanceID: spec.DestInstanceID, + span: r.String(), + frontierTS: timestamp.GoTime().String(), + behindBy: humanizeutil.Duration(now.Sub(timestamp.GoTime())), + }) + return span.ContinueMatch + }) + } + + // Sort res on the basis of srcInstanceID, destInstanceID. + sort.Slice(res, func(i, j int) bool { + if res[i].srcInstanceID != res[j].srcInstanceID { + return res[i].srcInstanceID < res[j].srcInstanceID + } + if res[i].destInstanceID != res[j].destInstanceID { + return res[i].destInstanceID < res[j].destInstanceID + } + return res[i].span < res[j].span + }) + } + + return res, nil +} + +// generateSpanFrontierExecutionDetailFile generates and writes a file to the +// job_info table that captures the mapping from: +// +// # Src Instance | Dest Instance | Span | Frontier Timestamp | Behind By +// +// This information is computed from information persisted by the +// stream ingestion resumer and frontier processor. Namely: +// +// - The StreamIngestionPartitionSpec of each partition providing a mapping from +// span to src and dest SQLInstanceID. +// - The snapshot of the frontier tracking how far each span has been replicated +// up to. +func generateSpanFrontierExecutionDetailFile( + ctx context.Context, execCfg *sql.ExecutorConfig, ingestionJobID jobspb.JobID, skipBehindBy bool, +) error { + return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + var sb bytes.Buffer + w := tabwriter.NewWriter(&sb, 0, 0, 1, ' ', tabwriter.TabIndent) + + // Read the StreamIngestionPartitionSpecs to get a mapping from spans to + // their source and destination SQL instance IDs. + specs, err := jobs.ReadChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, txn, ingestionJobID) + if err != nil { + return err + } + + var partitionSpecs execinfrapb.StreamIngestionPartitionSpecs + if err := protoutil.Unmarshal(specs, &partitionSpecs); err != nil { + return err + } + + // Now, read the latest snapshot of the frontier that tells us what + // timestamp each span has been replicated up to. + frontierEntries, err := jobs.ReadChunkedFileToJobInfo(ctx, frontierEntriesFilename, txn, ingestionJobID) + if err != nil { + return err + } + + var frontierSpans execinfrapb.FrontierEntries + if err := protoutil.Unmarshal(frontierEntries, &frontierSpans); err != nil { + return err + } + executionDetails, err := constructSpanFrontierExecutionDetails(partitionSpecs, frontierSpans) + if err != nil { + return err + } + + header := "Src Instance\tDest Instance\tSpan\tFrontier Timestamp\tBehind By" + if skipBehindBy { + header = "Src Instance\tDest Instance\tSpan\tFrontier Timestamp" + } + fmt.Fprintln(w, header) + for _, ed := range executionDetails { + if skipBehindBy { + fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", + ed.srcInstanceID, ed.destInstanceID, ed.span, ed.frontierTS) + } else { + fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n", + ed.srcInstanceID, ed.destInstanceID, ed.span, ed.frontierTS, ed.behindBy) + } + } + + filename := fmt.Sprintf("replication-frontier.%s.txt", timeutil.Now().Format("20060102_150405.00")) + if err := w.Flush(); err != nil { + return err + } + return jobs.WriteExecutionDetailFile(ctx, filename, sb.Bytes(), txn, ingestionJobID) + }) +} + +// persistStreamIngestionPartitionSpecs persists all +// StreamIngestionPartitionSpecs in a serialized form to the job_info table. +// This information is used when the Resumer is requested to construct a +// replication-frontier.txt file. +func persistStreamIngestionPartitionSpecs( + ctx context.Context, + execCfg *sql.ExecutorConfig, + ingestionJobID jobspb.JobID, + streamIngestionSpecs map[base.SQLInstanceID]*execinfrapb.StreamIngestionDataSpec, +) error { + err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + specs := make([]*execinfrapb.StreamIngestionPartitionSpec, 0) + partitionSpecs := execinfrapb.StreamIngestionPartitionSpecs{Specs: specs} + for _, d := range streamIngestionSpecs { + for _, partitionSpec := range d.PartitionSpecs { + partitionSpecs.Specs = append(partitionSpecs.Specs, &partitionSpec) + } + } + specBytes, err := protoutil.Marshal(&partitionSpecs) + if err != nil { + return err + } + return jobs.WriteChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, specBytes, txn, ingestionJobID) + }) + return err +} diff --git a/pkg/ccl/streamingccl/streamingest/replication_execution_details_test.go b/pkg/ccl/streamingccl/streamingest/replication_execution_details_test.go new file mode 100644 index 000000000000..4647485a6d32 --- /dev/null +++ b/pkg/ccl/streamingccl/streamingest/replication_execution_details_test.go @@ -0,0 +1,384 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamingest + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "os" + "sort" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/httputil" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestFrontierExecutionDetailFile is a unit test for +// constructSpanFrontierExecutionDetails. Refer to the method header for +// details. +func TestConstructFrontierExecutionDetailFile(t *testing.T) { + defer leaktest.AfterTest(t)() + + clearTimestamps := func(executionDetails []frontierExecutionDetails) []frontierExecutionDetails { + res := make([]frontierExecutionDetails, len(executionDetails)) + for i, ed := range executionDetails { + ed.frontierTS = "" + ed.behindBy = "" + res[i] = ed + } + return res + } + + for _, tc := range []struct { + name string + partitionSpecs execinfrapb.StreamIngestionPartitionSpecs + frontierEntries execinfrapb.FrontierEntries + expected []frontierExecutionDetails + }{ + { + name: "matching spans", + partitionSpecs: execinfrapb.StreamIngestionPartitionSpecs{ + Specs: []*execinfrapb.StreamIngestionPartitionSpec{ + { + SrcInstanceID: 1, + DestInstanceID: 2, + Spans: []roachpb.Span{ + {Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + }, + }, + }}, + frontierEntries: execinfrapb.FrontierEntries{ResolvedSpans: []jobspb.ResolvedSpan{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + Timestamp: hlc.Timestamp{WallTime: 1}, + }, + }}, + expected: []frontierExecutionDetails{ + { + srcInstanceID: 1, + destInstanceID: 2, + span: "{a-b}", + }}, + }, + { + name: "multi-partition", + partitionSpecs: execinfrapb.StreamIngestionPartitionSpecs{ + Specs: []*execinfrapb.StreamIngestionPartitionSpec{ + { + SrcInstanceID: 1, + DestInstanceID: 2, + Spans: []roachpb.Span{ + {Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + }, + }, + { + SrcInstanceID: 1, + DestInstanceID: 3, + Spans: []roachpb.Span{ + {Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + }, + }, + }}, + frontierEntries: execinfrapb.FrontierEntries{ResolvedSpans: []jobspb.ResolvedSpan{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("a'")}, + Timestamp: hlc.Timestamp{WallTime: 1}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + Timestamp: hlc.Timestamp{WallTime: 2}, + }, + }}, + expected: []frontierExecutionDetails{ + { + srcInstanceID: 1, + destInstanceID: 2, + span: "a{-'}", + }, + { + srcInstanceID: 1, + destInstanceID: 3, + span: "{b-c}", + }, + }, + }, + { + name: "merged frontier", + partitionSpecs: execinfrapb.StreamIngestionPartitionSpecs{ + Specs: []*execinfrapb.StreamIngestionPartitionSpec{ + { + SrcInstanceID: 1, + DestInstanceID: 2, + Spans: []roachpb.Span{ + {Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + }, + }, + { + SrcInstanceID: 1, + DestInstanceID: 3, + Spans: []roachpb.Span{ + {Key: roachpb.Key("b"), EndKey: roachpb.Key("d")}, + }, + }, + }}, + frontierEntries: execinfrapb.FrontierEntries{ResolvedSpans: []jobspb.ResolvedSpan{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")}, + Timestamp: hlc.Timestamp{WallTime: 1}, + }, + }}, + expected: []frontierExecutionDetails{ + { + srcInstanceID: 1, + destInstanceID: 2, + span: "{a-b}", + }, + { + srcInstanceID: 1, + destInstanceID: 3, + span: "{b-d}", + }}, + }, + { + name: "no matching spans", + partitionSpecs: execinfrapb.StreamIngestionPartitionSpecs{ + Specs: []*execinfrapb.StreamIngestionPartitionSpec{ + { + SrcInstanceID: 1, + DestInstanceID: 2, + Spans: []roachpb.Span{ + {Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + }, + }, + }, + }, + frontierEntries: execinfrapb.FrontierEntries{ResolvedSpans: []jobspb.ResolvedSpan{ + { + Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, + Timestamp: hlc.Timestamp{WallTime: 1}, + }, + }}, + expected: []frontierExecutionDetails{}, + }, + { + name: "split frontier", + partitionSpecs: execinfrapb.StreamIngestionPartitionSpecs{ + Specs: []*execinfrapb.StreamIngestionPartitionSpec{ + { + SrcInstanceID: 1, + DestInstanceID: 2, + Spans: []roachpb.Span{{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")}}}}}, + frontierEntries: execinfrapb.FrontierEntries{ResolvedSpans: []jobspb.ResolvedSpan{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + Timestamp: hlc.Timestamp{WallTime: 1}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + Timestamp: hlc.Timestamp{WallTime: 2}, + }, + }}, + expected: []frontierExecutionDetails{ + { + srcInstanceID: 1, + destInstanceID: 2, + span: "{a-b}", + }, + { + srcInstanceID: 1, + destInstanceID: 2, + span: "{b-c}", + }}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + executionDetails, err := constructSpanFrontierExecutionDetails(tc.partitionSpecs, tc.frontierEntries) + require.NoError(t, err) + executionDetails = clearTimestamps(executionDetails) + require.Equal(t, tc.expected, executionDetails) + }) + } +} + +func listExecutionDetails( + t *testing.T, s serverutils.TestServerInterface, jobID jobspb.JobID, +) []string { + t.Helper() + + client, err := s.GetAdminHTTPClient() + require.NoError(t, err) + + url := s.AdminURL().String() + fmt.Sprintf("/_status/list_job_profiler_execution_details/%d", jobID) + req, err := http.NewRequest("GET", url, nil) + require.NoError(t, err) + + req.Header.Set("Content-Type", httputil.ProtoContentType) + resp, err := client.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + edResp := serverpb.ListJobProfilerExecutionDetailsResponse{} + require.NoError(t, protoutil.Unmarshal(body, &edResp)) + sort.Slice(edResp.Files, func(i, j int) bool { + return edResp.Files[i] < edResp.Files[j] + }) + return edResp.Files +} + +func checkExecutionDetails( + t *testing.T, s serverutils.TestServerInterface, jobID jobspb.JobID, filename string, +) ([]byte, error) { + t.Helper() + + client, err := s.GetAdminHTTPClient() + if err != nil { + return nil, err + } + + url := s.AdminURL().String() + fmt.Sprintf("/_status/job_profiler_execution_details/%d?%s", jobID, filename) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + + req.Header.Set("Content-Type", httputil.ProtoContentType) + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + require.Equal(t, http.StatusOK, resp.StatusCode) + + edResp := serverpb.GetJobProfilerExecutionDetailResponse{} + if err := protoutil.Unmarshal(body, &edResp); err != nil { + return nil, err + } + + r := bytes.NewReader(edResp.Data) + data, err := io.ReadAll(r) + if err != nil { + return data, err + } + if len(data) == 0 { + return data, errors.New("no data returned") + } + return data, nil +} + +func TestEndToEndFrontierExecutionDetailFile(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + + // First, let's persist some partitions specs. + streamIngestionsSpecs := map[base.SQLInstanceID]*execinfrapb.StreamIngestionDataSpec{ + 1: { + PartitionSpecs: map[string]execinfrapb.StreamIngestionPartitionSpec{ + "1": { + SrcInstanceID: 2, + DestInstanceID: 1, + Spans: []roachpb.Span{ + {Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + }, + }, + }, + }, + 2: { + PartitionSpecs: map[string]execinfrapb.StreamIngestionPartitionSpec{ + "1": { + SrcInstanceID: 1, + DestInstanceID: 2, + Spans: []roachpb.Span{ + {Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + }, + }, + }, + }, + 3: { + PartitionSpecs: map[string]execinfrapb.StreamIngestionPartitionSpec{ + "1": { + SrcInstanceID: 3, + DestInstanceID: 3, + Spans: []roachpb.Span{ + {Key: roachpb.Key("d"), EndKey: roachpb.Key("e")}, + }, + }, + }, + }, + } + + srv := serverutils.StartServerOnly(t, base.TestServerArgs{}) + defer srv.Stopper().Stop(ctx) + ts := srv.ApplicationLayer() + execCfg := ts.ExecutorConfig().(sql.ExecutorConfig) + + ingestionJobID := jobspb.JobID(123) + require.NoError(t, persistStreamIngestionPartitionSpecs(ctx, &execCfg, + ingestionJobID, streamIngestionsSpecs)) + + // Now, let's persist some frontier entries. + frontierEntries := execinfrapb.FrontierEntries{ResolvedSpans: []jobspb.ResolvedSpan{ + { + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + Timestamp: hlc.Timestamp{WallTime: 1}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + Timestamp: hlc.Timestamp{WallTime: 2}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("d'")}, + Timestamp: hlc.Timestamp{WallTime: 2}, + }, + { + Span: roachpb.Span{Key: roachpb.Key("d'"), EndKey: roachpb.Key("e")}, + Timestamp: hlc.Timestamp{WallTime: 0}, + }, + }} + + frontierBytes, err := protoutil.Marshal(&frontierEntries) + require.NoError(t, err) + require.NoError(t, execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + return jobs.WriteChunkedFileToJobInfo(ctx, frontierEntriesFilename, frontierBytes, txn, ingestionJobID) + })) + require.NoError(t, generateSpanFrontierExecutionDetailFile(ctx, &execCfg, ingestionJobID, true /* skipBehindBy */)) + files := listExecutionDetails(t, srv, ingestionJobID) + require.Len(t, files, 1) + data, err := checkExecutionDetails(t, srv, ingestionJobID, files[0]) + require.NoError(t, err) + require.NotEmpty(t, data) + + expectedData, err := os.ReadFile(datapathutils.TestDataPath(t, "expected_replication_frontier.txt")) + require.NoError(t, err) + require.Equal(t, expectedData, data) +} diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go index d28dad98e17b..5c79614db213 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go @@ -32,7 +32,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" @@ -56,6 +55,10 @@ var replanFrequency = settings.RegisterDurationSetting( settings.PositiveDuration, ) +// replicationPartitionInfoFilename is the filename at which the replication job +// resumer writes its partition specs. +const replicationPartitionInfoFilename = "~replication-partition-specs.binpb" + func startDistIngestion( ctx context.Context, execCtx sql.JobExecContext, resumer *streamIngestionResumer, ) error { @@ -173,7 +176,6 @@ func startDistIngestion( } return ingestor.ingestSpanConfigs(ctx, details.SourceTenantName) } - execInitialPlan := func(ctx context.Context) error { defer func() { stopReplanner() @@ -267,31 +269,6 @@ func (p *replicationFlowPlanner) getSrcTenantID() (roachpb.TenantID, error) { return p.srcTenantID, nil } -func persistStreamIngestionPartitionSpecs( - ctx context.Context, - execCtx sql.JobExecContext, - ingestionJobID jobspb.JobID, - streamIngestionSpecs map[base.SQLInstanceID]*execinfrapb.StreamIngestionDataSpec, -) error { - replicationPartitionInfoKey := "~replication-partition-specs.binpb" - err := execCtx.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - jobInfoStorage := jobs.InfoStorageForJob(txn, ingestionJobID) - specs := make([]*execinfrapb.StreamIngestionPartitionSpec, 0) - partitionSpecs := execinfrapb.StreamIngestionPartitionSpecs{Specs: specs} - for _, d := range streamIngestionSpecs { - for _, partitionSpec := range d.PartitionSpecs { - partitionSpecs.Specs = append(partitionSpecs.Specs, &partitionSpec) - } - } - specBytes, err := protoutil.Marshal(&partitionSpecs) - if err != nil { - return err - } - return jobInfoStorage.Write(ctx, replicationPartitionInfoKey, specBytes) - }) - return err -} - func (p *replicationFlowPlanner) constructPlanGenerator( execCtx sql.JobExecContext, ingestionJobID jobspb.JobID, @@ -342,7 +319,7 @@ func (p *replicationFlowPlanner) constructPlanGenerator( if knobs := execCtx.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.AfterReplicationFlowPlan != nil { knobs.AfterReplicationFlowPlan(streamIngestionSpecs, streamIngestionFrontierSpec) } - if err := persistStreamIngestionPartitionSpecs(ctx, execCtx, ingestionJobID, streamIngestionSpecs); err != nil { + if err := persistStreamIngestionPartitionSpecs(ctx, execCtx.ExecCfg(), ingestionJobID, streamIngestionSpecs); err != nil { return nil, nil, err } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index 1240b3d7ca08..d532a9ec4d36 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -61,6 +61,10 @@ var DumpFrontierEntries = settings.RegisterDurationSetting( const streamIngestionFrontierProcName = `ingestfntr` +// frontierEntriesFilename is the name of the file at which the stream ingestion +// frontier periodically dumps its state. +const frontierEntriesFilename = "~replication-frontier-entries.binpb" + type streamIngestionFrontier struct { execinfra.ProcessorBase @@ -515,12 +519,11 @@ func (sf *streamIngestionFrontier) updateLagMetric() { // we always persist the entries to the same info key and so we never have more // than one row describing the state of the frontier at a given point in time. func (sf *streamIngestionFrontier) maybePersistFrontierEntries() error { - ctx := sf.Ctx() - log.Info(ctx, "updating the frontier entries") - dumpFreq := DumpFrontierEntries.Get(&sf.flowCtx.Cfg.Settings.SV) + dumpFreq := DumpFrontierEntries.Get(&sf.FlowCtx.Cfg.Settings.SV) if dumpFreq == 0 || timeutil.Since(sf.lastFrontierDump) < dumpFreq { return nil } + ctx := sf.Ctx() f := sf.frontier jobID := jobspb.JobID(sf.spec.JobID) @@ -530,18 +533,14 @@ func (sf *streamIngestionFrontier) maybePersistFrontierEntries() error { return span.ContinueMatch }) - log.Infof(ctx, "updating the frontier with %v", frontierEntries) frontierBytes, err := protoutil.Marshal(frontierEntries) if err != nil { return err } - err = sf.flowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - infoStorage := jobs.InfoStorageForJob(txn, jobID) - frontierEntriesInfoKey := "~replication-frontier-entries.binpb" - return infoStorage.Write(ctx, frontierEntriesInfoKey, frontierBytes) - }) - if err != nil { + if err = sf.FlowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + return jobs.WriteChunkedFileToJobInfo(ctx, frontierEntriesFilename, frontierBytes, txn, jobID) + }); err != nil { return err } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 725155267292..51f8a7c1f1c0 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -536,8 +536,18 @@ func (s *streamIngestionResumer) CollectProfile(ctx context.Context, execCtx int defer s.mu.Unlock() aggStatsCopy = s.mu.perNodeAggregatorStats.DeepCopy() }() - return bulkutil.FlushTracingAggregatorStats(ctx, s.job.ID(), - p.ExecCfg().InternalDB, aggStatsCopy) + + var combinedErr error + if err := bulkutil.FlushTracingAggregatorStats(ctx, s.job.ID(), + p.ExecCfg().InternalDB, aggStatsCopy); err != nil { + combinedErr = errors.CombineErrors(combinedErr, errors.Wrap(err, "failed to flush aggregator stats")) + } + if err := generateSpanFrontierExecutionDetailFile(ctx, p.ExecCfg(), + s.job.ID(), false /* skipBehindBy */); err != nil { + combinedErr = errors.CombineErrors(combinedErr, errors.Wrap(err, "failed to generate span frontier execution details")) + } + + return combinedErr } func closeAndLog(ctx context.Context, d streamclient.Dialer) { diff --git a/pkg/ccl/streamingccl/streamingest/testdata/expected_replication_frontier.txt b/pkg/ccl/streamingccl/streamingest/testdata/expected_replication_frontier.txt new file mode 100644 index 000000000000..ce2cb4529165 --- /dev/null +++ b/pkg/ccl/streamingccl/streamingest/testdata/expected_replication_frontier.txt @@ -0,0 +1,5 @@ +Src Instance Dest Instance Span Frontier Timestamp +1 2 {b-c} 1970-01-01 00:00:00.000000002 +0000 UTC +2 1 {a-b} 1970-01-01 00:00:00.000000001 +0000 UTC +3 3 d{-'} 1970-01-01 00:00:00.000000002 +0000 UTC +3 3 {d'-e} 1970-01-01 00:00:00 +0000 UTC