-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
cluster.go
2317 lines (2072 loc) · 64.9 KB
/
cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2018 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package main
import (
"bufio"
"bytes"
"context"
gosql "database/sql"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net"
"net/url"
"os"
"os/exec"
"os/user"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/armon/circbuf"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
_ "github.com/lib/pq"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
var (
local bool
cockroach string
cloud = "gce"
encrypt encryptValue = "false"
workload string
roachprod string
buildTag string
clusterName string
clusterWipe bool
zonesF string
teamCity bool
)
type encryptValue string
func (v *encryptValue) String() string {
return string(*v)
}
func (v *encryptValue) Set(s string) error {
if s == "random" {
*v = encryptValue(s)
return nil
}
t, err := strconv.ParseBool(s)
if err != nil {
return err
}
*v = encryptValue(fmt.Sprint(t))
return nil
}
func (v *encryptValue) asBool() bool {
if *v == "random" {
return rand.Intn(2) == 0
}
t, err := strconv.ParseBool(string(*v))
if err != nil {
return false
}
return t
}
func (v *encryptValue) Type() string {
return "string"
}
func ifLocal(trueVal, falseVal string) string {
if local {
return trueVal
}
return falseVal
}
func filepathAbs(path string) (string, error) {
path, err := filepath.Abs(path)
if err != nil {
return "", errors.WithStack(err)
}
return path, nil
}
func findBinary(binary, defValue string) (string, error) {
if binary == "" {
binary = defValue
}
// Check to see if binary exists and is a regular file and executable.
if fi, err := os.Stat(binary); err == nil && fi.Mode().IsRegular() && (fi.Mode()&0111) != 0 {
return filepathAbs(binary)
}
// Find the binary to run and translate it to an absolute path. First, look
// for the binary in PATH.
path, err := exec.LookPath(binary)
if err != nil {
if strings.HasPrefix(binary, "/") {
return "", errors.WithStack(err)
}
// We're unable to find the binary in PATH and "binary" is a relative path:
// look in the cockroach repo.
gopath := os.Getenv("GOPATH")
if gopath == "" {
gopath = filepath.Join(os.Getenv("HOME"), "go")
}
var binSuffix string
if !local {
binSuffix = ".docker_amd64"
}
dirs := []string{
filepath.Join(gopath, "/src/github.com/cockroachdb/cockroach/"),
filepath.Join(gopath, "/src/github.com/cockroachdb/cockroach/bin"+binSuffix),
filepath.Join(os.ExpandEnv("$PWD"), "bin"+binSuffix),
}
for _, dir := range dirs {
path = filepath.Join(dir, binary)
var err2 error
path, err2 = exec.LookPath(path)
if err2 == nil {
return filepathAbs(path)
}
}
return "", fmt.Errorf("failed to find %q in $PATH or any of %s", binary, dirs)
}
return filepathAbs(path)
}
func initBinaries() {
// If we're running against an existing "local" cluster, force the local flag
// to true in order to get the "local" test configurations.
if clusterName == "local" {
local = true
}
cockroachDefault := "cockroach"
if !local {
cockroachDefault = "cockroach-linux-2.6.32-gnu-amd64"
}
var err error
cockroach, err = findBinary(cockroach, cockroachDefault)
if err != nil {
fmt.Fprintf(os.Stderr, "%+v\n", err)
os.Exit(1)
}
workload, err = findBinary(workload, "workload")
if err != nil {
fmt.Fprintf(os.Stderr, "%+v\n", err)
os.Exit(1)
}
roachprod, err = findBinary(roachprod, "roachprod")
if err != nil {
fmt.Fprintf(os.Stderr, "%+v\n", err)
os.Exit(1)
}
}
type clusterRegistry struct {
mu struct {
syncutil.Mutex
clusters map[string]*cluster
tagCount map[string]int
// savedClusters keeps track of clusters that have been saved for further
// debugging. Each cluster comes with a message about the test failure
// causing it to be saved for debugging.
savedClusters map[*cluster]string
}
}
func newClusterRegistry() *clusterRegistry {
cr := &clusterRegistry{}
cr.mu.clusters = make(map[string]*cluster)
cr.mu.savedClusters = make(map[*cluster]string)
return cr
}
func (r *clusterRegistry) registerCluster(c *cluster) error {
r.mu.Lock()
defer r.mu.Unlock()
if r.mu.clusters[c.name] != nil {
return fmt.Errorf("cluster named %q already exists in registry", c.name)
}
r.mu.clusters[c.name] = c
return nil
}
func (r *clusterRegistry) unregisterCluster(c *cluster) bool {
r.mu.Lock()
defer r.mu.Unlock()
if _, ok := r.mu.clusters[c.name]; !ok {
// If the cluster is not registered, no-op. This allows the
// method to be called defensively.
return false
}
delete(r.mu.clusters, c.name)
if c.tag != "" {
if _, ok := r.mu.tagCount[c.tag]; !ok {
panic(fmt.Sprintf("tagged cluster not accounted for: %s", c))
}
r.mu.tagCount[c.tag]--
}
return true
}
func (r *clusterRegistry) countForTag(tag string) int {
r.mu.Lock()
defer r.mu.Unlock()
return r.mu.tagCount[tag]
}
// markClusterAsSaved marks c such that it will not be destroyed by
// destroyAllClusters.
// msg is a message recording the reason why the cluster is being saved (i.e.
// generally a test failure error).
func (r *clusterRegistry) markClusterAsSaved(c *cluster, msg string) {
r.mu.Lock()
r.mu.savedClusters[c] = msg
r.mu.Unlock()
}
// savedClusters returns the list of clusters that have been saved for
// debugging.
func (r *clusterRegistry) savedClusters() map[*cluster]string {
r.mu.Lock()
defer r.mu.Unlock()
res := make(map[*cluster]string, len(r.mu.savedClusters))
for c, msg := range r.mu.savedClusters {
res[c] = msg
}
return res
}
// destroyAllClusters destroys all the clusters (except for "saved" ones) and
// blocks until they're destroyed. It responds to context cancelation by
// interrupting the waiting; the cluster destruction itself does not inherit the
// cancelation.
func (r *clusterRegistry) destroyAllClusters(ctx context.Context, l *logger) {
// Fire off a goroutine to destroy all of the clusters.
done := make(chan struct{})
go func() {
defer close(done)
var clusters []*cluster
savedClusters := make(map[*cluster]struct{})
r.mu.Lock()
for _, c := range r.mu.clusters {
clusters = append(clusters, c)
}
for c := range r.mu.savedClusters {
savedClusters[c] = struct{}{}
}
r.mu.Unlock()
var wg sync.WaitGroup
wg.Add(len(clusters))
for _, c := range clusters {
go func(c *cluster) {
defer wg.Done()
if _, ok := savedClusters[c]; !ok {
// We don't close the logger here since the cluster may be still in use
// by a test, and so the logger might still be needed.
c.Destroy(ctx, dontCloseLogger, l)
}
}(c)
}
wg.Wait()
}()
select {
case <-done:
case <-ctx.Done():
}
}
func execCmd(ctx context.Context, l *logger, args ...string) error {
// NB: It is important that this waitgroup Waits after cancel() below.
var wg sync.WaitGroup
defer wg.Wait()
var cancel func()
ctx, cancel = context.WithCancel(ctx)
defer cancel()
l.Printf("> %s\n", strings.Join(args, " "))
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
debugStdoutBuffer, _ := circbuf.NewBuffer(1024)
debugStderrBuffer, _ := circbuf.NewBuffer(1024)
// Do a dance around https://github.com/golang/go/issues/23019.
// Briefly put, passing os.Std{out,err} to subprocesses isn't great for
// context cancellation as Run() will wait for any subprocesses to finish.
// For example, "roachprod run x -- sleep 20" would wait 20 seconds, even
// if the context got canceled right away. Work around the problem by passing
// pipes to the command on which we set aggressive deadlines once the context
// expires.
{
rOut, wOut, err := os.Pipe()
if err != nil {
return err
}
defer rOut.Close()
defer wOut.Close()
rErr, wErr, err := os.Pipe()
if err != nil {
return err
}
defer rErr.Close()
defer wErr.Close()
cmd.Stdout = wOut
wg.Add(3)
go func() {
defer wg.Done()
_, _ = io.Copy(l.stdout, io.TeeReader(rOut, debugStdoutBuffer))
}()
if l.stderr == l.stdout {
// If l.stderr == l.stdout, we use only one pipe to avoid
// duplicating everything.
wg.Done()
cmd.Stderr = wOut
} else {
cmd.Stderr = wErr
go func() {
defer wg.Done()
_, _ = io.Copy(l.stderr, io.TeeReader(rErr, debugStderrBuffer))
}()
}
go func() {
defer wg.Done()
<-ctx.Done()
// NB: setting a more aggressive deadline here makes TestClusterMonitor flaky.
now := timeutil.Now().Add(3 * time.Second)
_ = rOut.SetDeadline(now)
_ = wOut.SetDeadline(now)
_ = rErr.SetDeadline(now)
_ = wErr.SetDeadline(now)
}()
}
if err := cmd.Run(); err != nil {
cancel()
wg.Wait() // synchronize access to ring buffer
// Context deadline exceeded errors opaquely appear as "signal killed" when
// manifested. We surface this error explicitly.
if ctx.Err() == context.DeadlineExceeded {
return ctx.Err()
}
return errors.Wrapf(
err,
"%s returned:\nstderr:\n%s\nstdout:\n%s",
strings.Join(args, " "),
debugStderrBuffer.String(),
debugStdoutBuffer.String(),
)
}
return nil
}
// execCmdWithBuffer executes the given command and returns its stdout/stderr
// output. If the return code is not 0, an error is also returned.
// l is used to log the command before running it. No output is logged.
func execCmdWithBuffer(ctx context.Context, l *logger, args ...string) ([]byte, error) {
l.Printf("> %s\n", strings.Join(args, " "))
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
out, err := cmd.CombinedOutput()
if err != nil {
return out, errors.Wrapf(err, `%s`, strings.Join(args, ` `))
}
return out, nil
}
func makeGCEClusterName(name string) string {
name = strings.ToLower(name)
name = regexp.MustCompile(`[^-a-z0-9]+`).ReplaceAllString(name, "-")
name = regexp.MustCompile(`-+`).ReplaceAllString(name, "-")
return name
}
func makeClusterName(name string) string {
return makeGCEClusterName(name)
}
// MachineTypeToCPUs returns a CPU count for either a GCE or AWS
// machine type.
func MachineTypeToCPUs(s string) int {
{
// GCE machine types.
var v int
if _, err := fmt.Sscanf(s, "n1-standard-%d", &v); err == nil {
return v
}
if _, err := fmt.Sscanf(s, "n1-highcpu-%d", &v); err == nil {
return v
}
if _, err := fmt.Sscanf(s, "n1-highmem-%d", &v); err == nil {
return v
}
}
typeAndSize := strings.Split(s, ".")
if len(typeAndSize) == 2 {
size := typeAndSize[1]
switch size {
case "large":
return 2
case "xlarge":
return 4
case "2xlarge":
return 8
case "4xlarge":
return 16
case "9xlarge":
return 36
case "12xlarge":
return 48
case "18xlarge":
return 72
case "24xlarge":
return 96
}
}
fmt.Fprintf(os.Stderr, "unknown machine type: %s\n", s)
os.Exit(1)
return -1
}
func awsMachineType(cpus int) string {
switch {
case cpus <= 2:
return "c5d.large"
case cpus <= 4:
return "c5d.xlarge"
case cpus <= 8:
return "c5d.2xlarge"
case cpus <= 16:
return "c5d.4xlarge"
case cpus <= 36:
return "c5d.9xlarge"
case cpus <= 72:
return "c5d.18xlarge"
case cpus <= 96:
// There is no c5d.24xlarge.
return "m5d.24xlarge"
default:
panic(fmt.Sprintf("no aws machine type with %d cpus", cpus))
}
}
func gceMachineType(cpus int) string {
// TODO(peter): This is awkward: below 16 cpus, use n1-standard so that the
// machines have a decent amount of RAM. We could use customer machine
// configurations, but the rules for the amount of RAM per CPU need to be
// determined (you can't request any arbitrary amount of RAM).
if cpus < 16 {
return fmt.Sprintf("n1-standard-%d", cpus)
}
return fmt.Sprintf("n1-highcpu-%d", cpus)
}
type testI interface {
Name() string
Fatal(args ...interface{})
Fatalf(format string, args ...interface{})
Failed() bool
// Path to a directory where the test is supposed to store its log and other
// artifacts.
ArtifactsDir() string
logger() *logger
}
// TODO(tschottdorf): Consider using a more idiomatic approach in which options
// act upon a config struct:
// https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis
type option interface {
option()
}
type nodeSelector interface {
option
merge(nodeListOption) nodeListOption
}
type nodeListOption []int
func (n nodeListOption) option() {}
func (n nodeListOption) merge(o nodeListOption) nodeListOption {
t := make(nodeListOption, 0, len(n)+len(o))
t = append(t, n...)
t = append(t, o...)
sort.Ints([]int(t))
r := t[:1]
for i := 1; i < len(t); i++ {
if r[len(r)-1] != t[i] {
r = append(r, t[i])
}
}
return r
}
func (n nodeListOption) randNode() nodeListOption {
return nodeListOption{n[rand.Intn(len(n))]}
}
func (n nodeListOption) String() string {
if len(n) == 0 {
return ""
}
var buf bytes.Buffer
buf.WriteByte(':')
appendRange := func(start, end int) {
if buf.Len() > 1 {
buf.WriteByte(',')
}
if start == end {
fmt.Fprintf(&buf, "%d", start)
} else {
fmt.Fprintf(&buf, "%d-%d", start, end)
}
}
start, end := -1, -1
for _, i := range n {
if start != -1 && end == i-1 {
end = i
continue
}
if start != -1 {
appendRange(start, end)
}
start, end = i, i
}
if start != -1 {
appendRange(start, end)
}
return buf.String()
}
// clusterSpec represents a test's description of what its cluster needs to
// look like. It becomes part of a clusterConfig when the cluster is created.
type clusterSpec struct {
NodeCount int
// CPUs is the number of CPUs per node.
CPUs int
Zones string
Geo bool
Lifetime time.Duration
ReusePolicy clusterReusePolicy
}
func makeClusterSpec(nodeCount int, opts ...createOption) clusterSpec {
spec := clusterSpec{NodeCount: nodeCount}
defaultOpts := []createOption{cpu(4), nodeLifetimeOption(12 * time.Hour), reuseAny()}
for _, o := range append(defaultOpts, opts...) {
o.apply(&spec)
}
return spec
}
func clustersCompatible(s1, s2 clusterSpec) bool {
s1.Lifetime = 0
s2.Lifetime = 0
return s1 == s2
}
func (s clusterSpec) String() string {
str := fmt.Sprintf("n%dcpu%d", s.NodeCount, s.CPUs)
if s.Geo {
str += "-geo"
}
return str
}
func (s *clusterSpec) args() []string {
var args []string
switch cloud {
case "aws":
if s.Zones != "" {
fmt.Fprintf(os.Stderr, "zones spec not yet supported on AWS: %s\n", s.Zones)
os.Exit(1)
}
if s.Geo {
fmt.Fprintf(os.Stderr, "geo-distributed clusters not yet supported on AWS\n")
os.Exit(1)
}
args = append(args, "--clouds=aws")
}
if !local && s.CPUs != 0 {
switch cloud {
case "aws":
args = append(args, "--aws-machine-type-ssd="+awsMachineType(s.CPUs))
case "gce":
args = append(args, "--gce-machine-type="+gceMachineType(s.CPUs))
}
}
if s.Zones != "" {
args = append(args, "--gce-zones="+s.Zones)
}
if s.Geo {
args = append(args, "--geo")
}
if s.Lifetime != 0 {
args = append(args, "--lifetime="+s.Lifetime.String())
}
return args
}
func (s *clusterSpec) expiration() time.Time {
l := s.Lifetime
if l == 0 {
l = 12 * time.Hour
}
return timeutil.Now().Add(l)
}
type createOption interface {
apply(spec *clusterSpec)
}
type nodeCPUOption int
func (o nodeCPUOption) apply(spec *clusterSpec) {
spec.CPUs = int(o)
}
// cpu is a node option which requests nodes with the specified number of CPUs.
func cpu(n int) nodeCPUOption {
return nodeCPUOption(n)
}
type nodeGeoOption struct{}
func (o nodeGeoOption) apply(spec *clusterSpec) {
spec.Geo = true
}
// geo is a node option which requests geo-distributed nodes.
func geo() nodeGeoOption {
return nodeGeoOption{}
}
type nodeZonesOption string
func (o nodeZonesOption) apply(spec *clusterSpec) {
spec.Zones = string(o)
}
// zones is a node option which requests geo-distributed nodes. Note that this
// overrides the --zones flag and is useful for tests that require running on
// specific zones.
func zones(s string) nodeZonesOption {
return nodeZonesOption(s)
}
type nodeLifetimeOption time.Duration
func (o nodeLifetimeOption) apply(spec *clusterSpec) {
spec.Lifetime = time.Duration(o)
}
// clusterReusePolicy indicates what clusters a particular test can run on and
// who (if anybody) can reuse the cluster after the test has finished running
// (either passing or failing). See the individual policies for details.
//
// Only tests whose cluster spec matches can ever run on the same
// cluster, regardless of this policy.
//
// Clean clusters (freshly-created clusters or cluster on which a test with the
// Any policy ran) are accepted by all policies.
//
// Note that not all combinations of "what cluster can I accept" and "how am I
// soiling this cluster" can be expressed. For example, there's no way to
// express that I'll accept a cluster that was tagged a certain way but after me
// nobody else can reuse the cluster at all.
type clusterReusePolicy interface {
clusterReusePolicy()
}
// reusePolicyAny means that only clean clusters are accepted and the cluster
// can be used by any other test (i.e. the cluster remains "clean").
type reusePolicyAny struct{}
// reusePolicyNone means that only clean clusters are accepted and the cluster
// cannot be reused afterwards.
type reusePolicyNone struct{}
// reusePolicyTagged means that clusters left over by similarly-tagged tests are
// accepted in addition to clean cluster and, regardless of how the cluster
// started up, it will be tagged with the given tag at the end (so only
// similarly-tagged tests can use it afterwards).
//
// The idea is that a tag identifies a particular way in which a test is soiled,
// since it's common for groups of tests to mess clusters up in similar ways and
// to also be able to reset the cluster when the test starts. It's like a virus
// - if you carry it, you infect a clean host and can otherwise intermingle with
// other hosts that are already infected. Note that using this policy assumes
// that the way in which every test soils the cluster is idempotent.
type reusePolicyTagged struct{ tag string }
func (reusePolicyAny) clusterReusePolicy() {}
func (reusePolicyNone) clusterReusePolicy() {}
func (reusePolicyTagged) clusterReusePolicy() {}
type clusterReusePolicyOption struct {
p clusterReusePolicy
}
func reuseAny() clusterReusePolicyOption {
return clusterReusePolicyOption{p: reusePolicyAny{}}
}
func reuseNone() clusterReusePolicyOption {
return clusterReusePolicyOption{p: reusePolicyNone{}}
}
func reuseTagged(tag string) clusterReusePolicyOption {
return clusterReusePolicyOption{p: reusePolicyTagged{tag: tag}}
}
func (p clusterReusePolicyOption) apply(spec *clusterSpec) {
spec.ReusePolicy = p.p
}
// cluster provides an interface for interacting with a set of machines,
// starting and stopping a cockroach cluster on a subset of those machines, and
// running load generators and other operations on the machines.
//
// A cluster is safe for concurrent use by multiple goroutines.
type cluster struct {
name string
tag string
spec clusterSpec
status func(...interface{})
t testI
// r is the registry tracking this cluster. Destroying the cluster will
// unregister it.
r *clusterRegistry
// l is the logger used to log various cluster operations.
// DEPRECATED for use outside of cluster methods: Use a test's t.l instead.
// This is generally set to the current test's logger.
l *logger
expiration time.Time
// encryptDefault is true if the cluster should default to having encryption
// at rest enabled. The default only applies if encryption is not explicitly
// enabled or disabled by options passed to Start.
encryptDefault bool
// destroyState contains state related to the cluster's destruction.
destroyState destroyState
}
func (c *cluster) String() string {
return fmt.Sprintf("%s [tag:%s] (%d nodes)", c.name, c.tag, c.spec.NodeCount)
}
type destroyState struct {
// owned is set if this instance is responsible for `roachprod destroy`ing the
// cluster. It is set when a new cluster is created, but not when we attach to
// an existing roachprod cluster.
// If not set, Destroy() only wipes the cluster.
owned bool
// alloc is set if owned is set. If set, it represents resources in a
// QuotaPool that need to be released when the cluster is destroyed.
alloc *quotapool.IntAlloc
mu struct {
syncutil.Mutex
loggerClosed bool
// destroyed is used to coordinate between different goroutines that want to
// destroy a cluster. It is set once the destroy process starts. It it
// closed when the destruction is complete.
destroyed chan struct{}
// saved is set if this cluster should not be wiped or destroyed. It should
// be left alone for further debugging. This is kept in sync with the
// clusterRegistry which maintains a list of all saved clusters.
saved bool
// savedMsg records a message describing the reason why the cluster is being
// saved.
savedMsg string
}
}
// closeLogger closes c.l. It can be called multiple times.
func (c *cluster) closeLogger() {
c.destroyState.mu.Lock()
defer c.destroyState.mu.Unlock()
if c.destroyState.mu.loggerClosed {
return
}
c.destroyState.mu.loggerClosed = true
c.l.close()
}
type clusterConfig struct {
spec clusterSpec
// artifactsDir is the path where log file will be stored.
artifactsDir string
localCluster bool
useIOBarrier bool
alloc *quotapool.IntAlloc
}
// clusterFactory is a creator of clusters.
type clusterFactory struct {
// namePrefix is prepended to all cluster names.
namePrefix string
// counter is incremented with every new cluster. It's used as part of the cluster's name.
// Accessed atomically.
counter uint64
// The registry with whom all clustered will be registered.
r *clusterRegistry
// artifactsDir is the directory in which the cluster creation log file will be placed.
artifactsDir string
// sem is a semaphore throttling the creation of clusters (because AWS has
// ridiculous API calls limits).
sem chan struct{}
}
func newClusterFactory(
user string, clustersID string, artifactsDir string, r *clusterRegistry, concurrentCreations int,
) *clusterFactory {
secs := timeutil.Now().Unix()
var prefix string
if clustersID != "" {
prefix = fmt.Sprintf("%s-%s-%d-", user, clustersID, secs)
} else {
prefix = fmt.Sprintf("%s-%d-", user, secs)
}
return &clusterFactory{
sem: make(chan struct{}, concurrentCreations),
namePrefix: prefix,
artifactsDir: artifactsDir,
r: r,
}
}
// acquireSem blocks until the semaphore allows a new cluster creation. The
// returned function needs to be called when cluster creation finished.
func (f *clusterFactory) acquireSem() func() {
f.sem <- struct{}{}
return f.releaseSem
}
func (f *clusterFactory) releaseSem() {
<-f.sem
}
// newCluster creates a new roachprod cluster.
//
// setStatus is called with status messages indicating the stage of cluster
// creation.
//
// NOTE: setTest() needs to be called before a test can use this cluster.
func (f *clusterFactory) newCluster(
ctx context.Context, cfg clusterConfig, setStatus func(string), teeOpt teeOptType,
) (*cluster, error) {
if ctx.Err() != nil {
return nil, errors.Wrap(ctx.Err(), "newCluster")
}
var name string
if cfg.localCluster {
name = "local" // The roachprod tool understands this magic name.
} else {
count := atomic.AddUint64(&f.counter, 1)
name = makeClusterName(
fmt.Sprintf("%s-%02d-%s", f.namePrefix, count, cfg.spec.String()))
}
if cfg.spec.NodeCount == 0 {
// For tests. Return the minimum that makes them happy.
c := &cluster{
name: name,
expiration: timeutil.Now().Add(24 * time.Hour),
status: func(...interface{}) {},
r: f.r,
}
if err := f.r.registerCluster(c); err != nil {
return nil, err
}
return c, nil
}
exp := cfg.spec.expiration()
if cfg.localCluster {
// Local clusters never expire.
exp = timeutil.Now().Add(100000 * time.Hour)
}
c := &cluster{
name: name,
spec: cfg.spec,
status: func(...interface{}) {},
expiration: exp,
encryptDefault: encrypt.asBool(),
r: f.r,
destroyState: destroyState{
owned: true,
alloc: cfg.alloc,
},
}
sargs := []string{roachprod, "create", c.name, "-n", fmt.Sprint(c.spec.NodeCount)}
sargs = append(sargs, cfg.spec.args()...)
if !local && zonesF != "" && cfg.spec.Zones == "" {
sargs = append(sargs, "--gce-zones="+zonesF)
}
if !cfg.useIOBarrier {
sargs = append(sargs, "--local-ssd-no-ext4-barrier")
}
setStatus("acquring cluster creation semaphore")
release := f.acquireSem()
defer release()
setStatus("roachprod create")
c.status("creating cluster")
// Logs for creating a new cluster go to a dedicated log file.
logPath := filepath.Join(f.artifactsDir, runnerLogsDir, "cluster-create", name+".log")
l, err := rootLogger(logPath, teeOpt)
if err != nil {
log.Fatal(ctx, err)
}
success := false
// Attempt to create a cluster several times, cause them clouds be flaky that
// my phone says it's snowing.
for i := 0; i < 3; i++ {
err = execCmd(ctx, l, sargs...)
if err == nil {
success = true
break
}
l.PrintfCtx(ctx, "Failed to create cluster.")
if !strings.Contains(err.Error(), "already exists") {
l.PrintfCtx(ctx, "Cleaning up in case it was partially created.")
c.Destroy(ctx, closeLogger, l)
} else {
break
}
}
if !success {
return nil, err
}
if err := f.r.registerCluster(c); err != nil {
return nil, err
}
c.status("idle")
return c, nil
}
type attachOpt struct {
skipValidation bool
// Implies skipWipe.
skipStop bool