Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
69501: liveness: make CreateLivenessRecord retry on a TransactionError r=lunevalex a=lunevalex

Fixes #50977

CreateLivenessRecord has no fault tolerance built in, which makes
the server startup facility flaky. The startup procesure allocates a
nodeId, storeId and then creates a liveness record. If there
is a hiccup or brief period of unavailability during the liveness
creation we end up cycing through node/store ids which is undesirable.
This made the TestRemoveDeadReplicas test flaky because nodes
ended up cycling through a few node ids on startup, making it no possible
to match epected node ids with the actual ones. To fix this issue
we retry liveness creation when it encounters an AmbigiousError
or TransactionError. This is similar behavior to the update code
path for the liveness record.

Release note: None

71736: roachtest: fix schemachange/during/kv r=ajwerner a=otan

Resolves #71608

Some more fallout from the PRIMARY KEY rename.

Release note: None

71762: ui: update url params when switching tabs on sql activity r=maryliag a=maryliag

When the user clicks on a tab on the new SQL Activity page
now the search params on the url is updated with the tab
selection.
This commit also create a common function to sync history
so all the pages can use it and remove duplicate code.

Release note: None

Co-authored-by: Alex Lunev <[email protected]>
Co-authored-by: Oliver Tan <[email protected]>
Co-authored-by: Marylia Gutierrez <[email protected]>
  • Loading branch information
4 people committed Oct 20, 2021
4 parents 736e65f + 3505f62 + 4e2384e + e573415 commit 830283b
Show file tree
Hide file tree
Showing 13 changed files with 276 additions and 184 deletions.
1 change: 0 additions & 1 deletion pkg/cli/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ func TestOpenReadOnlyStore(t *testing.T) {

func TestRemoveDeadReplicas(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 50977, "flaky test")
defer log.Scope(t).Close(t)

// This test is pretty slow under race (200+ cpu-seconds) because it
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/schemachange.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func waitForSchemaChanges(ctx context.Context, l *logger.Logger, db *gosql.DB) e
}
// Queries to hone in on index validation problems.
indexValidationQueries := []string{
"SELECT count(k) FROM test.kv@primary AS OF SYSTEM TIME %s WHERE created_at > $1 AND created_at <= $2",
"SELECT count(k) FROM test.kv@kv_pkey AS OF SYSTEM TIME %s WHERE created_at > $1 AND created_at <= $2",
"SELECT count(v) FROM test.kv@foo AS OF SYSTEM TIME %s WHERE created_at > $1 AND created_at <= $2",
}
return runValidationQueries(ctx, l, db, start, validationQueries, indexValidationQueries)
Expand Down
101 changes: 57 additions & 44 deletions pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,22 @@ func (e *errRetryLiveness) Error() string {
return fmt.Sprintf("%T: %s", *e, e.error)
}

func isErrRetryLiveness(ctx context.Context, err error) bool {
if errors.HasType(err, (*roachpb.AmbiguousResultError)(nil)) {
// We generally want to retry ambiguous errors immediately, except if the
// ctx is canceled - in which case the ambiguous error is probably caused
// by the cancellation (and in any case it's pointless to retry with a
// canceled ctx).
return ctx.Err() == nil
} else if errors.HasType(err, (*roachpb.TransactionStatusError)(nil)) {
// 21.2 nodes can return a TransactionStatusError when they should have
// returned an AmbiguousResultError.
// TODO(andrei): Remove this in 22.2.
return true
}
return false
}

// Node liveness metrics counter names.
var (
metaLiveNodes = metric.Metadata{
Expand Down Expand Up @@ -527,41 +543,50 @@ type livenessUpdate struct {
// NB: An existing liveness record is not overwritten by this method, we return
// an error instead.
func (nl *NodeLiveness) CreateLivenessRecord(ctx context.Context, nodeID roachpb.NodeID) error {
// We start off at epoch=0, entrusting the initial heartbeat to increment it
// to epoch=1 to signal the very first time the node is up and running.
liveness := livenesspb.Liveness{NodeID: nodeID, Epoch: 0}

// We skip adding an expiration, we only really care about the liveness
// record existing within KV.
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
// We start off at epoch=0, entrusting the initial heartbeat to increment it
// to epoch=1 to signal the very first time the node is up and running.
liveness := livenesspb.Liveness{NodeID: nodeID, Epoch: 0}

// We skip adding an expiration, we only really care about the liveness
// record existing within KV.

v := new(roachpb.Value)
err := nl.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
b := txn.NewBatch()
key := keys.NodeLivenessKey(nodeID)
if err := v.SetProto(&liveness); err != nil {
log.Fatalf(ctx, "failed to marshall proto: %s", err)
}
// Given we're looking to create a new liveness record here, we don't
// expect to find anything.
b.CPut(key, v, nil)

// We don't bother adding a gossip trigger, that'll happen with the
// first heartbeat. We still keep it as a 1PC commit to avoid leaving
// write intents.
b.AddRawRequest(&roachpb.EndTxnRequest{
Commit: true,
Require1PC: true,
})
return txn.Run(ctx, b)
})

v := new(roachpb.Value)
if err := nl.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
b := txn.NewBatch()
key := keys.NodeLivenessKey(nodeID)
if err := v.SetProto(&liveness); err != nil {
log.Fatalf(ctx, "failed to marshall proto: %s", err)
if err == nil {
// We'll learn about this liveness record through gossip eventually, so we
// don't bother updating our in-memory view of node liveness.
log.Infof(ctx, "created liveness record for n%d", nodeID)
return nil
}
// Given we're looking to create a new liveness record here, we don't
// expect to find anything.
b.CPut(key, v, nil)

// We don't bother adding a gossip trigger, that'll happen with the
// first heartbeat. We still keep it as a 1PC commit to avoid leaving
// write intents.
b.AddRawRequest(&roachpb.EndTxnRequest{
Commit: true,
Require1PC: true,
})
return txn.Run(ctx, b)
}); err != nil {
if !isErrRetryLiveness(ctx, err) {
return err
}
log.VEventf(ctx, 2, "failed to create liveness record for node %d, because of %s. retrying...", nodeID, err)
}
if err := ctx.Err(); err != nil {
return err
}

// We'll learn about this liveness record through gossip eventually, so we
// don't bother updating our in-memory view of node liveness.

log.Infof(ctx, "created liveness record for n%d", nodeID)
return nil
return errors.AssertionFailedf("unexpected problem while creating liveness record for node %d", nodeID)
}

func (nl *NodeLiveness) setMembershipStatusInternal(
Expand Down Expand Up @@ -1318,19 +1343,7 @@ func (nl *NodeLiveness) updateLivenessAttempt(
return Record{}, errors.Wrapf(err, "couldn't update node liveness from CPut actual value")
}
return Record{}, handleCondFailed(Record{Liveness: actualLiveness, raw: tErr.ActualValue.TagAndDataBytes()})
} else if errors.HasType(err, (*roachpb.AmbiguousResultError)(nil)) {
// We generally want to retry ambiguous errors immediately, except if the
// ctx is canceled - in which case the ambiguous error is probably caused
// by the cancellation (and in any case it's pointless to retry with a
// canceled ctx).
if ctx.Err() != nil {
return Record{}, err
}
return Record{}, &errRetryLiveness{err}
} else if errors.HasType(err, (*roachpb.TransactionStatusError)(nil)) {
// 21.2 nodes can return a TransactionStatusError when they should have
// returned an AmbiguousResultError.
// TODO(andrei): Remove this in 22.2.
} else if isErrRetryLiveness(ctx, err) {
return Record{}, &errRetryLiveness{err}
}
return Record{}, err
Expand Down
70 changes: 70 additions & 0 deletions pkg/kv/kvserver/node_liveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,76 @@ func TestNodeLivenessRetryAmbiguousResultError(t *testing.T) {
}
}

// This tests the create code path for node liveness, for that we need to create
// a cluster of nodes, because we need to exercise to join RPC codepath.
func TestNodeLivenessRetryAmbiguousResultOnCreateError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var injectError atomic.Value
type injectErrorData struct {
shouldError bool
count int32
}

injectError.Store(map[roachpb.NodeID]injectErrorData{
roachpb.NodeID(1): {true, 0},
roachpb.NodeID(2): {true, 0},
roachpb.NodeID(3): {true, 0},
})
testingEvalFilter := func(args kvserverbase.FilterArgs) *roachpb.Error {
if req, ok := args.Req.(*roachpb.ConditionalPutRequest); ok {
if val := injectError.Load(); val != nil {
var liveness livenesspb.Liveness
if err := req.Value.GetProto(&liveness); err != nil {
return nil
}
injectErrorMap := val.(map[roachpb.NodeID]injectErrorData)
if injectErrorMap[liveness.NodeID].shouldError {
if liveness.NodeID != 1 {
// We expect this to come from the create code path on all nodes
// except the first. Make sure that is actually true.
assert.Equal(t, liveness.Epoch, int64(0))
}
injectErrorMap[liveness.NodeID] = injectErrorData{false, injectErrorMap[liveness.NodeID].count + 1}
injectError.Store(injectErrorMap)
return roachpb.NewError(roachpb.NewAmbiguousResultError("test"))
}
}
}
return nil
}
ctx := context.Background()
tc := testcluster.StartTestCluster(t, 3,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
TestingEvalFilter: testingEvalFilter,
},
},
},
},
})
defer tc.Stopper().Stop(ctx)

for _, s := range tc.Servers {
// Verify retry of the ambiguous result for heartbeat loop.
testutils.SucceedsSoon(t, func() error {
return verifyLivenessServer(s, 3)
})
nl := s.NodeLiveness().(*liveness.NodeLiveness)
_, ok := nl.Self()
assert.True(t, ok)

injectErrorMap := injectError.Load().(map[roachpb.NodeID]injectErrorData)
assert.NotNil(t, injectErrorMap)
assert.Equal(t, int32(1), injectErrorMap[s.NodeID()].count)
}
}

// Test that, although a liveness heartbeat is generally retried on
// AmbiguousResultError (see test above), it is not retried when the error is
// caused by a canceled context.
Expand Down
7 changes: 2 additions & 5 deletions pkg/ui/workspaces/cluster-ui/src/sessions/sessionDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,9 @@ export class SessionDetails extends React.Component<SessionDetailsProps> {
}

backToSessionsPage = (): void => {
const { history, location, onBackButtonClick } = this.props;
const { history, onBackButtonClick } = this.props;
onBackButtonClick && onBackButtonClick();
history.push({
...location,
pathname: "/sql-activity/sessions",
});
history.push("/sql-activity?tab=sessions");
};

render(): React.ReactElement {
Expand Down
32 changes: 9 additions & 23 deletions pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
// licenses/APL.txt.

import React from "react";
import { forIn, isNil, merge } from "lodash";
import { isNil, merge } from "lodash";

import { getMatchParamByName } from "src/util/query";
import { getMatchParamByName, syncHistory } from "src/util/query";
import { appAttr } from "src/util/constants";
import {
makeSessionsColumns,
Expand Down Expand Up @@ -43,11 +43,9 @@ import {
ICancelQueryRequest,
} from "src/store/terminateQuery";

import sortedTableStyles from "src/sortedtable/sortedtable.module.scss";
import statementsPageStyles from "src/statementsPage/statementsPage.module.scss";
import sessionPageStyles from "./sessionPage.module.scss";

const sortableTableCx = classNames.bind(sortedTableStyles);
const statementsPageCx = classNames.bind(statementsPageStyles);
const sessionsPageCx = classNames.bind(sessionPageStyles);

Expand Down Expand Up @@ -113,21 +111,6 @@ export class SessionsPage extends React.Component<
};
};

syncHistory = (params: Record<string, string | undefined>): void => {
const { history } = this.props;
const currentSearchParams = new URLSearchParams(history.location.search);
forIn(params, (value, key) => {
if (!value) {
currentSearchParams.delete(key);
} else {
currentSearchParams.set(key, value);
}
});

history.location.search = currentSearchParams.toString();
history.replace(history.location);
};

changeSortSetting = (ss: SortSetting): void => {
const { onSortingChange } = this.props;
onSortingChange && onSortingChange(ss.columnTitle);
Expand All @@ -136,10 +119,13 @@ export class SessionsPage extends React.Component<
sortSetting: ss,
});

this.syncHistory({
ascending: ss.ascending.toString(),
columnTitle: ss.columnTitle,
});
syncHistory(
{
ascending: ss.ascending.toString(),
columnTitle: ss.columnTitle,
},
this.props.history,
);
};

resetPagination = (): void => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ export class StatementDetails extends React.Component<
};

backToStatementsClick = (): void => {
this.props.history.push("/sql-activity/statements");
this.props.history.push("/sql-activity?tab=statements");
if (this.props.onBackToStatementsClick) {
this.props.onBackToStatementsClick();
}
Expand Down
Loading

0 comments on commit 830283b

Please sign in to comment.