diff --git a/app/vmock.go b/app/vmock.go index 795bd68df..e00e6f8e2 100644 --- a/app/vmock.go +++ b/app/vmock.go @@ -29,6 +29,7 @@ import ( "github.com/obolnetwork/charon/app/log" "github.com/obolnetwork/charon/app/z" "github.com/obolnetwork/charon/core" + "github.com/obolnetwork/charon/eth2util" "github.com/obolnetwork/charon/eth2util/keystore" "github.com/obolnetwork/charon/testutil/validatormock" ) @@ -48,13 +49,13 @@ func wireValidatorMock(conf Config, pubshares []eth2p0.BLSPubKey, sched core.Sch onStartup := true sched.SubscribeSlots(func(ctx context.Context, slot core.Slot) error { // Prepare attestations when slots tick. - vMockWrap(ctx, slot.Slot, slot.Epoch(), func(ctx context.Context, state vMockState) error { + vMockWrap(ctx, slot.Slot, func(ctx context.Context, state vMockState) error { return state.Attester.Prepare(ctx) }) // Prepare sync committee message when epoch tick. if onStartup || slot.FirstInEpoch() { - vMockWrap(ctx, slot.Slot, slot.Epoch(), func(ctx context.Context, state vMockState) error { + vMockWrap(ctx, slot.Slot, func(ctx context.Context, state vMockState) error { // Either call if it is first slot in epoch or on charon startup. return state.SyncCommMember.PrepareEpoch(ctx) }) @@ -63,13 +64,13 @@ func wireValidatorMock(conf Config, pubshares []eth2p0.BLSPubKey, sched core.Sch onStartup = false // Prepare sync committee selections when slots tick. - vMockWrap(ctx, slot.Slot, slot.Epoch(), func(ctx context.Context, state vMockState) error { + vMockWrap(ctx, slot.Slot, func(ctx context.Context, state vMockState) error { // Either call if it is first slot in epoch or on charon startup. return state.SyncCommMember.PrepareSlot(ctx, eth2p0.Slot(slot.Slot)) }) // Submit sync committee message 1/3 into the slot. - vMockWrap(ctx, slot.Slot, slot.Epoch(), func(ctx context.Context, state vMockState) error { + vMockWrap(ctx, slot.Slot, func(ctx context.Context, state vMockState) error { thirdDuration := slot.SlotDuration / 3 thirdTime := slot.Time.Add(thirdDuration) @@ -86,7 +87,7 @@ func wireValidatorMock(conf Config, pubshares []eth2p0.BLSPubKey, sched core.Sch // Handle duties when triggered. sched.SubscribeDuties(func(ctx context.Context, duty core.Duty, _ core.DutyDefinitionSet) error { - vMockWrap(ctx, duty.Slot, 0, func(ctx context.Context, state vMockState) error { + vMockWrap(ctx, duty.Slot, func(ctx context.Context, state vMockState) error { return handleVMockDuty(ctx, duty, state.Eth2Cl, state.SignFunc, pubshares, state.Attester, state.SyncCommMember) }) @@ -96,7 +97,7 @@ func wireValidatorMock(conf Config, pubshares []eth2p0.BLSPubKey, sched core.Sch go func() { // TODO(corver): Improve registrations to use lock file and trigger on epoch transitions. for registration := range conf.TestConfig.BuilderRegistration { - vMockWrap(context.Background(), 0, 0, func(ctx context.Context, state vMockState) error { + vMockWrap(context.Background(), 0, func(ctx context.Context, state vMockState) error { return validatormock.Register(ctx, state.Eth2Cl, state.SignFunc, registration, pubshares[0]) }) } @@ -117,7 +118,7 @@ type vMockState struct { type vMockCallback func(context.Context, vMockState) error // newVMockWrapper returns a stateful validator mock wrapper function. -func newVMockWrapper(conf Config, pubshares []eth2p0.BLSPubKey) (func(ctx context.Context, slot int64, epoch int64, callback vMockCallback), error) { +func newVMockWrapper(conf Config, pubshares []eth2p0.BLSPubKey) (func(ctx context.Context, slot int64, callback vMockCallback), error) { // Immutable state and providers. signFunc, err := newVMockSigner(conf, pubshares) if err != nil { @@ -133,7 +134,7 @@ func newVMockWrapper(conf Config, pubshares []eth2p0.BLSPubKey) (func(ctx contex syncCommMem = new(validatormock.SyncCommMember) ) - return func(ctx context.Context, slot, epoch int64, fn vMockCallback) { + return func(ctx context.Context, slot int64, fn vMockCallback) { mu.Lock() defer mu.Unlock() @@ -145,12 +146,18 @@ func newVMockWrapper(conf Config, pubshares []eth2p0.BLSPubKey) (func(ctx contex return } + epoch, err := eth2util.EpochFromSlot(ctx, eth2Cl, eth2p0.Slot(slot)) + if err != nil { + log.Error(ctx, "Epoch from slot", err) + return + } + // Create new slot attester on new slots if slot != 0 && attester.Slot() != eth2p0.Slot(slot) { attester = validatormock.NewSlotAttester(eth2Cl, eth2p0.Slot(slot), signFunc, pubshares) } - if epoch != 0 && syncCommMem.Epoch() != eth2p0.Epoch(epoch) { - syncCommMem = validatormock.NewSyncCommMember(eth2Cl, eth2p0.Epoch(epoch), signFunc, pubshares) + if epoch != 0 && syncCommMem.Epoch() != epoch { + syncCommMem = validatormock.NewSyncCommMember(eth2Cl, epoch, signFunc, pubshares) } state := vMockState{ diff --git a/core/priority/prioritiser.go b/core/priority/prioritiser.go index 64859cdb9..0994820a7 100644 --- a/core/priority/prioritiser.go +++ b/core/priority/prioritiser.go @@ -14,6 +14,19 @@ // this program. If not, see . // Package priority implements the priority protocol that resolves arbitrary cluster wide priorities. +// +// Protocol overview: +// - Priorities are arbitrary protobufs (data). +// - Priorities are grouped by a topic (also arbitrary protobuf data). +// - Peers in the cluster participate in a priority protocol instances. +// - The protocol consists of two steps: priority exchange followed by priority consensus. +// - All peers propose their own set of priorities for an instance. +// - These are exchanged with all other peers. +// - All peers also respond with their priorities. +// - The exchange step is complete when the priorities of all peers have been received or on timeout. +// - Each peer calculates what they consider as the cluster wide priorities based on the priorities available to them at the point. +// - Each peer then starts a consensus instance proposing this deterministic calculated result. +// - Consensus is reached if quorum peers propose the same value. package priority import ( @@ -25,6 +38,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" "github.com/obolnetwork/charon/app/errors" "github.com/obolnetwork/charon/app/log" @@ -33,7 +47,10 @@ import ( "github.com/obolnetwork/charon/p2p" ) -const ProtocolID = "charon/priority/1.0.0" +const ( + ProtocolID = "charon/priority/1.0.0" + deleteAfter = time.Minute +) // Instance identifies an instance of the priority protocol. type Instance proto.Message @@ -46,9 +63,13 @@ type Priority proto.Message // instanceData contains an Instance and its data. type instanceData struct { - Instance Instance - Msgs map[string]received // map[peerID]msg - Timeout time.Time + OwnID string + Instance Instance + Key [32]byte // Hash of instance + Pending []chan<- *pbv1.PriorityMsg // Pending exchange requests from peers + Msgs map[string]*pbv1.PriorityMsg // Received messages by peers (including own) + Timeout time.Time // Timeout starts consensus even if all messages not received + ConsStarted bool // Whether consensus was started } type Consensus interface { @@ -65,9 +86,8 @@ type tickerProvider func() (<-chan time.Time, func()) // subscriber abstracts the output subscriber callbacks of Prioritiser. type subscriber func(context.Context, Instance, *pbv1.PriorityResult) error -// received contains a received peer message and a channel to provide response. -type received struct { - Own bool +// request contains a received peer request and a channel to provide response. +type request struct { Msg *pbv1.PriorityMsg Response chan<- *pbv1.PriorityMsg } @@ -100,8 +120,9 @@ func newInternal(tcpNode host.Host, peers []peer.ID, minRequired int, sendFunc p msgValidator: msgValidator, consensusTimeout: consensusTimeout, tickerProvider: tickerProvider, - proposals: make(chan *pbv1.PriorityMsg), - receives: make(chan received), + own: make(chan *pbv1.PriorityMsg), + responses: make(chan *pbv1.PriorityMsg), + requests: make(chan request), quit: make(chan struct{}), noSupportFilters: noSupportFilters, skipAllFilter: log.Filter(), @@ -137,9 +158,12 @@ func newInternal(tcpNode host.Host, peers []peer.ID, minRequired int, sendFunc p // Prioritiser resolves cluster wide priorities. type Prioritiser struct { + // All state immutable wrt Run. + quit chan struct{} - proposals chan *pbv1.PriorityMsg - receives chan received + own chan *pbv1.PriorityMsg // Own proposed messages to exchange + requests chan request // Other peers requesting to exchange messages. + responses chan *pbv1.PriorityMsg // Responses from exchanging with peers. minRequired int consensusTimeout time.Duration tcpNode host.Host @@ -162,7 +186,7 @@ func (p *Prioritiser) Subscribe(fn subscriber) { // Prioritise starts a new prioritisation instance for the provided message or returns an error. func (p *Prioritiser) Prioritise(ctx context.Context, msg *pbv1.PriorityMsg) error { select { - case p.proposals <- msg: + case p.own <- msg: return nil case <-ctx.Done(): return ctx.Err() @@ -180,14 +204,14 @@ func (p *Prioritiser) Run(ctx context.Context) error { ticker, stopTicker := p.tickerProvider() defer stopTicker() + // Mutable state instances := make(map[[32]byte]instanceData) - startConsensus := func(key [32]byte) { - data := instances[key] - + // startConsensus starts consensus and marks the instance as such. + startConsensus := func(data instanceData) { var msgs []*pbv1.PriorityMsg for _, msg := range data.Msgs { - msgs = append(msgs, msg.Msg) + msgs = append(msgs, msg) } result, err := calculateResult(msgs, p.minRequired) if err != nil { @@ -195,57 +219,100 @@ func (p *Prioritiser) Run(ctx context.Context) error { return } - err = p.consensus.ProposePriority(ctx, data.Instance, result) + go func() { + err = p.consensus.ProposePriority(ctx, data.Instance, result) + if err != nil { + log.Warn(ctx, "Propose priority consensus", err) // Unexpected + return + } + }() + + data.ConsStarted = true + instances[data.Key] = data + } + + // processInstance calls the callback with new or existing instance data and + // stores the result after processing any pending requests. It also starts consensus + // if all messages were received. + processInstance := func(instance *anypb.Any, callback func(instanceData) (instanceData, error)) { + // TODO(corver): Instance needs a duty/slot so we can filter out unexpected instances. + instancePB, err := instance.UnmarshalNew() + if err != nil { + log.Error(ctx, "Priority unmarshal any", err) + return + } + key, err := hashProto(instancePB) if err != nil { - log.Warn(ctx, "Propose priority consensus", err) // Unexpected + log.Error(ctx, "Priority hash proto", err) return } - delete(instances, key) + data, ok := instances[key] + if !ok { + data = instanceData{ + OwnID: p.tcpNode.ID().String(), + Instance: instancePB, + Key: key, + Msgs: make(map[string]*pbv1.PriorityMsg), + Timeout: time.Now().Add(p.consensusTimeout), + } + } + + data, err = callback(data) + if err != nil { + log.Error(ctx, "Priority instance error", err) + return + } + + data = processPending(data) + instances[key] = data + + if !data.ConsStarted && len(data.Msgs) == len(p.peers) { + // All messages received before timeout + log.Debug(ctx, "Priority instance received all messages, starting consensus") + startConsensus(data) + } } for { select { case <-ctx.Done(): return ctx.Err() - case msg := <-p.proposals: - p.prioritiseOnce(ctx, msg) - - case recv := <-p.receives: - instance, err := recv.Msg.Instance.UnmarshalNew() - if err != nil { - log.Error(ctx, "Priority instance from any proto", err) - continue - } - key, err := hashProto(instance) - if err != nil { - log.Error(ctx, "Priority instance key", err) - continue - } - - data, ok := instances[key] - if !ok { - data = instanceData{ - Instance: instance, - Msgs: make(map[string]received), - Timeout: time.Now().Add(p.consensusTimeout), - } - } - sendResponse(data.Msgs, recv) - data.Msgs[recv.Msg.PeerId] = recv - instances[key] = data - - if len(data.Msgs) == len(p.peers) { - // All messages received before timeout - startConsensus(key) - } + case msg := <-p.own: + log.Debug(ctx, "Priority protocol triggered") + processInstance(msg.Instance, func(data instanceData) (instanceData, error) { + data.Msgs[msg.PeerId] = msg + return data, nil + }) + p.exchangeOnce(ctx, msg) + case req := <-p.requests: + processInstance(req.Msg.Instance, func(data instanceData) (instanceData, error) { + data.Msgs[req.Msg.PeerId] = req.Msg + data.Pending = append(data.Pending, req.Response) + + return data, nil + }) + case msg := <-p.responses: + processInstance(msg.Instance, func(data instanceData) (instanceData, error) { + data.Msgs[msg.PeerId] = msg + return data, nil + }) case now := <-ticker: - for key, data := range instances { + for _, data := range instances { if now.Before(data.Timeout) { + continue // Not timed out yet. + } + if !data.ConsStarted { // Timed out and consensus not started yet. + log.Debug(ctx, "Priority instance timeout, starting consensus") + startConsensus(data) + continue } + if now.Before(data.Timeout.Add(deleteAfter)) { + continue // Not deletable yet + } - startConsensus(key) // Note that iterating and deleting from a map from a single goroutine is fine. + delete(instances, data.Key) } } } @@ -260,13 +327,13 @@ func (p *Prioritiser) handleRequest(ctx context.Context, pID peer.ID, msg *pbv1. } response := make(chan *pbv1.PriorityMsg, 1) // Ensure responding goroutine never blocks. - recv := received{ + req := request{ Msg: msg, Response: response, } select { - case p.receives <- recv: + case p.requests <- req: case <-ctx.Done(): return nil, ctx.Err() case <-p.quit: @@ -283,24 +350,12 @@ func (p *Prioritiser) handleRequest(ctx context.Context, pID peer.ID, msg *pbv1. } } -// prioritiseOnce initiates a priority message exchange with all peers. -func (p *Prioritiser) prioritiseOnce(ctx context.Context, msg *pbv1.PriorityMsg) { +// exchangeOnce initiates a priority message exchange with all peers. +func (p *Prioritiser) exchangeOnce(ctx context.Context, msg *pbv1.PriorityMsg) { if !p.quorumSupported(ctx) { log.Warn(ctx, "Skipping non-critical priority protocol not supported by quorum peers", nil, p.skipAllFilter) return } - log.Debug(ctx, "Priority protocol triggered") - - // Send our own message first to start consensus timeout. - go func() { // Async since unbuffered - select { - case <-ctx.Done(): - case p.receives <- received{ - Own: true, - Msg: msg, - }: - } - }() for _, pID := range p.peers { if pID == p.tcpNode.ID() { @@ -327,7 +382,7 @@ func (p *Prioritiser) prioritiseOnce(ctx context.Context, msg *pbv1.PriorityMsg) select { case <-ctx.Done(): - case p.receives <- received{Msg: response}: + case p.responses <- response: } }(pID) } @@ -391,29 +446,22 @@ func hashProto(msg proto.Message) ([32]byte, error) { return hash, nil } -// sendResponse sends own response to any awaiting peers. -func sendResponse(msgs map[string]received, recv received) { - if recv.Own { // Send our message to all waiting peers. - for _, other := range msgs { - if other.Response == nil { - continue - } - other.Response <- recv.Msg - } - - return +// processPending sends own proposed msg to any awaiting/pending peers removing them from the returned instance. +func processPending(data instanceData) instanceData { + // Get own message + own, ok := data.Msgs[data.OwnID] + if !ok { + // Own message not received yet + return data } - if recv.Response == nil { - // This peer doesn't need a response - return + // Send own to any awaiting peers + for _, ch := range data.Pending { + ch <- own } - // Send own response to this peer. - for _, other := range msgs { - if !other.Own { - continue - } - recv.Response <- other.Msg - } + // Clear pending + data.Pending = nil + + return data } diff --git a/core/priority/prioritiser_test.go b/core/priority/prioritiser_test.go index a56f5f99e..4f1312480 100644 --- a/core/priority/prioritiser_test.go +++ b/core/priority/prioritiser_test.go @@ -17,7 +17,7 @@ package priority_test import ( "context" - "reflect" + "strings" "sync" "testing" "time" @@ -26,10 +26,10 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" - "github.com/obolnetwork/charon/app/errors" pbv1 "github.com/obolnetwork/charon/core/corepb/v1" "github.com/obolnetwork/charon/core/priority" "github.com/obolnetwork/charon/p2p" @@ -40,10 +40,10 @@ func TestPrioritiser(t *testing.T) { var ( ctx, cancel = context.WithCancel(context.Background()) n = 3 - instance = &pbv1.Duty{Slot: 99} + instances = []*pbv1.Duty{{Slot: 97}, {Slot: 98}, {Slot: 99}} tcpNodes []host.Host peers []peer.ID - consensus = new(testConsensus) + consensus = &testConsensus{t: t} msgValidator = func(*pbv1.PriorityMsg) error { return nil } noTicks = func() (<-chan time.Time, func()) { return nil, func() {} } results = make(chan []*pbv1.PriorityScoredResult, n) @@ -81,29 +81,30 @@ func TestPrioritiser(t *testing.T) { resTopic, err := result.Topics[0].Topic.UnmarshalNew() require.NoError(t, err) - requireProtoEqual(t, instance, resInstance) + requireAnyDuty(t, instances, resInstance) requireProtoEqual(t, topic, resTopic) results <- result.Topics[0].Priorities return nil }) - msg := &pbv1.PriorityMsg{ - Topics: []*pbv1.PriorityTopicProposal{{Topic: mustAny(topic), Priorities: priorities}}, - Instance: mustAny(instance), - PeerId: tcpNode.ID().String(), - } - go func() { require.ErrorIs(t, prio.Run(ctx), context.Canceled) }() - go func() { - require.NoError(t, prio.Prioritise(ctx, msg)) - }() + for _, instance := range instances { + msg := &pbv1.PriorityMsg{ + Topics: []*pbv1.PriorityTopicProposal{{Topic: mustAny(topic), Priorities: priorities}}, + Instance: mustAny(instance), + PeerId: tcpNode.ID().String(), + } + go func() { + require.NoError(t, prio.Prioritise(ctx, msg)) + }() + } } - for i := 0; i < n; i++ { + for i := 0; i < n*len(instances); i++ { res := <-results require.Len(t, res, 1) require.EqualValues(t, n*1000, res[0].Score) @@ -116,8 +117,9 @@ func TestPrioritiser(t *testing.T) { // testConsensus is a mock consensus implementation that "decides" on the first proposal. // It also expects all proposals to be identical. type testConsensus struct { + t *testing.T mu sync.Mutex - proposed *pbv1.PriorityResult + proposed map[int64]*pbv1.PriorityResult subs []func(ctx context.Context, instance priority.Instance, result *pbv1.PriorityResult) error } @@ -125,10 +127,12 @@ func (t *testConsensus) ProposePriority(ctx context.Context, instance priority.I t.mu.Lock() defer t.mu.Unlock() - if t.proposed != nil { - if !reflect.DeepEqual(t.proposed, result) { - return errors.New("mismatching proposals") - } + slot := instance.(*pbv1.Duty).Slot + + if t.proposed[slot] != nil { + prev := mustResultsToText(t.proposed[slot].Topics) + this := mustResultsToText(result.Topics) + require.Equal(t.t, prev, this) return nil } @@ -139,7 +143,11 @@ func (t *testConsensus) ProposePriority(ctx context.Context, instance priority.I return err } } - t.proposed = result + + if t.proposed == nil { + t.proposed = make(map[int64]*pbv1.PriorityResult) + } + t.proposed[slot] = result return nil } @@ -161,6 +169,32 @@ func prioToAny(prio int) *anypb.Any { return mustAny(&pbv1.Duty{Slot: int64(prio)}) } +func requireAnyDuty(t *testing.T, anyOf []*pbv1.Duty, actual proto.Message) { + t.Helper() + for _, msg := range anyOf { + if proto.Equal(msg, actual) { + return + } + } + require.Fail(t, "not anyOf: %#v\nactual: %#v\n", anyOf, actual) +} + +func mustResultsToText(msgs []*pbv1.PriorityTopicResult) string { + var resp []string + for _, msg := range msgs { + b, err := prototext.MarshalOptions{ + Multiline: true, + }.Marshal(msg) + if err != nil { + panic(err) + } + + resp = append(resp, string(b)) + } + + return strings.Join(resp, ",") +} + func requireProtoEqual(t *testing.T, expect, actual proto.Message) { t.Helper() require.True(t, proto.Equal(expect, actual), "expected: %#v\nactual: %#v\n", expect, actual) diff --git a/p2p/receive.go b/p2p/receive.go index d5f7e56d0..dabf17afa 100644 --- a/p2p/receive.go +++ b/p2p/receive.go @@ -69,7 +69,7 @@ func RegisterHandler(logTopic string, tcpNode host.Host, protocol protocol.ID, resp, ok, err := handlerFunc(ctx, s.Conn().RemotePeer(), req) if err != nil { - log.Error(ctx, "LibP2P handler error", err) + log.Error(ctx, "LibP2P handle stream error", err, z.Str("protocol", string(protocol))) return }