Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…133690 #133693

133234: workload: tpcc consistency check added flag as-of. r=srosenberg,nameisbhaskar,vidit-bhat a=shailendra-patel

While running the consistency checker on the tpcc database with an active tpcc workload, the consistency check fails with a retryable error, such as restart transaction:`TransactionRetryWithProtoRefreshError: ReadWithinUncertaintyIntervalError:`
To fix this, added a new flag `as-of` which allows to run consistency check using `AS OF SYSTEM TIME`.

Epic: none
Release note: None

133347: crossclsuter/logical: add settings/stats to ldr ingest chunking r=dt a=dt



133607: sql: check object type when revoking privilege r=rafiss a=rafiss

fixes #131157
Release note (bug fix): Fix an unhandled error that could occur when using `REVOKE ... ON SEQUENCE FROM ... user` on an object that is not a sequence.

133608: schemachanger: force prod values in expensive test r=rafiss a=rafiss

fixes #133437
Release note: None

133616: roachtest: validate token return in perturbation/* tests r=kvoli a=andrewbaptist

This commit adds validation that all RAC tokens are returned on all stable nodes at the end of the test.

Fixes: #133410

Release note: None

133681: roachtest: minor fixes in rebalance/by-load test r=arulajmani a=kvoli

`%` was not escaped, causing it to be substituted with values which
were meant to go later.

e.g., from:

```
node 0 has core count normalized CPU utilization ts datapoint not in [0%!,(float64=1.4920845083839689)100{[{{%!](string=cr.node.sys.cpu.combined.percent-normalized) %!]
...
```

To

```
node idx 0 has core count normalized CPU utilization ts datapoint not in [0%,100%]
...
```

---

The `rebalance/by-load/*` roachtests compare the CPU of nodes and assert
that the distribution of node cpu is bounded +- 20%. The previous metric:

```
sys.cpu.combined.percent_normalized
```

Would occasionally over-report the CPU, as greater than 100% (>1.0),
which is impossible. Use the host CPU instead, which will look at the
machines CPU utilization, rather than any cockroach processes.

```
sys.cpu.host.combined.percent_normalized
```

Part of: #133004
Part of: #133054
Part of: #132019
Part of: #133223
Part of: #132633
Release note: None

133683: license: don't hit EnvOrDefaultInt64 in hot path r=fqazi,mgartner a=tbg

Saves 0.3%cpu on sysbench.

Fixes #133088.

Release note: None
Epic: None


133686: rac2: order testingRCRange.mu before RaftMu in tests r=sumeerbhola a=kvoli

`testingRCRange.mu` was being acquired, and held before acquiring `RaftMu` in `testingRCRange.admit()`, which conflicted with different ordering (reversed). This was a test only issue with `TestRangeController`.

Order `testingRCRange.mu` before `RaftMu` in `admit()`.

Fixes: #133650
Release note: None

133690: roachtest: always pass a Context to queries r=kvoli a=andrewbaptist

Queries can hang if there is no context passed to them. In roachtests, a context can be cancelled if there is a VM preemption. It is always better to use the test context and avoid this risk. This change updates the perturbation/* tests to always pass a context.

Fixes: #133625

Release note: None

133693: kvserver: deflake TestSnapshotsToDrainingNodes r=kvoli a=arulajmani

This test was making tight assertions about the size of the snapshot that was sent. To do so, it was trying to reimplement the actual snapshot sending logic in `kvBatchSnapshotStrategy.Send()`. So these tight assertions weren't of much use -- they were asserting that we were correctly re-implementing `kvBatchSnapshotStrategy.Send()` in `getExpectedSnapshotSizeBytes`. We weren't, as evidenced by some rare flakes.

This patch loosens assertions to deflake the test.

Closes #133517
Release note: None

Co-authored-by: Shailendra Patel <[email protected]>
Co-authored-by: David Taylor <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: Andrew Baptist <[email protected]>
Co-authored-by: Austen McClernon <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Arul Ajmani <[email protected]>
  • Loading branch information
8 people committed Oct 29, 2024
11 parents 0f9175f + 5d6fc67 + 2c3e186 + 4b26b96 + 5c455fe + 865919a + 2d6c13f + 089eb3c + 5d74d15 + 192a365 + e38cf40 commit 2e37122
Show file tree
Hide file tree
Showing 20 changed files with 225 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package logical
import (
"context"
"fmt"
"runtime/pprof"
"slices"
"time"

Expand Down Expand Up @@ -57,6 +58,22 @@ var flushBatchSize = settings.RegisterIntSetting(
settings.NonNegativeInt,
)

var writerWorkers = settings.RegisterIntSetting(
settings.ApplicationLevel,
"logical_replication.consumer.flush_worker_per_proc",
"the maximum number of workers per processor to use to flush each batch",
32,
settings.NonNegativeInt,
)

var minChunkSize = settings.RegisterIntSetting(
settings.ApplicationLevel,
"logical_replication.consumer.flush_chunk_size",
"the minimum number of row updates to pass to each flush worker",
64,
settings.NonNegativeInt,
)

// logicalReplicationWriterProcessor consumes a cross-cluster replication stream
// by decoding kvs in it to logical changes and applying them by executing DMLs.
type logicalReplicationWriterProcessor struct {
Expand Down Expand Up @@ -107,6 +124,13 @@ var (

const logicalReplicationWriterProcessorName = "logical-replication-writer-processor"

var batchSizeSetting = settings.RegisterByteSizeSetting(
settings.ApplicationLevel,
"logical_replication.stream_batch_size",
"target batch size for logical replication stream",
1<<20,
)

func newLogicalReplicationWriterProcessor(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
Expand Down Expand Up @@ -138,7 +162,7 @@ func newLogicalReplicationWriterProcessor(
tableID: descpb.ID(dstTableID),
}
}
bhPool := make([]BatchHandler, maxWriterWorkers)
bhPool := make([]BatchHandler, writerWorkers.Get(&flowCtx.Cfg.Settings.SV))
for i := range bhPool {
var rp RowProcessor
var err error
Expand Down Expand Up @@ -315,6 +339,7 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) {
lrw.spec.Discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes ||
lrw.spec.Discard == jobspb.LogicalReplicationDetails_DiscardAllDeletes),
streamclient.WithDiff(true),
streamclient.WithBatchSize(batchSizeSetting.Get(&lrw.FlowCtx.Cfg.Settings.SV)),
)
if err != nil {
lrw.MoveToDrainingAndLogError(errors.Wrapf(err, "subscribing to partition from %s", redactedAddr))
Expand All @@ -336,10 +361,12 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) {
})
lrw.workerGroup.GoCtx(func(ctx context.Context) error {
defer close(lrw.checkpointCh)
if err := lrw.consumeEvents(ctx); err != nil {
log.Infof(lrw.Ctx(), "consumer completed. Error: %s", err)
lrw.sendError(errors.Wrap(err, "consume events"))
}
pprof.Do(ctx, pprof.Labels("proc", fmt.Sprintf("%d", lrw.ProcessorID)), func(ctx context.Context) {
if err := lrw.consumeEvents(ctx); err != nil {
log.Infof(lrw.Ctx(), "consumer completed. Error: %s", err)
lrw.sendError(errors.Wrap(err, "consume events"))
}
})
return nil
})
}
Expand Down Expand Up @@ -595,8 +622,6 @@ func filterRemaining(kvs []streampb.StreamEvent_KV) []streampb.StreamEvent_KV {
return remaining[:j]
}

const maxWriterWorkers = 32

// flushBuffer processes some or all of the events in the passed buffer, and
// zeros out each event in the passed buffer for which it successfully completed
// processing either by applying it or by sending it to a DLQ. If mustProcess is
Expand Down Expand Up @@ -649,15 +674,20 @@ func (lrw *logicalReplicationWriterProcessor) flushBuffer(
return a.KeyValue.Value.Timestamp.Compare(b.KeyValue.Value.Timestamp)
})

const minChunkSize = 64
chunkSize := max((len(kvs)/len(lrw.bh))+1, minChunkSize)
minChunk := 64
if lrw.FlowCtx != nil {
minChunk = int(minChunkSize.Get(&lrw.FlowCtx.Cfg.Settings.SV))
}

chunkSize := max((len(kvs)/len(lrw.bh))+1, minChunk)

perChunkStats := make([]flushStats, len(lrw.bh))

todo := kvs
g := ctxgroup.WithContext(ctx)
for worker := range lrw.bh {
if len(todo) == 0 {
perChunkStats = perChunkStats[:worker]
break
}
// The chunk should end after the first new key after chunk size.
Expand Down Expand Up @@ -779,6 +809,9 @@ func (lrw *logicalReplicationWriterProcessor) flushChunk(
) (flushStats, error) {
batchSize := lrw.getBatchSize()

lrw.debug.RecordChunkStart()
defer lrw.debug.RecordChunkComplete()

var stats flushStats
// TODO: The batching here in production would need to be much
// smarter. Namely, we don't want to include updates to the
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/crosscluster/producer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,9 @@ func streamPartition(
if len(spec.Spans) == 0 {
return nil, errors.AssertionFailedf("expected at least one span, got none")
}
spec.Config.BatchByteSize = defaultBatchSize
if spec.Config.BatchByteSize == 0 {
spec.Config.BatchByteSize = defaultBatchSize
}
spec.Config.MinCheckpointFrequency = crosscluster.StreamReplicationMinCheckpointFrequency.Get(&evalCtx.Settings.SV)

execCfg := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig)
Expand Down
10 changes: 9 additions & 1 deletion pkg/ccl/crosscluster/producer/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/repstream"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand Down Expand Up @@ -172,6 +173,13 @@ func getUDTs(
return typeDescriptors, nil, nil
}

var useStreaksInLDR = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"logical_replication.producer.group_adjacent_spans.enabled",
"controls whether to attempt adjacent spans in the same stream",
true,
)

func (r *replicationStreamManagerImpl) PlanLogicalReplication(
ctx context.Context, req streampb.LogicalReplicationPlanRequest,
) (*streampb.ReplicationStreamSpec, error) {
Expand Down Expand Up @@ -199,7 +207,7 @@ func (r *replicationStreamManagerImpl) PlanLogicalReplication(
return nil, err
}
}
spec, err := buildReplicationStreamSpec(ctx, r.evalCtx, tenID, false, spans)
spec, err := buildReplicationStreamSpec(ctx, r.evalCtx, tenID, false, spans, useStreaksInLDR.Get(&r.evalCtx.Settings.SV))
if err != nil {
return nil, err
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/ccl/crosscluster/producer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func getPhysicalReplicationStreamSpec(
if j.Status() != jobs.StatusRunning {
return nil, jobIsNotRunningError(jobID, j.Status(), "create stream spec")
}
return buildReplicationStreamSpec(ctx, evalCtx, details.TenantID, false, details.Spans)
return buildReplicationStreamSpec(ctx, evalCtx, details.TenantID, false, details.Spans, true)

}

Expand All @@ -293,16 +293,21 @@ func buildReplicationStreamSpec(
tenantID roachpb.TenantID,
forSpanConfigs bool,
targetSpans roachpb.Spans,
useStreaks bool,
) (*streampb.ReplicationStreamSpec, error) {
jobExecCtx := evalCtx.JobExecContext.(sql.JobExecContext)

// Partition the spans with SQLPlanner
dsp := jobExecCtx.DistSQLPlanner()
noLoc := roachpb.Locality{}
oracle := kvfollowerreadsccl.NewBulkOracle(
dsp.ReplicaOracleConfig(evalCtx.Locality), noLoc, kvfollowerreadsccl.StreakConfig{
var streaks kvfollowerreadsccl.StreakConfig
if useStreaks {
streaks = kvfollowerreadsccl.StreakConfig{
Min: 10, SmallPlanMin: 3, SmallPlanThreshold: 3, MaxSkew: 0.95,
},
}
}
oracle := kvfollowerreadsccl.NewBulkOracle(
dsp.ReplicaOracleConfig(evalCtx.Locality), noLoc, streaks,
)

planCtx := dsp.NewPlanningCtxWithOracle(
Expand Down
10 changes: 10 additions & 0 deletions pkg/ccl/crosscluster/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ type subscribeConfig struct {
// NB: Callers should note that initial scan results will not
// contain a diff.
withDiff bool

// batchByteSize requests the producer emit batches up to the specified size.
batchByteSize int64
}

type SubscribeOption func(*subscribeConfig)
Expand All @@ -150,6 +153,13 @@ func WithDiff(enableDiff bool) SubscribeOption {
}
}

// WithBatchSize requests the producer emit batches up to the specified size.
func WithBatchSize(bytes int64) SubscribeOption {
return func(cfg *subscribeConfig) {
cfg.batchByteSize = bytes
}
}

// Topology is a configuration of stream partitions. These are particular to a
// stream. It specifies the number and addresses of partitions of the stream.
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func (p *partitionedStreamClient) Subscribe(
sps.WithDiff = cfg.withDiff
sps.WithFiltering = cfg.withFiltering
sps.Type = streampb.ReplicationType_PHYSICAL
sps.Config.BatchByteSize = cfg.batchByteSize
if p.logical {
sps.Type = streampb.ReplicationType_LOGICAL
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/cli/zip_table_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,8 @@ var zipInternalTablesPerNode = DebugZipTableRegistry{
"flush_bytes",
"flush_batches",
"last_flush_time",
"chunks_running",
"chunks_done",
"last_kvs_done",
"last_kvs_todo",
"last_batches",
Expand Down
46 changes: 44 additions & 2 deletions pkg/cmd/roachtest/tests/admission_control_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -901,6 +902,47 @@ func (v variations) runTest(ctx context.Context, t test.Test, c cluster.Cluster)
t.L().Printf("validating stats after the perturbation")
failures = append(failures, isAcceptableChange(t.L(), baselineStats, afterStats, v.acceptableChange)...)
require.True(t, len(failures) == 0, strings.Join(failures, "\n"))
v.validateTokensReturned(ctx, t)
}

// validateTokensReturned ensures that all RAC tokens are returned to the pool
// at the end of the test.
func (v variations) validateTokensReturned(ctx context.Context, t test.Test) {
t.L().Printf("validating all tokens returned")
for _, node := range v.stableNodes() {
// Wait for the tokens to be returned to the pool. Normally this will
// pass immediately however it is possible that there is still some
// recovery so loop a few times.
testutils.SucceedsWithin(t, func() error {
db := v.Conn(ctx, t.L(), node)
defer db.Close()
for _, sType := range []string{"send", "eval"} {
for _, tType := range []string{"elastic", "regular"} {
statPrefix := fmt.Sprintf("kvflowcontrol.tokens.%s.%s", sType, tType)
query := fmt.Sprintf(`
SELECT d.value::INT8 AS deducted, r.value::INT8 AS returned
FROM
crdb_internal.node_metrics d,
crdb_internal.node_metrics r
WHERE
d.name='%s.deducted' AND
r.name='%s.returned'`,
statPrefix, statPrefix)
rows, err := db.QueryContext(ctx, query)
require.NoError(t, err)
require.True(t, rows.Next())
var deducted, returned int64
if err := rows.Scan(&deducted, &returned); err != nil {
return err
}
if deducted != returned {
return errors.Newf("tokens not returned for %s: deducted %d returned %d", statPrefix, deducted, returned)
}
}
}
return nil
}, 5*time.Second)
}
}

// trackedStat is a collection of the relevant values from the histogram. The
Expand Down Expand Up @@ -994,7 +1036,7 @@ func (v variations) waitForRebalanceToStop(ctx context.Context, t test.Test) {
Multiplier: 1,
}
for r := retry.StartWithCtx(ctx, opts); r.Next(); {
if row := db.QueryRow(q); row != nil {
if row := db.QueryRowContext(ctx, q); row != nil {
var secondsSinceLastEvent int
if err := row.Scan(&secondsSinceLastEvent); err != nil && !errors.Is(err, gosql.ErrNoRows) {
t.Fatal(err)
Expand All @@ -1021,7 +1063,7 @@ func (v variations) waitForIOOverloadToEnd(ctx context.Context, t test.Test) {
anyOverloaded := false
for _, nodeId := range v.targetNodes() {
db := v.Conn(ctx, t.L(), nodeId)
if row := db.QueryRow(q); row != nil {
if row := db.QueryRowContext(ctx, q); row != nil {
var overload float64
if err := row.Scan(&overload); err != nil && !errors.Is(err, gosql.ErrNoRows) {
db.Close()
Expand Down
6 changes: 3 additions & 3 deletions pkg/cmd/roachtest/tests/rebalance_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func makeStoreCPUFn(
tsQueries := make([]tsQuery, numNodes)
for i := range tsQueries {
tsQueries[i] = tsQuery{
name: "cr.node.sys.cpu.combined.percent-normalized",
name: "cr.node.sys.cpu.host.combined.percent-normalized",
queryType: total,
sources: []string{fmt.Sprintf("%d", i+1)},
tenantID: roachpb.SystemTenantID,
Expand Down Expand Up @@ -376,8 +376,8 @@ func makeStoreCPUFn(
// as much to avoid any surprises.
if cpu < 0 || cpu > 1 {
return nil, errors.Newf(
"node %d has core count normalized CPU utilization ts datapoint "+
"not in [0%,100%] (impossible!): %f [resp=%+v]", node, cpu, resp)
"node idx %d has core count normalized CPU utilization ts datapoint "+
"not in [0\\%,100\\%] (impossible!): %v [resp=%+v]", node, cpu, resp)
}

nodeIdx := node * storesPerNode
Expand Down
8 changes: 4 additions & 4 deletions pkg/cmd/roachtest/tests/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func profileTopStatements(

// Enable continuous statement diagnostics rather than just the first one.
sql := "SET CLUSTER SETTING sql.stmt_diagnostics.collect_continuously.enabled=true"
if _, err := db.Exec(sql); err != nil {
if _, err := db.ExecContext(ctx, sql); err != nil {
return err
}

Expand Down Expand Up @@ -199,7 +199,7 @@ FROM (
dbName,
minNumExpectedStmts,
)
if _, err := db.Exec(sql); err != nil {
if _, err := db.ExecContext(ctx, sql); err != nil {
return err
}
return nil
Expand All @@ -217,7 +217,7 @@ func downloadProfiles(
query := "SELECT id, collected_at FROM system.statement_diagnostics"
db := cluster.Conn(ctx, logger, 1)
defer db.Close()
idRow, err := db.Query(query)
idRow, err := db.QueryContext(ctx, query)
if err != nil {
return err
}
Expand All @@ -236,7 +236,7 @@ func downloadProfiles(
return err
}
url := urlPrefix + diagID
resp, err := client.Get(context.Background(), url)
resp, err := client.Get(ctx, url)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 2e37122

Please sign in to comment.