Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: ensure Replica objects never change replicaID #40751

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/storage/apply/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package apply

import (
"context"
"errors"

"go.etcd.io/etcd/raft/raftpb"
)
Expand Down Expand Up @@ -54,6 +55,10 @@ type StateMachine interface {
ApplySideEffects(CheckedCommand) (AppliedCommand, error)
}

// ErrRemoved can be returned from ApplySideEffects which will stop the
// task from processing more commands and return immediately.
var ErrRemoved = errors.New("replica removed")

// Batch accumulates a series of updates from Commands and performs them
// all at once to its StateMachine when applied. Groups of Commands will be
// staged in the Batch such that one or more trivial Commands are staged or
Expand Down
84 changes: 10 additions & 74 deletions pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
)

Expand Down Expand Up @@ -1638,6 +1637,7 @@ func TestStoreReplicaGCAfterMerge(t *testing.T) {
storeCfg.TestingKnobs.DisableReplicateQueue = true
storeCfg.TestingKnobs.DisableReplicaGCQueue = true
storeCfg.TestingKnobs.DisableMergeQueue = true
storeCfg.TestingKnobs.DisableEagerReplicaRemoval = true
mtc := &multiTestContext{storeConfig: &storeCfg}
mtc.Start(t, 2)
defer mtc.Stop()
Expand All @@ -1662,10 +1662,10 @@ func TestStoreReplicaGCAfterMerge(t *testing.T) {
for _, rangeID := range []roachpb.RangeID{lhsDesc.RangeID, rhsDesc.RangeID} {
repl, err := store1.GetReplica(rangeID)
if err != nil {
t.Fatal(err)
continue
}
if err := store1.ManualReplicaGC(repl); err != nil {
t.Fatal(err)
t.Logf("replica was already removed: %v", err)
}
if _, err := store1.GetReplica(rangeID); err == nil {
t.Fatalf("replica of r%d not gc'd from s1", rangeID)
Expand Down Expand Up @@ -2041,6 +2041,7 @@ func TestStoreRangeMergeAbandonedFollowers(t *testing.T) {
storeCfg.TestingKnobs.DisableReplicaGCQueue = true
storeCfg.TestingKnobs.DisableSplitQueue = true
storeCfg.TestingKnobs.DisableMergeQueue = true
storeCfg.TestingKnobs.DisableEagerReplicaRemoval = true
mtc := &multiTestContext{storeConfig: &storeCfg}
mtc.Start(t, 3)
defer mtc.Stop()
Expand Down Expand Up @@ -2808,74 +2809,6 @@ func TestStoreRangeMergeSlowWatcher(t *testing.T) {
}
}

// unreliableRaftHandler drops all Raft messages that are addressed to the
// specified rangeID, but lets all other messages through.
type unreliableRaftHandler struct {
rangeID roachpb.RangeID
storage.RaftMessageHandler
// If non-nil, can return false to avoid dropping a msg to rangeID
dropReq func(*storage.RaftMessageRequest) bool
dropHB func(*storage.RaftHeartbeat) bool
dropResp func(*storage.RaftMessageResponse) bool
}

func (h *unreliableRaftHandler) HandleRaftRequest(
ctx context.Context,
req *storage.RaftMessageRequest,
respStream storage.RaftMessageResponseStream,
) *roachpb.Error {
if len(req.Heartbeats)+len(req.HeartbeatResps) > 0 {
reqCpy := *req
req = &reqCpy
req.Heartbeats = h.filterHeartbeats(req.Heartbeats)
req.HeartbeatResps = h.filterHeartbeats(req.HeartbeatResps)
if len(req.Heartbeats)+len(req.HeartbeatResps) == 0 {
// Entirely filtered.
return nil
}
} else if req.RangeID == h.rangeID {
if h.dropReq == nil || h.dropReq(req) {
log.Infof(
ctx,
"dropping Raft message %s",
raft.DescribeMessage(req.Message, func([]byte) string {
return "<omitted>"
}),
)

return nil
}
}
return h.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream)
}

func (h *unreliableRaftHandler) filterHeartbeats(
hbs []storage.RaftHeartbeat,
) []storage.RaftHeartbeat {
if len(hbs) == 0 {
return hbs
}
var cpy []storage.RaftHeartbeat
for i := range hbs {
hb := &hbs[i]
if hb.RangeID != h.rangeID || (h.dropHB != nil && !h.dropHB(hb)) {
cpy = append(cpy, *hb)
}
}
return cpy
}

func (h *unreliableRaftHandler) HandleRaftResponse(
ctx context.Context, resp *storage.RaftMessageResponse,
) error {
if resp.RangeID == h.rangeID {
if h.dropResp == nil || h.dropResp(resp) {
return nil
}
}
return h.RaftMessageHandler.HandleRaftResponse(ctx, resp)
}

func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -3353,9 +3286,12 @@ func TestMergeQueue(t *testing.T) {
t.Run("non-collocated", func(t *testing.T) {
reset(t)
verifyUnmerged(t)
mtc.replicateRange(rhs().RangeID, 1)
mtc.transferLease(ctx, rhs().RangeID, 0, 1)
mtc.unreplicateRange(rhs().RangeID, 0)
rhsRangeID := rhs().RangeID
mtc.replicateRange(rhsRangeID, 1)
mtc.transferLease(ctx, rhsRangeID, 0, 1)
mtc.unreplicateRange(rhsRangeID, 0)
require.NoError(t, mtc.waitForUnreplicated(rhsRangeID, 0))

clearRange(t, lhsStartKey, rhsEndKey)
store.MustForceMergeScanAndProcess()
verifyMerged(t)
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/client_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
)

func checkGauge(t *testing.T, id string, g *metric.Gauge, e int64) {
Expand Down Expand Up @@ -313,8 +314,9 @@ func TestStoreMetrics(t *testing.T) {
return mtc.unreplicateRangeNonFatal(replica.RangeID, 0)
})

// Force GC Scan on store 0 in order to fully remove range.
mtc.stores[1].MustForceReplicaGCScanAndProcess()
// Wait until we're sure that store 0 has successfully processed its removal.
require.NoError(t, mtc.waitForUnreplicated(replica.RangeID, 0))

mtc.waitForValues(roachpb.Key("z"), []int64{0, 5, 5})

// Verify range count is as expected.
Expand Down
115 changes: 115 additions & 0 deletions pkg/storage/client_raft_helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage_test

import (
"context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"go.etcd.io/etcd/raft"
)

// unreliableRaftHandler drops all Raft messages that are addressed to the
// specified rangeID, but lets all other messages through.
type unreliableRaftHandler struct {
rangeID roachpb.RangeID
storage.RaftMessageHandler
// If non-nil, can return false to avoid dropping a msg to rangeID
dropReq func(*storage.RaftMessageRequest) bool
dropHB func(*storage.RaftHeartbeat) bool
dropResp func(*storage.RaftMessageResponse) bool
}

func (h *unreliableRaftHandler) HandleRaftRequest(
ctx context.Context,
req *storage.RaftMessageRequest,
respStream storage.RaftMessageResponseStream,
) *roachpb.Error {
if len(req.Heartbeats)+len(req.HeartbeatResps) > 0 {
reqCpy := *req
req = &reqCpy
req.Heartbeats = h.filterHeartbeats(req.Heartbeats)
req.HeartbeatResps = h.filterHeartbeats(req.HeartbeatResps)
if len(req.Heartbeats)+len(req.HeartbeatResps) == 0 {
// Entirely filtered.
return nil
}
} else if req.RangeID == h.rangeID {
if h.dropReq == nil || h.dropReq(req) {
log.Infof(
ctx,
"dropping Raft message %s",
raft.DescribeMessage(req.Message, func([]byte) string {
return "<omitted>"
}),
)

return nil
}
}
return h.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream)
}

func (h *unreliableRaftHandler) filterHeartbeats(
hbs []storage.RaftHeartbeat,
) []storage.RaftHeartbeat {
if len(hbs) == 0 {
return hbs
}
var cpy []storage.RaftHeartbeat
for i := range hbs {
hb := &hbs[i]
if hb.RangeID != h.rangeID || (h.dropHB != nil && !h.dropHB(hb)) {
cpy = append(cpy, *hb)
}
}
return cpy
}

func (h *unreliableRaftHandler) HandleRaftResponse(
ctx context.Context, resp *storage.RaftMessageResponse,
) error {
if resp.RangeID == h.rangeID {
if h.dropResp == nil || h.dropResp(resp) {
return nil
}
}
return h.RaftMessageHandler.HandleRaftResponse(ctx, resp)
}

// mtcStoreRaftMessageHandler exists to allows a store to be stopped and
// restarted while maintaining a partition using an unreliableRaftHandler.
type mtcStoreRaftMessageHandler struct {
mtc *multiTestContext
storeIdx int
}

func (h *mtcStoreRaftMessageHandler) HandleRaftRequest(
ctx context.Context,
req *storage.RaftMessageRequest,
respStream storage.RaftMessageResponseStream,
) *roachpb.Error {
return h.mtc.Store(h.storeIdx).HandleRaftRequest(ctx, req, respStream)
}

func (h *mtcStoreRaftMessageHandler) HandleRaftResponse(
ctx context.Context, resp *storage.RaftMessageResponse,
) error {
return h.mtc.Store(h.storeIdx).HandleRaftResponse(ctx, resp)
}

func (h *mtcStoreRaftMessageHandler) HandleSnapshot(
header *storage.SnapshotRequest_Header, respStream storage.SnapshotResponseStream,
) error {
return h.mtc.Store(h.storeIdx).HandleSnapshot(header, respStream)
}
Loading