Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
38211: changefeedccl: remove deprecated poller r=tbg a=danhhz

We switched the default to push-based rangefeeds in 19.1. This removes
the old pull-based poller fallback entirely.

Details of the removal:
- The relevant code is removed
- Several poller-related hacks are removed
- The changefeed.run.push.enabled telemetry metric is removed
- The changefeed.push.enabled cluster setting is removed
- The poller subtest is removed from each changefeedccl test
- The cdc/poller roachtest is skipped on 19.2+
- TestValidations is removed, it's redundant with the much better
  quality TestChangefeedNemeses

Note that the table history still does some polling, but switching this
to RangeFeed will cause an unacceptable increase in the commit-to-emit
latency of rows. This bit of polling will be removed as part of #36289.
This commit also leaves the structure of the changefeed code mostly
unchanged. There is an opportunity for cleanup here, but this also will
wait for after #36289.

Closes #36914

Release note: None

38418: storage: fix null pointer dereference in AdminMerge r=jeffrey-xiao a=jeffrey-xiao

Fixes #38427.

I noticed that `TestRepartitioning` was failing under stress on master with the following error:

```
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x20894f6]

...

        /usr/lib/go-1.12/src/runtime/panic.go:522 +0x1b5
github.com/cockroachdb/cockroach/pkg/storage.(*Replica).AdminMerge.func1(0xc00441e750, 0xc002a83e60, 0xc00020a880)
        /home/cockroach/go/src/github.com/cockroachdb/cockroach/pkg/storage/replica_command.go:536 +0x226

...
```

The logic for retrieving the RHS descriptor before and after #38302 is not the same in the case where `rightDesc` does not exist. This PR changes it so the behavior is consistent.

It seems possible that the following branch can evaluate true, but I might be missing something here.
```go
// Verify that the two ranges are mergeable.
if !bytes.Equal(origLeftDesc.EndKey, rightDesc.StartKey) {
  // Should never happen, but just in case.
  return errors.Errorf("ranges are not adjacent; %s != %s", origLeftDesc.EndKey, rightDesc.StartKey)
}
```


38435: sql: Fixing interleave check for loose index scan r=rohany a=rohany

Fixing the interleave check for the loose index scan while interleave support is in progress.

Co-authored-by: Daniel Harrison <[email protected]>
Co-authored-by: Jeffrey Xiao <[email protected]>
Co-authored-by: Rohan Yadav <[email protected]>
  • Loading branch information
4 people committed Jun 26, 2019
4 parents 5f358ed + c7a195c + 0b76aa6 + d3e7871 commit 56c18d8
Show file tree
Hide file tree
Showing 16 changed files with 60 additions and 342 deletions.
1 change: 0 additions & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
<thead><tr><th>Setting</th><th>Type</th><th>Default</th><th>Description</th></tr></thead>
<tbody>
<tr><td><code>changefeed.experimental_poll_interval</code></td><td>duration</td><td><code>1s</code></td><td>polling interval for the prototype changefeed implementation (WARNING: may compromise cluster stability or correctness; do not edit without supervision)</td></tr>
<tr><td><code>changefeed.push.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, changed are pushed instead of pulled. This requires the kv.rangefeed.enabled setting. See https://www.cockroachlabs.com/docs/v19.2/change-data-capture.html#enable-rangefeeds-to-reduce-latency</td></tr>
<tr><td><code>cloudstorage.gs.default.key</code></td><td>string</td><td><code></code></td><td>if set, JSON key to use during Google Cloud Storage operations</td></tr>
<tr><td><code>cloudstorage.http.custom_ca</code></td><td>string</td><td><code></code></td><td>custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage</td></tr>
<tr><td><code>cloudstorage.timeout</code></td><td>duration</td><td><code>10m0s</code></td><td>the timeout for import/export storage operations</td></tr>
Expand Down
11 changes: 10 additions & 1 deletion pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ func BenchmarkChangefeedTicks(b *testing.B) {
defer leaktest.AfterTest(b)()
defer log.Scope(b).Close(b)

// In PR #38211, we removed the polling based data watcher in changefeeds in
// favor of RangeFeed. This benchmark worked by writing a bunch of data at
// certain timestamps and manipulating clocks at runtime so the polling
// grabbed a little of it at a time. There's fundamentally no way for this to
// work with RangeFeed without a rewrite, but it's not being used for anything
// right now, so the rewrite isn't worth it. We should fix this if we need to
// start doing changefeed perf work at some point.
b.Skip(`broken in #38211`)

ctx := context.Background()
s, sqlDBRaw, _ := serverutils.StartServer(b, base.TestServerArgs{UseDatabase: "d"})
defer s.Stopper().Stop(ctx)
Expand Down Expand Up @@ -212,7 +221,7 @@ func createBenchmarkChangefeed(
s.ClusterSettings(), details, spans, encoder, sink, rowsFn, TestingKnobs{}, metrics)

ctx, cancel := context.WithCancel(ctx)
go func() { _ = poller.Run(ctx) }()
go func() { _ = poller.RunUsingRangefeeds(ctx) }()
go func() { _ = thUpdater.PollTableDescs(ctx) }()

errCh := make(chan error, 1)
Expand Down
42 changes: 5 additions & 37 deletions pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
gosql "database/sql"
"math/rand"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/util/fsm"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -43,17 +42,9 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB) (Validator, error) {
ctx := context.Background()
rng, _ := randutil.NewPseudoRand()

var usingRangeFeed bool
if err := db.QueryRow(
`SHOW CLUSTER SETTING changefeed.push.enabled`,
).Scan(&usingRangeFeed); err != nil {
return nil, err
}

ns := &nemeses{
rowCount: 4,
db: db,
usingPoller: !usingRangeFeed,
rowCount: 4,
db: db,
// eventMix does not have to add to 100
eventMix: map[fsm.Event]int{
// eventTransact opens an UPSERT transaction is there is not one open. If
Expand Down Expand Up @@ -172,10 +163,9 @@ const (
)

type nemeses struct {
rowCount int
eventMix map[fsm.Event]int
mixTotal int
usingPoller bool
rowCount int
eventMix map[fsm.Event]int
mixTotal int

v *CountValidator
db *gosql.DB
Expand Down Expand Up @@ -377,28 +367,6 @@ func transact(a fsm.Args) error {
func noteFeedMessage(a fsm.Args) error {
ns := a.Extended.(*nemeses)

// The poller works by continually selecting a timestamp to be the next
// high-water and polling for changes between the last high-water and the new
// one. It doesn't push any unresolved intents (it would enter the txnwaitq,
// which would see the txn as live and hence not try to push it), so if we
// have an open transaction, it's possible that the poller is stuck waiting on
// it to resolve, which would cause the below call to `Next` to deadlock. This
// breaks that deadlock.
if ns.usingPoller {
nextDone := make(chan struct{})
defer close(nextDone)
go func() {
select {
case <-time.After(5 * time.Second):
log.Info(a.Ctx, "pushed open txn to break deadlock")
if err := push(a); err != nil {
panic(err)
}
case <-nextDone:
}
}()
}

m, err := ns.f.Next()
if err != nil {
return err
Expand Down
12 changes: 0 additions & 12 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -37,17 +36,6 @@ var changefeedPollInterval = func() *settings.DurationSetting {
return s
}()

// PushEnabled is a cluster setting that triggers all subsequently
// created/unpaused changefeeds to receive kv changes via RangeFeed push
// (instead of ExportRequest polling).
var PushEnabled = settings.RegisterBoolSetting(
"changefeed.push.enabled",
"if set, changed are pushed instead of pulled. This requires the "+
"kv.rangefeed.enabled setting. See "+
base.DocsURL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`),
true,
)

const (
jsonMetaSentinel = `__crdb__`
)
Expand Down
9 changes: 1 addition & 8 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlplan"
Expand Down Expand Up @@ -72,13 +71,6 @@ func distChangefeedFlow(
return err
}

execCfg := phs.ExecCfg()
if PushEnabled.Get(&execCfg.Settings.SV) {
telemetry.Count(`changefeed.run.push.enabled`)
} else {
telemetry.Count(`changefeed.run.push.disabled`)
}

spansTS := details.StatementTime
var initialHighWater hlc.Timestamp
if h := progress.GetHighWater(); h != nil && *h != (hlc.Timestamp{}) {
Expand All @@ -88,6 +80,7 @@ func distChangefeedFlow(
spansTS = initialHighWater
}

execCfg := phs.ExecCfg()
trackedSpans, err := fetchSpansForTargets(ctx, execCfg.DB, details.Targets, spansTS)
if err != nil {
return err
Expand Down
7 changes: 1 addition & 6 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,7 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {
ca.pollerDoneCh = make(chan struct{})
if err := ca.flowCtx.Stopper().RunAsyncTask(ctx, "changefeed-poller", func(ctx context.Context) {
defer close(ca.pollerDoneCh)
var err error
if PushEnabled.Get(&ca.flowCtx.Settings.SV) {
err = ca.poller.RunUsingRangefeeds(ctx)
} else {
err = ca.poller.Run(ctx)
}
err := ca.poller.RunUsingRangefeeds(ctx)

// Trying to call MoveToDraining here is racy (`MoveToDraining called in
// state stateTrailingMeta`), so return the error via a channel.
Expand Down
Loading

0 comments on commit 56c18d8

Please sign in to comment.