-
Notifications
You must be signed in to change notification settings - Fork 26
/
SWIMInstance.swift
1712 lines (1511 loc) · 79 KB
/
SWIMInstance.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Cluster Membership open source project
//
// Copyright (c) 2020 Apple Inc. and the Swift Cluster Membership project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of Swift Cluster Membership project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import ClusterMembership
import CoreMetrics
import Logging
import struct Dispatch.DispatchTime
#if os(macOS) || os(iOS) || os(watchOS) || os(tvOS)
import Darwin
#else
import Glibc
#endif
extension SWIM {
/// The `SWIM.Instance` encapsulates the complete algorithm implementation of the `SWIM` protocol.
///
/// **Please refer to `SWIM` for an in-depth discussion of the algorithm and extensions implemented in this package.**
///
/// - SeeAlso: `SWIM` for a complete and in depth discussion of the protocol.
public struct Instance<
Peer: SWIMPeer,
PingOrigin: SWIMPingOriginPeer,
PingRequestOrigin: SWIMPingRequestOriginPeer
>: SWIMProtocol {
/// The settings currently in use by this instance.
public let settings: SWIM.Settings
/// Struct containing all metrics a SWIM Instance (and implementation Shell) should emit.
public let metrics: SWIM.Metrics
/// Node which this SWIM.Instance is representing in the cluster.
public var swimNode: ClusterMembership.Node {
self.peer.node
}
// Convenience overload for internal use so we don't have to repeat "swim" all the time.
internal var node: ClusterMembership.Node {
self.swimNode
}
private var log: Logger {
self.settings.logger
}
/// The `SWIM.Member` representing this instance, also referred to as "myself".
public var member: SWIM.Member<Peer> {
if let storedMyself = self.member(forNode: self.swimNode),
!storedMyself.status.isAlive
{
return storedMyself // it is something special, like .dead
} else {
// return the always up to date "our view" on ourselves
return SWIM.Member(
peer: self.peer,
status: .alive(incarnation: self.incarnation),
protocolPeriod: self.protocolPeriod
)
}
}
// We store the owning SWIMShell peer in order avoid adding it to the `membersToPing` list
private let peer: Peer
/// Main members storage, map to values to obtain current members.
internal var _members: [ClusterMembership.Node: SWIM.Member<Peer>] {
didSet {
self.metrics.updateMembership(self.members)
}
}
/// List of members maintained in random yet stable order, see `addMember` for details.
internal var membersToPing: [SWIM.Member<Peer>]
/// Constantly mutated by `nextMemberToPing` in an effort to keep the order in which we ping nodes evenly distributed.
private var _membersToPingIndex: Int = 0
private var membersToPingIndex: Int {
self._membersToPingIndex
}
/// Tombstones are needed to avoid accidentally re-adding a member that we confirmed as dead already.
internal var removedDeadMemberTombstones: Set<MemberTombstone> = [] {
didSet {
self.metrics.removedDeadMemberTombstones.record(self.removedDeadMemberTombstones.count)
}
}
private var _sequenceNumber: SWIM.SequenceNumber = 0
/// Sequence numbers are used to identify messages and pair them up into request/replies.
/// - SeeAlso: `SWIM.SequenceNumber`
public mutating func nextSequenceNumber() -> SWIM.SequenceNumber {
// TODO: can we make it internal? it does not really hurt having public
// TODO: sequence numbers per-target node? https://github.com/apple/swift-cluster-membership/issues/39
self._sequenceNumber += 1
return self._sequenceNumber
}
/// Lifeguard IV.A. Local Health Multiplier (LHM)
/// > These different sources of feedback are combined in a Local Health Multiplier (LHM).
/// > LHM is a saturating counter, with a max value S and min value zero, meaning it will not
/// > increase above S or decrease below zero.
///
/// The local health multiplier (LHM for short) is designed to relax the `probeInterval` and `pingTimeout`.
///
/// The value MUST be >= 0.
///
/// - SeeAlso: `SWIM.Instance.LHModifierEvent` for details how and when the LHM is adjusted.
public var localHealthMultiplier = 0 {
didSet {
assert(
self.localHealthMultiplier >= 0,
"localHealthMultiplier MUST NOT be < 0, but was: \(self.localHealthMultiplier)"
)
self.metrics.localHealthMultiplier.record(self.localHealthMultiplier)
}
}
/// Dynamically adjusted probing interval.
///
/// Usually this interval will be yielded with a directive at appropriate spots, so it should not be
/// necessary to invoke it manually.
///
/// - SeeAlso: `localHealthMultiplier` for more detailed documentation.
/// - SeeAlso: Lifeguard IV.A. Local Health Multiplier (LHM)
var dynamicLHMProtocolInterval: Duration {
.nanoseconds(Int(self.settings.probeInterval.nanoseconds * Int64(1 + self.localHealthMultiplier)))
}
/// Dynamically adjusted (based on Local Health) timeout to be used when sending `ping` messages.
///
/// Usually this interval will be yielded with a directive at appropriate spots, so it should not be
/// necessary to invoke it manually.
///
/// - SeeAlso: `localHealthMultiplier` for more detailed documentation.
/// - SeeAlso: Lifeguard IV.A. Local Health Multiplier (LHM)
var dynamicLHMPingTimeout: Duration {
.nanoseconds(Int(self.settings.pingTimeout.nanoseconds * Int64(1 + self.localHealthMultiplier)))
}
/// The incarnation number is used to get a sense of ordering of events, so if an `.alive` or `.suspect`
/// state with a lower incarnation than the one currently known by a node is received, it can be dropped
/// as outdated and we don't accidentally override state with older events. The incarnation can only
/// be incremented by the respective node itself and will happen if that node receives a `.suspect` for
/// itself, to which it will respond with an `.alive` with the incremented incarnation.
var incarnation: SWIM.Incarnation {
self._incarnation
}
private var _incarnation: SWIM.Incarnation = 0 {
didSet {
self.metrics.incarnation.record(self._incarnation)
}
}
private mutating func nextIncarnation() {
self._incarnation += 1
}
/// Creates a new SWIM algorithm instance.
public init(settings: SWIM.Settings, myself: Peer) {
self.settings = settings
self.peer = myself
self._members = [:]
self.membersToPing = []
self.metrics = SWIM.Metrics(settings: settings)
_ = self.addMember(myself, status: .alive(incarnation: 0))
self.metrics.incarnation.record(self.incarnation)
self.metrics.localHealthMultiplier.record(self.localHealthMultiplier)
self.metrics.updateMembership(self.members)
}
func makeSuspicion(incarnation: SWIM.Incarnation) -> SWIM.Status {
.suspect(incarnation: incarnation, suspectedBy: [self.node])
}
func mergeSuspicions(
suspectedBy: Set<ClusterMembership.Node>,
previouslySuspectedBy: Set<ClusterMembership.Node>
) -> Set<ClusterMembership.Node> {
var newSuspectedBy = previouslySuspectedBy
for suspectedBy in suspectedBy.sorted()
where newSuspectedBy.count < self.settings.lifeguard.maxIndependentSuspicions {
newSuspectedBy.update(with: suspectedBy)
}
return newSuspectedBy
}
/// Adjust the Local Health-aware Multiplier based on the event causing it.
///
/// - Parameter event: event which causes the LHM adjustment.
public mutating func adjustLHMultiplier(_ event: LHModifierEvent) {
defer {
self.settings.logger.trace(
"Adjusted LHM multiplier",
metadata: [
"swim/lhm/event": "\(event)",
"swim/lhm": "\(self.localHealthMultiplier)",
]
)
}
self.localHealthMultiplier =
min(
max(0, self.localHealthMultiplier + event.lhmAdjustment),
self.settings.lifeguard.maxLocalHealthMultiplier
)
}
// The protocol period represents the number of times we have pinged a random member
// of the cluster. At the end of every ping cycle, the number will be incremented.
// Suspicion timeouts are based on the protocol period, i.e. if a probe did not
// reply within any of the `suspicionTimeoutPeriodsMax` rounds, it would be marked as `.suspect`.
private var _protocolPeriod: UInt64 = 0
/// In order to speed up the spreading of "fresh" rumors, we order gossips in their "number of times gossiped",
/// and thus are able to easily pick the least spread rumor and pick it for the next gossip round.
///
/// This is tremendously important in order to spread information about e.g. newly added members to others,
/// before members which are aware of them could have a chance to all terminate, leaving the rest of the cluster
/// unaware about those new members. For disseminating suspicions this is less urgent, however also serves as an
/// useful optimization.
///
/// - SeeAlso: SWIM 4.1. Infection-Style Dissemination Component
private var _messagesToGossip: Heap<SWIM.Gossip<Peer>> = Heap(
comparator: {
$0.numberOfTimesGossiped < $1.numberOfTimesGossiped
}
)
/// Note that peers without UID (in their `Node`) will NOT be added to the membership.
///
/// This is because a cluster member must be a _specific_ peer instance, and not some arbitrary "some peer on that host/port",
/// which a Node without UID represents. The only reason we allow for peers and nodes without UID, is to simplify making
/// initial contact with a node - i.e. one can construct a peer to "there should be a peer on this host/port" to send an initial ping,
/// however in reply a peer in gossip must ALWAYS include it's unique identifier in the node - such that we know it from
/// any new instance of a process on the same host/port pair.
internal mutating func addMember(_ peer: Peer, status: SWIM.Status) -> [AddMemberDirective] {
var directives: [AddMemberDirective] = []
// Guard 1) protect against adding already known dead members
if self.hasTombstone(peer.node) {
// We saw this member already and even confirmed it dead, it shall never be added again
self.log.debug("Attempt to re-add already confirmed dead peer \(peer), ignoring it.")
directives.append(.memberAlreadyKnownDead(Member(peer: peer, status: .dead, protocolPeriod: 0)))
return directives
}
// Guard 2) protect against adding non UID members
guard peer.node.uid != nil else {
self.log.warning("Ignoring attempt to add peer representing node without UID: \(peer)")
return directives
}
let maybeExistingMember = self.member(for: peer)
if let existingMember = maybeExistingMember, existingMember.status.supersedes(status) {
// we already have a newer state for this member
directives.append(.newerMemberAlreadyPresent(existingMember))
return directives
}
/// if we're adding a node, it may be a reason to declare the previous "incarnation" as dead
// TODO: could solve by another dictionary without the UIDs?
if let withoutUIDMatchMember = self._members.first(where: {
$0.value.node.withoutUID == peer.node.withoutUID
})?.value,
peer.node.uid != nil, // the incoming node has UID, so it definitely is a real peer
peer.node.uid != withoutUIDMatchMember.node.uid
{ // the peers don't agree on UID, it must be a new node on same host/port
switch self.confirmDead(peer: withoutUIDMatchMember.peer) {
case .ignored:
() // should not happen?
case .applied(let change):
directives.append(.previousHostPortMemberConfirmedDead(change))
}
}
// just in case we had a peer added manually, and thus we did not know its uuid, let us remove it
// maybe we replaced a mismatching UID node already, but let's check and remove also if we stored any "without UID" node
if let removed = self._members.removeValue(forKey: self.node.withoutUID) {
switch self.confirmDead(peer: removed.peer) {
case .ignored:
() // should not happen?
case .applied(let change):
directives.append(.previousHostPortMemberConfirmedDead(change))
}
}
let member = SWIM.Member(peer: peer, status: status, protocolPeriod: self.protocolPeriod)
self._members[member.node] = member
if self.notMyself(member), !member.isDead {
// We know this is a new member.
//
// Newly added members are inserted at a random spot in the list of members
// to ping, to have a better distribution of messages to this node from all
// other nodes. If for example all nodes would add it to the end of the list,
// it would take a longer time until it would be pinged for the first time
// and also likely receive multiple pings within a very short time frame.
let insertIndex = Int.random(in: self.membersToPing.startIndex...self.membersToPing.endIndex)
self.membersToPing.insert(member, at: insertIndex)
if insertIndex <= self.membersToPingIndex {
// If we inserted the new member before the current `membersToPingIndex`,
// we need to advance the index to avoid pinging the same member multiple
// times in a row. This is especially critical when inserting a larger
// number of members, e.g. when the cluster is just being formed, or
// on a rolling restart.
self.advanceMembersToPingIndex()
}
}
// upon each membership change we reset the gossip counters
// such that nodes have a chance to be notified about others,
// even if a node joined an otherwise quiescent cluster.
self.resetGossipPayloads(member: member)
directives.append(.added(member))
return directives
}
enum AddMemberDirective {
/// Informs an implementation that a new member was added and now has the following state.
/// An implementation should react to this by emitting a cluster membership change event.
case added(SWIM.Member<Peer>)
/// By adding a node with a new UID on the same host/port, we may actually invalidate any previous member that
/// existed on this host/port part. If this is the case, we confirm the "previous" member on the same host/port
/// pair as dead immediately.
case previousHostPortMemberConfirmedDead(SWIM.MemberStatusChangedEvent<Peer>)
/// We already have information about this exact `Member`, and our information is more recent (higher incarnation number).
/// The incoming information was discarded and the returned here member is the most up to date information we have.
case newerMemberAlreadyPresent(SWIM.Member<Peer>)
/// Member already was part of the cluster, became dead and we removed it.
/// It shall never be part of the cluster again.
///
/// This is only enforced by tombstones which are kept in the system for a period of time,
/// in the hope that all other nodes stop gossiping about this known dead member until then as well.
case memberAlreadyKnownDead(SWIM.Member<Peer>)
}
/// Implements the round-robin yet shuffled member to probe selection as proposed in the SWIM paper.
///
/// This mechanism should reduce the time until state is spread across the whole cluster,
/// by guaranteeing that each node will be gossiped to within N cycles (where N is the cluster size).
///
/// - Note:
/// SWIM 4.3: [...] The failure detection protocol at member works by maintaining a list (intuitively, an array) of the known
/// elements of the current membership list, and select-ing ping targets not randomly from this list,
/// but in a round-robin fashion. Instead, a newly joining member is inserted in the membership list at
/// a position that is chosen uniformly at random. On completing a traversal of the entire list,
/// rearranges the membership list to a random reordering.
mutating func nextPeerToPing() -> Peer? {
if self.membersToPing.isEmpty {
return nil
}
defer {
self.advanceMembersToPingIndex()
}
return self.membersToPing[self.membersToPingIndex].peer
}
/// Selects `settings.indirectProbeCount` members to send a `ping-req` to.
func membersToPingRequest(target: SWIMAddressablePeer) -> ArraySlice<SWIM.Member<Peer>> {
func notTarget(_ peer: SWIMAddressablePeer) -> Bool {
peer.node != target.node
}
func isReachable(_ status: SWIM.Status) -> Bool {
status.isAlive || status.isSuspect
}
let candidates = self._members
.values
.filter {
notTarget($0.peer) && notMyself($0.peer) && isReachable($0.status)
}
.shuffled()
return candidates.prefix(self.settings.indirectProbeCount)
}
/// Mark a specific peer/member with the new status.
mutating func mark(_ peer: Peer, as status: SWIM.Status) -> MarkedDirective {
let previousStatusOption = self.status(of: peer)
var status = status
var protocolPeriod = self.protocolPeriod
var suspicionStartedAt: DispatchTime?
if case .suspect(let incomingIncarnation, let incomingSuspectedBy) = status,
case .suspect(let previousIncarnation, let previousSuspectedBy)? = previousStatusOption,
let member = self.member(for: peer),
incomingIncarnation == previousIncarnation
{
let suspicions = self.mergeSuspicions(
suspectedBy: incomingSuspectedBy,
previouslySuspectedBy: previousSuspectedBy
)
status = .suspect(incarnation: incomingIncarnation, suspectedBy: suspicions)
// we should keep old protocol period when member is already a suspect
protocolPeriod = member.protocolPeriod
suspicionStartedAt = member.localSuspicionStartedAt
} else if case .suspect = status {
suspicionStartedAt = self.now()
} else if case .unreachable = status,
case SWIM.Settings.UnreachabilitySettings.disabled = self.settings.unreachability
{
self.log.warning(
"Attempted to mark \(peer.node) as `.unreachable`, but unreachability is disabled! Promoting to `.dead`!"
)
status = .dead
}
if let previousStatus = previousStatusOption, previousStatus.supersedes(status) {
// we already have a newer status for this member
return .ignoredDueToOlderStatus(currentStatus: previousStatus)
}
let member = SWIM.Member(
peer: peer,
status: status,
protocolPeriod: protocolPeriod,
suspicionStartedAt: suspicionStartedAt
)
self._members[peer.node] = member
if status.isDead {
if let _ = self._members.removeValue(forKey: peer.node) {
self.metrics.membersTotalDead.increment()
}
self.removeFromMembersToPing(member)
if let uid = member.node.uid {
let deadline = self.protocolPeriod + self.settings.tombstoneTimeToLiveInTicks
let tombstone = MemberTombstone(uid: uid, deadlineProtocolPeriod: deadline)
self.removedDeadMemberTombstones.insert(tombstone)
}
}
self.resetGossipPayloads(member: member)
return .applied(previousStatus: previousStatusOption, member: member)
}
enum MarkedDirective: Equatable {
/// The status that was meant to be set is "old" and was ignored.
/// We already have newer information about this peer (`currentStatus`).
case ignoredDueToOlderStatus(currentStatus: SWIM.Status)
case applied(previousStatus: SWIM.Status?, member: SWIM.Member<Peer>)
}
private mutating func resetGossipPayloads(member: SWIM.Member<Peer>) {
// seems we gained a new member, and we need to reset gossip counts in order to ensure it also receive information about all nodes
// TODO: this would be a good place to trigger a full state sync, to speed up convergence; see https://github.com/apple/swift-cluster-membership/issues/37
self.members.forEach { self.addToGossip(member: $0) }
}
mutating func incrementProtocolPeriod() {
self._protocolPeriod += 1
}
mutating func advanceMembersToPingIndex() {
self._membersToPingIndex = (self._membersToPingIndex + 1) % self.membersToPing.count
}
mutating func removeFromMembersToPing(_ member: SWIM.Member<Peer>) {
if let index = self.membersToPing.firstIndex(where: { $0.peer.node == member.peer.node }) {
self.membersToPing.remove(at: index)
if index < self.membersToPingIndex {
self._membersToPingIndex -= 1
}
if self.membersToPingIndex >= self.membersToPing.count {
self._membersToPingIndex = self.membersToPing.startIndex
}
}
}
/// Current SWIM protocol period (i.e. which round of gossip the instance is in).
public var protocolPeriod: UInt64 {
self._protocolPeriod
}
/// Debug only. Actual suspicion timeout depends on number of suspicions and calculated in `suspicionTimeout`
/// This will only show current estimate of how many intervals should pass before suspicion is reached. May change when more data is coming
var timeoutSuspectsBeforePeriodMax: Int64 {
self.settings.lifeguard.suspicionTimeoutMax.nanoseconds / self.dynamicLHMProtocolInterval.nanoseconds + 1
}
/// Debug only. Actual suspicion timeout depends on number of suspicions and calculated in `suspicionTimeout`
/// This will only show current estimate of how many intervals should pass before suspicion is reached. May change when more data is coming
var timeoutSuspectsBeforePeriodMin: Int64 {
self.settings.lifeguard.suspicionTimeoutMin.nanoseconds / self.dynamicLHMProtocolInterval.nanoseconds + 1
}
/// Local Health Aware Suspicion timeout calculation, as defined Lifeguard IV.B.
///
/// Suspicion timeout is logarithmically decaying from `suspicionTimeoutPeriodsMax` to `suspicionTimeoutPeriodsMin`
/// depending on a number of suspicion confirmations.
///
/// Suspicion timeout adjusted according to number of known independent suspicions of given member.
///
/// See: Lifeguard IV-B: Local Health Aware Suspicion
///
/// The timeout for a given suspicion is calculated as follows:
///
/// ```
/// log(C + 1)
/// SuspicionTimeout = max(Min, Max − (Max−Min) ----------)
/// log(K + 1)
/// ```
///
/// where:
/// - `Min` and `Max` are the minimum and maximum Suspicion timeout.
/// See Section `V-C` for discussion of their configuration.
/// - `K` is the number of independent suspicions required to be received before setting the suspicion timeout to `Min`.
/// We default `K` to `3`.
/// - `C` is the number of independent suspicions about that member received since the local suspicion was raised.
public func suspicionTimeout(suspectedByCount: Int) -> Duration {
let minTimeout = self.settings.lifeguard.suspicionTimeoutMin.nanoseconds
let maxTimeout = self.settings.lifeguard.suspicionTimeoutMax.nanoseconds
return .nanoseconds(
Int(
max(
minTimeout,
maxTimeout
- Int64(
round(
Double(maxTimeout - minTimeout)
* (log2(Double(suspectedByCount + 1))
/ log2(Double(self.settings.lifeguard.maxIndependentSuspicions + 1)))
)
)
)
)
)
}
/// Checks if a deadline is expired (relating to current time).
///
/// - Parameter deadline: deadline we want to check if it's expired
/// - Returns: true if the `now()` time is "past" the deadline
public func isExpired(deadline: DispatchTime) -> Bool {
deadline < self.now()
}
/// Returns the current point in time on this machine.
/// - Note: `DispatchTime` is simply a number of nanoseconds since boot on this machine, and thus is not comparable across machines.
/// We use it on purpose, as we do not intend to share our local time observations with any other peers.
private func now() -> DispatchTime {
self.settings.timeSourceNow()
}
/// Create a gossip payload (i.e. a set of `SWIM.Gossip` messages) that should be gossiped with failure detector
/// messages, or using some other medium.
///
/// - Parameter target: Allows passing the target peer this gossip will be sent to.
/// If gossiping to a specific peer, and given peer is suspect, we will always prioritize
/// letting it know that it is being suspected, such that it can refute the suspicion as soon as possible,
/// if if still is alive.
/// - Returns: The gossip payload to be gossiped.
public mutating func makeGossipPayload(to target: SWIMAddressablePeer?) -> SWIM.GossipPayload<Peer> {
var membersToGossipAbout: [SWIM.Member<Peer>] = []
// Lifeguard IV. Buddy System
// Always send to a suspect its suspicion.
// The reason for that to ensure the suspect will be notified it is being suspected,
// even if the suspicion has already been disseminated "enough times".
let targetIsSuspect: Bool
if let target = target,
let member = self.member(forNode: target.node),
member.isSuspect
{
// the member is suspect, and we must inform it about this, thus including in gossip payload:
membersToGossipAbout.append(member)
targetIsSuspect = true
} else {
targetIsSuspect = false
}
guard self._messagesToGossip.count > 0 else {
if membersToGossipAbout.isEmpty {
// if we have no pending gossips to share, at least inform the member about our state.
return .membership([self.member])
} else {
return .membership(membersToGossipAbout)
}
}
// In order to avoid duplicates within a single gossip payload, we first collect all messages we need to
// gossip out and only then re-insert them into `messagesToGossip`. Otherwise, we may end up selecting the
// same message multiple times, if e.g. the total number of messages is smaller than the maximum gossip
// size, or for newer messages that have a lower `numberOfTimesGossiped` counter than the other messages.
var gossipRoundMessages: [SWIM.Gossip<Peer>] = []
gossipRoundMessages.reserveCapacity(
min(self.settings.gossip.maxNumberOfMessagesPerGossip, self._messagesToGossip.count)
)
while gossipRoundMessages.count < self.settings.gossip.maxNumberOfMessagesPerGossip,
let gossip = self._messagesToGossip.removeRoot()
{
gossipRoundMessages.append(gossip)
}
membersToGossipAbout.reserveCapacity(gossipRoundMessages.count)
for var gossip in gossipRoundMessages {
if targetIsSuspect, target?.node == gossip.member.node {
// We do NOT add gossip to payload if it's a gossip about target and target is suspect,
// this case was handled earlier and doing it here will lead to duplicate messages
()
} else {
membersToGossipAbout.append(gossip.member)
}
gossip.numberOfTimesGossiped += 1
if self.settings.gossip.needsToBeGossipedMoreTimes(gossip, members: self.members.count) {
self._messagesToGossip.append(gossip)
}
}
return .membership(membersToGossipAbout)
}
/// Adds `Member` to gossip messages.
internal mutating func addToGossip(member: SWIM.Member<Peer>) {
// we need to remove old state before we add the new gossip, so we don't gossip out stale state
self._messagesToGossip.remove(where: { $0.member.peer.node == member.peer.node })
self._messagesToGossip.append(.init(member: member, numberOfTimesGossiped: 0))
}
}
}
// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: SWIM Member helper functions
extension SWIM.Instance {
func notMyself(_ member: SWIM.Member<Peer>) -> Bool {
self.whenMyself(member) == nil
}
func notMyself(_ peer: SWIMAddressablePeer) -> Bool {
!self.isMyself(peer.node)
}
func isMyself(_ member: SWIM.Member<Peer>) -> Bool {
self.isMyself(member.node)
}
func whenMyself(_ member: SWIM.Member<Peer>) -> SWIM.Member<Peer>? {
if self.isMyself(member.peer) {
return member
} else {
return nil
}
}
func isMyself(_ peer: SWIMAddressablePeer) -> Bool {
self.isMyself(peer.node)
}
func isMyself(_ node: Node) -> Bool {
// we are exactly that node:
self.node == node
// ...or, the incoming node has no UID; there was no handshake made,
// and thus the other side does not know which specific node it is going to talk to; as such, "we" are that node
// as such, "we" are that node; we should never add such peer to our members, but we will reply to that node with "us" and thus
// inform it about our specific UID, and from then onwards it will know about specifically this node (by replacing its UID-less version with our UID-ful version).
|| self.node.withoutUID == node
}
/// Returns status of the passed in peer's member of the cluster, if known.
///
/// - Parameter peer: the peer to look up the status for.
/// - Returns: Status of the peer, if known.
public func status(of peer: SWIMAddressablePeer) -> SWIM.Status? {
if self.notMyself(peer) {
return self._members[peer.node]?.status
} else {
// we consider ourselves always as alive (enables refuting others suspecting us)
return .alive(incarnation: self.incarnation)
}
}
/// Checks if the passed in peer is already a known member of the swim cluster.
///
/// Note: `.dead` members are eventually removed from the swim instance and as such peers are not remembered forever!
///
/// - parameters:
/// - peer: Peer to check if it currently is a member
/// - ignoreUID: Whether or not to ignore the peers UID, e.g. this is useful when issuing a "join 127.0.0.1:7337"
/// command, while being unaware of the nodes specific UID. When it joins, it joins with the specific UID after all.
/// - Returns: true if the peer is currently a member of the swim cluster (regardless of status it is in)
public func isMember(_ peer: SWIMAddressablePeer, ignoreUID: Bool = false) -> Bool {
// the peer could be either:
self.isMyself(peer) // 1) "us" (i.e. the peer which hosts this SWIM instance, or
|| self._members[peer.node] != nil // 2) a "known member"
|| (ignoreUID && peer.node.uid == nil
&& self._members.contains {
// 3) a known member, however the querying peer did not know the real UID of the peer yet
$0.key.withoutUID == peer.node
})
}
/// Returns specific `SWIM.Member` instance for the passed in peer.
///
/// - Parameter peer: peer whose member should be looked up (by its node identity, including the UID)
/// - Returns: the peer's member instance, if it currently is a member of this cluster
public func member(for peer: Peer) -> SWIM.Member<Peer>? {
self.member(forNode: peer.node)
}
/// Returns specific `SWIM.Member` instance for the passed in node.
///
/// - Parameter node: node whose member should be looked up (matching also by node UID)
/// - Returns: the peer's member instance, if it currently is a member of this cluster
public func member(forNode node: ClusterMembership.Node) -> SWIM.Member<Peer>? {
self._members[node]
}
/// Count of only non-dead members.
///
/// - SeeAlso: `SWIM.Status`
public var notDeadMemberCount: Int {
self._members.lazy.filter {
!$0.value.isDead
}.count
}
/// Count of all "other" members known to this instance (meaning members other than `myself`).
///
/// This is equal to `n-1` where `n` is the number of nodes in the cluster.
public var otherMemberCount: Int {
self.allMemberCount - 1
}
/// Count of all members, including the myself node as well as any unreachable and dead nodes which are still kept in the membership.
public var allMemberCount: Int {
self._members.count
}
/// Lists all members known to this SWIM instance currently, potentially including even `.dead` nodes.
///
/// - Complexity: O(1)
/// - Returns: Returns all current members of the cluster, including suspect, unreachable and potentially dead members.
public var members: SWIM.Membership<Peer> {
self._members.values
}
/// Lists all `SWIM.Status.suspect` members.
///
/// The `myself` member will never be suspect, as we always assume ourselves to be alive,
/// even if all other cluster members think otherwise - this is what allows us to refute
/// suspicions about our unreachability after all.
///
/// - SeeAlso: `SWIM.Status.suspect`
internal var suspects: [SWIM.Member<Peer>] {
self.members.filter { $0.isSuspect }
}
}
// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Handling SWIM protocol interactions
extension SWIM.Instance {
// ==== ------------------------------------------------------------------------------------------------------------
// MARK: On Periodic Ping Tick Handler
public mutating func onPeriodicPingTick() -> [PeriodicPingTickDirective] {
defer {
self.incrementProtocolPeriod()
}
var directives: [PeriodicPingTickDirective] = []
// 1) always check suspicion timeouts, even if we no longer have anyone else to ping
directives.append(contentsOf: self.checkSuspicionTimeouts())
// 2) if we have someone to ping, let's do so
if let toPing = self.nextPeerToPing() {
directives.append(
.sendPing(
target: toPing,
payload: self.makeGossipPayload(to: toPing),
timeout: self.dynamicLHMPingTimeout,
sequenceNumber: self.nextSequenceNumber()
)
)
}
// 3) periodic cleanup of tombstones
// TODO: could be optimized a bit to keep the "oldest one" and know if we have to scan already or not yet" etc
if self.protocolPeriod % UInt64(self.settings.tombstoneCleanupIntervalInTicks) == 0 {
cleanupTombstones()
}
// 3) ALWAYS schedule the next tick
directives.append(.scheduleNextTick(delay: self.dynamicLHMProtocolInterval))
return directives
}
/// Describes how a periodic tick should be handled.
public enum PeriodicPingTickDirective {
/// The membership has changed, e.g. a member was declared unreachable or dead and an event may need to be emitted.
case membershipChanged(SWIM.MemberStatusChangedEvent<Peer>)
/// Send a ping to the requested `target` peer using the provided timeout and sequenceNumber.
case sendPing(
target: Peer,
payload: SWIM.GossipPayload<Peer>,
timeout: Duration,
sequenceNumber: SWIM.SequenceNumber
)
/// Schedule the next timer `onPeriodicPingTick` invocation in `delay` time.
case scheduleNextTick(delay: Duration)
}
/// Check all suspects if any of them have been suspect for long enough that we should promote them to unreachable or dead.
///
/// Suspicion timeouts are calculated taking into account the number of peers suspecting a given member (LHA-Suspicion).
private mutating func checkSuspicionTimeouts() -> [PeriodicPingTickDirective] {
var directives: [PeriodicPingTickDirective] = []
for suspect in self.suspects {
if case .suspect(_, let suspectedBy) = suspect.status {
let suspicionTimeout = self.suspicionTimeout(suspectedByCount: suspectedBy.count)
// proceed with suspicion escalation to .unreachable if the timeout period has been exceeded
// We don't use Deadline because tests can override TimeSource
guard let suspectSince = suspect.localSuspicionStartedAt,
self.isExpired(
deadline: DispatchTime(
uptimeNanoseconds: suspectSince.uptimeNanoseconds + UInt64(suspicionTimeout.nanoseconds)
)
)
else {
continue // skip, this suspect is not timed-out yet
}
guard let incarnation = suspect.status.incarnation else {
// suspect had no incarnation number? that means it is .dead already and should be recycled soon
continue
}
let newStatus: SWIM.Status
if self.settings.unreachability == .enabled {
newStatus = .unreachable(incarnation: incarnation)
} else {
newStatus = .dead
}
switch self.mark(suspect.peer, as: newStatus) {
case .applied(let previousStatus, let member):
directives.append(
.membershipChanged(
SWIM.MemberStatusChangedEvent(previousStatus: previousStatus, member: member)
)
)
case .ignoredDueToOlderStatus:
continue
}
}
}
self.metrics.updateMembership(self.members)
return directives
}
// ==== ------------------------------------------------------------------------------------------------------------
// MARK: On Ping Handler
public mutating func onPing(
pingOrigin: PingOrigin,
payload: SWIM.GossipPayload<Peer>,
sequenceNumber: SWIM.SequenceNumber
) -> [PingDirective] {
var directives: [PingDirective]
// 1) Process gossip
directives = self.onGossipPayload(payload).map { g in
.gossipProcessed(g)
}
// 2) Prepare reply
directives.append(
.sendAck(
to: pingOrigin,
pingedTarget: self.peer,
incarnation: self.incarnation,
payload: self.makeGossipPayload(to: pingOrigin),
acknowledging: sequenceNumber
)
)
return directives
}
/// Directs a shell implementation about how to handle an incoming `.ping`.
public enum PingDirective {
/// Indicates that incoming gossip was processed and the membership may have changed because of it,
/// inspect the `GossipProcessedDirective` to learn more about what change was applied.
case gossipProcessed(GossipProcessedDirective)
/// Send an `ack` message.
///
/// - parameters:
/// - to: the peer to which an `ack` should be sent
/// - pingedTarget: the `myself` peer, should be passed as `target` when sending the ack message
/// - incarnation: the incarnation number of this peer; used to determine which status is "the latest"
/// when comparing acknowledgement with suspicions
/// - payload: additional gossip payload to include in the ack message
/// - acknowledging: sequence number of the ack message
case sendAck(
to: PingOrigin,
pingedTarget: Peer,
incarnation: SWIM.Incarnation,
payload: SWIM.GossipPayload<Peer>,
acknowledging: SWIM.SequenceNumber
)
}
// ==== ------------------------------------------------------------------------------------------------------------
// MARK: On Ping Response Handlers
public mutating func onPingResponse(
response: SWIM.PingResponse<Peer, PingRequestOrigin>,
pingRequestOrigin: PingRequestOrigin?,
pingRequestSequenceNumber: SWIM.SequenceNumber?
) -> [PingResponseDirective] {
switch response {
case .ack(let target, let incarnation, let payload, let sequenceNumber):
return self.onPingAckResponse(
target: target,
incarnation: incarnation,
payload: payload,
pingRequestOrigin: pingRequestOrigin,
pingRequestSequenceNumber: pingRequestSequenceNumber,
sequenceNumber: sequenceNumber
)
case .nack(let target, let sequenceNumber):
return self.onPingNackResponse(
target: target,
pingRequestOrigin: pingRequestOrigin,
sequenceNumber: sequenceNumber
)
case .timeout(let target, let pingRequestOrigin, let timeout, _):
return self.onPingResponseTimeout(
target: target,
timeout: timeout,
pingRequestOrigin: pingRequestOrigin,
pingRequestSequenceNumber: pingRequestSequenceNumber
)
}
}
mutating func onPingAckResponse(
target pingedNode: Peer,
incarnation: SWIM.Incarnation,
payload: SWIM.GossipPayload<Peer>,
pingRequestOrigin: PingRequestOrigin?,
pingRequestSequenceNumber: SWIM.SequenceNumber?,
sequenceNumber: SWIM.SequenceNumber
) -> [PingResponseDirective] {
self.metrics.successfulPingProbes.increment()
var directives: [PingResponseDirective] = []
// We're proxying an ack payload from ping target back to ping source.
// If ping target was a suspect, there'll be a refutation in a payload
// and we probably want to process it asap. And since the data is already here,
// processing this payload will just make gossip convergence faster.
let gossipDirectives = self.onGossipPayload(payload)
directives.append(
contentsOf: gossipDirectives.map {
PingResponseDirective.gossipProcessed($0)
}
)
self.log.debug(
"Received ack from [\(pingedNode)] with incarnation [\(incarnation)] and payload [\(payload)]",
metadata: self.metadata
)
// The shell is already informed tha the member moved -> alive by the gossipProcessed directive
_ = self.mark(pingedNode, as: .alive(incarnation: incarnation))
if let pingRequestOrigin = pingRequestOrigin,
let pingRequestSequenceNumber = pingRequestSequenceNumber
{
directives.append(
.sendAck(
peer: pingRequestOrigin,
acknowledging: pingRequestSequenceNumber,