-
Notifications
You must be signed in to change notification settings - Fork 115
/
worker.go
954 lines (829 loc) · 27.9 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
package registration
import (
"context"
"crypto/x509"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/pkg/errors"
flag "github.com/spf13/pflag"
"github.com/spf13/viper"
"github.com/oasislabs/oasis-core/go/common"
"github.com/oasislabs/oasis-core/go/common/crypto/signature"
fileSigner "github.com/oasislabs/oasis-core/go/common/crypto/signature/signers/file"
"github.com/oasislabs/oasis-core/go/common/entity"
"github.com/oasislabs/oasis-core/go/common/identity"
"github.com/oasislabs/oasis-core/go/common/logging"
"github.com/oasislabs/oasis-core/go/common/node"
"github.com/oasislabs/oasis-core/go/common/persistent"
consensus "github.com/oasislabs/oasis-core/go/consensus/api"
epochtime "github.com/oasislabs/oasis-core/go/epochtime/api"
"github.com/oasislabs/oasis-core/go/oasis-node/cmd/common/flags"
registry "github.com/oasislabs/oasis-core/go/registry/api"
runtimeRegistry "github.com/oasislabs/oasis-core/go/runtime/registry"
sentryClient "github.com/oasislabs/oasis-core/go/sentry/client"
workerCommon "github.com/oasislabs/oasis-core/go/worker/common"
"github.com/oasislabs/oasis-core/go/worker/common/p2p"
)
const (
workerRegistrationDBBucketName = "worker/registration"
// CfgRegistrationEntity configures the registration worker entity.
CfgRegistrationEntity = "worker.registration.entity"
// CfgDebugRegistrationPrivateKey configures the registration worker private key.
CfgDebugRegistrationPrivateKey = "worker.registration.debug.private_key"
// CfgRegistrationForceRegister overrides a previously saved deregistration
// request.
CfgRegistrationForceRegister = "worker.registration.force_register"
// CfgRegistrationRotateCerts sets the number of epochs that a node's TLS
// certificate should be valid for.
CfgRegistrationRotateCerts = "worker.registration.rotate_certs"
)
var (
deregistrationRequestStoreKey = []byte("deregistration requested")
// Flags has the configuration flags.
Flags = flag.NewFlagSet("", flag.ContinueOnError)
allowUnroutableAddresses bool
)
// RegisterNodeHook is a function that is used to update the node descriptor.
type RegisterNodeHook func(*node.Node) error
// Delegate is the interface for objects that wish to know about the worker's events.
type Delegate interface {
// RegistrationStopped is called by the worker when the registration loop exits cleanly.
RegistrationStopped()
}
// RoleProvider is the node descriptor role provider interface.
//
// It is used to reserve a slot in the node descriptor that will be filled when the role provider
// decides that it is available. This is used so that the registration worker knows when certain
// roles are ready to be serviced by the node.
//
// An unavailable role provider will prevent the node from being (re-)registered.
type RoleProvider interface {
// SetAvailable signals that the role provider is available and that node registration can
// thus proceed.
SetAvailable(hook RegisterNodeHook)
// SetUnavailable signals that the role provider is unavailable and that node registration
// should be blocked until the role provider becomes available.
SetUnavailable()
}
type roleProvider struct {
sync.Mutex
w *Worker
role node.RolesMask
runtimeID *common.Namespace
hook RegisterNodeHook
}
func (rp *roleProvider) SetAvailable(hook RegisterNodeHook) {
rp.Lock()
rp.hook = hook
rp.Unlock()
rp.w.registerCh <- struct{}{}
}
func (rp *roleProvider) SetUnavailable() {
rp.SetAvailable(nil)
}
// Worker is a service handling worker node registration.
type Worker struct { // nolint: maligned
sync.RWMutex
workerCommonCfg *workerCommon.Config
store *persistent.ServiceStore
storedDeregister bool
deregRequested uint32
delegate Delegate
entityID signature.PublicKey
registrationSigner signature.Signer
sentryAddresses []node.Address
sentryCerts []*x509.Certificate
runtimeRegistry runtimeRegistry.Registry
epochtime epochtime.Backend
registry registry.Backend
identity *identity.Identity
p2p *p2p.P2P
ctx context.Context
// Bandaid: Idempotent Stop for testing.
stopped uint32
stopCh chan struct{} // closed internally to trigger stop
quitCh chan struct{} // closed after stopped
initialRegCh chan struct{} // closed after initial registration
stopRegCh chan struct{} // closed internally to trigger clean registration lapse
logger *logging.Logger
consensus consensus.Backend
roleProviders []*roleProvider
registerCh chan struct{}
}
// DebugForceallowUnroutableAddresses allows unroutable addresses.
func DebugForceAllowUnroutableAddresses() {
allowUnroutableAddresses = true
}
func (w *Worker) registrationLoop() { // nolint: gocyclo
// If we have any sentry nodes, let them know about our TLS certs.
sentryAddrs := w.sentryAddresses
sentryCerts := w.sentryCerts
if len(sentryAddrs) > 0 {
for i, sentryAddr := range sentryAddrs {
var numRetries uint
pushCerts := func() error {
client, err := sentryClient.New(&sentryAddr, sentryCerts[i], w.identity)
if err != nil {
if numRetries < 60 {
numRetries++
return err
}
return backoff.Permanent(err)
}
defer client.Close()
certs := [][]byte{}
if c := w.identity.GetTLSCertificate(); c != nil {
certs = append(certs, c.Certificate[0])
}
if c := w.identity.GetNextTLSCertificate(); c != nil {
certs = append(certs, c.Certificate[0])
}
err = client.SetUpstreamTLSCertificates(w.ctx, certs)
if err != nil {
return err
}
return nil
}
sched := backoff.NewConstantBackOff(1 * time.Second)
err := backoff.Retry(pushCerts, backoff.WithContext(sched, w.ctx))
if err != nil {
w.logger.Error("unable to push upstream TLS certificates to sentry node!",
"err", err,
"sentry_address", sentryAddr,
)
}
}
}
// Delay node registration till after the consensus service has
// finished initial synchronization if applicable.
if w.consensus != nil {
select {
case <-w.stopCh:
return
case <-w.consensus.Synced():
}
}
// (re-)register the node on each epoch transition. This doesn't
// need to be strict block-epoch time, since it just serves to
// extend the node's expiration.
ch, sub := w.epochtime.WatchEpochs()
defer sub.Close()
regFn := func(epoch epochtime.EpochTime, hook RegisterNodeHook, retry bool) error {
var off backoff.BackOff
switch retry {
case true:
expBackoff := backoff.NewExponentialBackOff()
expBackoff.MaxElapsedTime = 0
off = expBackoff
case false:
off = &backoff.StopBackOff{}
}
off = backoff.WithContext(off, w.ctx)
// WARNING: This can potentially infinite loop, on certain
// "shouldn't be possible" pathological failures.
//
// w.ctx being canceled will break out of the loop correctly
// but it's entirely possible to sit around in an infinite
// retry loop with no hope of success.
return backoff.Retry(func() error {
// Update the epoch if it happens to change while retrying.
var ok bool
select {
case <-w.stopCh:
return context.Canceled
case epoch, ok = <-ch:
if !ok {
return context.Canceled
}
default:
}
return w.registerNode(epoch, hook)
}, off)
}
var epoch epochtime.EpochTime
first := true
Loop:
for {
select {
case <-w.stopCh:
return
case <-w.stopRegCh:
w.logger.Info("node deregistration and eventual shutdown requested")
return
case epoch = <-ch:
// Epoch updated, check if we can submit a registration.
// Check if we need to rotate the node's TLS certificate.
if !w.identity.DoNotRotateTLS {
// Per how many epochs should we do rotations?
rotateTLSCertsPer := epochtime.EpochTime(viper.GetUint64(CfgRegistrationRotateCerts))
if rotateTLSCertsPer != 0 && epoch%rotateTLSCertsPer == 0 {
baseEpoch, err := w.epochtime.GetBaseEpoch(w.ctx)
if err != nil {
w.logger.Error("failed to get base epoch, node TLS certificate rotation failed",
"new_epoch", epoch,
"err", err,
)
} else {
// Rotate node TLS certificates (but not on the
// first epoch).
// TODO: Make this time-based instead.
if epoch != baseEpoch {
err := w.identity.RotateCertificates()
if err != nil {
w.logger.Error("node TLS certificate rotation failed",
"new_epoch", epoch,
"err", err,
)
} else {
w.logger.Info("node TLS certificates have been rotated",
"new_epoch", epoch,
)
}
}
}
}
}
case <-w.registerCh:
// Notification that a role provider has been updated.
}
// If there are any role providers which are still not ready, we must wait for more
// notifications.
hooks := func() (h []RegisterNodeHook) {
w.RLock()
defer w.RUnlock()
for _, rp := range w.roleProviders {
rp.Lock()
role := rp.role
hook := rp.hook
rp.Unlock()
if hook == nil {
return nil
}
h = append(h, func(n *node.Node) error {
n.AddRoles(role)
return hook(n)
})
}
return
}()
if hooks == nil {
continue Loop
}
// Package all per-role/runtime hooks into a metahook.
hook := func(n *node.Node) error {
for _, hook := range hooks {
if err := hook(n); err != nil {
return fmt.Errorf("hook failed: %w", err)
}
}
return nil
}
// Attempt a registration.
if err := regFn(epoch, hook, first); err != nil {
if first {
w.logger.Error("failed to register node",
"err", err,
)
// This is by definition a cancellation as the first
// registration retries until success. So we can avoid
// another iteration of the loop to figure this out
// and abort early.
return
}
w.logger.Error("failed to re-register node",
"err", err,
)
continue
}
if first {
close(w.initialRegCh)
first = false
}
}
}
func (w *Worker) doNodeRegistration() {
defer close(w.quitCh)
if !w.storedDeregister {
w.registrationLoop()
}
// Loop broken; shutdown requested.
publicKey := w.identity.NodeSigner.Public()
regCh, sub, err := w.registry.WatchNodes(w.ctx)
if err != nil {
w.logger.Error("failed to watch nodes",
"err", err,
)
return
}
defer sub.Close()
// Check if the node is already deregistered.
_, err = w.registry.GetNode(w.ctx, ®istry.IDQuery{ID: publicKey, Height: consensus.HeightLatest})
if err == registry.ErrNoSuchNode {
w.registrationStopped()
return
}
if err != nil {
w.logger.Error("can't get this node from the registry during shutdown wait",
"err", err,
)
return
}
w.logger.Info("waiting for node to deregister")
for {
select {
case ev := <-regCh:
if !ev.IsRegistration && ev.Node.ID.Equal(publicKey) {
w.registrationStopped()
return
}
case <-w.ctx.Done():
return
case <-w.stopCh:
return
}
}
}
func (w *Worker) registrationStopped() {
if w.delegate != nil {
w.delegate.RegistrationStopped()
}
}
// InitialRegistrationCh returns the initial registration channel.
func (w *Worker) InitialRegistrationCh() chan struct{} {
return w.initialRegCh
}
// NewRoleProvider creates a new role provider slot.
//
// Each part of the code that wishes to contribute something to the node descriptor can use this
// method to ask the registration worker to create a slot. The slot can (later) be toggled to be
// either available or unavailable. An unavailable slot will prevent the node registration from
// taking place.
//
// The returned role provider is in unavailable state.
func (w *Worker) NewRoleProvider(role node.RolesMask) (RoleProvider, error) {
return w.newRoleProvider(role, nil)
}
// NewRuntimeRoleProvider creates a new runtime role provider slot.
//
// Each part of the code that wishes to contribute something to the node descriptor can use this
// method to ask the registration worker to create a slot. The slot can (later) be toggled to be
// either available or unavailable. An unavailable slot will prevent the node registration from
// taking place.
//
// The returned role provider is in unavailable state.
func (w *Worker) NewRuntimeRoleProvider(role node.RolesMask, runtimeID common.Namespace) (RoleProvider, error) {
return w.newRoleProvider(role, &runtimeID)
}
func (w *Worker) newRoleProvider(role node.RolesMask, runtimeID *common.Namespace) (RoleProvider, error) {
if !role.IsSingleRole() {
return nil, fmt.Errorf("RegisterRole: registration role mask does not encode a single role. RoleMask: '%s'", role)
}
rp := &roleProvider{
w: w,
role: role,
runtimeID: runtimeID,
}
w.Lock()
w.roleProviders = append(w.roleProviders, rp)
w.Unlock()
return rp, nil
}
func (w *Worker) gatherConsensusAddresses(sentryConsensusAddrs []node.ConsensusAddress) ([]node.ConsensusAddress, error) {
var consensusAddrs []node.ConsensusAddress
var err error
switch len(w.sentryAddresses) > 0 {
// If sentry nodes are used, use sentry addresses.
case true:
consensusAddrs = sentryConsensusAddrs
// Otherwise gather consensus addresses.
case false:
consensusAddrs, err = w.consensus.GetAddresses()
if err != nil {
return nil, fmt.Errorf("worker/registration: failed to get validator's consensus address(es): %w", err)
}
}
// Filter out any potentially invalid addresses.
var validatedAddrs []node.ConsensusAddress
for _, addr := range consensusAddrs {
if !addr.ID.IsValid() {
w.logger.Error("worker/registration: skipping validator address due to invalid ID",
"addr", addr,
)
continue
}
if err = registry.VerifyAddress(addr.Address, allowUnroutableAddresses); err != nil {
w.logger.Error("worker/registration: skipping validator address due to invalid address",
"addr", addr,
"err", err,
)
continue
}
validatedAddrs = append(validatedAddrs, addr)
}
if len(validatedAddrs) == 0 {
return nil, fmt.Errorf("worker/registration: node has no valid consensus addresses")
}
return validatedAddrs, nil
}
func (w *Worker) gatherCommitteeAddresses(sentryCommitteeAddrs []node.CommitteeAddress) ([]node.CommitteeAddress, error) {
var committeeAddresses []node.CommitteeAddress
switch len(w.sentryAddresses) > 0 {
// If sentry nodes are used, use sentry addresses.
case true:
committeeAddresses = sentryCommitteeAddrs
// Otherwise gather committee addresses.
case false:
addrs, err := w.workerCommonCfg.GetNodeAddresses()
if err != nil {
return nil, fmt.Errorf("worker/registration: failed to register node: unable to get node addresses: %w", err)
}
for _, addr := range addrs {
committeeAddresses = append(committeeAddresses, node.CommitteeAddress{
Certificate: w.identity.GetTLSCertificate().Certificate[0],
Address: addr,
})
// Make sure to also include the certificate that will be valid
// in the next epoch, so that the node remains reachable.
if nextCert := w.identity.GetNextTLSCertificate(); nextCert != nil {
committeeAddresses = append(committeeAddresses, node.CommitteeAddress{
Certificate: nextCert.Certificate[0],
Address: addr,
})
}
}
}
// Filter out any potentially invalid addresses.
var validatedAddrs []node.CommitteeAddress
for _, addr := range committeeAddresses {
if _, err := addr.ParseCertificate(); err != nil {
w.logger.Error("worker/registration: skipping node address due to invalid certificate",
"addr", addr,
"err", err,
)
continue
}
if err := registry.VerifyAddress(addr.Address, allowUnroutableAddresses); err != nil {
w.logger.Error("worker/registration: skipping node address due to invalid address",
"addr", addr,
"err", err,
)
continue
}
validatedAddrs = append(validatedAddrs, addr)
}
if len(validatedAddrs) == 0 {
return nil, fmt.Errorf("worker/registration: node has no valid committee addresses")
}
return validatedAddrs, nil
}
func (w *Worker) registerNode(epoch epochtime.EpochTime, hook RegisterNodeHook) error {
identityPublic := w.identity.NodeSigner.Public()
w.logger.Info("performing node (re-)registration",
"epoch", epoch,
"node_id", identityPublic.String(),
)
var nextCert []byte
if c := w.identity.GetNextTLSCertificate(); c != nil {
nextCert = c.Certificate[0]
}
nodeDesc := node.Node{
ID: identityPublic,
EntityID: w.entityID,
Expiration: uint64(epoch) + 2,
Committee: node.CommitteeInfo{
Certificate: w.identity.GetTLSCertificate().Certificate[0],
NextCertificate: nextCert,
},
P2P: node.P2PInfo{
ID: w.identity.P2PSigner.Public(),
},
Consensus: node.ConsensusInfo{
ID: w.identity.ConsensusSigner.Public(),
},
}
if err := hook(&nodeDesc); err != nil {
return err
}
// Sanity check to prevent an invalid registration when no role provider added any runtimes but
// runtimes are required due to the specified role.
if nodeDesc.HasRoles(registry.RuntimesRequiredRoles) && len(nodeDesc.Runtimes) == 0 {
w.logger.Error("not registering: no runtimes provided while runtimes are required",
"node_descriptor", nodeDesc,
)
return fmt.Errorf("registration: no runtimes provided while runtimes are required")
}
var sentryConsensusAddrs []node.ConsensusAddress
var sentryCommitteeAddrs []node.CommitteeAddress
if len(w.sentryAddresses) > 0 {
sentryConsensusAddrs, sentryCommitteeAddrs = w.querySentries()
}
// Add Consensus Addresses if required.
if nodeDesc.HasRoles(registry.ConsensusAddressRequiredRoles) {
addrs, err := w.gatherConsensusAddresses(sentryConsensusAddrs)
if err != nil {
return fmt.Errorf("error gathering consensus addresses: %w", err)
}
nodeDesc.Consensus.Addresses = addrs
}
// Add Committee Addresses if required.
if nodeDesc.HasRoles(registry.CommitteeAddressRequiredRoles) {
addrs, err := w.gatherCommitteeAddresses(sentryCommitteeAddrs)
if err != nil {
return fmt.Errorf("error gathering committee addresses: %w", err)
}
nodeDesc.Committee.Addresses = addrs
}
// Add P2P Addresses if required.
if nodeDesc.HasRoles(registry.P2PAddressRequiredRoles) {
nodeDesc.P2P.Addresses = w.p2p.Addresses()
}
nodeSigners := []signature.Signer{
w.registrationSigner,
w.identity.P2PSigner,
w.identity.ConsensusSigner,
w.identity.GetTLSSigner(),
}
if !w.identity.NodeSigner.Public().Equal(w.registrationSigner.Public()) {
// In the case where the registration signer is the entity signer
// then we prepend the node signer so that the descriptor is always
// signed by the node itself.
nodeSigners = append([]signature.Signer{w.identity.NodeSigner}, nodeSigners...)
}
sigNode, err := node.MultiSignNode(nodeSigners, registry.RegisterNodeSignatureContext, &nodeDesc)
if err != nil {
w.logger.Error("failed to register node: unable to sign node descriptor",
"err", err,
)
return err
}
tx := registry.NewRegisterNodeTx(0, nil, sigNode)
if err := consensus.SignAndSubmitTx(w.ctx, w.consensus, w.registrationSigner, tx); err != nil {
w.logger.Error("failed to register node",
"err", err,
)
return err
}
w.logger.Info("node registered with the registry")
return nil
}
func (w *Worker) querySentries() ([]node.ConsensusAddress, []node.CommitteeAddress) {
var consensusAddrs []node.ConsensusAddress
var committeeAddrs []node.CommitteeAddress
var err error
sentryAddrs := w.sentryAddresses
sentryCerts := w.sentryCerts
for i, sentryAddr := range sentryAddrs {
var client *sentryClient.Client
client, err = sentryClient.New(&sentryAddr, sentryCerts[i], w.identity)
if err != nil {
w.logger.Warn("failed to create client to a sentry node",
"err", err,
"sentry_address", sentryAddr,
)
continue
}
defer client.Close()
// Query sentry node for addresses.
sentryAddresses, err := client.GetAddresses(w.ctx)
if err != nil {
w.logger.Warn("failed to obtain addresses from sentry node",
"err", err,
"sentry_address", sentryAddr,
)
}
// Keep sentries updated with our latest TLS certificates.
certs := [][]byte{}
if c := w.identity.GetTLSCertificate(); c != nil {
certs = append(certs, c.Certificate[0])
}
if c := w.identity.GetNextTLSCertificate(); c != nil {
certs = append(certs, c.Certificate[0])
}
err = client.SetUpstreamTLSCertificates(w.ctx, certs)
if err != nil {
w.logger.Warn("failed to provide upstream TLS certificates to sentry node",
"err", err,
"sentry_address", sentryAddr,
)
}
consensusAddrs = append(consensusAddrs, sentryAddresses.Consensus...)
committeeAddrs = append(committeeAddrs, sentryAddresses.Committee...)
}
if len(consensusAddrs) == 0 {
errMsg := "failed to obtain any consensus address from the configured sentry nodes"
w.logger.Error(errMsg,
"sentry_addresses", sentryAddrs,
)
}
if len(committeeAddrs) == 0 {
errMsg := "failed to obtain any committee address from the configured sentry nodes"
w.logger.Error(errMsg,
"sentry_addresses", sentryAddrs,
)
}
return consensusAddrs, committeeAddrs
}
// RequestDeregistration requests that the node not register itself in the next epoch.
func (w *Worker) RequestDeregistration() error {
if !atomic.CompareAndSwapUint32(&w.deregRequested, 0, 1) {
// Deregistration already requested, don't do anything.
return nil
}
storedDeregister := true
err := w.store.PutCBOR(deregistrationRequestStoreKey, &storedDeregister)
if err != nil {
w.logger.Error("can't persist deregistration request",
"err", err,
)
// Let them request it again in this case.
atomic.StoreUint32(&w.deregRequested, 0)
return err
}
close(w.stopRegCh)
return nil
}
// GetRegistrationSigner loads the signing credentials as configured by this package's flags.
func GetRegistrationSigner(logger *logging.Logger, dataDir string, identity *identity.Identity) (signature.PublicKey, signature.Signer, error) {
var defaultPk signature.PublicKey
// If the test entity is enabled, use the entity signing key for signing
// registrations.
if flags.DebugTestEntity() {
testEntity, testSigner, _ := entity.TestEntity()
return testEntity.ID, testSigner, nil
}
// Load the registration entity descriptor.
f := viper.GetString(CfgRegistrationEntity)
if f == "" {
// TODO: There are certain configurations (eg: the test client) that
// spin up workers, which require a registration worker, but don't
// need it, and do not have an owning entity. The registration worker
// should not be initialized in this case.
return defaultPk, nil, nil
}
// Attempt to load the entity descriptor.
entity, err := entity.LoadDescriptor(f)
if err != nil {
return defaultPk, nil, errors.Wrap(err, "worker/registration: failed to load entity descriptor")
}
if !entity.AllowEntitySignedNodes {
// If the entity does not allow any entity-signed nodes, then
// registrations will always be node-signed.
return entity.ID, identity.NodeSigner, nil
}
for _, v := range entity.Nodes {
if v.Equal(identity.NodeSigner.Public()) {
// If the node is in the entity's list of allowed nodes
// then registrations MUST be node-signed.
return entity.ID, identity.NodeSigner, nil
}
}
if !flags.DebugDontBlameOasis() {
return defaultPk, nil, fmt.Errorf("worker/registration: entity signed nodes disallowed by node config")
}
// At this point, the entity allows entity-signed registrations,
// and the node is not in the entity's list of allowed
// node-signed nodes.
//
// TODO: The only reason why an entity descriptor ever needs to
// be provided, is for this check. It would be better for the common
// case to just query the entity descriptor from the registry,
// given a entity ID.
// The entity allows self-signed nodes, try to load the entity private key.
f = viper.GetString(CfgDebugRegistrationPrivateKey)
if f == "" {
// If the private key is not provided, try using a node-signed
// registration, the local copy of the entity descriptor may
// just be stale.
logger.Warn("no entity signing key provided, falling back to the node identity key")
return entity.ID, identity.NodeSigner, nil
}
logger.Warn("using the entity signing key for node registration")
factory, err := fileSigner.NewFactory(dataDir, signature.SignerEntity)
if err != nil {
return defaultPk, nil, errors.Wrap(err, "worker/registration: failed to create entity signer factory")
}
fileFactory := factory.(*fileSigner.Factory)
entitySigner, err := fileFactory.ForceLoad(f)
if err != nil {
return defaultPk, nil, errors.Wrap(err, "worker/registration: failed to load entity signing key")
}
return entity.ID, entitySigner, nil
}
// New constructs a new worker node registration service.
func New(
dataDir string,
epochtime epochtime.Backend,
registry registry.Backend,
identity *identity.Identity,
consensus consensus.Backend,
p2p *p2p.P2P,
workerCommonCfg *workerCommon.Config,
store *persistent.CommonStore,
delegate Delegate,
runtimeRegistry runtimeRegistry.Registry,
) (*Worker, error) {
logger := logging.GetLogger("worker/registration")
serviceStore, err := store.GetServiceStore(workerRegistrationDBBucketName)
if err != nil {
logger.Error("can't get registration worker store bucket",
"err", err,
)
return nil, err
}
entityID, registrationSigner, err := GetRegistrationSigner(logger, dataDir, identity)
if err != nil {
return nil, err
}
storedDeregister := false
err = serviceStore.GetCBOR(deregistrationRequestStoreKey, &storedDeregister)
if err != nil && err != persistent.ErrNotFound {
return nil, err
}
if viper.GetBool(CfgRegistrationForceRegister) {
storedDeregister = false
err = serviceStore.PutCBOR(deregistrationRequestStoreKey, &storedDeregister)
if err != nil {
return nil, err
}
}
if viper.GetUint64(CfgRegistrationRotateCerts) != 0 && identity.DoNotRotateTLS {
return nil, fmt.Errorf("node TLS certificate rotation must not be enabled if using pre-generated TLS certificates")
}
w := &Worker{
workerCommonCfg: workerCommonCfg,
store: serviceStore,
storedDeregister: storedDeregister,
delegate: delegate,
entityID: entityID,
sentryAddresses: workerCommonCfg.SentryAddresses,
sentryCerts: workerCommonCfg.SentryCertificates,
registrationSigner: registrationSigner,
runtimeRegistry: runtimeRegistry,
epochtime: epochtime,
registry: registry,
identity: identity,
stopCh: make(chan struct{}),
quitCh: make(chan struct{}),
initialRegCh: make(chan struct{}),
stopRegCh: make(chan struct{}),
ctx: context.Background(),
logger: logger,
consensus: consensus,
p2p: p2p,
registerCh: make(chan struct{}, 64),
}
if flags.ConsensusValidator() {
rp, err := w.NewRoleProvider(node.RoleValidator)
if err != nil {
return nil, err
}
// The consensus validator is available immediately.
rp.SetAvailable(func(*node.Node) error { return nil })
}
return w, nil
}
// Name returns the service name.
func (w *Worker) Name() string {
return "worker node registration service"
}
// Start starts the registration service.
func (w *Worker) Start() error {
w.logger.Info("starting node registration service")
// HACK: This can be ok in certain configurations.
if !w.entityID.IsValid() || w.registrationSigner == nil {
w.logger.Warn("no entity/signer for this node, registration will NEVER succeed")
// Make sure the node is stopped on quit.
go func() {
<-w.stopCh
close(w.quitCh)
}()
return nil
}
go w.doNodeRegistration()
return nil
}
// Stop halts the service.
func (w *Worker) Stop() {
if !atomic.CompareAndSwapUint32(&w.stopped, 0, 1) {
return
}
close(w.stopCh)
}
// Quit returns a channel that will be closed when the service terminates.
func (w *Worker) Quit() <-chan struct{} {
return w.quitCh
}
// Cleanup performs the service specific post-termination cleanup.
func (w *Worker) Cleanup() {
}
func init() {
Flags.String(CfgRegistrationEntity, "", "entity to use as the node owner in registrations")
Flags.String(CfgDebugRegistrationPrivateKey, "", "private key to use to sign node registrations")
Flags.Bool(CfgRegistrationForceRegister, false, "override a previously saved deregistration request")
Flags.Uint64(CfgRegistrationRotateCerts, 0, "rotate node TLS certificates every N epochs (0 to disable)")
_ = Flags.MarkHidden(CfgDebugRegistrationPrivateKey)
_ = viper.BindPFlags(Flags)
}