diff --git a/pkg/ccl/crosscluster/logical/BUILD.bazel b/pkg/ccl/crosscluster/logical/BUILD.bazel index 84e3fa4ccf55..8881b6a20ab6 100644 --- a/pkg/ccl/crosscluster/logical/BUILD.bazel +++ b/pkg/ccl/crosscluster/logical/BUILD.bazel @@ -96,7 +96,6 @@ go_test( "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", - "//pkg/security/username", "//pkg/server", "//pkg/sql", "//pkg/sql/catalog", diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go index 9904edb846eb..cdd0dcb085b8 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go @@ -12,7 +12,6 @@ import ( "bytes" "context" "fmt" - "net/url" "strings" "sync/atomic" "testing" @@ -22,7 +21,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -51,6 +49,7 @@ var ( "SET CLUSTER SETTING kv.rangefeed.closed_timestamp_refresh_interval = '200ms'", "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'", "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '50ms'", + "SET CLUSTER SETTING physical_replication.producer.timestamp_granularity = '0s'", // TODO(ssd): Duplicate these over to logical_replication as well. "SET CLUSTER SETTING physical_replication.producer.min_checkpoint_frequency='100ms'", @@ -118,7 +117,9 @@ func TestLogicalStreamIngestionJob(t *testing.T) { defer server.Stopper().Stop(ctx) s := server.Server(0).ApplicationLayer() - _, err := server.Conns[0].Exec("CREATE DATABASE a") + _, err := server.Conns[0].Exec("SET CLUSTER SETTING physical_replication.producer.timestamp_granularity = '0s'") + require.NoError(t, err) + _, err = server.Conns[0].Exec("CREATE DATABASE a") require.NoError(t, err) _, err = server.Conns[0].Exec("CREATE DATABASE B") require.NoError(t, err) @@ -143,12 +144,10 @@ func TestLogicalStreamIngestionJob(t *testing.T) { dbA.Exec(t, "INSERT INTO tab VALUES (1, 'hello')") dbB.Exec(t, "INSERT INTO tab VALUES (1, 'goodbye')") - dbAURL, cleanup := sqlutils.PGUrl(t, s.SQLAddr(), t.Name(), url.User(username.RootUser)) - dbAURL.Path = "a" + dbAURL, cleanup := s.PGUrl(t, serverutils.DBName("a")) defer cleanup() - dbBURL, cleanupB := sqlutils.PGUrl(t, s.SQLAddr(), t.Name(), url.User(username.RootUser)) + dbBURL, cleanupB := s.PGUrl(t, serverutils.DBName("b")) defer cleanupB() - dbBURL.Path = "b" var ( jobAID jobspb.JobID @@ -280,7 +279,7 @@ family f2(other_payload, v2)) serverASQL.Exec(t, "INSERT INTO tab(pk, payload, other_payload) VALUES (1, 'hello', 'ruroh1')") - serverAURL, cleanup := sqlutils.PGUrl(t, serverA.Server(0).ApplicationLayer().SQLAddr(), t.Name(), url.User(username.RootUser)) + serverAURL, cleanup := serverA.Server(0).ApplicationLayer().PGUrl(t) defer cleanup() var jobBID jobspb.JobID @@ -356,8 +355,7 @@ func TestRandomTables(t *testing.T) { runnerA.Exec(t, addCol) runnerB.Exec(t, addCol) - dbAURL, cleanup := sqlutils.PGUrl(t, s.SQLAddr(), t.Name(), url.User(username.RootUser)) - dbAURL.Path = "a" + dbAURL, cleanup := s.PGUrl(t, serverutils.DBName("a")) defer cleanup() streamStartStmt := fmt.Sprintf("CREATE LOGICAL REPLICATION STREAM FROM TABLE %[1]s ON $1 INTO TABLE %[1]s", tableName) @@ -418,8 +416,7 @@ func TestPreviouslyInterestingTables(t *testing.T) { baseTableName := "rand_table" rng, _ := randutil.NewPseudoRand() numInserts := 20 - dbAURL, cleanup := sqlutils.PGUrl(t, s.SQLAddr(), t.Name(), url.User(username.RootUser)) - dbAURL.Path = "a" + dbAURL, cleanup := s.PGUrl(t, serverutils.DBName("a")) defer cleanup() for i, tc := range testCases { t.Run(tc.name, func(t *testing.T) { diff --git a/pkg/ccl/crosscluster/physical/stream_ingestion_dist.go b/pkg/ccl/crosscluster/physical/stream_ingestion_dist.go index 2b7009564408..533459c24b47 100644 --- a/pkg/ccl/crosscluster/physical/stream_ingestion_dist.go +++ b/pkg/ccl/crosscluster/physical/stream_ingestion_dist.go @@ -534,7 +534,6 @@ func (p *replicationFlowPlanner) constructPlanGenerator( streamIngestionSpecs, streamIngestionFrontierSpec, err := constructStreamIngestionPlanSpecs( ctx, - crosscluster.StreamAddress(details.StreamAddress), topology, destNodeLocalities, initialScanTimestamp, @@ -757,7 +756,6 @@ func GetDestNodeLocalities( func constructStreamIngestionPlanSpecs( ctx context.Context, - streamAddress crosscluster.StreamAddress, topology streamclient.Topology, destSQLInstances []sql.InstanceLocality, initialScanTimestamp hlc.Timestamp, @@ -780,7 +778,6 @@ func constructStreamIngestionPlanSpecs( PreviousReplicatedTimestamp: previousReplicatedTimestamp, InitialScanTimestamp: initialScanTimestamp, Checkpoint: checkpoint, // TODO: Only forward relevant checkpoint info - StreamAddress: string(streamAddress), PartitionSpecs: make(map[string]execinfrapb.StreamIngestionPartitionSpec), TenantRekey: execinfrapb.TenantRekey{ OldID: sourceTenantID, diff --git a/pkg/ccl/crosscluster/physical/stream_ingestion_dist_test.go b/pkg/ccl/crosscluster/physical/stream_ingestion_dist_test.go index ee59feeb8818..3fc9edfdce2b 100644 --- a/pkg/ccl/crosscluster/physical/stream_ingestion_dist_test.go +++ b/pkg/ccl/crosscluster/physical/stream_ingestion_dist_test.go @@ -16,7 +16,6 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" @@ -318,10 +317,8 @@ func TestSourceDestMatching(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - fakeStreamAddress := crosscluster.StreamAddress("") sipSpecs, _, err := constructStreamIngestionPlanSpecs( ctx, - fakeStreamAddress, fakeTopology(tc.srcNodes, keys.MakeTenantSpan(roachpb.TenantID{InternalValue: 2})), tc.dstNodes, hlc.Timestamp{}, diff --git a/pkg/ccl/crosscluster/physical/stream_ingestion_processor_test.go b/pkg/ccl/crosscluster/physical/stream_ingestion_processor_test.go index aef756760a31..cbe6bab601dd 100644 --- a/pkg/ccl/crosscluster/physical/stream_ingestion_processor_test.go +++ b/pkg/ccl/crosscluster/physical/stream_ingestion_processor_test.go @@ -713,7 +713,6 @@ func getStreamIngestionProcessor( post := execinfrapb.PostProcessSpec{} var spec execinfrapb.StreamIngestionDataSpec - spec.StreamAddress = "http://unused" spec.TenantRekey = tenantRekey spec.PartitionSpecs = make(map[string]execinfrapb.StreamIngestionPartitionSpec) for _, pa := range partitions.Partitions { diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index 18e299a98dd8..c5d693d213c6 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -242,8 +242,8 @@ message StreamIngestionDataSpec { // ingesting data from the PreviousReplicatedTimestamp described above. optional util.hlc.Timestamp initial_scan_timestamp = 11 [(gogoproto.nullable) = false]; - // StreamAddress locate the stream so that a stream client can be initialized. - optional string stream_address = 3 [(gogoproto.nullable) = false]; + reserved 3; // this was `string stream_address` in case we want to resurrect it. + // JobID is the job ID of the stream ingestion job. optional int64 job_id = 4 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"];