-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
client_replica_gc_test.go
213 lines (190 loc) · 6.49 KB
/
client_replica_gc_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
// Copyright 2015 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvserver_test
import (
"context"
"os"
"path/filepath"
"strconv"
"testing"
"time"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
)
// TestReplicaGCQueueDropReplica verifies that a removed replica is
// immediately cleaned up.
func TestReplicaGCQueueDropReplicaDirect(t *testing.T) {
defer leaktest.AfterTest(t)()
const numStores = 3
// Use actual engines (not in memory) because the in-mem ones don't write
// to disk. The test would still pass if we didn't do this except it
// would probably look at an empty sideloaded directory and fail.
tempDir, cleanup := testutils.TempDir(t)
defer cleanup()
testKnobs := kvserver.StoreTestingKnobs{}
var tc *testcluster.TestCluster
serverArgsPerNode := make(map[int]base.TestServerArgs)
for i := 0; i < numStores; i++ {
testServerArgs := base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &testKnobs,
},
StoreSpecs: []base.StoreSpec{
{
Path: filepath.Join(tempDir, strconv.Itoa(i)),
InMemory: false,
},
},
}
serverArgsPerNode[i] = testServerArgs
}
// In this test, the Replica on the second Node is removed, and the test
// verifies that that Node adds this Replica to its RangeGCQueue. However,
// the queue does a consistent lookup which will usually be read from
// Node 1. Hence, if Node 1 hasn't processed the removal when Node 2 has,
// no GC will take place since the consistent RangeLookup hits the first
// Node. We use the TestingEvalFilter to make sure that the second Node
// waits for the first.
testKnobs.EvalKnobs.TestingEvalFilter =
func(filterArgs kvserverbase.FilterArgs) *roachpb.Error {
et, ok := filterArgs.Req.(*roachpb.EndTxnRequest)
if !ok || filterArgs.Sid != 2 {
return nil
}
crt := et.InternalCommitTrigger.GetChangeReplicasTrigger()
if crt == nil || crt.DeprecatedChangeType != roachpb.REMOVE_REPLICA {
return nil
}
testutils.SucceedsSoon(t, func() error {
k := tc.ScratchRange(t)
desc, err := tc.LookupRange(k)
if err != nil {
return err
}
if _, ok := desc.GetReplicaDescriptor(2); ok {
return errors.New("expected second node gone from first node's known replicas")
}
return nil
})
return nil
}
tc = testcluster.StartTestCluster(t, numStores,
base.TestClusterArgs{
ReplicationMode: base.ReplicationAuto,
ServerArgsPerNode: serverArgsPerNode,
},
)
defer tc.Stopper().Stop(context.Background())
k := tc.ScratchRange(t)
desc := tc.LookupRangeOrFatal(t, k)
ts := tc.Servers[1]
store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID())
if pErr != nil {
t.Fatal(pErr)
}
{
repl1, err := store.GetReplica(desc.RangeID)
if err != nil {
t.Fatal(err)
}
eng := store.Engine()
// Put some bogus sideloaded data on the replica which we're about to
// remove. Then, at the end of the test, check that that sideloaded
// storage is now empty (in other words, GC'ing the Replica took care of
// cleanup).
repl1.RaftLock()
dir := repl1.SideloadedRaftMuLocked().Dir()
repl1.RaftUnlock()
if dir == "" {
t.Fatal("no sideloaded directory")
}
if err := eng.MkdirAll(dir); err != nil {
t.Fatal(err)
}
if err := fs.WriteFile(eng, filepath.Join(dir, "i1000000.t100000"), []byte("foo")); err != nil {
t.Fatal(err)
}
defer func() {
if !t.Failed() {
testutils.SucceedsSoon(t, func() error {
// Verify that the whole directory for the replica is gone.
repl1.RaftLock()
dir := repl1.SideloadedRaftMuLocked().Dir()
repl1.RaftUnlock()
if _, err := eng.Stat(dir); os.IsNotExist(err) {
return nil
}
return errors.Errorf("replica still has sideloaded files despite GC: %v", err)
})
}
}()
}
desc = tc.RemoveReplicasOrFatal(t, k, tc.Target(1))
// Make sure the range is removed from the store.
testutils.SucceedsSoon(t, func() error {
if _, err := store.GetReplica(desc.RangeID); !testutils.IsError(err, "r[0-9]+ was not found") {
return errors.Errorf("expected range removal: %v", err) // NB: errors.Wrapf(nil, ...) returns nil.
}
return nil
})
}
// TestReplicaGCQueueDropReplicaOnScan verifies that the range GC queue
// removes a range from a store that no longer should have a replica.
func TestReplicaGCQueueDropReplicaGCOnScan(t *testing.T) {
defer leaktest.AfterTest(t)()
tc := testcluster.StartTestCluster(t, 3,
base.TestClusterArgs{
ReplicationMode: base.ReplicationAuto,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableEagerReplicaRemoval: true,
// Override the garbage collection threshold to something small,
// so this test can trigger the GC without relying on moving time.
ReplicaGCQueueInactivityThreshold: time.Millisecond * 100,
},
},
},
},
)
defer tc.Stopper().Stop(context.Background())
ts := tc.Servers[1]
store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID())
if pErr != nil {
t.Fatal(pErr)
}
// Disable the replica gc queue to prevent direct removal of replica.
store.SetReplicaGCQueueActive(false)
k := tc.ScratchRange(t)
desc := tc.RemoveReplicasOrFatal(t, k, tc.Target(1))
// Wait long enough for the direct replica GC to have had a chance and been
// discarded because the queue is disabled.
time.Sleep(10 * time.Millisecond)
if _, err := store.GetReplica(desc.RangeID); err != nil {
t.Error("unexpected range removal")
}
// Enable the queue.
store.SetReplicaGCQueueActive(true)
// Make sure the range is removed from the store.
testutils.SucceedsSoon(t, func() error {
store.MustForceReplicaGCScanAndProcess()
if _, err := store.GetReplica(desc.RangeID); !testutils.IsError(err, "r[0-9]+ was not found") {
return errors.Errorf("expected range removal: %v", err) // NB: errors.Wrapf(nil, ...) returns nil.
}
return nil
})
}