-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathconnectivity_test.go
413 lines (352 loc) · 13.2 KB
/
connectivity_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
// Copyright 2016 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.
package server_test
import (
"context"
"fmt"
"net"
"sort"
"sync"
"testing"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
)
// TestClusterConnectivity sets up an uninitialized cluster with custom join
// flags (individual nodes point to specific others, instead of all pointing to
// n1), and tests that the cluster/node IDs are distributed correctly
// throughout.
func TestClusterConnectivity(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
// TODO(irfansharif): Teach testServer to accept a list of join addresses
// instead of just one.
var testConfigurations = []struct {
// bootstrapNode controls which node is `cockroach init`-ialized.
// Everything is 0-indexed.
bootstrapNode int
// joinConfig[i] returns the node the i-th node is pointing to through
// its join flags. Everything is 0-indexed.
joinConfig []int
}{
// 0. Every node points to the first, including the first.
{0, []int{0, 0, 0, 0, 0}},
// 1. Every node points to the previous, except the first, which points to
// itself.
//
// 0 <-- 1 <-- 2 <-- 3 <-- 4
{0, []int{0, 0, 1, 2, 3}},
// 2. Same as previous, but a few links switched around.
//
// 0 <-- 2 <-- 1 <-- 3 <-- 4
{0, []int{0, 2, 0, 1, 3}},
// 3. Introduce a bidirectional link.
//
// 1 <-> 2 <-- 0 <-- 3
// 1 <-- 4
{1, []int{2, 2, 1, 0, 1}},
// 4. Same as above, but bootstrap the other node in the bidirectional
// link.
//
// 1 <-> 2 <-- 0 <-- 3
// 1 <-- 4
{2, []int{2, 2, 1, 0, 1}},
// 5. Another topology centered around node 1, which itself is pointed
// to node 0.
//
// 0 <-> 1 <-- 2
// 1 <-- 3
{0, []int{1, 0, 1, 1}},
// 6. Same as above, but bootstrapping the centered node directly.
//
// 0 <-> 1 <-- 2
// 1 <-- 3
{1, []int{1, 0, 1, 1}},
// TODO(irfansharif): We would really like to be able to set up test
// clusters that are only partially connected, and assert that only
// nodes that are supposed to find out about bootstrap, actually do.
// Something like:
//
// 0 <-> 1 <-- 2
// 5 <-- 4 <-- 3 <-- 5
//
// A version of this was originally prototyped in #52526 but the changes
// required in Test{Cluster,Server} were too invasive to justify at the
// time.
}
// getListener is a short hand to allocate a listener to an unbounded port.
getListener := func() net.Listener {
t.Helper()
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
return listener
}
baseServerArgs := base.TestServerArgs{
// We're going to manually control initialization in this test.
NoAutoInitializeCluster: true,
StoreSpecs: []base.StoreSpec{{InMemory: true}},
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ContextTestingKnobs: rpc.ContextTestingKnobs{
// Disable the special RPC loopback dial logic.
// This is needed because the test starts to perform
// RPC dials before the PreStart() function is called,
// and that is where the loopback dial function is defined.
NoLoopbackDialer: true,
},
},
},
}
for i, test := range testConfigurations {
t.Run(fmt.Sprintf("topology=%d", i), func(t *testing.T) {
numNodes := len(test.joinConfig)
var serverArgsPerNode = make(map[int]base.TestServerArgs)
// We start off with installing a listener for each server. We
// pre-bind a listener so the kernel can go ahead and assign an
// address for us. We'll later use this address to populate join
// flags for neighboring nodes.
var listeners = make([]net.Listener, numNodes)
for i := 0; i < numNodes; i++ {
listener := getListener()
serverArg := baseServerArgs
serverArg.Listener = listener
serverArg.Addr = listener.Addr().String()
serverArgsPerNode[i] = serverArg
listeners[i] = listener
}
// We'll annotate the server args with the right join flags.
for i := 0; i < numNodes; i++ {
joinNode := test.joinConfig[i]
joinAddr := listeners[joinNode].Addr().String()
serverArg := serverArgsPerNode[i]
serverArg.JoinAddr = joinAddr
serverArgsPerNode[i] = serverArg
}
tcArgs := base.TestClusterArgs{
// Saves time in this test.
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: serverArgsPerNode,
// We have to start servers in parallel because we're looking to
// bootstrap the cluster manually in a separate thread. Each
// individual Server.Start is a blocking call (it waits for
// init). We want to start all of them in parallel to simulate a
// bunch of servers each waiting for init.
ParallelStart: true,
}
// The test structure here is a bit convoluted, but necessary given
// the current implementation of TestCluster. TestCluster.Start
// wants to wait for all the nodes in the test cluster to be fully
// initialized before returning. Given we're testing initialization
// behavior, we do all the real work in a separate thread and keep
// the main thread limited to simply starting and stopping the test
// cluster.
//
// NB: That aside, TestCluster very much wants to live on the main
// goroutine running the test. That's mostly to do with its internal
// error handling and the limitations imposed by
// https://golang.org/pkg/testing/#T.FailNow (which sits underneath
// t.Fatal).
tc := testcluster.NewTestCluster(t, numNodes, tcArgs)
var wg sync.WaitGroup
wg.Add(1)
go func(bootstrapNode int) {
defer wg.Done()
// Attempt to bootstrap the cluster through the configured node.
testutils.SucceedsSoon(t, func() (e error) {
ctx := context.Background()
serv := tc.Server(bootstrapNode)
target := serv.AdvRPCAddr()
dialOpts, err := tc.Server(bootstrapNode).RPCContext().GRPCDialOptions(ctx, target, rpc.SystemClass)
if err != nil {
return err
}
conn, err := grpc.DialContext(ctx, target, dialOpts...)
if err != nil {
return err
}
defer func() {
_ = conn.Close() // nolint:grpcconnclose
}()
client := serverpb.NewInitClient(conn)
_, err = client.Bootstrap(context.Background(), &serverpb.BootstrapRequest{})
return err
})
// Wait to get a real cluster ID (doesn't always get populated
// right after bootstrap).
testutils.SucceedsSoon(t, func() error {
clusterID := tc.Server(bootstrapNode).StorageClusterID()
if clusterID.Equal(uuid.UUID{}) {
return errors.New("cluster ID still not recorded")
}
return nil
})
clusterID := tc.Server(bootstrapNode).StorageClusterID()
testutils.SucceedsSoon(t, func() error {
var nodeIDs []roachpb.NodeID
var storeIDs []roachpb.StoreID
// Sanity check that all the nodes we expect to join this
// network actually do (by checking they discover the right
// cluster ID). Also collect node/store IDs for below.
for i := 0; i < numNodes; i++ {
if got := tc.Server(i).StorageClusterID(); got != clusterID {
return errors.Newf("mismatched cluster IDs; %s (for node %d) != %s (for node %d)",
clusterID.String(), bootstrapNode, got.String(), i)
}
nodeIDs = append(nodeIDs, tc.Server(i).NodeID())
storeIDs = append(storeIDs, tc.Server(i).GetFirstStoreID())
}
sort.Slice(nodeIDs, func(i, j int) bool {
return nodeIDs[i] < nodeIDs[j]
})
sort.Slice(storeIDs, func(i, j int) bool {
return storeIDs[i] < storeIDs[j]
})
// Double check that we have the full set of node/store IDs
// we expect.
for i := 1; i <= len(nodeIDs); i++ {
expNodeID := roachpb.NodeID(i)
if got := nodeIDs[i-1]; got != expNodeID {
return errors.Newf("unexpected node ID; expected %s, got %s", expNodeID.String(), got.String())
}
expStoreID := roachpb.StoreID(i)
if got := storeIDs[i-1]; got != expStoreID {
return errors.Newf("unexpected store ID; expected %s, got %s", expStoreID.String(), got.String())
}
}
return nil
})
}(test.bootstrapNode)
// Start the test cluster. This is a blocking call, and expects the
// configured number of servers in the cluster to be fully
// initialized before it returns. Given that the initialization
// happens in the other thread, we'll only get past it after having
// bootstrapped the test cluster in the thread above.
tc.Start(t)
defer tc.Stopper().Stop(context.Background())
wg.Wait()
})
}
}
// TestJoinVersionGate checks to see that improperly versioned cockroach nodes
// are not able to join a cluster.
func TestJoinVersionGate(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
commonArg := base.TestServerArgs{
StoreSpecs: []base.StoreSpec{
{InMemory: true},
},
}
numNodes := 3
tcArgs := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual, // Saves time in this test.
ServerArgs: commonArg,
ParallelStart: true,
}
tc := testcluster.StartTestCluster(t, numNodes, tcArgs)
defer tc.Stopper().Stop(context.Background())
testutils.SucceedsSoon(t, func() error {
for i := 0; i < numNodes; i++ {
clusterID := tc.Server(0).StorageClusterID()
got := tc.Server(i).StorageClusterID()
if got != clusterID {
return errors.Newf("mismatched cluster IDs; %s (for node %d) != %s (for node %d)", clusterID.String(), 0, got.String(), i)
}
}
return nil
})
var newVersion = clusterversion.Latest.Version()
var oldVersion = prev(newVersion)
knobs := base.TestingKnobs{
Server: &server.TestingKnobs{
ClusterVersionOverride: oldVersion,
},
}
oldVersionServerArgs := commonArg
oldVersionServerArgs.Knobs = knobs
oldVersionServerArgs.JoinAddr = tc.Servers[0].AdvRPCAddr()
serv, err := tc.AddServer(oldVersionServerArgs)
if err != nil {
t.Fatal(err)
}
defer serv.Stop(context.Background())
ctx := context.Background()
if err := serv.Start(ctx); !errors.Is(errors.Cause(err), server.ErrIncompatibleBinaryVersion) {
t.Fatalf("expected error %s, got %v", server.ErrIncompatibleBinaryVersion.Error(), err.Error())
}
}
func TestDecommissionedNodeCannotConnect(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
numNodes := 3
tcArgs := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual, // saves time
}
tc := testcluster.StartTestCluster(t, numNodes, tcArgs)
defer tc.Stopper().Stop(ctx)
scratchKey := tc.ScratchRange(t)
scratchRange := tc.LookupRangeOrFatal(t, scratchKey)
require.Len(t, scratchRange.InternalReplicas, 1)
require.Equal(t, tc.Server(0).NodeID(), scratchRange.InternalReplicas[0].NodeID)
decomSrv := tc.Server(2)
for _, status := range []livenesspb.MembershipStatus{
livenesspb.MembershipStatus_DECOMMISSIONING, livenesspb.MembershipStatus_DECOMMISSIONED,
} {
require.NoError(t, tc.Servers[0].Decommission(ctx, status, []roachpb.NodeID{decomSrv.NodeID()}))
}
testutils.SucceedsSoon(t, func() error {
for _, idx := range []int{0, 1} {
clusterSrv := tc.Server(idx)
// Within a short period of time, the cluster (n1, n2) will refuse to reach out to n3.
_, err := clusterSrv.RPCContext().GRPCDialNode(
decomSrv.RPCAddr(), decomSrv.NodeID(), decomSrv.Locality(), rpc.DefaultClass,
).Connect(ctx)
s, ok := grpcstatus.FromError(errors.UnwrapAll(err))
if !ok || s.Code() != codes.FailedPrecondition {
// NB: errors.Wrapf(nil, ...) returns nil.
// nolint:errwrap
return errors.Errorf("expected failed precondition for n%d->n%d, got %v", clusterSrv.NodeID(), decomSrv.NodeID(), err)
}
// And similarly, n3 will be refused by n1, n2.
_, err = decomSrv.RPCContext().GRPCDialNode(
clusterSrv.RPCAddr(), clusterSrv.NodeID(), clusterSrv.Locality(), rpc.DefaultClass,
).Connect(ctx)
s, ok = grpcstatus.FromError(errors.UnwrapAll(err))
if !ok || s.Code() != codes.PermissionDenied {
// NB: errors.Wrapf(nil, ...) returns nil.
// nolint:errwrap
return errors.Errorf("expected permission denied for n%d->n%d, got %v", decomSrv.NodeID(), clusterSrv.NodeID(), err)
}
}
// Trying to scan the scratch range from the decommissioned node should
// now result in a permission denied error.
_, err := decomSrv.DB().Scan(ctx, scratchKey, keys.MaxKey, 1)
s, ok := grpcstatus.FromError(errors.UnwrapAll(err))
if !ok || s.Code() != codes.PermissionDenied {
// NB: errors.Wrapf(nil, ...) returns nil.
// nolint:errwrap
return errors.Errorf("expected permission denied for scan, got %v", err)
}
return nil
})
}