Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: remove deprecated poller #38211

Merged
merged 1 commit into from
Jun 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
<thead><tr><th>Setting</th><th>Type</th><th>Default</th><th>Description</th></tr></thead>
<tbody>
<tr><td><code>changefeed.experimental_poll_interval</code></td><td>duration</td><td><code>1s</code></td><td>polling interval for the prototype changefeed implementation (WARNING: may compromise cluster stability or correctness; do not edit without supervision)</td></tr>
<tr><td><code>changefeed.push.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, changed are pushed instead of pulled. This requires the kv.rangefeed.enabled setting. See https://www.cockroachlabs.com/docs/v19.2/change-data-capture.html#enable-rangefeeds-to-reduce-latency</td></tr>
<tr><td><code>cloudstorage.gs.default.key</code></td><td>string</td><td><code></code></td><td>if set, JSON key to use during Google Cloud Storage operations</td></tr>
<tr><td><code>cloudstorage.http.custom_ca</code></td><td>string</td><td><code></code></td><td>custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage</td></tr>
<tr><td><code>cloudstorage.timeout</code></td><td>duration</td><td><code>10m0s</code></td><td>the timeout for import/export storage operations</td></tr>
Expand Down
11 changes: 10 additions & 1 deletion pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ func BenchmarkChangefeedTicks(b *testing.B) {
defer leaktest.AfterTest(b)()
defer log.Scope(b).Close(b)

// In PR #38211, we removed the polling based data watcher in changefeeds in
// favor of RangeFeed. This benchmark worked by writing a bunch of data at
// certain timestamps and manipulating clocks at runtime so the polling
// grabbed a little of it at a time. There's fundamentally no way for this to
// work with RangeFeed without a rewrite, but it's not being used for anything
// right now, so the rewrite isn't worth it. We should fix this if we need to
// start doing changefeed perf work at some point.
b.Skip(`broken in #38211`)

ctx := context.Background()
s, sqlDBRaw, _ := serverutils.StartServer(b, base.TestServerArgs{UseDatabase: "d"})
defer s.Stopper().Stop(ctx)
Expand Down Expand Up @@ -212,7 +221,7 @@ func createBenchmarkChangefeed(
s.ClusterSettings(), details, spans, encoder, sink, rowsFn, TestingKnobs{}, metrics)

ctx, cancel := context.WithCancel(ctx)
go func() { _ = poller.Run(ctx) }()
go func() { _ = poller.RunUsingRangefeeds(ctx) }()
go func() { _ = thUpdater.PollTableDescs(ctx) }()

errCh := make(chan error, 1)
Expand Down
42 changes: 5 additions & 37 deletions pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
gosql "database/sql"
"math/rand"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/util/fsm"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -43,17 +42,9 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB) (Validator, error) {
ctx := context.Background()
rng, _ := randutil.NewPseudoRand()

var usingRangeFeed bool
if err := db.QueryRow(
`SHOW CLUSTER SETTING changefeed.push.enabled`,
).Scan(&usingRangeFeed); err != nil {
return nil, err
}

ns := &nemeses{
rowCount: 4,
db: db,
usingPoller: !usingRangeFeed,
rowCount: 4,
db: db,
// eventMix does not have to add to 100
eventMix: map[fsm.Event]int{
// eventTransact opens an UPSERT transaction is there is not one open. If
Expand Down Expand Up @@ -172,10 +163,9 @@ const (
)

type nemeses struct {
rowCount int
eventMix map[fsm.Event]int
mixTotal int
usingPoller bool
rowCount int
eventMix map[fsm.Event]int
mixTotal int

v *CountValidator
db *gosql.DB
Expand Down Expand Up @@ -377,28 +367,6 @@ func transact(a fsm.Args) error {
func noteFeedMessage(a fsm.Args) error {
ns := a.Extended.(*nemeses)

// The poller works by continually selecting a timestamp to be the next
// high-water and polling for changes between the last high-water and the new
// one. It doesn't push any unresolved intents (it would enter the txnwaitq,
// which would see the txn as live and hence not try to push it), so if we
// have an open transaction, it's possible that the poller is stuck waiting on
// it to resolve, which would cause the below call to `Next` to deadlock. This
// breaks that deadlock.
if ns.usingPoller {
nextDone := make(chan struct{})
defer close(nextDone)
go func() {
select {
case <-time.After(5 * time.Second):
log.Info(a.Ctx, "pushed open txn to break deadlock")
if err := push(a); err != nil {
panic(err)
}
case <-nextDone:
}
}()
}

m, err := ns.f.Next()
if err != nil {
return err
Expand Down
12 changes: 0 additions & 12 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -37,17 +36,6 @@ var changefeedPollInterval = func() *settings.DurationSetting {
return s
}()

// PushEnabled is a cluster setting that triggers all subsequently
// created/unpaused changefeeds to receive kv changes via RangeFeed push
// (instead of ExportRequest polling).
var PushEnabled = settings.RegisterBoolSetting(
"changefeed.push.enabled",
"if set, changed are pushed instead of pulled. This requires the "+
"kv.rangefeed.enabled setting. See "+
base.DocsURL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`),
true,
)

const (
jsonMetaSentinel = `__crdb__`
)
Expand Down
9 changes: 1 addition & 8 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlplan"
Expand Down Expand Up @@ -72,13 +71,6 @@ func distChangefeedFlow(
return err
}

execCfg := phs.ExecCfg()
if PushEnabled.Get(&execCfg.Settings.SV) {
telemetry.Count(`changefeed.run.push.enabled`)
} else {
telemetry.Count(`changefeed.run.push.disabled`)
}

spansTS := details.StatementTime
var initialHighWater hlc.Timestamp
if h := progress.GetHighWater(); h != nil && *h != (hlc.Timestamp{}) {
Expand All @@ -88,6 +80,7 @@ func distChangefeedFlow(
spansTS = initialHighWater
}

execCfg := phs.ExecCfg()
trackedSpans, err := fetchSpansForTargets(ctx, execCfg.DB, details.Targets, spansTS)
if err != nil {
return err
Expand Down
7 changes: 1 addition & 6 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,7 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {
ca.pollerDoneCh = make(chan struct{})
if err := ca.flowCtx.Stopper().RunAsyncTask(ctx, "changefeed-poller", func(ctx context.Context) {
defer close(ca.pollerDoneCh)
var err error
if PushEnabled.Get(&ca.flowCtx.Settings.SV) {
err = ca.poller.RunUsingRangefeeds(ctx)
} else {
err = ca.poller.Run(ctx)
}
err := ca.poller.RunUsingRangefeeds(ctx)

// Trying to call MoveToDraining here is racy (`MoveToDraining called in
// state stateTrailingMeta`), so return the error via a channel.
Expand Down
Loading