Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
87244: kv: campaign on rejected lease request when leader not live in node liveness r=erikgrinaker a=nvanbenschoten

Fixes #84655.
Related to #49220.

This commit extends the logic introduced in 8aa1c14 to simultaneously campaign for Raft leadership when rejecting a lease request on a Raft follower that observes that the current Raft leader is not live according to node liveness. These kinds of cases are often associated with asymmetric network partitions. 

In such cases, the Raft leader will be unable to acquire an epoch-based lease until it heartbeats its liveness record. As a result, the range can only regain availability if a different replica acquires the lease. However, the protection added in 8aa1c14 prevents followers from acquiring leases to protect against a different form of unavailability.

After this commit, the followers will attempt to acquire Raft leadership when detecting such cases by campaigning. This allows these ranges to recover availability once Raft leadership moves off the partitioned Raft leader to one of the followers that can reach node liveness and can subsequently acquire the lease.

Campaigning for Raft leadership is safer than blindly allowing the lease request to be proposed (through a redirected proposal). This is because the follower may be arbitrarily far behind on its Raft log and acquiring the lease in such cases could cause unavailability (the kind we saw in #37906). By instead calling a Raft pre-vote election, the follower can determine whether it is behind on its log without risking disruption. If so, we don't want it to acquire the lease — one of the other followers that is caught up on its log can. If not, it will eventually become leader and can proceed with a future attempt to acquire the lease.

The commit adds a test that reproduces the failure mode described in #84655. It creates an asymmetric network partition scenario that looks like:
```
        [0]       raft leader / leaseholder
         ^
        / \
       /   \
      v     v
    [1]<--->[2]   raft followers
      ^     ^
       \   /
        \ /
         v
        [3]       liveness range
```
It then waits for the raft leader's lease to expire and demonstrates that one of the raft followers will now call a Raft election, which allows it to safely grab Raft leadership, acquire the lease, and recover availability. Without the change, the test failed.

----

Release justification: None. Too risky for the stability period. Potential backport candidate after sufficient baking on master.

Release note (bug fix): A bug causing ranges to remain without a leaseholder in cases of asymmetric network partitions has been resolved.

88101: kv,sql: simplify the Txn API by removing 2 cleanup functions r=lidorcarmel a=lidorcarmel

Txn.CleanupOnError() basically does a rollback, and in addition takes an error only for the purpose of logging it.

Txn.CommitOrCleanup() tries to commit and if unsuccessful it tries a rollback. The error from the rollback is logged but not returned, the error from the commit is returned.

Removing these 2 functions means that the caller should call Commit and Rollback directly when needed, and handle the returned errors. For example, sql may need to log errors to a different channel from the one used but Txn, and tests may want to fail when a Rollback fails unexpectedly. This PR removes those functions.

Release note: None
Epic: None

91011: storage: adjust range tombstone language to encourage caution r=nicktrav a=jbowens

Adjust the MVCC range tombstone cluster setting description to highlight its experimental nature and to be appropriately cautious about the consequence of enabling range tombstones.

Cockroach 22.2.0 will ship with the cluster setting and likely at least one bug that may induce corruption if the setting is enabled (#90948).

Epic: None
Release note: None

Close #91001.

91077: sql/schemachange: revert TableZoneConfig field ID r=fqazi a=fqazi

Fixes: #91053

Backwards compatibility with 22.2 was accidentally regressed, when the field ID for tabe_zone_config was changed for older declarative schema changer states. To address this, this patch will revert the field ID back to the same value as 22.2

Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Lidor Carmel <[email protected]>
Co-authored-by: Jackson Owens <[email protected]>
Co-authored-by: Faizan Qazi <[email protected]>
  • Loading branch information
5 people committed Nov 1, 2022
5 parents 02ff871 + 503c731 + 8b00b5d + d28b3d8 + 770e217 commit 14249a5
Show file tree
Hide file tree
Showing 20 changed files with 444 additions and 97 deletions.
4 changes: 3 additions & 1 deletion pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,9 @@ func runTxn(ctx context.Context, txn *Txn, retryable func(context.Context, *Txn)
return retryable(ctx, txn)
})
if err != nil {
txn.CleanupOnError(ctx, err)
if rollbackErr := txn.Rollback(ctx); rollbackErr != nil {
log.Eventf(ctx, "failure aborting transaction: %s; abort caused by: %s", rollbackErr, err)
}
}
// Terminate TransactionRetryWithProtoRefreshError here, so it doesn't cause a higher-level
// txn to be retried. We don't do this in any of the other functions in DB; I
Expand Down
7 changes: 3 additions & 4 deletions pkg/kv/kvclient/kvcoord/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

// This file contains contains integration tests that don't fit anywhere else.
Expand Down Expand Up @@ -177,9 +177,8 @@ func TestWaiterOnRejectedCommit(t *testing.T) {
<-readerBlocked
<-readerBlocked

if err := txn.CommitOrCleanup(ctx); !testutils.IsError(err, "test injected err") {
t.Fatalf("expected injected err, got: %v", err)
}
require.ErrorContains(t, txn.Commit(ctx), "test injected err", "expected injected error")
require.NoError(t, txn.Rollback(ctx))
// Wait for the txn wait queue to be pinged and check the status.
if status := <-txnUpdate; status != roachpb.ABORTED {
t.Fatalf("expected the wait queue to be updated with an Aborted txn, instead got: %s", status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestSavepoints(t *testing.T) {
ptxn()

case "commit":
if err := txn.CommitOrCleanup(ctx); err != nil {
if err := txn.Commit(ctx); err != nil {
fmt.Fprintf(&buf, "(%T) %v\n", err, err)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestHeartbeatFindsOutAboutAbortedTransaction(t *testing.T) {
if err := conflictTxn.Put(ctx, key, "pusher was here"); err != nil {
return err
}
return conflictTxn.CommitOrCleanup(ctx)
return conflictTxn.Commit(ctx)
}

// Make a db with a short heartbeat interval.
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestHeartbeatFindsOutAboutAbortedTransaction(t *testing.T) {

// Check that further sends through the aborted txn are rejected. The
// TxnCoordSender is supposed to synthesize a TransactionAbortedError.
if err := txn.CommitOrCleanup(ctx); !testutils.IsError(
if err := txn.Commit(ctx); !testutils.IsError(
err, "TransactionRetryWithProtoRefreshError: TransactionAbortedError",
) {
t.Fatalf("expected aborted error, got: %s", err)
Expand Down
24 changes: 12 additions & 12 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,9 @@ func TestTxnCoordSenderEndTxn(t *testing.T) {
err := txn.UpdateDeadline(ctx, pushedTimestamp.Next())
require.NoError(t, err, "Deadline update to future failed")
}
err = txn.CommitOrCleanup(ctx)
if err = txn.Commit(ctx); err != nil {
require.NoError(t, txn.Rollback(ctx))
}

switch i {
case 0:
Expand Down Expand Up @@ -624,11 +626,9 @@ func TestTxnCoordSenderCleanupOnAborted(t *testing.T) {

// Now end the transaction and verify we've cleanup up, even though
// end transaction failed.
err := txn1.CommitOrCleanup(ctx)
err := txn1.Commit(ctx)
assertTransactionAbortedError(t, err)
if err := txn2.CommitOrCleanup(ctx); err != nil {
t.Fatal(err)
}
require.NoError(t, txn2.Commit(ctx))
verifyCleanup(key, s.Eng, t, txn1.Sender().(*kvcoord.TxnCoordSender), txn2.Sender().(*kvcoord.TxnCoordSender))
}

Expand All @@ -655,9 +655,7 @@ func TestTxnCoordSenderCleanupOnCommitAfterRestart(t *testing.T) {
txn.Sender().ManualRestart(ctx, txn.UserPriority(), s.Clock.Now())

// Now immediately commit.
if err := txn.CommitOrCleanup(ctx); err != nil {
t.Fatal(err)
}
require.NoError(t, txn.Commit(ctx))
verifyCleanup(key, s.Eng, t, txn.Sender().(*kvcoord.TxnCoordSender))
}

Expand Down Expand Up @@ -1322,7 +1320,10 @@ func TestTxnRestartCount(t *testing.T) {
})

// Commit (should cause restart metric to increase).
err := txn.CommitOrCleanup(ctx)
err := txn.Commit(ctx)
if err != nil {
require.NoError(t, txn.Rollback(ctx))
}
assertTransactionRetryError(t, err)
checkTxnMetrics(t, metrics, "restart txn", 0, 0, 1 /* aborts */, 1 /* restarts */)
}
Expand Down Expand Up @@ -1620,9 +1621,8 @@ func TestAbortTransactionOnCommitErrors(t *testing.T) {
if pErr := txn.Put(ctx, "a", "b"); pErr != nil {
t.Fatalf("put failed: %s", pErr)
}
if pErr := txn.CommitOrCleanup(ctx); pErr == nil {
t.Fatalf("unexpected commit success")
}
require.Error(t, txn.Commit(ctx), "unexpected commit success")
require.NoError(t, txn.Rollback(ctx))

if !commit.Load().(bool) {
t.Errorf("%T: failed to find initial commit request", test.err)
Expand Down
134 changes: 134 additions & 0 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,140 @@ func TestRequestsOnLaggingReplica(t *testing.T) {
require.Equal(t, leaderReplicaID, nlhe.Lease.Replica.ReplicaID)
}

// TestRequestsOnFollowerWithNonLiveLeaseholder tests the availability of a
// range that has an expired epoch-based lease and a live Raft leader that is
// unable to heartbeat its liveness record. Such a range should recover once
// Raft leadership moves off the partitioned Raft leader to one of the followers
// that can reach node liveness.
//
// This test relies on follower replicas campaigning for Raft leadership in
// certain cases when refusing to forward lease acquisition requests to the
// leader. In these cases where they determine that the leader is non-live
// according to node liveness, they will attempt to steal Raft leadership and,
// if successful, will be able to perform future lease acquisition attempts.
func TestRequestsOnFollowerWithNonLiveLeaseholder(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

var installPartition int32
partitionFilter := func(_ context.Context, ba *roachpb.BatchRequest) *roachpb.Error {
if atomic.LoadInt32(&installPartition) == 0 {
return nil
}
if ba.GatewayNodeID == 1 && ba.Replica.NodeID == 4 {
return roachpb.NewError(context.Canceled)
}
return nil
}

clusterArgs := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
// Reduce the election timeout some to speed up the test.
RaftConfig: base.RaftConfig{RaftElectionTimeoutTicks: 10},
Knobs: base.TestingKnobs{
NodeLiveness: kvserver.NodeLivenessTestingKnobs{
// This test waits for an epoch-based lease to expire, so we're
// setting the liveness duration as low as possible while still
// keeping the test stable.
LivenessDuration: 3000 * time.Millisecond,
RenewalDuration: 1500 * time.Millisecond,
},
Store: &kvserver.StoreTestingKnobs{
// We eliminate clock offsets in order to eliminate the stasis period
// of leases, in order to speed up the test.
MaxOffset: time.Nanosecond,
TestingRequestFilter: partitionFilter,
},
},
},
}

tc := testcluster.StartTestCluster(t, 4, clusterArgs)
defer tc.Stopper().Stop(ctx)

{
// Move the liveness range to node 4.
desc := tc.LookupRangeOrFatal(t, keys.NodeLivenessPrefix)
tc.RebalanceVoterOrFatal(ctx, t, desc.StartKey.AsRawKey(), tc.Target(0), tc.Target(3))
}

// Create a new range.
_, rngDesc, err := tc.Servers[0].ScratchRangeEx()
require.NoError(t, err)
key := rngDesc.StartKey.AsRawKey()
// Add replicas on all the stores.
tc.AddVotersOrFatal(t, rngDesc.StartKey.AsRawKey(), tc.Target(1), tc.Target(2))

// Store 0 holds the lease.
store0 := tc.GetFirstStoreFromServer(t, 0)
store0Repl, err := store0.GetReplica(rngDesc.RangeID)
require.NoError(t, err)
leaseStatus := store0Repl.CurrentLeaseStatus(ctx)
require.True(t, leaseStatus.OwnedBy(store0.StoreID()))

{
// Write a value so that the respective key is present in all stores and we
// can increment it again later.
_, err := tc.Server(0).DB().Inc(ctx, key, 1)
require.NoError(t, err)
log.Infof(ctx, "test: waiting for initial values...")
tc.WaitForValues(t, key, []int64{1, 1, 1, 0})
log.Infof(ctx, "test: waiting for initial values... done")
}

// Begin dropping all node liveness heartbeats from the original raft leader
// while allowing the leader to maintain Raft leadership and otherwise behave
// normally. This mimics cases where the raft leader is partitioned away from
// the liveness range but can otherwise reach its followers. In these cases,
// it is still possible that the followers can reach the liveness range and
// see that the leader becomes non-live. For example, the configuration could
// look like:
//
// [0] raft leader
// ^
// / \
// / \
// v v
// [1]<--->[2] raft followers
// ^ ^
// \ /
// \ /
// v
// [3] liveness range
//
log.Infof(ctx, "test: partitioning node")
atomic.StoreInt32(&installPartition, 1)

// Wait until the lease expires.
log.Infof(ctx, "test: waiting for lease expiration")
testutils.SucceedsSoon(t, func() error {
leaseStatus = store0Repl.CurrentLeaseStatus(ctx)
if leaseStatus.IsValid() {
return errors.New("lease still valid")
}
return nil
})
log.Infof(ctx, "test: lease expired")

{
// Increment the initial value again, which requires range availability. To
// get there, the request will need to trigger a lease request on a follower
// replica, which will call a Raft election, acquire Raft leadership, then
// acquire the range lease.
_, err := tc.Server(0).DB().Inc(ctx, key, 1)
require.NoError(t, err)
log.Infof(ctx, "test: waiting for new lease...")
tc.WaitForValues(t, key, []int64{2, 2, 2, 0})
log.Infof(ctx, "test: waiting for new lease... done")
}

// Store 0 no longer holds the lease.
leaseStatus = store0Repl.CurrentLeaseStatus(ctx)
require.False(t, leaseStatus.OwnedBy(store0.StoreID()))
}

type fakeSnapshotStream struct {
nextReq *kvserverpb.SnapshotRequest
nextErr error
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2505,7 +2505,7 @@ func TestDistributedTxnCleanup(t *testing.T) {
return errors.New("forced abort")
}
if err := txnFn(ctx, txn); err != nil {
txn.CleanupOnError(ctx, err)
require.NoError(t, txn.Rollback(ctx))
if !force && commit {
t.Fatalf("expected success with commit == true; got %v", err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,9 @@ func (r *Replica) AdminMerge(
err := runMergeTxn(txn)
if err != nil {
log.VEventf(ctx, 2, "merge txn failed: %s", err)
txn.CleanupOnError(ctx, err)
if rollbackErr := txn.Rollback(ctx); rollbackErr != nil {
log.VEventf(ctx, 2, "merge txn rollback failed: %s", rollbackErr)
}
}
if !errors.HasType(err, (*roachpb.TransactionRetryWithProtoRefreshError)(nil)) {
if err != nil {
Expand Down
Loading

0 comments on commit 14249a5

Please sign in to comment.