Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
126456: crosscluster/logical: minor clean ups r=dt a=dt

See commits

Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
craig[bot] and dt committed Jun 29, 2024
2 parents 00a4562 + 897974c commit d132535
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 22 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/crosscluster/logical/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ go_test(
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/catalog",
Expand Down
21 changes: 9 additions & 12 deletions pkg/ccl/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"bytes"
"context"
"fmt"
"net/url"
"strings"
"sync/atomic"
"testing"
Expand All @@ -22,7 +21,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand Down Expand Up @@ -51,6 +49,7 @@ var (
"SET CLUSTER SETTING kv.rangefeed.closed_timestamp_refresh_interval = '200ms'",
"SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'",
"SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '50ms'",
"SET CLUSTER SETTING physical_replication.producer.timestamp_granularity = '0s'",

// TODO(ssd): Duplicate these over to logical_replication as well.
"SET CLUSTER SETTING physical_replication.producer.min_checkpoint_frequency='100ms'",
Expand Down Expand Up @@ -118,7 +117,9 @@ func TestLogicalStreamIngestionJob(t *testing.T) {
defer server.Stopper().Stop(ctx)
s := server.Server(0).ApplicationLayer()

_, err := server.Conns[0].Exec("CREATE DATABASE a")
_, err := server.Conns[0].Exec("SET CLUSTER SETTING physical_replication.producer.timestamp_granularity = '0s'")
require.NoError(t, err)
_, err = server.Conns[0].Exec("CREATE DATABASE a")
require.NoError(t, err)
_, err = server.Conns[0].Exec("CREATE DATABASE B")
require.NoError(t, err)
Expand All @@ -143,12 +144,10 @@ func TestLogicalStreamIngestionJob(t *testing.T) {
dbA.Exec(t, "INSERT INTO tab VALUES (1, 'hello')")
dbB.Exec(t, "INSERT INTO tab VALUES (1, 'goodbye')")

dbAURL, cleanup := sqlutils.PGUrl(t, s.SQLAddr(), t.Name(), url.User(username.RootUser))
dbAURL.Path = "a"
dbAURL, cleanup := s.PGUrl(t, serverutils.DBName("a"))
defer cleanup()
dbBURL, cleanupB := sqlutils.PGUrl(t, s.SQLAddr(), t.Name(), url.User(username.RootUser))
dbBURL, cleanupB := s.PGUrl(t, serverutils.DBName("b"))
defer cleanupB()
dbBURL.Path = "b"

var (
jobAID jobspb.JobID
Expand Down Expand Up @@ -280,7 +279,7 @@ family f2(other_payload, v2))

serverASQL.Exec(t, "INSERT INTO tab(pk, payload, other_payload) VALUES (1, 'hello', 'ruroh1')")

serverAURL, cleanup := sqlutils.PGUrl(t, serverA.Server(0).ApplicationLayer().SQLAddr(), t.Name(), url.User(username.RootUser))
serverAURL, cleanup := serverA.Server(0).ApplicationLayer().PGUrl(t)
defer cleanup()

var jobBID jobspb.JobID
Expand Down Expand Up @@ -356,8 +355,7 @@ func TestRandomTables(t *testing.T) {
runnerA.Exec(t, addCol)
runnerB.Exec(t, addCol)

dbAURL, cleanup := sqlutils.PGUrl(t, s.SQLAddr(), t.Name(), url.User(username.RootUser))
dbAURL.Path = "a"
dbAURL, cleanup := s.PGUrl(t, serverutils.DBName("a"))
defer cleanup()

streamStartStmt := fmt.Sprintf("CREATE LOGICAL REPLICATION STREAM FROM TABLE %[1]s ON $1 INTO TABLE %[1]s", tableName)
Expand Down Expand Up @@ -418,8 +416,7 @@ func TestPreviouslyInterestingTables(t *testing.T) {
baseTableName := "rand_table"
rng, _ := randutil.NewPseudoRand()
numInserts := 20
dbAURL, cleanup := sqlutils.PGUrl(t, s.SQLAddr(), t.Name(), url.User(username.RootUser))
dbAURL.Path = "a"
dbAURL, cleanup := s.PGUrl(t, serverutils.DBName("a"))
defer cleanup()
for i, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down
3 changes: 0 additions & 3 deletions pkg/ccl/crosscluster/physical/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,6 @@ func (p *replicationFlowPlanner) constructPlanGenerator(

streamIngestionSpecs, streamIngestionFrontierSpec, err := constructStreamIngestionPlanSpecs(
ctx,
crosscluster.StreamAddress(details.StreamAddress),
topology,
destNodeLocalities,
initialScanTimestamp,
Expand Down Expand Up @@ -757,7 +756,6 @@ func GetDestNodeLocalities(

func constructStreamIngestionPlanSpecs(
ctx context.Context,
streamAddress crosscluster.StreamAddress,
topology streamclient.Topology,
destSQLInstances []sql.InstanceLocality,
initialScanTimestamp hlc.Timestamp,
Expand All @@ -780,7 +778,6 @@ func constructStreamIngestionPlanSpecs(
PreviousReplicatedTimestamp: previousReplicatedTimestamp,
InitialScanTimestamp: initialScanTimestamp,
Checkpoint: checkpoint, // TODO: Only forward relevant checkpoint info
StreamAddress: string(streamAddress),
PartitionSpecs: make(map[string]execinfrapb.StreamIngestionPartitionSpec),
TenantRekey: execinfrapb.TenantRekey{
OldID: sourceTenantID,
Expand Down
3 changes: 0 additions & 3 deletions pkg/ccl/crosscluster/physical/stream_ingestion_dist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -318,10 +317,8 @@ func TestSourceDestMatching(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
fakeStreamAddress := crosscluster.StreamAddress("")
sipSpecs, _, err := constructStreamIngestionPlanSpecs(
ctx,
fakeStreamAddress,
fakeTopology(tc.srcNodes, keys.MakeTenantSpan(roachpb.TenantID{InternalValue: 2})),
tc.dstNodes,
hlc.Timestamp{},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,6 @@ func getStreamIngestionProcessor(
post := execinfrapb.PostProcessSpec{}

var spec execinfrapb.StreamIngestionDataSpec
spec.StreamAddress = "http://unused"
spec.TenantRekey = tenantRekey
spec.PartitionSpecs = make(map[string]execinfrapb.StreamIngestionPartitionSpec)
for _, pa := range partitions.Partitions {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/execinfrapb/processors_bulk_io.proto
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ message StreamIngestionDataSpec {
// ingesting data from the PreviousReplicatedTimestamp described above.
optional util.hlc.Timestamp initial_scan_timestamp = 11 [(gogoproto.nullable) = false];

// StreamAddress locate the stream so that a stream client can be initialized.
optional string stream_address = 3 [(gogoproto.nullable) = false];
reserved 3; // this was `string stream_address` in case we want to resurrect it.

// JobID is the job ID of the stream ingestion job.
optional int64 job_id = 4 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"];

Expand Down

0 comments on commit d132535

Please sign in to comment.