Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
87027: streamingccl: reduce server count in multinode tests r=samiskin a=samiskin

While these tests would pass under stress locally they would fail CI
stress, which may be because we were starting more server processes than
ever before with 4 source nodes, 4 source tenant pods, and 4 destination
nodes.

This PR reduces the node count to 3 (any lower and scatter doesn't
correctly distribute ranges) and only starts a single tenant pod for the
source cluster.

Release justification: test-only change
Release note: None

87412: cli,server: fix --sql-advertise-addr when --sql-addr is not specified r=a-robinson,ajwerner a=knz

Fixes #87040.
Informs #52266.

cc `@a-robinson` 

Release justification: bug fix

Release note (bug fix): The flag `--sql-advertise-addr` now properly
works even when the SQL and RPC ports are shared (because `--sql-addr`
was not specified). Note that this port sharing is a deprecated
feature in v22.2.

87440: ui: update txn contention insights to use waiting txns as event r=ericharmeling a=ericharmeling

This commit updates the transaction workload insights pages to use the waiting
contended transaction as the primary contention event, rather than the blocking
transaction.

Fixes #87284.

https://www.loom.com/share/383fec4297a74ec79d90e46f11def792

Release justification: bug fixes and low-risk updates to new functionality
Release note: None

87462: upgrade/upgrades: allow CreatedAtNanos to be set when validating migration r=ajwerner a=ajwerner

Schema change upgrade migrations to system tables are made idempotent by checking that the descriptor reaches some expected state. In order to ensure that it is in that expected state, some volatile fields need to be masked. We forgot to mask CreatedAtNanos. We also lost the testing which came with these helper functions we use.

The vast majority of this PR is reviving testing from #66889.

Fixes #85228.

Release justification: Import bug fix for backport

Release note (bug fix): Some upgrade migrations perform schema changes on system tables. Those upgrades which added indexes could, in some cases, get caught retrying because they failed to detect that the migration had already occurred due to the existence of a populated field. When that happens, the finalization of the new version can hang indefinitely and require manual intervention. This bug has been fixed.

Co-authored-by: Shiranka Miskin <[email protected]>
Co-authored-by: Raphael 'kena' Poss <[email protected]>
Co-authored-by: Eric Harmeling <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
5 people committed Sep 8, 2022
5 parents 05b4853 + 9994122 + 9971819 + 653f052 + 52eda07 commit 3ff179c
Show file tree
Hide file tree
Showing 15 changed files with 709 additions and 111 deletions.
39 changes: 12 additions & 27 deletions pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,26 +215,15 @@ func createTenantStreamingClusters(
// Start the source cluster.
srcCluster, srcURL, srcCleanup := startTestCluster(ctx, t, serverArgs, args.srcNumNodes)

// Start the src cluster tenant with tenant pods on every node in the cluster,
// ensuring they're all active beofre proceeding.
tenantArgs := base.TestTenantArgs{
TenantID: args.srcTenantID,
TestingKnobs: base.TestingKnobs{
TenantTestingKnobs: &sql.TenantTestingKnobs{
AllowSplitAndScatter: true,
}},
}
tenantConns := make([]*gosql.DB, 0)
srcTenantServer, srcTenantConn := serverutils.StartTenant(t, srcCluster.Server(0), tenantArgs)
tenantConns = append(tenantConns, srcTenantConn)
for i := 1; i < args.srcNumNodes; i++ {
tenantPodArgs := tenantArgs
tenantPodArgs.DisableCreateTenant = true
tenantPodArgs.SkipTenantCheck = true
_, srcTenantPodConn := serverutils.StartTenant(t, srcCluster.Server(i), tenantPodArgs)
tenantConns = append(tenantConns, srcTenantPodConn)
}
waitForTenantPodsActive(t, srcTenantServer, args.srcNumNodes)
waitForTenantPodsActive(t, srcTenantServer, 1)

// Start the destination cluster.
destCluster, _, destCleanup := startTestCluster(ctx, t, serverArgs, args.destNumNodes)
Expand Down Expand Up @@ -266,11 +255,7 @@ func createTenantStreamingClusters(
// Enable stream replication on dest by default.
tsc.destSysSQL.Exec(t, `SET enable_experimental_stream_replication = true;`)
return tsc, func() {
for _, tenantConn := range tenantConns {
if tenantConn != nil {
require.NoError(t, tenantConn.Close())
}
}
require.NoError(t, srcTenantConn.Close())
destCleanup()
srcCleanup()
}
Expand All @@ -280,7 +265,7 @@ func (c *tenantStreamingClusters) srcExec(exec srcInitExecFunc) {
exec(c.t, c.srcSysSQL, c.srcTenantSQL)
}

func createScatteredTable(t *testing.T, c *tenantStreamingClusters) {
func createScatteredTable(t *testing.T, c *tenantStreamingClusters, numNodes int) {
// Create a source table with multiple ranges spread across multiple nodes
numRanges := 50
rowsPerRange := 20
Expand All @@ -290,7 +275,7 @@ func createScatteredTable(t *testing.T, c *tenantStreamingClusters) {
ALTER TABLE d.scattered SPLIT AT (SELECT * FROM generate_series(%d, %d, %d));
ALTER TABLE d.scattered SCATTER;
`, numRanges*rowsPerRange, rowsPerRange, (numRanges-1)*rowsPerRange, rowsPerRange))
c.srcSysSQL.CheckQueryResultsRetry(t, "SELECT count(distinct lease_holder) from crdb_internal.ranges", [][]string{{"4"}})
c.srcSysSQL.CheckQueryResultsRetry(t, "SELECT count(distinct lease_holder) from crdb_internal.ranges", [][]string{{fmt.Sprint(numNodes)}})
}

var defaultSrcClusterSetting = map[string]string{
Expand Down Expand Up @@ -772,16 +757,17 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
defer log.Scope(t).Close(t)

skip.UnderRace(t, "takes too long with multiple nodes")
skip.WithIssue(t, 86287)

ctx := context.Background()
args := defaultTenantStreamingClustersArgs
args.srcNumNodes = 4
args.destNumNodes = 4

args.srcNumNodes = 3
args.destNumNodes = 3

c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()

createScatteredTable(t, c)
createScatteredTable(t, c, 3)
srcScatteredData := c.srcTenantSQL.QueryStr(c.t, "SELECT * FROM d.scattered ORDER BY key")

producerJobID, ingestionJobID := c.startStreamReplication()
Expand Down Expand Up @@ -961,12 +947,11 @@ func TestTenantStreamingMultipleNodes(t *testing.T) {
defer log.Scope(t).Close(t)

skip.UnderRace(t, "takes too long with multiple nodes")
skip.WithIssue(t, 86206)

ctx := context.Background()
args := defaultTenantStreamingClustersArgs
args.srcNumNodes = 4
args.destNumNodes = 4
args.srcNumNodes = 3
args.destNumNodes = 3

// Track the number of unique addresses that were connected to
clientAddresses := make(map[string]struct{})
Expand All @@ -982,7 +967,7 @@ func TestTenantStreamingMultipleNodes(t *testing.T) {
c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()

createScatteredTable(t, c)
createScatteredTable(t, c, 3)

producerJobID, ingestionJobID := c.startStreamReplication()
jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
Expand Down
10 changes: 6 additions & 4 deletions pkg/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,17 +1056,19 @@ func extraServerFlagInit(cmd *cobra.Command) error {
serverCfg.SplitListenSQL = changed(fs, cliflags.ListenSQLAddr.Name)

// Fill in the defaults for --advertise-sql-addr, if the flag exists on `cmd`.
advSpecified := changed(fs, cliflags.AdvertiseAddr.Name) ||
advHostSpecified := changed(fs, cliflags.AdvertiseAddr.Name) ||
changed(fs, cliflags.AdvertiseHost.Name)
advPortSpecified := changed(fs, cliflags.AdvertiseAddr.Name) ||
changed(fs, cliflags.AdvertisePort.Name)
if serverSQLAdvertiseAddr == "" {
if advSpecified {
if advHostSpecified {
serverSQLAdvertiseAddr = serverAdvertiseAddr
} else {
serverSQLAdvertiseAddr = serverSQLAddr
}
}
if serverSQLAdvertisePort == "" {
if advSpecified && !serverCfg.SplitListenSQL {
if advPortSpecified && !serverCfg.SplitListenSQL {
serverSQLAdvertisePort = serverAdvertisePort
} else {
serverSQLAdvertisePort = serverSQLPort
Expand Down Expand Up @@ -1104,7 +1106,7 @@ func extraServerFlagInit(cmd *cobra.Command) error {
serverCfg.HTTPAddr = net.JoinHostPort(serverHTTPAddr, serverHTTPPort)

if serverHTTPAdvertiseAddr == "" {
if advSpecified {
if advHostSpecified || advPortSpecified {
serverHTTPAdvertiseAddr = serverAdvertiseAddr
} else {
serverHTTPAdvertiseAddr = serverHTTPAddr
Expand Down
10 changes: 10 additions & 0 deletions pkg/cli/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,16 @@ func TestServerConnSettings(t *testing.T) {
":54321", "192.168.0.111:12345",
":54321", "192.168.0.111:12345",
},
{[]string{"start", "--listen-addr", "127.0.0.1", "--advertise-sql-addr", "192.168.0.111", "--port", "54321", "--advertise-port", "12345"},
"127.0.0.1:54321", "127.0.0.1:12345",
"127.0.0.1:54321", "192.168.0.111:12345",
"127.0.0.1:54321", "192.168.0.111:12345",
},
{[]string{"start", "--listen-addr", "127.0.0.1", "--advertise-sql-addr", "192.168.0.111:12345", "--port", "54321"},
"127.0.0.1:54321", "127.0.0.1:54321",
"127.0.0.1:54321", "192.168.0.111:12345",
"127.0.0.1:54321", "192.168.0.111:12345",
},
}

for i, td := range testData {
Expand Down
12 changes: 8 additions & 4 deletions pkg/server/start_listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
)

// startListenRPCAndSQL starts the RPC and SQL listeners.
Expand Down Expand Up @@ -89,11 +90,14 @@ func startListenRPCAndSQL(
pgL = m.Match(func(r io.Reader) bool {
return pgwire.Match(r)
})
// Also if the pg port is not split, the actual listen
// and advertise addresses for SQL become equal to that
// of RPC, regardless of what was configured.
// Also if the pg port is not split, the actual listen address for
// SQL become equal to that of RPC.
cfg.SQLAddr = cfg.Addr
cfg.SQLAdvertiseAddr = cfg.AdvertiseAddr
// Then we update the advertised addr with the right port, if
// the port had been auto-allocated.
if err := UpdateAddrs(ctx, &cfg.SQLAddr, &cfg.SQLAdvertiseAddr, ln.Addr()); err != nil {
return nil, nil, errors.Wrapf(err, "internal error")
}
}

anyL := m.Match(cmux.Any())
Expand Down
Loading

0 comments on commit 3ff179c

Please sign in to comment.