Skip to content

Commit

Permalink
streamingest: output replication-frontier.txt file on demand
Browse files Browse the repository at this point in the history
This is the last commit in the series that uses the two pieces
of information persisted in the previous commits. Namely,

- The StreamIngestionPartitionSpec of each partition providing a mapping from
span to src and dest SQLInstanceID.

- The snapshot of the frontier tracking how far each span has been replicated
up to.

The file is generated everytime the destination job is requested for its execution
details. The file will be consumable from the `Advanced Debugging` tab on the
job details page and will be of the form:

```
Src Instance Dest Instance Span                    Frontier Timestamp                   Behind By
1            1             /Tenant/2{-/Table/106}  2023-08-16 19:22:44.940537 +0000 UTC 13s
1            1             /Tenant/{2/Table/106-3} 2023-08-16 19:22:44.739163 +0000 UTC 13.2s
```

Informs: #108374
Release note: None
  • Loading branch information
adityamaru committed Sep 27, 2023
1 parent f5f9cf4 commit 5ac0db3
Show file tree
Hide file tree
Showing 8 changed files with 614 additions and 40 deletions.
5 changes: 5 additions & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"ingest_span_configs.go",
"merged_subscription.go",
"metrics.go",
"replication_execution_details.go",
"stream_ingest_manager.go",
"stream_ingestion_dist.go",
"stream_ingestion_frontier_processor.go",
Expand Down Expand Up @@ -67,6 +68,7 @@ go_library(
"//pkg/util/bulk",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/protoutil",
Expand All @@ -92,6 +94,7 @@ go_test(
"main_test.go",
"merged_subscription_test.go",
"rangekey_batcher_test.go",
"replication_execution_details_test.go",
"replication_random_client_test.go",
"replication_stream_e2e_test.go",
"stream_ingestion_dist_test.go",
Expand Down Expand Up @@ -134,6 +137,7 @@ go_test(
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/server/serverpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
Expand All @@ -159,6 +163,7 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/httputil",
"//pkg/util/leaktest",
"//pkg/util/limit",
"//pkg/util/log",
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/streamingccl/streamingest/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ func TestDataDriven(t *testing.T) {

ctx := context.Background()
datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) {
// Skip the test if it is a .txt file. This is to allow us to have non-test
// testdata in the same directory as the test files.
if strings.HasSuffix(path, ".txt") {
return
}
ds := newDatadrivenTestState()
defer ds.cleanup(t)
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
Expand Down
189 changes: 189 additions & 0 deletions pkg/ccl/streamingccl/streamingest/replication_execution_details.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright 2023 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamingest

import (
"bytes"
"context"
"fmt"
"sort"
"text/tabwriter"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/redact"
)

type frontierExecutionDetails struct {
srcInstanceID base.SQLInstanceID
destInstanceID base.SQLInstanceID
span string
frontierTS string
behindBy redact.SafeString
}

// constructSpanFrontierExecutionDetails constructs the frontierExecutionDetails
// using the initial partition specs that map spans to the src and dest
// instances, and a snapshot of the current state of the frontier.
//
// The shape of the spans tracked by the frontier can be different from the
// initial partitioned set of spans. To account for this, for each span in the
// initial partition set we want to output all the intersecting sub-spans in the
// frontier along with their timestamps.
func constructSpanFrontierExecutionDetails(
partitionSpecs execinfrapb.StreamIngestionPartitionSpecs,
frontierSpans execinfrapb.FrontierEntries,
) ([]frontierExecutionDetails, error) {
f, err := span.MakeFrontier()
if err != nil {
return nil, err
}
for _, rs := range frontierSpans.ResolvedSpans {
if err := f.AddSpansAt(rs.Timestamp, rs.Span); err != nil {
return nil, err
}
}

now := timeutil.Now()
res := make([]frontierExecutionDetails, 0)
for _, spec := range partitionSpecs.Specs {
for _, sp := range spec.Spans {
f.SpanEntries(sp, func(r roachpb.Span, timestamp hlc.Timestamp) (done span.OpResult) {
res = append(res, frontierExecutionDetails{
srcInstanceID: spec.SrcInstanceID,
destInstanceID: spec.DestInstanceID,
span: r.String(),
frontierTS: timestamp.GoTime().String(),
behindBy: humanizeutil.Duration(now.Sub(timestamp.GoTime())),
})
return span.ContinueMatch
})
}

// Sort res on the basis of srcInstanceID, destInstanceID.
sort.Slice(res, func(i, j int) bool {
if res[i].srcInstanceID != res[j].srcInstanceID {
return res[i].srcInstanceID < res[j].srcInstanceID
}
if res[i].destInstanceID != res[j].destInstanceID {
return res[i].destInstanceID < res[j].destInstanceID
}
return res[i].span < res[j].span
})
}

return res, nil
}

// generateSpanFrontierExecutionDetailFile generates and writes a file to the
// job_info table that captures the mapping from:
//
// # Src Instance | Dest Instance | Span | Frontier Timestamp | Behind By
//
// This information is computed from information persisted by the
// stream ingestion resumer and frontier processor. Namely:
//
// - The StreamIngestionPartitionSpec of each partition providing a mapping from
// span to src and dest SQLInstanceID.
// - The snapshot of the frontier tracking how far each span has been replicated
// up to.
func generateSpanFrontierExecutionDetailFile(
ctx context.Context, execCfg *sql.ExecutorConfig, ingestionJobID jobspb.JobID, skipBehindBy bool,
) error {
return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
var sb bytes.Buffer
w := tabwriter.NewWriter(&sb, 0, 0, 1, ' ', tabwriter.TabIndent)

// Read the StreamIngestionPartitionSpecs to get a mapping from spans to
// their source and destination SQL instance IDs.
specs, err := jobs.ReadChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, txn, ingestionJobID)
if err != nil {
return err
}

var partitionSpecs execinfrapb.StreamIngestionPartitionSpecs
if err := protoutil.Unmarshal(specs, &partitionSpecs); err != nil {
return err
}

// Now, read the latest snapshot of the frontier that tells us what
// timestamp each span has been replicated up to.
frontierEntries, err := jobs.ReadChunkedFileToJobInfo(ctx, frontierEntriesFilename, txn, ingestionJobID)
if err != nil {
return err
}

var frontierSpans execinfrapb.FrontierEntries
if err := protoutil.Unmarshal(frontierEntries, &frontierSpans); err != nil {
return err
}
executionDetails, err := constructSpanFrontierExecutionDetails(partitionSpecs, frontierSpans)
if err != nil {
return err
}

header := "Src Instance\tDest Instance\tSpan\tFrontier Timestamp\tBehind By"
if skipBehindBy {
header = "Src Instance\tDest Instance\tSpan\tFrontier Timestamp"
}
fmt.Fprintln(w, header)
for _, ed := range executionDetails {
if skipBehindBy {
fmt.Fprintf(w, "%s\t%s\t%s\t%s\n",
ed.srcInstanceID, ed.destInstanceID, ed.span, ed.frontierTS)
} else {
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n",
ed.srcInstanceID, ed.destInstanceID, ed.span, ed.frontierTS, ed.behindBy)
}
}

filename := fmt.Sprintf("replication-frontier.%s.txt", timeutil.Now().Format("20060102_150405.00"))
if err := w.Flush(); err != nil {
return err
}
return jobs.WriteExecutionDetailFile(ctx, filename, sb.Bytes(), txn, ingestionJobID)
})
}

// persistStreamIngestionPartitionSpecs persists all
// StreamIngestionPartitionSpecs in a serialized form to the job_info table.
// This information is used when the Resumer is requested to construct a
// replication-frontier.txt file.
func persistStreamIngestionPartitionSpecs(
ctx context.Context,
execCfg *sql.ExecutorConfig,
ingestionJobID jobspb.JobID,
streamIngestionSpecs map[base.SQLInstanceID]*execinfrapb.StreamIngestionDataSpec,
) error {
err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
specs := make([]*execinfrapb.StreamIngestionPartitionSpec, 0)
partitionSpecs := execinfrapb.StreamIngestionPartitionSpecs{Specs: specs}
for _, d := range streamIngestionSpecs {
for _, partitionSpec := range d.PartitionSpecs {
partitionSpecs.Specs = append(partitionSpecs.Specs, &partitionSpec)
}
}
specBytes, err := protoutil.Marshal(&partitionSpecs)
if err != nil {
return err
}
return jobs.WriteChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, specBytes, txn, ingestionJobID)
})
return err
}
Loading

0 comments on commit 5ac0db3

Please sign in to comment.