Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
52844: kvserver,rangefeed: ensure that iterators are only constructed under tasks r=andreimatei a=ajwerner


Prior to this change, it was possible for a rangefeed request to be issued
concurrently with shutting down which could lead to an iterator being
constructed after the engine has been closed.

Touches #51544

Release note: None

52996: partialidx: prove implication for comparisons with two variables r=RaduBerinde a=mgartner

This commit adds support for proving partial index predicates are
implied by query filters when they contain comparison expressions with
two variables and they are not identical expressions.

Below are some examples where the left expression implies (=>) the right
expression. The right is guaranteed to contain the left despite both
expressions having no constant values.

    a > b  =>  a >= b
    a = b  =>  a >= b
    b < a  =>  a >= b
    a > b  =>  a != b

Release note: None

53113: roachprod: introduce --skip-init to `roachprod start` r=irfansharif a=irfansharif

..and `roachprod init`. I attempted to originally introduce this flag in
\#51329, and ultimately abandoned it. I still think it's a good idea to
have such a thing, especially given now we're writing integration tests
that want to control `init` behaviour. It's much easier to write them
with this --skip-init flag than it is to work around roachprod's magical
auto-init behavior.

To do what's skipped when using --skip-init, we introduce a `roachprod
init` sub command.

Release note: None

Co-authored-by: Andrew Werner <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: irfan sharif <[email protected]>
  • Loading branch information
4 people committed Aug 20, 2020
4 parents ab8a6bd + 5cbf775 + 66c5f48 + b0fe435 commit 545c4fe
Show file tree
Hide file tree
Showing 10 changed files with 453 additions and 125 deletions.
41 changes: 41 additions & 0 deletions pkg/cmd/roachprod/install/cluster_synced.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
clog "github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/version"
"github.com/cockroachdb/errors"
crdberrors "github.com/cockroachdb/errors"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -1736,3 +1737,43 @@ func (c *SyncedCluster) Parallel(
func (c *SyncedCluster) escapedTag() string {
return strings.Replace(c.Tag, "/", "\\/", -1)
}

// Init initializes the cluster. It does it through node 1 (as per ServerNodes)
// to maintain parity with auto-init behavior of `roachprod start` (when
// --skip-init) is not specified. The implementation should be kept in
// sync with Cockroach.Start.
func (c *SyncedCluster) Init() {
r := c.Impl.(Cockroach)
h := &crdbInstallHelper{c: c, r: r}

// See (Cockroach).Start. We reserve a few special operations for the first
// node, so we strive to maintain the same here for interoperability.
const firstNodeIdx = 0

vers, err := getCockroachVersion(c, c.ServerNodes()[firstNodeIdx])
if err != nil {
log.Fatalf("unable to retrieve cockroach version: %v", err)
}

if !vers.AtLeast(version.MustParse("v20.1.0")) {
log.Fatal("`roachprod init` only supported for v20.1 and beyond")
}

fmt.Printf("%s: initializing cluster\n", h.c.Name)
initOut, err := h.initializeCluster(firstNodeIdx)
if err != nil {
log.Fatalf("unable to initialize cluster: %v", err)
}
if initOut != "" {
fmt.Println(initOut)
}

fmt.Printf("%s: setting cluster settings\n", h.c.Name)
clusterSettingsOut, err := h.setClusterSettings(firstNodeIdx)
if err != nil {
log.Fatalf("unable to set cluster settings: %v", err)
}
if clusterSettingsOut != "" {
fmt.Println(clusterSettingsOut)
}
}
50 changes: 30 additions & 20 deletions pkg/cmd/roachprod/install/cockroach.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
var StartOpts struct {
Encrypt bool
Sequential bool
SkipInit bool
}

// Cockroach TODO(peter): document
Expand Down Expand Up @@ -119,7 +120,7 @@ func argExists(args []string, target string) int {
// `start-single-node` (this was written to provide a short hand to start a
// single node cluster with a replication factor of one).
func (r Cockroach) Start(c *SyncedCluster, extraArgs []string) {
h := &crdbStartHelper{c: c, r: r}
h := &crdbInstallHelper{c: c, r: r}
h.distributeCerts()

nodes := c.ServerNodes()
Expand Down Expand Up @@ -150,11 +151,17 @@ func (r Cockroach) Start(c *SyncedCluster, extraArgs []string) {
// NB: The code blocks below are not parallelized, so it's safe for us
// to use fmt.Printf style logging.

// 1. We don't init when invoking with `start-single-node`.
// 2. For nodes running <20.1, the --join flags are constructed in a manner
// such that the first node doesn't have any (see `generateStartArgs`),
// which prompts CRDB to auto-initialize. For nodes running >=20.1, we
// need to explicitly initialize.
// 1. We don't init invoked using `--skip-init`.
// 2. We don't init when invoking with `start-single-node`.
// 3. For nodes running <20.1, the --join flags are constructed in a
// manner such that the first node doesn't have any (see
// `generateStartArgs`),which prompts CRDB to auto-initialize. For
// nodes running >=20.1, we need to explicitly initialize.

if StartOpts.SkipInit {
return nil, nil
}

shouldInit := !h.useStartSingleNode(vers) && vers.AtLeast(version.MustParse("v20.1.0"))
if shouldInit {
fmt.Printf("%s: initializing cluster\n", h.c.Name)
Expand Down Expand Up @@ -188,6 +195,9 @@ func (r Cockroach) Start(c *SyncedCluster, extraArgs []string) {
}
}

// We're sure to set cluster settings after having initialized the
// cluster.

fmt.Printf("%s: setting cluster settings\n", h.c.Name)
clusterSettingsOut, err := h.setClusterSettings(nodeIdx)
if err != nil {
Expand Down Expand Up @@ -316,12 +326,12 @@ func (r Cockroach) SQL(c *SyncedCluster, args []string) error {
return nil
}

type crdbStartHelper struct {
type crdbInstallHelper struct {
c *SyncedCluster
r Cockroach
}

func (h *crdbStartHelper) startNode(
func (h *crdbInstallHelper) startNode(
nodeIdx int, extraArgs []string, vers *version.Version,
) (string, error) {
startCmd, err := h.generateStartCmd(nodeIdx, extraArgs, vers)
Expand All @@ -343,7 +353,7 @@ func (h *crdbStartHelper) startNode(
return strings.TrimSpace(string(out)), nil
}

func (h *crdbStartHelper) generateStartCmd(
func (h *crdbInstallHelper) generateStartCmd(
nodeIdx int, extraArgs []string, vers *version.Version,
) (string, error) {
args, err := h.generateStartArgs(nodeIdx, extraArgs, vers)
Expand Down Expand Up @@ -390,7 +400,7 @@ func (h *crdbStartHelper) generateStartCmd(
return cmd, nil
}

func (h *crdbStartHelper) generateStartArgs(
func (h *crdbInstallHelper) generateStartArgs(
nodeIdx int, extraArgs []string, vers *version.Version,
) ([]string, error) {
var args []string
Expand Down Expand Up @@ -491,7 +501,7 @@ func (h *crdbStartHelper) generateStartArgs(
return args, nil
}

func (h *crdbStartHelper) initializeCluster(nodeIdx int) (string, error) {
func (h *crdbInstallHelper) initializeCluster(nodeIdx int) (string, error) {
nodes := h.c.ServerNodes()
initCmd := h.generateInitCmd(nodeIdx)

Expand All @@ -508,7 +518,7 @@ func (h *crdbStartHelper) initializeCluster(nodeIdx int) (string, error) {
return strings.TrimSpace(string(out)), nil
}

func (h *crdbStartHelper) setClusterSettings(nodeIdx int) (string, error) {
func (h *crdbInstallHelper) setClusterSettings(nodeIdx int) (string, error) {
nodes := h.c.ServerNodes()
clusterSettingCmd := h.generateClusterSettingCmd(nodeIdx)

Expand All @@ -525,7 +535,7 @@ func (h *crdbStartHelper) setClusterSettings(nodeIdx int) (string, error) {
return strings.TrimSpace(string(out)), nil
}

func (h *crdbStartHelper) generateClusterSettingCmd(nodeIdx int) string {
func (h *crdbInstallHelper) generateClusterSettingCmd(nodeIdx int) string {
nodes := h.c.ServerNodes()
license := envutil.EnvOrDefaultString("COCKROACH_DEV_LICENSE", "")
if license == "" {
Expand Down Expand Up @@ -553,7 +563,7 @@ func (h *crdbStartHelper) generateClusterSettingCmd(nodeIdx int) string {
return clusterSettingCmd
}

func (h *crdbStartHelper) generateInitCmd(nodeIdx int) string {
func (h *crdbInstallHelper) generateInitCmd(nodeIdx int) string {
nodes := h.c.ServerNodes()

var initCmd string
Expand All @@ -572,7 +582,7 @@ func (h *crdbStartHelper) generateInitCmd(nodeIdx int) string {
return initCmd
}

func (h *crdbStartHelper) generateKeyCmd(nodeIdx int, extraArgs []string) string {
func (h *crdbInstallHelper) generateKeyCmd(nodeIdx int, extraArgs []string) string {
if !StartOpts.Encrypt {
return ""
}
Expand All @@ -594,13 +604,13 @@ func (h *crdbStartHelper) generateKeyCmd(nodeIdx int, extraArgs []string) string
return keyCmd
}

func (h *crdbStartHelper) useStartSingleNode(vers *version.Version) bool {
func (h *crdbInstallHelper) useStartSingleNode(vers *version.Version) bool {
return len(h.c.VMs) == 1 && vers.AtLeast(version.MustParse("v19.2.0"))
}

// distributeCerts, like the name suggests, distributes certs if it's a secure
// cluster and we're starting n1.
func (h *crdbStartHelper) distributeCerts() {
func (h *crdbInstallHelper) distributeCerts() {
for _, node := range h.c.ServerNodes() {
if node == 1 && h.c.Secure {
h.c.DistributeCerts()
Expand All @@ -609,7 +619,7 @@ func (h *crdbStartHelper) distributeCerts() {
}
}

func (h *crdbStartHelper) shouldAdvertisePublicIP() bool {
func (h *crdbInstallHelper) shouldAdvertisePublicIP() bool {
// If we're creating nodes that span VPC (e.g. AWS multi-region or
// multi-cloud), we'll tell the nodes to advertise their public IPs
// so that attaching nodes to the cluster Just Works.
Expand All @@ -621,7 +631,7 @@ func (h *crdbStartHelper) shouldAdvertisePublicIP() bool {
return false
}

func (h *crdbStartHelper) getEnvVars() string {
func (h *crdbInstallHelper) getEnvVars() string {
var buf strings.Builder
for _, v := range os.Environ() {
if strings.HasPrefix(v, "COCKROACH_") {
Expand All @@ -640,7 +650,7 @@ func (h *crdbStartHelper) getEnvVars() string {
return buf.String()
}

func (h *crdbStartHelper) run(nodeIdx int, cmd string) (string, error) {
func (h *crdbInstallHelper) run(nodeIdx int, cmd string) (string, error) {
nodes := h.c.ServerNodes()

sess, err := h.c.newSession(nodes[nodeIdx])
Expand Down
33 changes: 32 additions & 1 deletion pkg/cmd/roachprod/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ var (
adminurlIPs = false
useTreeDist = true
encrypt = false
skipInit = false
quiet = false
sig = 9
waitFlag = false
Expand Down Expand Up @@ -984,7 +985,9 @@ environment variables to the cockroach process.
` + tagHelp + `
The "start" command takes care of setting up the --join address and specifying
reasonable defaults for other flags. One side-effect of this convenience is
that node 1 is special and must be started for the cluster to be initialized.
that node 1 is special and if started, is used to auto-initialize the cluster.
The --skip-init flag can be used to avoid auto-initialization (which can then
separately be done using the "init" command).
If the COCKROACH_DEV_LICENSE environment variable is set the enterprise.license
cluster setting will be set to its value.
Expand Down Expand Up @@ -1035,6 +1038,31 @@ other signals.
}),
}

var initCmd = &cobra.Command{
Use: "init <cluster>",
Short: "initialize the cluster",
Long: `Initialize the cluster.
The "init" command bootstraps the cluster (using "cockroach init"). It also sets
default cluster settings. It's intended to be used in conjunction with
'roachprod start --skip-init'.
`,
Args: cobra.ExactArgs(1),
Run: wrap(func(cmd *cobra.Command, args []string) error {
clusterName, err := verifyClusterName(args[0])
if err != nil {
return err
}

c, err := newCluster(clusterName)
if err != nil {
return err
}
c.Init()
return nil
}),
}

var statusCmd = &cobra.Command{
Use: "status <cluster>",
Short: "retrieve the status of nodes in a cluster",
Expand Down Expand Up @@ -1558,6 +1586,7 @@ func main() {
monitorCmd,
startCmd,
stopCmd,
initCmd,
runCmd,
wipeCmd,
reformatCmd,
Expand Down Expand Up @@ -1754,6 +1783,8 @@ func main() {
&clusterType, "type", "t", clusterType, `cluster type ("cockroach" or "cassandra")`)
cmd.Flags().BoolVar(
&install.StartOpts.Encrypt, "encrypt", encrypt, "start nodes with encryption at rest turned on")
cmd.Flags().BoolVar(
&install.StartOpts.SkipInit, "skip-init", skipInit, "skip initializing the cluster")
fallthrough
case sqlCmd:
cmd.Flags().StringVarP(
Expand Down
20 changes: 11 additions & 9 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ func NewProcessor(cfg Config) *Processor {
}
}

// IteratorConstructor is used to construct an iterator. It should be called
// from underneath a stopper task to ensure that the engine has not been closed.
type IteratorConstructor func() storage.SimpleIterator

// Start launches a goroutine to process rangefeed events and send them to
// registrations.
//
Expand All @@ -167,10 +171,10 @@ func NewProcessor(cfg Config) *Processor {
// calling its Close method when it is finished. If the iterator is nil then
// no initialization scan will be performed and the resolved timestamp will
// immediately be considered initialized.
func (p *Processor) Start(stopper *stop.Stopper, rtsIter storage.SimpleIterator) {
func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor) {
ctx := p.AnnotateCtx(context.Background())
if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) {
p.run(ctx, rtsIter, stopper)
p.run(ctx, rtsIterFunc, stopper)
}); err != nil {
pErr := roachpb.NewError(err)
p.reg.DisconnectWithErr(all, pErr)
Expand All @@ -180,15 +184,16 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIter storage.SimpleIterator)

// run is called from Start and runs the rangefeed.
func (p *Processor) run(
ctx context.Context, rtsIter storage.SimpleIterator, stopper *stop.Stopper,
ctx context.Context, rtsIterFunc IteratorConstructor, stopper *stop.Stopper,
) {
defer close(p.stoppedC)
ctx, cancelOutputLoops := context.WithCancel(ctx)
defer cancelOutputLoops()

// Launch an async task to scan over the resolved timestamp iterator and
// initialize the unresolvedIntentQueue. Ignore error if quiescing.
if rtsIter != nil {
if rtsIterFunc != nil {
rtsIter := rtsIterFunc()
initScan := newInitResolvedTSScan(p, rtsIter)
err := stopper.RunAsyncTask(ctx, "rangefeed: init resolved ts", initScan.Run)
if err != nil {
Expand Down Expand Up @@ -239,9 +244,6 @@ func (p *Processor) run(
}
}
if err := stopper.RunAsyncTask(ctx, "rangefeed: output loop", runOutputLoop); err != nil {
if r.catchupIter != nil {
r.catchupIter.Close() // clean up
}
r.disconnect(roachpb.NewError(err))
p.reg.Unregister(&r)
}
Expand Down Expand Up @@ -368,7 +370,7 @@ func (p *Processor) sendStop(pErr *roachpb.Error) {
func (p *Processor) Register(
span roachpb.RSpan,
startTS hlc.Timestamp,
catchupIter storage.SimpleIterator,
catchupIterConstructor IteratorConstructor,
withDiff bool,
stream Stream,
errC chan<- *roachpb.Error,
Expand All @@ -379,7 +381,7 @@ func (p *Processor) Register(
p.syncEventC()

r := newRegistration(
span.AsRawSpanWithNoLocals(), startTS, catchupIter, withDiff,
span.AsRawSpanWithNoLocals(), startTS, catchupIterConstructor, withDiff,
p.Config.EventChanCap, p.Metrics, stream, errC,
)
select {
Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,17 @@ func newTestProcessorWithTxnPusher(
EventChanCap: testProcessorEventCCap,
CheckStreamsInterval: 10 * time.Millisecond,
})
p.Start(stopper, rtsIter)
p.Start(stopper, makeIteratorConstructor(rtsIter))
return p, stopper
}

func makeIteratorConstructor(rtsIter storage.SimpleIterator) IteratorConstructor {
if rtsIter == nil {
return nil
}
return func() storage.SimpleIterator { return rtsIter }
}

func newTestProcessor(rtsIter storage.SimpleIterator) (*Processor, *stop.Stopper) {
return newTestProcessorWithTxnPusher(rtsIter, nil /* pusher */)
}
Expand Down
Loading

0 comments on commit 545c4fe

Please sign in to comment.