Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
134671: storage: ingest columnar-block sstables when enabled r=RaduBerinde a=jbowens

When the active cluster verison is sufficiently high and the columnar blocks cluster setting is enabled, build sstables for ingestion in table format TableFormatPebblev5 with columnar blocks.

Epic: none
Release note: none

135312: roachtest: fix TestVMPreemptionPolling data race r=srosenberg,herkolategan a=DarrylWong

This change switches to pollPreemptionInterval to be a mutex protected struct instead, as multiple unit tests modify it and can lead to a data race without.

Fixes: #135267
Epic: none
Release note: none

135715: ccl/schemachangerccl: deflake TestBackupSuccess r=fqazi a=fqazi

Previously, the backup and restore tests for the declarative schema changer could flake because the latest completed schema job could not be sufficiently determined by the finish_time. This was because this time only has resolution of seconds at most, so multiple jobs could have the same finish time. To address this, this patch also sorts the jobs by job_id as well, which should be increasing for later jobs.

Fixes: #131583

Release note: None

Co-authored-by: Jackson Owens <[email protected]>
Co-authored-by: DarrylWong <[email protected]>
Co-authored-by: Faizan Qazi <[email protected]>
  • Loading branch information
4 people committed Nov 19, 2024
4 parents a520670 + fa505ac + 8e3466d + a7f9797 commit a7a0350
Show file tree
Hide file tree
Showing 15 changed files with 88 additions and 25 deletions.
21 changes: 18 additions & 3 deletions pkg/cmd/roachtest/test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ import (
"github.com/petermattis/goid"
)

func init() {
pollPreemptionInterval.Lock()
defer pollPreemptionInterval.Unlock()
pollPreemptionInterval.interval = 5 * time.Minute
}

var (
errTestsFailed = fmt.Errorf("some tests failed")

Expand Down Expand Up @@ -2132,20 +2138,29 @@ var getPreemptedVMsHook = func(c cluster.Cluster, ctx context.Context, l *logger
return c.GetPreemptedVMs(ctx, l)
}

// pollPreemptionInterval is how often to poll for preempted VMs.
var pollPreemptionInterval = 5 * time.Minute
// pollPreemptionInterval is how often to poll for preempted VMs. We use a
// mutex protected struct to allow for unit tests to safely modify it.
// Interval defaults to 5 minutes if not set.
var pollPreemptionInterval struct {
syncutil.Mutex
interval time.Duration
}

func monitorForPreemptedVMs(ctx context.Context, t test.Test, c cluster.Cluster, l *logger.Logger) {
if c.IsLocal() || !c.Spec().UseSpotVMs {
return
}

pollPreemptionInterval.Lock()
defer pollPreemptionInterval.Unlock()
interval := pollPreemptionInterval.interval

go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(pollPreemptionInterval):
case <-time.After(interval):
preemptedVMs, err := getPreemptedVMsHook(c, ctx, l)
if err != nil {
l.Printf("WARN: monitorForPreemptedVMs: failed to check preempted VMs:\n%+v", err)
Expand Down
12 changes: 9 additions & 3 deletions pkg/cmd/roachtest/test_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,12 @@ func TestVMPreemptionPolling(t *testing.T) {
},
}

setPollPreemptionInterval := func(interval time.Duration) {
pollPreemptionInterval.Lock()
defer pollPreemptionInterval.Unlock()
pollPreemptionInterval.interval = interval
}

getPreemptedVMsHook = func(c cluster.Cluster, ctx context.Context, l *logger.Logger) ([]vm.PreemptedVM, error) {
preemptedVMs := []vm.PreemptedVM{{
Name: "test_node",
Expand All @@ -731,13 +737,13 @@ func TestVMPreemptionPolling(t *testing.T) {
getPreemptedVMsHook = func(c cluster.Cluster, ctx context.Context, l *logger.Logger) ([]vm.PreemptedVM, error) {
return c.GetPreemptedVMs(ctx, l)
}
pollPreemptionInterval = 5 * time.Minute
setPollPreemptionInterval(5 * time.Minute)
}()

// Test that if a VM is preempted, the VM preemption monitor will catch
// it and cancel the test before it times out.
t.Run("polling cancels test", func(t *testing.T) {
pollPreemptionInterval = 50 * time.Millisecond
setPollPreemptionInterval(50 * time.Millisecond)

err := runner.Run(ctx, []registry.TestSpec{mockTest}, 1, /* count */
defaultParallelism, copt, testOpts{}, lopt)
Expand All @@ -750,7 +756,7 @@ func TestVMPreemptionPolling(t *testing.T) {
// test finished first, the post failure checks will check again and mark it as a flake.
t.Run("polling doesn't catch preemption", func(t *testing.T) {
// Set the interval very high so we don't poll for preemptions.
pollPreemptionInterval = 1 * time.Hour
setPollPreemptionInterval(1 * time.Hour)

mockTest.Run = func(ctx context.Context, t test.Test, c cluster.Cluster) {
t.Error("Should be ignored")
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvnemesis/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ func (op DeleteRangeUsingTombstoneOperation) format(w *strings.Builder, fctx for
}

func (op AddSSTableOperation) format(w *strings.Builder, fctx formatCtx) {
fmt.Fprintf(w, `%s.AddSSTable(%s%s, %s, ... /* @%s */) // %d bytes`,
fctx.receiver, fctx.maybeCtx(), fmtKey(op.Span.Key), fmtKey(op.Span.EndKey), op.Seq, len(op.Data))
fmt.Fprintf(w, `%s.AddSSTable(%s%s, %s, ... /* @%s */)`,
fctx.receiver, fctx.maybeCtx(), fmtKey(op.Span.Key), fmtKey(op.Span.EndKey), op.Seq)
if op.AsWrites {
fmt.Fprintf(w, ` (as writes)`)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvnemesis/testdata/TestApplier/addsstable
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
echo
----
db0.AddSSTable(ctx, tk(1), tk(4), ... /* @s1 */) // 1004 bytes (as writes)
db0.AddSSTable(ctx, tk(1), tk(4), ... /* @s1 */) (as writes)
// ^-- tk(1) -> sv(s1): /Table/100/"0000000000000001"/<ts> -> /BYTES/v1
// ^-- tk(2) -> sv(s1): /Table/100/"0000000000000002"/<ts> -> /<empty>
// ^-- [tk(3), tk(4)) -> sv(s1): /Table/100/"000000000000000{3"-4"} -> /<empty>
2 changes: 1 addition & 1 deletion pkg/kv/kvnemesis/testdata/TestOperationsFormat/4
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
echo
----
···db0.AddSSTable(ctx, tk(1), tk(4), ... /* @s1 */) // 1004 bytes (as writes)
···db0.AddSSTable(ctx, tk(1), tk(4), ... /* @s1 */) (as writes)
···// ^-- tk(1) -> sv(s1): /Table/100/"0000000000000001"/0.000000001,0 -> /BYTES/v1
···// ^-- tk(2) -> sv(s1): /Table/100/"0000000000000002"/0.000000001,0 -> /<empty>
···// ^-- [tk(3), tk(4)) -> sv(s1): /Table/100/"000000000000000{3"-4"} -> /<empty>
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
echo
----
db0.AddSSTable(ctx, tk(1), tk(4), ... /* @s1 */) // 1004 bytes
db0.AddSSTable(ctx, tk(1), tk(4), ... /* @s1 */)
// ^-- tk(1) -> sv(s1): /Table/100/"0000000000000001"/0.000000001,0 -> /BYTES/v1
// ^-- tk(2) -> sv(s1): /Table/100/"0000000000000002"/0.000000001,0 -> /<empty>
// ^-- [tk(3), tk(4)) -> sv(s1): /Table/100/"000000000000000{3"-4"} -> /<empty>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ db0.Put(ctx, tk(1), sv(1)) // @0.000000001,0 <nil>
db0.Put(ctx, tk(2), sv(1)) // @0.000000001,0 <nil>
db0.Put(ctx, tk(3), sv(1)) // @0.000000001,0 <nil>
db0.Put(ctx, tk(4), sv(1)) // @0.000000001,0 <nil>
db0.AddSSTable(ctx, tk(1), tk(4), ... /* @s2 */) // 1005 bytes
db0.AddSSTable(ctx, tk(1), tk(4), ... /* @s2 */)
// ^-- tk(1) -> sv(s2): /Table/100/"0000000000000001"/0.000000001,0 -> /BYTES/v2
// ^-- tk(2) -> sv(s2): /Table/100/"0000000000000002"/0.000000001,0 -> /<empty>
// ^-- [tk(3), tk(4)) -> sv(s2): /Table/100/"000000000000000{3"-4"} -> /<empty>
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func TestEvalAddSSTable(t *testing.T) {
expectErr: []string{
`unexpected timestamp 2.000000000,0 (expected 1.000000000,0) for key "c"`,
`key has suffix "\x00\x00\x00\x00w5\x94\x00\t", expected "\x00\x00\x00\x00;\x9a\xca\x00\t"`,
`has suffix 0x000000007735940009; require 0x000000003b9aca0009`,
},
},
"SSTTimestampToRequestTimestamp rejects incorrect SST timestamp for range keys": {
Expand All @@ -187,6 +188,7 @@ func TestEvalAddSSTable(t *testing.T) {
expectErr: []string{
`unexpected timestamp 2.000000000,0 (expected 1.000000000,0) for range key {c-d}`,
`key has suffix "\x00\x00\x00\x00w5\x94\x00\t", expected "\x00\x00\x00\x00;\x9a\xca\x00\t"`,
`has suffix 0x000000007735940009; require 0x000000003b9aca0009`,
},
},
"SSTTimestampToRequestTimestamp rejects incorrect 0 SST timestamp": {
Expand All @@ -196,6 +198,7 @@ func TestEvalAddSSTable(t *testing.T) {
expectErr: []string{
`unexpected timestamp 0,0 (expected 1.000000000,0) for key "c"`,
`key has suffix "", expected "\x00\x00\x00\x00;\x9a\xca\x00\t"`,
`has suffix 0x; require 0x000000003b9aca0009`,
},
expectErrRace: `SST contains inline value or intent for key "c"/0,0`,
},
Expand Down
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/rangefeed/event_size_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package rangefeed

import (
"context"
"math/rand"
"testing"

Expand All @@ -16,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils/storageutils"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
Expand Down Expand Up @@ -94,6 +96,7 @@ func generateStaticTestdata() testData {
func TestEventSizeCalculation(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
data := generateStaticTestdata()
storage.ColumnarBlocksEnabled.Override(context.Background(), &st.SV, true)

key := data.key
timestamp := data.timestamp
Expand Down Expand Up @@ -217,10 +220,10 @@ func TestEventSizeCalculation(t *testing.T) {
{
name: "sstEvent event",
ev: event{sst: &sstEvent{data: sst, span: span, ts: timestamp}},
expectedCurrMemUsage: int64(1962),
expectedCurrMemUsage: int64(2218),
actualCurrMemUsage: eventOverhead + sstEventOverhead +
int64(cap(sst)+cap(span.Key)+cap(span.EndKey)),
expectedFutureMemUsage: int64(1978),
expectedFutureMemUsage: int64(2234),
actualFutureMemUsage: futureEventBaseOverhead + rangefeedSSTTableOverhead +
int64(cap(sst)+cap(span.Key)+cap(span.EndKey)),
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/schemachanger/sctest/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ func waitForSchemaChangesToFinish(t *testing.T, tdb *sqlutils.SQLRunner) {

func hasLatestSchemaChangeSucceeded(t *testing.T, tdb *sqlutils.SQLRunner) bool {
result := tdb.QueryStr(t, fmt.Sprintf(
`SELECT status FROM [SHOW JOBS] WHERE job_type IN ('%s') ORDER BY finished DESC LIMIT 1`,
`SELECT status FROM [SHOW JOBS] WHERE job_type IN ('%s') ORDER BY finished DESC, job_id DESC LIMIT 1`,
jobspb.TypeNewSchemaChange,
))
return result[0][0] == "succeeded"
Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ func TestMVCCHistories(t *testing.T) {
// SST iterator in order to accurately represent the raw SST data.
reportSSTEntries := func(buf *redact.StringBuilder, name string, sst []byte) error {
r, err := sstable.NewMemReader(sst, sstable.ReaderOptions{
Comparer: storage.EngineComparer,
Comparer: storage.EngineComparer,
KeySchemas: sstable.MakeKeySchemas(storage.KeySchemas...),
})
if err != nil {
return err
Expand All @@ -275,7 +276,7 @@ func TestMVCCHistories(t *testing.T) {
return err
}
defer func() { _ = iter.Close() }()
for kv := iter.SeekGE(nil, 0); kv != nil; kv = iter.Next() {
for kv := iter.First(); kv != nil; kv = iter.Next() {
if err := iter.Error(); err != nil {
return err
}
Expand Down Expand Up @@ -337,7 +338,7 @@ func TestMVCCHistories(t *testing.T) {
for _, k := range s.Keys {
buf.Printf("%s: %s", strings.ToLower(k.Kind().String()),
roachpb.Span{Key: start.Key, EndKey: end.Key})
if k.Suffix != nil {
if len(k.Suffix) > 0 {
ts, err := storage.DecodeMVCCTimestampSuffix(k.Suffix)
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ var IngestSplitEnabled = settings.RegisterBoolSetting(
settings.WithPublic,
)

// columnarBlocksEnabled controls whether columnar-blocks are enabled in Pebble.
var columnarBlocksEnabled = settings.RegisterBoolSetting(
// ColumnarBlocksEnabled controls whether columnar-blocks are enabled in Pebble.
var ColumnarBlocksEnabled = settings.RegisterBoolSetting(
settings.SystemVisible,
"storage.columnar_blocks.enabled",
"set to true to enable columnar-blocks to store KVs in a columnar format",
Expand Down Expand Up @@ -1246,7 +1246,7 @@ func newPebble(ctx context.Context, cfg engineConfig) (p *Pebble, err error) {
return IngestSplitEnabled.Get(&cfg.settings.SV)
}
cfg.opts.Experimental.EnableColumnarBlocks = func() bool {
return columnarBlocksEnabled.Get(&cfg.settings.SV)
return ColumnarBlocksEnabled.Get(&cfg.settings.SV)
}
cfg.opts.Experimental.EnableDeleteOnlyCompactionExcises = func() bool {
return deleteCompactionsCanExcise.Get(&cfg.settings.SV)
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/pebble_key_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,9 @@ func (ks *cockroachKeySeeker) IsLowerBound(k []byte, syntheticSuffix []byte) boo
func (ks *cockroachKeySeeker) SeekGE(
key []byte, boundRow int, searchDir int8,
) (row int, equalPrefix bool) {
if buildutil.CrdbTestBuild && len(key) == 0 {
panic(errors.AssertionFailedf("seeking to empty key"))
}
// TODO(jackson): Inline EngineKeySplit.
si := EngineKeySplit(key)
row, eq := ks.roachKeys.Search(key[:si-1])
Expand Down
12 changes: 8 additions & 4 deletions pkg/storage/sst_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"
"io"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -80,11 +81,14 @@ func (*noopFinishAbort) Abort() {}
// MakeIngestionWriterOptions returns writer options suitable for writing SSTs
// that will subsequently be ingested (e.g. with AddSSTable).
func MakeIngestionWriterOptions(ctx context.Context, cs *cluster.Settings) sstable.WriterOptions {
// By default, take a conservative approach and assume we don't have newer
// table features available. Upgrade to an appropriate version only if the
// cluster supports it. Currently, all supported versions understand
// TableFormatPebblev4.
// All supported versions understand TableFormatPebblev4. If columnar blocks
// are enabled and the active cluster version is at least 24.3, use
// TableFormatPebblev5.
format := sstable.TableFormatPebblev4
if cs.Version.IsActive(ctx, clusterversion.V24_3) && ColumnarBlocksEnabled.Get(&cs.SV) {
format = sstable.TableFormatPebblev5
}

opts := DefaultPebbleOptions().MakeWriterOptions(0, format)
// By default, compress with the algorithm used for storage in a Pebble store.
// There are other, more specific, use cases that may call for a different
Expand Down
28 changes: 28 additions & 0 deletions pkg/storage/sst_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func TestMakeIngestionWriterOptions(t *testing.T) {
st: func() *cluster.Settings {
st := cluster.MakeTestingClusterSettings()
IngestionValueBlocksEnabled.Override(context.Background(), &st.SV, true)
ColumnarBlocksEnabled.Override(context.Background(), &st.SV, false)
return st
}(),
want: want{
Expand All @@ -99,13 +100,40 @@ func TestMakeIngestionWriterOptions(t *testing.T) {
st: func() *cluster.Settings {
st := cluster.MakeTestingClusterSettings()
IngestionValueBlocksEnabled.Override(context.Background(), &st.SV, false)
ColumnarBlocksEnabled.Override(context.Background(), &st.SV, false)
return st
}(),
want: want{
format: sstable.TableFormatPebblev4,
disableValueBlocks: true,
},
},
{
name: "enable columnar blocks",
st: func() *cluster.Settings {
st := cluster.MakeTestingClusterSettings()
IngestionValueBlocksEnabled.Override(context.Background(), &st.SV, false)
ColumnarBlocksEnabled.Override(context.Background(), &st.SV, true)
return st
}(),
want: want{
format: sstable.TableFormatPebblev5,
disableValueBlocks: true,
},
},
{
name: "enable columnar blocks with value blocks",
st: func() *cluster.Settings {
st := cluster.MakeTestingClusterSettings()
IngestionValueBlocksEnabled.Override(context.Background(), &st.SV, true)
ColumnarBlocksEnabled.Override(context.Background(), &st.SV, true)
return st
}(),
want: want{
format: sstable.TableFormatPebblev5,
disableValueBlocks: false,
},
},
}

for _, tc := range testCases {
Expand Down

0 comments on commit a7a0350

Please sign in to comment.