Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…db#79334

79296: kvserver: log when acquiring lease as a draining node r=aayushshah15 a=aayushshah15

Generally, draining nodes are not allowed to acquire leases. Additionally,
replicas that are not Raft leaders are not allowed to acquire leases. This
means that draining nodes have to be permitted to acquire leases for replicas
that they are the Raft leaders for (otherwise, we risk a deadlock since no
replicas will be allowed to acquire the lease).

This commit adds a verbose log event when a draining store acquires a lease
because it is the Raft leader.

Release note: none


79300: roachtest: fix drain test setup r=ZhouXing19 a=rafiss

fixes cockroachdb#79287
fixes cockroachdb#79285

Due to a recent change, these SET CLUSTER SETTING statements can no
longer all be run in one batch statement -- since that causes them all
to run in the same implicit transaction.

Release note: None

79325: changefeedccl: remove the default values from SHOW CHANGEFEED JOB output r=sherman-grewal a=sherman-grewal

changefeedccl: remove the default values from SHOW
CHANGEFEED JOB output

Resolves cockroachdb#78420 

Currently, when a user alters a changefeed, we
include the default options in the SHOW CHANGEFEED
JOB output. In this PR we prevent the default values
from being displayed.

Release note (enterprise change): Remove the default
values from the SHOW CHANGEFEED JOB output

Jira issue: CRDB-14694

79334: dev: have `doctor` recommend running `bazel clean --expunge` once r=mari-crl a=rickystewart

It is sometimes necessary to `bazel clean --expunge` once you've set up
your machine, but we don't want to suggest this more than once. So we
update the code to make sure that we only print that message if the
doctor status file is newly created.

Closes cockroachdb#74746.

Release note: None

Jira issue: CRDB-14713

Co-authored-by: Aayush Shah <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: Sherman Grewal <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
  • Loading branch information
5 people committed Apr 4, 2022
5 parents bb2c29c + 5ffaa9e + 41a1af7 + 21b8aa4 + 4f9d05e commit 2108206
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 46 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/flowinfra",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/pgwire/pgnotice",
Expand Down
47 changes: 30 additions & 17 deletions pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -78,7 +79,11 @@ func alterChangefeedPlanHook(

newChangefeedStmt := &tree.CreateChangefeed{}

newOptions, newSinkURI, err := generateNewOpts(ctx, p, alterChangefeedStmt.Cmds, prevDetails)
prevOpts, err := getPrevOpts(job.Payload().Description, prevDetails.Opts)
if err != nil {
return err
}
newOptions, newSinkURI, err := generateNewOpts(ctx, p, alterChangefeedStmt.Cmds, prevOpts, prevDetails.SinkURI)
if err != nil {
return err
}
Expand Down Expand Up @@ -191,22 +196,11 @@ func generateNewOpts(
ctx context.Context,
p sql.PlanHookState,
alterCmds tree.AlterChangefeedCmds,
details jobspb.ChangefeedDetails,
prevOpts map[string]string,
prevSinkURI string,
) (map[string]string, string, error) {
sinkURI := details.SinkURI
newOptions := make(map[string]string, len(details.Opts))

// pull the options that are set for the existing changefeed.
for key, value := range details.Opts {
// There are some options (e.g. topics) that we set during the creation of
// a changefeed, but we do not allow these options to be set by the user.
// Hence, we can not include these options in our new CREATE CHANGEFEED
// statement.
if _, ok := changefeedbase.ChangefeedOptionExpectValues[key]; !ok {
continue
}
newOptions[key] = value
}
sinkURI := prevSinkURI
newOptions := prevOpts

for _, cmd := range alterCmds {
switch v := cmd.(type) {
Expand All @@ -231,7 +225,7 @@ func generateNewOpts(
return nil, ``, err
}

prevSinkURI, err := url.Parse(details.SinkURI)
prevSinkURI, err := url.Parse(sinkURI)
if err != nil {
return nil, ``, err
}
Expand Down Expand Up @@ -652,3 +646,22 @@ func fetchSpansForDescs(

return spans, err
}

func getPrevOpts(prevDescription string, opts map[string]string) (map[string]string, error) {
prevStmt, err := parser.ParseOne(prevDescription)
if err != nil {
return nil, err
}

prevChangefeedStmt, ok := prevStmt.AST.(*tree.CreateChangefeed)
if !ok {
return nil, errors.Errorf(`could not parse job description`)
}

prevOpts := make(map[string]string, len(prevChangefeedStmt.Options))
for _, opt := range prevChangefeedStmt.Options {
prevOpts[opt.Key.String()] = opts[opt.Key.String()]
}

return prevOpts, nil
}
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/show_changefeed_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func TestShowChangefeedJobsAlterChangefeed(t *testing.T) {
out = obtainJobRowFn()

require.Equal(t, jobID, out.id, "Expected id:%d but found id:%d", jobID, out.id)
require.Equal(t, "CREATE CHANGEFEED FOR TABLE bar INTO 'kafka://does.not.matter/' WITH envelope = 'wrapped', format = 'json', on_error = 'fail', schema_change_events = 'default', schema_change_policy = 'backfill', virtual_columns = 'omitted'", out.description, "Expected description:%s but found description:%s", "CREATE CHANGEFEED FOR TABLE bar INTO 'kafka://does.not.matter/'", out.description)
require.Equal(t, "CREATE CHANGEFEED FOR TABLE bar INTO 'kafka://does.not.matter/'", out.description, "Expected description:%s but found description:%s", "CREATE CHANGEFEED FOR TABLE bar INTO 'kafka://does.not.matter/'", out.description)
require.Equal(t, sinkURI, out.SinkURI, "Expected sinkUri:%s but found sinkUri:%s", sinkURI, out.SinkURI)
require.Equal(t, "bar", out.topics, "Expected topics:%s but found topics:%s", "bar", sortedTopics)
require.Equal(t, "{d.public.bar}", string(out.FullTableNames), "Expected fullTableNames:%s but found fullTableNames:%s", "{d.public.bar}", string(out.FullTableNames))
Expand All @@ -433,7 +433,7 @@ func TestShowChangefeedJobsAlterChangefeed(t *testing.T) {
out = obtainJobRowFn()

require.Equal(t, jobID, out.id, "Expected id:%d but found id:%d", jobID, out.id)
require.Equal(t, "CREATE CHANGEFEED FOR TABLE bar INTO 'kafka://does.not.matter/' WITH envelope = 'wrapped', format = 'json', on_error = 'fail', resolved = '5s', schema_change_events = 'default', schema_change_policy = 'backfill', virtual_columns = 'omitted'", out.description, "Expected description:%s but found description:%s", "CREATE CHANGEFEED FOR TABLE bar INTO 'kafka://does.not.matter/'", out.description)
require.Equal(t, "CREATE CHANGEFEED FOR TABLE bar INTO 'kafka://does.not.matter/' WITH resolved = '5s'", out.description, "Expected description:%s but found description:%s", "CREATE CHANGEFEED FOR TABLE bar INTO 'kafka://does.not.matter/ WITH resolved = '5s''", out.description)
require.Equal(t, sinkURI, out.SinkURI, "Expected sinkUri:%s but found sinkUri:%s", sinkURI, out.SinkURI)
require.Equal(t, "bar", out.topics, "Expected topics:%s but found topics:%s", "bar", sortedTopics)
require.Equal(t, "{d.public.bar}", string(out.FullTableNames), "Expected fullTableNames:%s but found fullTableNames:%s", "{d.public.bar}", string(out.FullTableNames))
Expand Down
39 changes: 29 additions & 10 deletions pkg/cmd/dev/doctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,35 +37,54 @@ const (
noCacheFlag = "no-cache"
)

func (d *dev) checkDoctorStatus(ctx context.Context) error {
if d.knobs.skipDoctorCheck {
return nil
}

// getDoctorStatus returns the current doctor status number. This function only
// returns an error in exceptional situations -- if the status file does not
// already exist (as would be the case for a clean checkout), this function
// simply returns 0, nil.
func (d *dev) getDoctorStatus(ctx context.Context) (int, error) {
dir, err := d.getWorkspace(ctx)
if err != nil {
return err
return -1, err
}
statusFile := filepath.Join(dir, doctorStatusFile)
content, err := ioutil.ReadFile(statusFile)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
content = []byte("0")
} else {
return err
return -1, err
}
}
status, err := strconv.Atoi(strings.TrimSpace(string(content)))
return strconv.Atoi(strings.TrimSpace(string(content)))
}

// checkDoctorStatus returns an error iff the current doctor status is not the
// latest.
func (d *dev) checkDoctorStatus(ctx context.Context) error {
if d.knobs.skipDoctorCheck {
return nil
}

status, err := d.getDoctorStatus(ctx)
if err != nil {
return err
}

if status < doctorStatusVersion {
return errors.New("please run `dev doctor` to refresh dev status, then try again")
}
return nil
}

func (d *dev) writeDoctorStatus(ctx context.Context, ex *exec.Exec) error {
func (d *dev) writeDoctorStatus(ctx context.Context) error {
prevStatus, err := d.getDoctorStatus(ctx)
if err != nil {
return err
}
if prevStatus <= 0 {
// In this case recommend the user `bazel clean --expunge`.
log.Println("It is recommended to run `bazel clean --expunge` to avoid any spurious failures now that your machine is set up. (You only have to do this once.)")
}
dir, err := d.getWorkspace(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -264,7 +283,7 @@ slightly slower and introduce a noticeable delay in first-time build setup.`
return errors.New("please address the errors described above and try again")
}

if err := d.writeDoctorStatus(ctx, d.exec); err != nil {
if err := d.writeDoctorStatus(ctx); err != nil {
return err
}
log.Println("You are ready to build :)")
Expand Down
19 changes: 9 additions & 10 deletions pkg/cmd/roachtest/tests/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,13 @@ func prepareCluster(
db := c.Conn(ctx, t.L(), 1)
defer db.Close()

waitPhasesSettingStmt := fmt.Sprintf(
"SET CLUSTER SETTING server.shutdown.drain_wait = '%fs'; "+
"SET CLUSTER SETTING server.shutdown.connection_wait = '%fs';"+
"SET CLUSTER SETTING server.shutdown.query_wait = '%fs';",
drainWait.Seconds(),
connectionWait.Seconds(),
queryWait.Seconds(),
)
_, err = db.ExecContext(ctx, waitPhasesSettingStmt)
require.NoError(t, err, "cannot set cluster setting")
waitPhasesSettingStmts := []string{
fmt.Sprintf("SET CLUSTER SETTING server.shutdown.drain_wait = '%fs';", drainWait.Seconds()),
fmt.Sprintf("SET CLUSTER SETTING server.shutdown.query_wait = '%fs'", queryWait.Seconds()),
fmt.Sprintf("SET CLUSTER SETTING server.shutdown.connection_wait = '%fs'", connectionWait.Seconds()),
}
for _, stmt := range waitPhasesSettingStmts {
_, err = db.ExecContext(ctx, stmt)
require.NoError(t, err, "cannot set cluster setting")
}
}
25 changes: 18 additions & 7 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,13 +772,24 @@ func (r *Replica) requestLeaseLocked(
// trying to move leases away elsewhere). But if we're the leader, we don't
// really have a choice and we take the lease - there might not be any other
// replica available to take this lease (perhaps they're all draining).
if r.store.IsDraining() && (r.raftBasicStatusRLocked().RaftState != raft.StateLeader) {
// TODO(andrei): If we start refusing to take leases on followers elsewhere,
// this code can go away.
log.VEventf(ctx, 2, "refusing to take the lease because we're draining")
return r.mu.pendingLeaseRequest.newResolvedHandle(roachpb.NewError(
newNotLeaseHolderError(roachpb.Lease{}, r.store.StoreID(), r.mu.state.Desc,
"refusing to take the lease; node is draining")))
if r.store.IsDraining() {
// NB: Replicas that are not the Raft leader will not take leases anyway
// (see the check inside propBuf.FlushLockedWithRaftGroup()), so we don't
// really need any special behavior for draining nodes here. This check
// serves mostly as a means to get more granular logging and as a defensive
// precaution.
if r.raftBasicStatusRLocked().RaftState != raft.StateLeader {
log.VEventf(ctx, 2, "refusing to take the lease because we're draining")
return r.mu.pendingLeaseRequest.newResolvedHandle(
roachpb.NewError(
newNotLeaseHolderError(
roachpb.Lease{}, r.store.StoreID(), r.mu.state.Desc,
"refusing to take the lease; node is draining",
),
),
)
}
log.Info(ctx, "trying to take the lease while we're draining since we're the raft leader")
}

// Propose a Raft command to get a lease for this replica.
Expand Down

0 comments on commit 2108206

Please sign in to comment.