From 5cbf7751f3eb65153b9a137f4b45028ef0b3b648 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Fri, 14 Aug 2020 18:41:50 -0400 Subject: [PATCH 1/3] kvserver,rangefeed: ensure that iterators are only constructed under tasks Prior to this change, it was possible for a rangefeed request to be issued concurrently with shutting down which could lead to an iterator being constructed after the engine has been closed. Touches #51544 Release note: None --- pkg/kv/kvserver/rangefeed/processor.go | 20 +++--- pkg/kv/kvserver/rangefeed/processor_test.go | 9 ++- pkg/kv/kvserver/rangefeed/registry.go | 72 +++++++++++---------- pkg/kv/kvserver/rangefeed/registry_test.go | 4 +- pkg/kv/kvserver/replica_rangefeed.go | 65 ++++++++++--------- 5 files changed, 93 insertions(+), 77 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 02cbc9cae902..3079743a5267 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -158,6 +158,10 @@ func NewProcessor(cfg Config) *Processor { } } +// IteratorConstructor is used to construct an iterator. It should be called +// from underneath a stopper task to ensure that the engine has not been closed. +type IteratorConstructor func() storage.SimpleIterator + // Start launches a goroutine to process rangefeed events and send them to // registrations. // @@ -167,10 +171,10 @@ func NewProcessor(cfg Config) *Processor { // calling its Close method when it is finished. If the iterator is nil then // no initialization scan will be performed and the resolved timestamp will // immediately be considered initialized. -func (p *Processor) Start(stopper *stop.Stopper, rtsIter storage.SimpleIterator) { +func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor) { ctx := p.AnnotateCtx(context.Background()) if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) { - p.run(ctx, rtsIter, stopper) + p.run(ctx, rtsIterFunc, stopper) }); err != nil { pErr := roachpb.NewError(err) p.reg.DisconnectWithErr(all, pErr) @@ -180,7 +184,7 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIter storage.SimpleIterator) // run is called from Start and runs the rangefeed. func (p *Processor) run( - ctx context.Context, rtsIter storage.SimpleIterator, stopper *stop.Stopper, + ctx context.Context, rtsIterFunc IteratorConstructor, stopper *stop.Stopper, ) { defer close(p.stoppedC) ctx, cancelOutputLoops := context.WithCancel(ctx) @@ -188,7 +192,8 @@ func (p *Processor) run( // Launch an async task to scan over the resolved timestamp iterator and // initialize the unresolvedIntentQueue. Ignore error if quiescing. - if rtsIter != nil { + if rtsIterFunc != nil { + rtsIter := rtsIterFunc() initScan := newInitResolvedTSScan(p, rtsIter) err := stopper.RunAsyncTask(ctx, "rangefeed: init resolved ts", initScan.Run) if err != nil { @@ -239,9 +244,6 @@ func (p *Processor) run( } } if err := stopper.RunAsyncTask(ctx, "rangefeed: output loop", runOutputLoop); err != nil { - if r.catchupIter != nil { - r.catchupIter.Close() // clean up - } r.disconnect(roachpb.NewError(err)) p.reg.Unregister(&r) } @@ -368,7 +370,7 @@ func (p *Processor) sendStop(pErr *roachpb.Error) { func (p *Processor) Register( span roachpb.RSpan, startTS hlc.Timestamp, - catchupIter storage.SimpleIterator, + catchupIterConstructor IteratorConstructor, withDiff bool, stream Stream, errC chan<- *roachpb.Error, @@ -379,7 +381,7 @@ func (p *Processor) Register( p.syncEventC() r := newRegistration( - span.AsRawSpanWithNoLocals(), startTS, catchupIter, withDiff, + span.AsRawSpanWithNoLocals(), startTS, catchupIterConstructor, withDiff, p.Config.EventChanCap, p.Metrics, stream, errC, ) select { diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 7fbc63531354..e4bf0062547b 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -152,10 +152,17 @@ func newTestProcessorWithTxnPusher( EventChanCap: testProcessorEventCCap, CheckStreamsInterval: 10 * time.Millisecond, }) - p.Start(stopper, rtsIter) + p.Start(stopper, makeIteratorConstructor(rtsIter)) return p, stopper } +func makeIteratorConstructor(rtsIter storage.SimpleIterator) IteratorConstructor { + if rtsIter == nil { + return nil + } + return func() storage.SimpleIterator { return rtsIter } +} + func newTestProcessor(rtsIter storage.SimpleIterator) (*Processor, *stop.Stopper) { return newTestProcessorWithTxnPusher(rtsIter, nil /* pusher */) } diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index 3774860830bd..dbc66daf7fd9 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -53,11 +53,11 @@ type Stream interface { // has finished. type registration struct { // Input. - span roachpb.Span - catchupTimestamp hlc.Timestamp - catchupIter storage.SimpleIterator - withDiff bool - metrics *Metrics + span roachpb.Span + catchupTimestamp hlc.Timestamp + catchupIterConstructor func() storage.SimpleIterator + withDiff bool + metrics *Metrics // Output. stream Stream @@ -86,7 +86,7 @@ type registration struct { func newRegistration( span roachpb.Span, startTS hlc.Timestamp, - catchupIter storage.SimpleIterator, + catchupIterConstructor func() storage.SimpleIterator, withDiff bool, bufferSz int, metrics *Metrics, @@ -94,14 +94,14 @@ func newRegistration( errC chan<- *roachpb.Error, ) registration { r := registration{ - span: span, - catchupTimestamp: startTS, - catchupIter: catchupIter, - withDiff: withDiff, - metrics: metrics, - stream: stream, - errC: errC, - buf: make(chan *roachpb.RangeFeedEvent, bufferSz), + span: span, + catchupTimestamp: startTS, + catchupIterConstructor: catchupIterConstructor, + withDiff: withDiff, + metrics: metrics, + stream: stream, + errC: errC, + buf: make(chan *roachpb.RangeFeedEvent, bufferSz), } r.mu.Locker = &syncutil.Mutex{} r.mu.caughtUp = true @@ -231,13 +231,11 @@ func (r *registration) disconnect(pErr *roachpb.Error) { // canceled, or when the buffer has overflowed and all pre-overflow entries // have been emitted. func (r *registration) outputLoop(ctx context.Context) error { - // If the registration has a catch-up scan, - if r.catchupIter != nil { - if err := r.runCatchupScan(); err != nil { - err = errors.Wrap(err, "catch-up scan failed") - log.Errorf(ctx, "%v", err) - return err - } + // If the registration has a catch-up scan, run it. + if err := r.maybeRunCatchupScan(); err != nil { + err = errors.Wrap(err, "catch-up scan failed") + log.Errorf(ctx, "%v", err) + return err } // Normal buffered output loop. @@ -274,18 +272,22 @@ func (r *registration) runOutputLoop(ctx context.Context) { r.disconnect(roachpb.NewError(err)) } -// runCatchupScan starts a catchup scan which will output entries for all +// maybeRunCatchupScan starts a catchup scan which will output entries for all // recorded changes in the replica that are newer than the catchupTimestamp. // This uses the iterator provided when the registration was originally created; // after the scan completes, the iterator will be closed. -func (r *registration) runCatchupScan() error { - if r.catchupIter == nil { +// +// If the registration does not have a catchUpIteratorConstructor, this method +// is a no-op. +func (r *registration) maybeRunCatchupScan() error { + if r.catchupIterConstructor == nil { return nil } + catchupIter := r.catchupIterConstructor() + r.catchupIterConstructor = nil start := timeutil.Now() defer func() { - r.catchupIter.Close() - r.catchupIter = nil + catchupIter.Close() r.metrics.RangeFeedCatchupScanNanos.Inc(timeutil.Since(start).Nanoseconds()) }() @@ -323,16 +325,16 @@ func (r *registration) runCatchupScan() error { // versions of each key that are after the registration's startTS, so we // can't use NextKey. var meta enginepb.MVCCMetadata - r.catchupIter.SeekGE(startKey) + catchupIter.SeekGE(startKey) for { - if ok, err := r.catchupIter.Valid(); err != nil { + if ok, err := catchupIter.Valid(); err != nil { return err - } else if !ok || !r.catchupIter.UnsafeKey().Less(endKey) { + } else if !ok || !catchupIter.UnsafeKey().Less(endKey) { break } - unsafeKey := r.catchupIter.UnsafeKey() - unsafeVal := r.catchupIter.UnsafeValue() + unsafeKey := catchupIter.UnsafeKey() + unsafeVal := catchupIter.UnsafeValue() if !unsafeKey.IsValue() { // Found a metadata key. if err := protoutil.Unmarshal(unsafeVal, &meta); err != nil { @@ -344,7 +346,7 @@ func (r *registration) runCatchupScan() error { // past the corresponding provisional key-value. To do this, // scan to the timestamp immediately before (i.e. the key // immediately after) the provisional key. - r.catchupIter.SeekGE(storage.MVCCKey{ + catchupIter.SeekGE(storage.MVCCKey{ Key: unsafeKey.Key, Timestamp: hlc.Timestamp(meta.Timestamp).Prev(), }) @@ -375,7 +377,7 @@ func (r *registration) runCatchupScan() error { if ignore && !r.withDiff { // Skip all the way to the next key. // NB: fast-path to avoid value copy when !r.withDiff. - r.catchupIter.NextKey() + catchupIter.NextKey() continue } @@ -388,10 +390,10 @@ func (r *registration) runCatchupScan() error { if ignore { // Skip all the way to the next key. - r.catchupIter.NextKey() + catchupIter.NextKey() } else { // Move to the next version of this key. - r.catchupIter.Next() + catchupIter.Next() var event roachpb.RangeFeedEvent event.MustSetValue(&roachpb.RangeFeedValue{ diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index d003b488030b..54bfc0c8d769 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -104,7 +104,7 @@ func newTestRegistration( registration: newRegistration( span, ts, - catchup, + makeIteratorConstructor(catchup), withDiff, 5, NewMetrics(), @@ -253,7 +253,7 @@ func TestRegistrationCatchUpScan(t *testing.T) { }, hlc.Timestamp{WallTime: 4}, iter, true /* withDiff */) require.Zero(t, r.metrics.RangeFeedCatchupScanNanos.Count()) - require.NoError(t, r.runCatchupScan()) + require.NoError(t, r.maybeRunCatchupScan()) require.True(t, iter.closed) require.NotZero(t, r.metrics.RangeFeedCatchupScanNanos.Count()) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 2a43ed9fc21d..e6298f70720a 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -194,28 +194,32 @@ func (r *Replica) RangeFeed( } // Register the stream with a catch-up iterator. - var catchUpIter storage.SimpleIterator + var catchUpIterFunc rangefeed.IteratorConstructor if usingCatchupIter { - innerIter := r.Engine().NewIterator(storage.IterOptions{ - UpperBound: args.Span.EndKey, - // RangeFeed originally intended to use the time-bound iterator - // performance optimization. However, they've had correctness issues in - // the past (#28358, #34819) and no-one has the time for the due-diligence - // necessary to be confidant in their correctness going forward. Not using - // them causes the total time spent in RangeFeed catchup on changefeed - // over tpcc-1000 to go from 40s -> 4853s, which is quite large but still - // workable. See #35122 for details. - // MinTimestampHint: args.Timestamp, - }) - catchUpIter = iteratorWithCloser{ - SimpleIterator: innerIter, - close: iterSemRelease, + catchUpIterFunc = func() storage.SimpleIterator { + + innerIter := r.Engine().NewIterator(storage.IterOptions{ + UpperBound: args.Span.EndKey, + // RangeFeed originally intended to use the time-bound iterator + // performance optimization. However, they've had correctness issues in + // the past (#28358, #34819) and no-one has the time for the due-diligence + // necessary to be confidant in their correctness going forward. Not using + // them causes the total time spent in RangeFeed catchup on changefeed + // over tpcc-1000 to go from 40s -> 4853s, which is quite large but still + // workable. See #35122 for details. + // MinTimestampHint: args.Timestamp, + }) + catchUpIter := iteratorWithCloser{ + SimpleIterator: innerIter, + close: iterSemRelease, + } + // Responsibility for releasing the semaphore now passes to the iterator. + iterSemRelease = nil + return catchUpIter } - // Responsibility for releasing the semaphore now passes to the iterator. - iterSemRelease = nil } p := r.registerWithRangefeedRaftMuLocked( - ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, lockedStream, errC, + ctx, rSpan, args.Timestamp, catchUpIterFunc, args.WithDiff, lockedStream, errC, ) r.raftMu.Unlock() @@ -296,7 +300,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( ctx context.Context, span roachpb.RSpan, startTS hlc.Timestamp, - catchupIter storage.SimpleIterator, + catchupIter rangefeed.IteratorConstructor, withDiff bool, stream rangefeed.Stream, errC chan<- *roachpb.Error, @@ -341,16 +345,18 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( p = rangefeed.NewProcessor(cfg) // Start it with an iterator to initialize the resolved timestamp. - rtsIter := r.Engine().NewIterator(storage.IterOptions{ - UpperBound: desc.EndKey.AsRawKey(), - // TODO(nvanbenschoten): To facilitate fast restarts of rangefeed - // we should periodically persist the resolved timestamp so that we - // can initialize the rangefeed using an iterator that only needs to - // observe timestamps back to the last recorded resolved timestamp. - // This is safe because we know that there are no unresolved intents - // at times before a resolved timestamp. - // MinTimestampHint: r.ResolvedTimestamp, - }) + rtsIter := func() storage.SimpleIterator { + return r.Engine().NewIterator(storage.IterOptions{ + UpperBound: desc.EndKey.AsRawKey(), + // TODO(nvanbenschoten): To facilitate fast restarts of rangefeed + // we should periodically persist the resolved timestamp so that we + // can initialize the rangefeed using an iterator that only needs to + // observe timestamps back to the last recorded resolved timestamp. + // This is safe because we know that there are no unresolved intents + // at times before a resolved timestamp. + // MinTimestampHint: r.ResolvedTimestamp, + }) + } p.Start(r.store.Stopper(), rtsIter) // Register with the processor *before* we attach its reference to the @@ -360,7 +366,6 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // server shutdown. reg, filter := p.Register(span, startTS, catchupIter, withDiff, stream, errC) if !reg { - catchupIter.Close() // clean up select { case <-r.store.Stopper().ShouldQuiesce(): errC <- roachpb.NewError(&roachpb.NodeUnavailableError{}) From 66c5f48baf1c14636b6f2c312cf0f9fe17fcb75a Mon Sep 17 00:00:00 2001 From: Marcus Gartner Date: Mon, 13 Jul 2020 17:30:05 -0700 Subject: [PATCH 2/3] partialidx: prove implication for comparisons with two variables This commit adds support for proving partial index predicates are implied by query filters when they contain comparison expressions with two variables and they are not identical expressions. Below are some examples where the left expression implies (=>) the right expression. The right is guaranteed to contain the left despite both expressions having no constant values. a > b => a >= b a = b => a >= b b < a => a >= b a > b => a != b Release note: None --- pkg/sql/opt/partialidx/implicator.go | 140 ++++++++++++++--- .../opt/partialidx/testdata/implicator/atom | 144 +++++++++++++++++- 2 files changed, 257 insertions(+), 27 deletions(-) diff --git a/pkg/sql/opt/partialidx/implicator.go b/pkg/sql/opt/partialidx/implicator.go index 958ba70aa370..2eb9601c967c 100644 --- a/pkg/sql/opt/partialidx/implicator.go +++ b/pkg/sql/opt/partialidx/implicator.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/opt/norm" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/errors" ) // Implicator is used to 1) prove that query filters imply a partial index @@ -450,28 +451,36 @@ func (im *Implicator) atomImpliesPredicate( default: // atom A => atom B iff B contains A. - return im.atomContainsAtom(pred, e) + return im.atomImpliesAtom(e, pred, exactMatches) } } -// atomContainsAtom returns true if atom expression a contains atom expression -// b, meaning that all values for variables in which b evaluates to true, a also -// evaluates to true. +// atomImpliesAtom returns true if the predicate atom expression, pred, contains +// atom expression a, meaning that all values for variables in which e evaluates +// to true, pred also evaluates to true. // // Constraints are used to prove containment because they make it easy to assess // if one expression contains another, handling many types of expressions // including comparison operators, IN operators, and tuples. -func (im *Implicator) atomContainsAtom(a, b opt.ScalarExpr) bool { - // Build constraint sets for a and b, unless they have been cached. - aSet, aTight, ok := im.fetchConstraint(a) +func (im *Implicator) atomImpliesAtom( + e opt.ScalarExpr, pred opt.ScalarExpr, exactMatches map[opt.Expr]struct{}, +) bool { + // Check for containment of comparison expressions with two variables, like + // a = b. + if res, ok := im.twoVarComparisonImpliesTwoVarComparison(e, pred, exactMatches); ok { + return res + } + + // Build constraint sets for e and pred, unless they have been cached. + eSet, eTight, ok := im.fetchConstraint(e) if !ok { - aSet, aTight = memo.BuildConstraints(a, im.md, im.evalCtx) - im.cacheConstraint(a, aSet, aTight) + eSet, eTight = memo.BuildConstraints(e, im.md, im.evalCtx) + im.cacheConstraint(e, eSet, eTight) } - bSet, bTight, ok := im.fetchConstraint(b) + predSet, predTight, ok := im.fetchConstraint(pred) if !ok { - bSet, bTight = memo.BuildConstraints(b, im.md, im.evalCtx) - im.cacheConstraint(b, bSet, bTight) + predSet, predTight = memo.BuildConstraints(pred, im.md, im.evalCtx) + im.cacheConstraint(pred, predSet, predTight) } // If either set has more than one constraint, then constraints cannot be @@ -484,28 +493,105 @@ func (im *Implicator) atomContainsAtom(a, b opt.ScalarExpr) bool { // // /1: (/NULL - ]; /2: (/NULL - ] // - // TODO(mgartner): Prove implication in cases like (a > b) => (a >= b), - // without using constraints. - if aSet.Length() > 1 || bSet.Length() > 1 { + if eSet.Length() > 1 || predSet.Length() > 1 { return false } // Containment cannot be proven if either constraint is not tight, because // the constraint does not fully represent the expression. - if !aTight || !bTight { + if !eTight || !predTight { return false } - ac := aSet.Constraint(0) - bc := bSet.Constraint(0) + eConstraint := eSet.Constraint(0) + predConstraint := predSet.Constraint(0) - // If the columns in ac are not a prefix of the columns in bc, then ac - // cannot contain bc. - if !ac.Columns.IsPrefixOf(&bc.Columns) { + // If the columns in predConstraint are not a prefix of the columns in + // eConstraint, then predConstraint cannot contain eConstraint. + if !predConstraint.Columns.IsPrefixOf(&eConstraint.Columns) { return false } - return ac.Contains(im.evalCtx, bc) + return predConstraint.Contains(im.evalCtx, eConstraint) +} + +// twoVarComparisonImpliesTwoVarComparison returns true if pred contains e, +// where both expressions are comparisons (=, <, >, <=, >=, !=) of two +// variables. If either expressions is not a comparison of two variables, this +// function returns ok=false. +// +// For example, it can be prove that (a > b) implies (a >= b) because all +// values of a and b that satisfy the first expression also satisfy the second +// expression. +func (im *Implicator) twoVarComparisonImpliesTwoVarComparison( + e opt.ScalarExpr, pred opt.ScalarExpr, exactMatches map[opt.Expr]struct{}, +) (containment bool, ok bool) { + if !isTwoVarComparison(e) || !isTwoVarComparison(pred) { + return false, false + } + + commutedOp := func(op opt.Operator) opt.Operator { + switch op { + case opt.EqOp: + return opt.EqOp + case opt.NeOp: + return opt.NeOp + case opt.LtOp: + return opt.GtOp + case opt.GtOp: + return opt.LtOp + case opt.LeOp: + return opt.GeOp + case opt.GeOp: + return opt.LeOp + default: + panic(errors.AssertionFailedf("%s has no commuted operator", op)) + } + } + + predLeftCol := pred.Child(0).(*memo.VariableExpr).Col + predRightCol := pred.Child(1).(*memo.VariableExpr).Col + impliesPred := func(a opt.ColumnID, b opt.ColumnID, op opt.Operator) bool { + // If the columns are not the same, then pred is not implied. + if a != predLeftCol || b != predRightCol { + return false + } + + // If the columns are the same and the ops are the same, then pred is + // implied. + if op == pred.Op() { + return true + } + + switch op { + case opt.EqOp: + // a = b implies a <= b and a >= b + return pred.Op() == opt.LeOp || pred.Op() == opt.GeOp + case opt.LtOp: + // a < b implies a <= b and a != b + return pred.Op() == opt.LeOp || pred.Op() == opt.NeOp + case opt.GtOp: + // a > b implies a >= b and a != b + return pred.Op() == opt.GeOp || pred.Op() == opt.NeOp + default: + return false + } + } + + eLeftCol := e.Child(0).(*memo.VariableExpr).Col + eRightCol := e.Child(1).(*memo.VariableExpr).Col + if impliesPred(eLeftCol, eRightCol, e.Op()) || impliesPred(eRightCol, eLeftCol, commutedOp(e.Op())) { + // If both operators are equal, or e's commuted operator is equal to + // pred's operator, then e is an exact match to pred and it should be + // removed from the remaining filters. For example, (a > b) and + // (b < a) both individually imply (a > b) with no remaining filters. + if e.Op() == pred.Op() || commutedOp(e.Op()) == pred.Op() { + exactMatches[e] = struct{}{} + } + return true, true + } + + return false, true } // cacheConstraint caches a constraint set and a tight boolean for the given @@ -638,3 +724,13 @@ func flattenOrExpr(or *memo.OrExpr) []opt.ScalarExpr { return ors } + +// isTwoVarComparison returns true if the expression is a comparison +// expression (=, <, >, <=, >=, !=) and both side of the comparison are +// variables. +func isTwoVarComparison(e opt.ScalarExpr) bool { + op := e.Op() + return (op == opt.EqOp || op == opt.LtOp || op == opt.GtOp || op == opt.LeOp || op == opt.GeOp || op == opt.NeOp) && + e.Child(0).Op() == opt.VariableOp && + e.Child(1).Op() == opt.VariableOp +} diff --git a/pkg/sql/opt/partialidx/testdata/implicator/atom b/pkg/sql/opt/partialidx/testdata/implicator/atom index 7aab4c64da07..ed43dc2ccc15 100644 --- a/pkg/sql/opt/partialidx/testdata/implicator/atom +++ b/pkg/sql/opt/partialidx/testdata/implicator/atom @@ -130,15 +130,29 @@ predtest vars=(int, int) true └── remaining filters: none -# TODO(mgartner): This filter should imply the predicate. The current logic does -# not support this because it relies solely on constraints which can only -# represent variable constraints in relation to constants. predtest vars=(int, int) -@1 > @2 +@1 = @2 +=> +@2 = @1 +---- +true +└── remaining filters: none + +predtest vars=(int, int) +@1 = @2 +=> +@1 <= @2 +---- +true +└── remaining filters: @1 = @2 + +predtest vars=(int, int) +@1 = @2 => @1 >= @2 ---- -false +true +└── remaining filters: @1 = @2 predtest vars=(int) @1 = 1 @@ -173,6 +187,110 @@ predtest vars=(int, int) true └── remaining filters: none +predtest vars=(int, int) +@1 < @2 +=> +@2 > @1 +---- +true +└── remaining filters: none + +predtest vars=(int, int) +@1 < @2 +=> +@1 <= @2 +---- +true +└── remaining filters: @1 < @2 + +predtest vars=(int, int) +@1 < @2 +=> +@2 >= @1 +---- +true +└── remaining filters: @1 < @2 + +predtest vars=(int, int) +@1 < @2 +=> +@1 != @2 +---- +true +└── remaining filters: @1 < @2 + +predtest vars=(int, int) +@1 < @2 +=> +@2 != @1 +---- +true +└── remaining filters: @1 < @2 + +predtest vars=(int, int) +@1 <= @2 +=> +@2 >= @1 +---- +true +└── remaining filters: none + +predtest vars=(int, int) +@1 > @2 +=> +@2 < @1 +---- +true +└── remaining filters: none + +predtest vars=(int, int) +@1 > @2 +=> +@1 >= @2 +---- +true +└── remaining filters: @1 > @2 + +predtest vars=(int, int) +@1 > @2 +=> +@2 <= @1 +---- +true +└── remaining filters: @1 > @2 + +predtest vars=(int, int) +@1 > @2 +=> +@1 != @2 +---- +true +└── remaining filters: @1 > @2 + +predtest vars=(int, int) +@1 > @2 +=> +@2 != @1 +---- +true +└── remaining filters: @1 > @2 + +predtest vars=(int, int) +@1 >= @2 +=> +@2 <= @1 +---- +true +└── remaining filters: none + +predtest vars=(int, int) +@1 != @2 +=> +@2 != @1 +---- +true +└── remaining filters: none + predtest vars=(int) @1 > 10 => @@ -390,6 +508,22 @@ predtest vars=(bool, bool) true └── remaining filters: @2 +predtest vars=(string, string, string) +@1 = @2 AND @3 = 'foo' +=> +@2 = @1 +---- +true +└── remaining filters: @3 = 'foo' + +predtest vars=(string, string, string) +@1 = @2 AND @3 = @1 +=> +@1 = @3 +---- +true +└── remaining filters: @1 = @2 + predtest vars=(bool, bool) @1 AND NOT @2 => From b0fe4350c1f8ca8eda31235d70f7cd2d45cb0a28 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Tue, 18 Aug 2020 11:36:17 -0400 Subject: [PATCH 3/3] roachprod: introduce --skip-init to `roachprod start` ..and `roachprod init`. I attempted to originally introduce this flag in \#51329, and ultimately abandoned it. I still think it's a good idea to have such a thing, especially given now we're writing integration tests that want to control `init` behaviour. It's much easier to write them with this --skip-init flag than it is to work around roachprod's magical auto-init behavior. To do what's skipped when using --skip-init, we introduce a `roachprod init` sub command. Release note: None --- pkg/cmd/roachprod/install/cluster_synced.go | 41 +++++++++++++++++ pkg/cmd/roachprod/install/cockroach.go | 50 ++++++++++++--------- pkg/cmd/roachprod/main.go | 33 +++++++++++++- 3 files changed, 103 insertions(+), 21 deletions(-) diff --git a/pkg/cmd/roachprod/install/cluster_synced.go b/pkg/cmd/roachprod/install/cluster_synced.go index c1b79c6664bf..2e2a1a67c791 100644 --- a/pkg/cmd/roachprod/install/cluster_synced.go +++ b/pkg/cmd/roachprod/install/cluster_synced.go @@ -37,6 +37,7 @@ import ( clog "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/version" "github.com/cockroachdb/errors" crdberrors "github.com/cockroachdb/errors" "golang.org/x/sync/errgroup" @@ -1736,3 +1737,43 @@ func (c *SyncedCluster) Parallel( func (c *SyncedCluster) escapedTag() string { return strings.Replace(c.Tag, "/", "\\/", -1) } + +// Init initializes the cluster. It does it through node 1 (as per ServerNodes) +// to maintain parity with auto-init behavior of `roachprod start` (when +// --skip-init) is not specified. The implementation should be kept in +// sync with Cockroach.Start. +func (c *SyncedCluster) Init() { + r := c.Impl.(Cockroach) + h := &crdbInstallHelper{c: c, r: r} + + // See (Cockroach).Start. We reserve a few special operations for the first + // node, so we strive to maintain the same here for interoperability. + const firstNodeIdx = 0 + + vers, err := getCockroachVersion(c, c.ServerNodes()[firstNodeIdx]) + if err != nil { + log.Fatalf("unable to retrieve cockroach version: %v", err) + } + + if !vers.AtLeast(version.MustParse("v20.1.0")) { + log.Fatal("`roachprod init` only supported for v20.1 and beyond") + } + + fmt.Printf("%s: initializing cluster\n", h.c.Name) + initOut, err := h.initializeCluster(firstNodeIdx) + if err != nil { + log.Fatalf("unable to initialize cluster: %v", err) + } + if initOut != "" { + fmt.Println(initOut) + } + + fmt.Printf("%s: setting cluster settings\n", h.c.Name) + clusterSettingsOut, err := h.setClusterSettings(firstNodeIdx) + if err != nil { + log.Fatalf("unable to set cluster settings: %v", err) + } + if clusterSettingsOut != "" { + fmt.Println(clusterSettingsOut) + } +} diff --git a/pkg/cmd/roachprod/install/cockroach.go b/pkg/cmd/roachprod/install/cockroach.go index 4070bd939342..d94ae3f114e1 100644 --- a/pkg/cmd/roachprod/install/cockroach.go +++ b/pkg/cmd/roachprod/install/cockroach.go @@ -31,6 +31,7 @@ import ( var StartOpts struct { Encrypt bool Sequential bool + SkipInit bool } // Cockroach TODO(peter): document @@ -119,7 +120,7 @@ func argExists(args []string, target string) int { // `start-single-node` (this was written to provide a short hand to start a // single node cluster with a replication factor of one). func (r Cockroach) Start(c *SyncedCluster, extraArgs []string) { - h := &crdbStartHelper{c: c, r: r} + h := &crdbInstallHelper{c: c, r: r} h.distributeCerts() nodes := c.ServerNodes() @@ -150,11 +151,17 @@ func (r Cockroach) Start(c *SyncedCluster, extraArgs []string) { // NB: The code blocks below are not parallelized, so it's safe for us // to use fmt.Printf style logging. - // 1. We don't init when invoking with `start-single-node`. - // 2. For nodes running <20.1, the --join flags are constructed in a manner - // such that the first node doesn't have any (see `generateStartArgs`), - // which prompts CRDB to auto-initialize. For nodes running >=20.1, we - // need to explicitly initialize. + // 1. We don't init invoked using `--skip-init`. + // 2. We don't init when invoking with `start-single-node`. + // 3. For nodes running <20.1, the --join flags are constructed in a + // manner such that the first node doesn't have any (see + // `generateStartArgs`),which prompts CRDB to auto-initialize. For + // nodes running >=20.1, we need to explicitly initialize. + + if StartOpts.SkipInit { + return nil, nil + } + shouldInit := !h.useStartSingleNode(vers) && vers.AtLeast(version.MustParse("v20.1.0")) if shouldInit { fmt.Printf("%s: initializing cluster\n", h.c.Name) @@ -188,6 +195,9 @@ func (r Cockroach) Start(c *SyncedCluster, extraArgs []string) { } } + // We're sure to set cluster settings after having initialized the + // cluster. + fmt.Printf("%s: setting cluster settings\n", h.c.Name) clusterSettingsOut, err := h.setClusterSettings(nodeIdx) if err != nil { @@ -316,12 +326,12 @@ func (r Cockroach) SQL(c *SyncedCluster, args []string) error { return nil } -type crdbStartHelper struct { +type crdbInstallHelper struct { c *SyncedCluster r Cockroach } -func (h *crdbStartHelper) startNode( +func (h *crdbInstallHelper) startNode( nodeIdx int, extraArgs []string, vers *version.Version, ) (string, error) { startCmd, err := h.generateStartCmd(nodeIdx, extraArgs, vers) @@ -343,7 +353,7 @@ func (h *crdbStartHelper) startNode( return strings.TrimSpace(string(out)), nil } -func (h *crdbStartHelper) generateStartCmd( +func (h *crdbInstallHelper) generateStartCmd( nodeIdx int, extraArgs []string, vers *version.Version, ) (string, error) { args, err := h.generateStartArgs(nodeIdx, extraArgs, vers) @@ -390,7 +400,7 @@ func (h *crdbStartHelper) generateStartCmd( return cmd, nil } -func (h *crdbStartHelper) generateStartArgs( +func (h *crdbInstallHelper) generateStartArgs( nodeIdx int, extraArgs []string, vers *version.Version, ) ([]string, error) { var args []string @@ -491,7 +501,7 @@ func (h *crdbStartHelper) generateStartArgs( return args, nil } -func (h *crdbStartHelper) initializeCluster(nodeIdx int) (string, error) { +func (h *crdbInstallHelper) initializeCluster(nodeIdx int) (string, error) { nodes := h.c.ServerNodes() initCmd := h.generateInitCmd(nodeIdx) @@ -508,7 +518,7 @@ func (h *crdbStartHelper) initializeCluster(nodeIdx int) (string, error) { return strings.TrimSpace(string(out)), nil } -func (h *crdbStartHelper) setClusterSettings(nodeIdx int) (string, error) { +func (h *crdbInstallHelper) setClusterSettings(nodeIdx int) (string, error) { nodes := h.c.ServerNodes() clusterSettingCmd := h.generateClusterSettingCmd(nodeIdx) @@ -525,7 +535,7 @@ func (h *crdbStartHelper) setClusterSettings(nodeIdx int) (string, error) { return strings.TrimSpace(string(out)), nil } -func (h *crdbStartHelper) generateClusterSettingCmd(nodeIdx int) string { +func (h *crdbInstallHelper) generateClusterSettingCmd(nodeIdx int) string { nodes := h.c.ServerNodes() license := envutil.EnvOrDefaultString("COCKROACH_DEV_LICENSE", "") if license == "" { @@ -553,7 +563,7 @@ func (h *crdbStartHelper) generateClusterSettingCmd(nodeIdx int) string { return clusterSettingCmd } -func (h *crdbStartHelper) generateInitCmd(nodeIdx int) string { +func (h *crdbInstallHelper) generateInitCmd(nodeIdx int) string { nodes := h.c.ServerNodes() var initCmd string @@ -572,7 +582,7 @@ func (h *crdbStartHelper) generateInitCmd(nodeIdx int) string { return initCmd } -func (h *crdbStartHelper) generateKeyCmd(nodeIdx int, extraArgs []string) string { +func (h *crdbInstallHelper) generateKeyCmd(nodeIdx int, extraArgs []string) string { if !StartOpts.Encrypt { return "" } @@ -594,13 +604,13 @@ func (h *crdbStartHelper) generateKeyCmd(nodeIdx int, extraArgs []string) string return keyCmd } -func (h *crdbStartHelper) useStartSingleNode(vers *version.Version) bool { +func (h *crdbInstallHelper) useStartSingleNode(vers *version.Version) bool { return len(h.c.VMs) == 1 && vers.AtLeast(version.MustParse("v19.2.0")) } // distributeCerts, like the name suggests, distributes certs if it's a secure // cluster and we're starting n1. -func (h *crdbStartHelper) distributeCerts() { +func (h *crdbInstallHelper) distributeCerts() { for _, node := range h.c.ServerNodes() { if node == 1 && h.c.Secure { h.c.DistributeCerts() @@ -609,7 +619,7 @@ func (h *crdbStartHelper) distributeCerts() { } } -func (h *crdbStartHelper) shouldAdvertisePublicIP() bool { +func (h *crdbInstallHelper) shouldAdvertisePublicIP() bool { // If we're creating nodes that span VPC (e.g. AWS multi-region or // multi-cloud), we'll tell the nodes to advertise their public IPs // so that attaching nodes to the cluster Just Works. @@ -621,7 +631,7 @@ func (h *crdbStartHelper) shouldAdvertisePublicIP() bool { return false } -func (h *crdbStartHelper) getEnvVars() string { +func (h *crdbInstallHelper) getEnvVars() string { var buf strings.Builder for _, v := range os.Environ() { if strings.HasPrefix(v, "COCKROACH_") { @@ -640,7 +650,7 @@ func (h *crdbStartHelper) getEnvVars() string { return buf.String() } -func (h *crdbStartHelper) run(nodeIdx int, cmd string) (string, error) { +func (h *crdbInstallHelper) run(nodeIdx int, cmd string) (string, error) { nodes := h.c.ServerNodes() sess, err := h.c.newSession(nodes[nodeIdx]) diff --git a/pkg/cmd/roachprod/main.go b/pkg/cmd/roachprod/main.go index 11715df45c15..e4b73ee178ba 100644 --- a/pkg/cmd/roachprod/main.go +++ b/pkg/cmd/roachprod/main.go @@ -89,6 +89,7 @@ var ( adminurlIPs = false useTreeDist = true encrypt = false + skipInit = false quiet = false sig = 9 waitFlag = false @@ -984,7 +985,9 @@ environment variables to the cockroach process. ` + tagHelp + ` The "start" command takes care of setting up the --join address and specifying reasonable defaults for other flags. One side-effect of this convenience is -that node 1 is special and must be started for the cluster to be initialized. +that node 1 is special and if started, is used to auto-initialize the cluster. +The --skip-init flag can be used to avoid auto-initialization (which can then +separately be done using the "init" command). If the COCKROACH_DEV_LICENSE environment variable is set the enterprise.license cluster setting will be set to its value. @@ -1035,6 +1038,31 @@ other signals. }), } +var initCmd = &cobra.Command{ + Use: "init ", + Short: "initialize the cluster", + Long: `Initialize the cluster. + +The "init" command bootstraps the cluster (using "cockroach init"). It also sets +default cluster settings. It's intended to be used in conjunction with +'roachprod start --skip-init'. +`, + Args: cobra.ExactArgs(1), + Run: wrap(func(cmd *cobra.Command, args []string) error { + clusterName, err := verifyClusterName(args[0]) + if err != nil { + return err + } + + c, err := newCluster(clusterName) + if err != nil { + return err + } + c.Init() + return nil + }), +} + var statusCmd = &cobra.Command{ Use: "status ", Short: "retrieve the status of nodes in a cluster", @@ -1558,6 +1586,7 @@ func main() { monitorCmd, startCmd, stopCmd, + initCmd, runCmd, wipeCmd, reformatCmd, @@ -1754,6 +1783,8 @@ func main() { &clusterType, "type", "t", clusterType, `cluster type ("cockroach" or "cassandra")`) cmd.Flags().BoolVar( &install.StartOpts.Encrypt, "encrypt", encrypt, "start nodes with encryption at rest turned on") + cmd.Flags().BoolVar( + &install.StartOpts.SkipInit, "skip-init", skipInit, "skip initializing the cluster") fallthrough case sqlCmd: cmd.Flags().StringVarP(