From befd7587ec80c11de3fbacb0fce8d1a30e684e35 Mon Sep 17 00:00:00 2001 From: ptrus Date: Thu, 2 Jun 2022 13:22:59 +0200 Subject: [PATCH] go/archive-mode: disable runtime P2P if archive mode is used --- .changelog/4775.feature.md | 1 + go/consensus/api/api.go | 3 + go/consensus/tendermint/full/archive.go | 19 ++- go/consensus/tendermint/full/common.go | 47 +++++-- go/consensus/tendermint/full/full.go | 21 ++- go/consensus/tendermint/seed/seed.go | 79 ++++++----- go/oasis-node/cmd/debug/byzantine/executor.go | 2 +- go/oasis-node/cmd/debug/byzantine/p2p.go | 9 +- go/oasis-node/cmd/node/node.go | 13 +- go/worker/common/committee/keymanager.go | 6 +- go/worker/common/committee/node.go | 10 +- go/worker/common/p2p/api/api.go | 132 ++++++++++++++++++ go/worker/common/p2p/{ => api}/crypto.go | 8 +- go/worker/common/p2p/{ => api}/types.go | 2 +- go/worker/common/p2p/dispatch.go | 27 +--- go/worker/common/p2p/nop.go | 91 ++++++++++++ go/worker/common/p2p/p2p.go | 104 ++++++-------- go/worker/common/p2p/peermgmt.go | 65 ++------- go/worker/common/p2p/rpc/client.go | 35 ++++- go/worker/common/p2p/rpc/peermgmt.go | 32 +++++ go/worker/common/worker.go | 8 +- go/worker/compute/executor/committee/node.go | 2 +- go/worker/compute/executor/committee/p2p.go | 2 +- go/worker/keymanager/p2p/client.go | 6 +- go/worker/keymanager/watcher.go | 2 +- go/worker/keymanager/worker.go | 2 +- go/worker/registration/worker.go | 6 +- 27 files changed, 501 insertions(+), 233 deletions(-) create mode 100644 .changelog/4775.feature.md create mode 100644 go/worker/common/p2p/api/api.go rename go/worker/common/p2p/{ => api}/crypto.go (91%) rename go/worker/common/p2p/{ => api}/types.go (98%) create mode 100644 go/worker/common/p2p/nop.go diff --git a/.changelog/4775.feature.md b/.changelog/4775.feature.md new file mode 100644 index 00000000000..2884a54d38f --- /dev/null +++ b/.changelog/4775.feature.md @@ -0,0 +1 @@ +archive-mode: disable runtime P2P if archive mode is used diff --git a/go/consensus/api/api.go b/go/consensus/api/api.go index 0511b0cbaf8..bdf43a1b798 100644 --- a/go/consensus/api/api.go +++ b/go/consensus/api/api.go @@ -355,6 +355,9 @@ type Backend interface { service.BackgroundService ServicesBackend + // Mode returns the configured consensus mode. + Mode() Mode + // SupportedFeatures returns the features supported by this consensus backend. SupportedFeatures() FeatureMask diff --git a/go/consensus/tendermint/full/archive.go b/go/consensus/tendermint/full/archive.go index dd48e52bc05..18c4c159604 100644 --- a/go/consensus/tendermint/full/archive.go +++ b/go/consensus/tendermint/full/archive.go @@ -55,7 +55,7 @@ func (srv *archiveService) started() bool { return srv.isStarted } -// Start starts the service. +// Implements consensusAPI.Backend. func (srv *archiveService) Start() error { if srv.started() { return fmt.Errorf("tendermint: service already started") @@ -90,7 +90,7 @@ func (srv *archiveService) Start() error { return nil } -// Stop halts the service. +// Implements consensusAPI.Backend. func (srv *archiveService) Stop() { if !srv.started() { return @@ -103,12 +103,12 @@ func (srv *archiveService) Stop() { }) } -// Quit returns a channel that will be closed when the service terminates. +// Implements consensusAPI.Backend. func (srv *archiveService) Quit() <-chan struct{} { return srv.quitCh } -// Implements Backend. +// Implements consensusAPI.Backend. func (srv *archiveService) Synced() <-chan struct{} { // Archive node is always considered synced. ch := make(chan struct{}) @@ -116,7 +116,12 @@ func (srv *archiveService) Synced() <-chan struct{} { return ch } -// Implements Backend. +// Implements consensusAPI.Backend. +func (srv *archiveService) Mode() consensusAPI.Mode { + return consensusAPI.ModeArchive +} + +// Implements consensusAPI.Backend. func (srv *archiveService) GetStatus(ctx context.Context) (*consensusAPI.Status, error) { status, err := srv.commonNode.GetStatus(ctx) if err != nil { @@ -128,12 +133,12 @@ func (srv *archiveService) GetStatus(ctx context.Context) (*consensusAPI.Status, return status, nil } -// Implements Backend. +// Implements consensusAPI.Backend. func (srv *archiveService) EstimateGas(ctx context.Context, req *consensusAPI.EstimateGasRequest) (transaction.Gas, error) { return 0, consensusAPI.ErrUnsupported } -// Implements Backend. +// Implements consensusAPI.Backend. func (srv *archiveService) GetSignerNonce(ctx context.Context, req *consensusAPI.GetSignerNonceRequest) (uint64, error) { return 0, consensusAPI.ErrUnsupported } diff --git a/go/consensus/tendermint/full/common.go b/go/consensus/tendermint/full/common.go index 292b34b5fb6..df306786f92 100644 --- a/go/consensus/tendermint/full/common.go +++ b/go/consensus/tendermint/full/common.go @@ -116,6 +116,7 @@ func (n *commonNode) ensureStarted(ctx context.Context) error { return nil } +// Implements consensusAPI.Backend. func (n *commonNode) Start() error { n.Lock() defer n.Unlock() @@ -138,6 +139,7 @@ func (n *commonNode) Start() error { return nil } +// Implements consensusAPI.Backend. func (n *commonNode) Stop() { n.Lock() defer n.Unlock() @@ -275,20 +277,23 @@ func (n *commonNode) initialize() error { return nil } -// Implements service.BackgroundService. +// Implements consensusAPI.Backend. func (n *commonNode) Cleanup() { n.serviceClientsWg.Wait() n.svcMgr.Cleanup() } +// Implements consensusAPI.Backend. func (n *commonNode) ConsensusKey() signature.PublicKey { return n.identity.ConsensusSigner.Public() } +// Implements consensusAPI.Backend. func (n *commonNode) SupportedFeatures() consensusAPI.FeatureMask { return consensusAPI.FeatureServices | consensusAPI.FeatureFullNode } +// Implements consensusAPI.Backend. func (n *commonNode) GetAddresses() ([]node.ConsensusAddress, error) { u, err := common.GetExternalAddress() if err != nil { @@ -304,10 +309,12 @@ func (n *commonNode) GetAddresses() ([]node.ConsensusAddress, error) { return []node.ConsensusAddress{addr}, nil } +// Implements consensusAPI.Backend. func (n *commonNode) Checkpointer() checkpoint.Checkpointer { return n.mux.State().Checkpointer() } +// Implements consensusAPI.Backend. func (n *commonNode) StateToGenesis(ctx context.Context, blockHeight int64) (*genesisAPI.Document, error) { blk, err := n.GetTendermintBlock(ctx, blockHeight) if err != nil { @@ -376,62 +383,77 @@ func (n *commonNode) StateToGenesis(ctx context.Context, blockHeight int64) (*ge }, nil } +// Implements consensusAPI.Backend. func (n *commonNode) GetGenesisDocument(ctx context.Context) (*genesisAPI.Document, error) { return n.genesis, nil } +// Implements consensusAPI.Backend. func (n *commonNode) GetChainContext(ctx context.Context) (string, error) { return n.genesis.ChainContext(), nil } +// Implements consensusAPI.Backend. func (n *commonNode) Beacon() beaconAPI.Backend { return n.beacon } +// Implements consensusAPI.Backend. func (n *commonNode) KeyManager() keymanagerAPI.Backend { return n.keymanager } +// Implements consensusAPI.Backend. func (n *commonNode) Registry() registryAPI.Backend { return n.registry } +// Implements consensusAPI.Backend. func (n *commonNode) RootHash() roothashAPI.Backend { return n.roothash } +// Implements consensusAPI.Backend. func (n *commonNode) Staking() stakingAPI.Backend { return n.staking } +// Implements consensusAPI.Backend. func (n *commonNode) Scheduler() schedulerAPI.Backend { return n.scheduler } +// Implements consensusAPI.Backend. func (n *commonNode) Governance() governanceAPI.Backend { return n.governance } +// Implements consensusAPI.Backend. func (n *commonNode) RegisterApplication(app api.Application) error { return n.mux.Register(app) } +// Implements consensusAPI.Backend. func (n *commonNode) SetTransactionAuthHandler(handler api.TransactionAuthHandler) error { return n.mux.SetTransactionAuthHandler(handler) } +// Implements consensusAPI.Backend. func (n *commonNode) TransactionAuthHandler() consensusAPI.TransactionAuthHandler { return n.mux.TransactionAuthHandler() } +// Implements consensusAPI.Backend. func (n *commonNode) EstimateGas(ctx context.Context, req *consensusAPI.EstimateGasRequest) (transaction.Gas, error) { return n.mux.EstimateGas(req.Signer, req.Transaction) } +// Implements consensusAPI.Backend. func (n *commonNode) Pruner() api.StatePruner { return n.mux.Pruner() } +// Implements consensusAPI.Backend. func (n *commonNode) RegisterHaltHook(hook consensusAPI.HaltHook) { if !n.initialized() { return @@ -458,11 +480,12 @@ func (n *commonNode) heightToTendermintHeight(height int64) (int64, error) { return tmHeight, nil } +// Implements consensusAPI.Backend. func (n *commonNode) GetSignerNonce(ctx context.Context, req *consensusAPI.GetSignerNonceRequest) (uint64, error) { return n.mux.TransactionAuthHandler().GetSignerNonce(ctx, req) } -// These method need to be provided. +// Implements consensusAPI.Backend. func (n *commonNode) GetTendermintBlock(ctx context.Context, height int64) (*tmtypes.Block, error) { if err := n.ensureStarted(ctx); err != nil { return nil, err @@ -485,6 +508,7 @@ func (n *commonNode) GetTendermintBlock(ctx context.Context, height int64) (*tmt return result.Block, nil } +// Implements consensusAPI.Backend. func (n *commonNode) GetBlockResults(ctx context.Context, height int64) (*tmcoretypes.ResultBlockResults, error) { if err := n.ensureStarted(ctx); err != nil { return nil, err @@ -502,6 +526,7 @@ func (n *commonNode) GetBlockResults(ctx context.Context, height int64) (*tmcore return result, nil } +// Implements consensusAPI.Backend. func (n *commonNode) GetLastRetainedVersion(ctx context.Context) (int64, error) { if err := n.ensureStarted(ctx); err != nil { return -1, err @@ -510,7 +535,7 @@ func (n *commonNode) GetLastRetainedVersion(ctx context.Context) (int64, error) return state.Base, nil } -// Following use the provided methods. +// Implements consensusAPI.Backend. func (n *commonNode) GetBlock(ctx context.Context, height int64) (*consensusAPI.Block, error) { blk, err := n.GetTendermintBlock(ctx, height) if err != nil { @@ -523,6 +548,7 @@ func (n *commonNode) GetBlock(ctx context.Context, height int64) (*consensusAPI. return api.NewBlock(blk), nil } +// Implements consensusAPI.Backend. func (n *commonNode) GetTransactions(ctx context.Context, height int64) ([][]byte, error) { blk, err := n.GetTendermintBlock(ctx, height) if err != nil { @@ -539,6 +565,7 @@ func (n *commonNode) GetTransactions(ctx context.Context, height int64) ([][]byt return txs, nil } +// Implements consensusAPI.Backend. func (n *commonNode) GetTransactionsWithResults(ctx context.Context, height int64) (*consensusAPI.TransactionsWithResults, error) { var txsWithResults consensusAPI.TransactionsWithResults @@ -624,6 +651,7 @@ func (n *commonNode) GetTransactionsWithResults(ctx context.Context, height int6 return &txsWithResults, nil } +// Implements consensusAPI.Backend. func (n *commonNode) GetStatus(ctx context.Context) (*consensusAPI.Status, error) { status := &consensusAPI.Status{ Version: version.ConsensusProtocol, @@ -703,36 +731,37 @@ func (n *commonNode) GetStatus(ctx context.Context) (*consensusAPI.Status, error // Unimplemented methods. +// Implements consensusAPI.Backend. func (n *commonNode) WatchTendermintBlocks() (<-chan *tmtypes.Block, *pubsub.Subscription, error) { return nil, nil, consensusAPI.ErrUnsupported } -// Implements Backend. +// Implements consensusAPI.Backend. func (n *commonNode) GetNextBlockState(ctx context.Context) (*consensusAPI.NextBlockState, error) { return nil, consensusAPI.ErrUnsupported } -// Implements Backend. +// Implements consensusAPI.Backend. func (n *commonNode) SubmitEvidence(ctx context.Context, evidence *consensusAPI.Evidence) error { return consensusAPI.ErrUnsupported } -// Implements Backend. +// Implements consensusAPI.Backend. func (n *commonNode) SubmitTx(ctx context.Context, tx *transaction.SignedTransaction) error { return consensusAPI.ErrUnsupported } -// Implements Backend. +// Implements consensusAPI.Backend. func (n *commonNode) GetUnconfirmedTransactions(ctx context.Context) ([][]byte, error) { return nil, consensusAPI.ErrUnsupported } -// Implements Backend. +// Implements consensusAPI.Backend. func (n *commonNode) WatchBlocks(ctx context.Context) (<-chan *consensusAPI.Block, pubsub.ClosableSubscription, error) { return nil, nil, consensusAPI.ErrUnsupported } -// Implements Backend. +// Implements consensusAPI.Backend. func (n *commonNode) SubmissionManager() consensusAPI.SubmissionManager { return &consensusAPI.NoOpSubmissionManager{} } diff --git a/go/consensus/tendermint/full/full.go b/go/consensus/tendermint/full/full.go index 0b264cf72f7..e8bcbf822ae 100644 --- a/go/consensus/tendermint/full/full.go +++ b/go/consensus/tendermint/full/full.go @@ -167,7 +167,7 @@ func (t *fullService) started() bool { return t.isStarted } -// Implements service.BackgroundService. +// Implements consensusAPI.Backend. func (t *fullService) Start() error { if t.started() { return fmt.Errorf("tendermint: service already started") @@ -223,12 +223,12 @@ func (t *fullService) Start() error { return nil } -// Implements service.BackgroundService. +// Implements consensusAPI.Backend. func (t *fullService) Quit() <-chan struct{} { return t.quitCh } -// Implements service.BackgroundService. +// Implements consensusAPI.Backend. func (t *fullService) Stop() { if !t.initialized() || !t.started() { return @@ -244,14 +244,22 @@ func (t *fullService) Stop() { }) } +// Implements consensusAPI.Backend. func (t *fullService) Started() <-chan struct{} { return t.startedCh } +// Implements consensusAPI.Backend. func (t *fullService) Synced() <-chan struct{} { return t.syncedCh } +// Implements consensusAPI.Backend. +func (t *fullService) Mode() consensusAPI.Mode { + return consensusAPI.ModeFull +} + +// Implements consensusAPI.Backend. func (t *fullService) SubmitTx(ctx context.Context, tx *transaction.SignedTransaction) error { // Subscribe to the transaction being included in a block. data := cbor.Marshal(tx) @@ -331,6 +339,7 @@ func (t *fullService) newSubscriberID() string { return fmt.Sprintf("%s/subscriber-%d", tmSubscriberID, atomic.AddUint64(&t.nextSubscriberID, 1)) } +// Implements consensusAPI.Backend. func (t *fullService) SubmitEvidence(ctx context.Context, evidence *consensusAPI.Evidence) error { var protoEv tmproto.Evidence if err := protoEv.Unmarshal(evidence.Meta); err != nil { @@ -397,10 +406,12 @@ func (t *fullService) unsubscribe(subscriber string, query tmpubsub.Query) error return fmt.Errorf("tendermint: unsubscribe called with no backing service") } +// Implements consensusAPI.Backend. func (t *fullService) SubmissionManager() consensusAPI.SubmissionManager { return t.submissionMgr } +// Implements consensusAPI.Backend. func (t *fullService) GetUnconfirmedTransactions(ctx context.Context) ([][]byte, error) { mempoolTxs := t.node.Mempool().ReapMaxTxs(-1) txs := make([][]byte, 0, len(mempoolTxs)) @@ -410,6 +421,7 @@ func (t *fullService) GetUnconfirmedTransactions(ctx context.Context) ([][]byte, return txs, nil } +// Implements consensusAPI.Backend. func (t *fullService) GetStatus(ctx context.Context) (*consensusAPI.Status, error) { status, err := t.commonNode.GetStatus(ctx) if err != nil { @@ -441,6 +453,7 @@ func (t *fullService) GetStatus(ctx context.Context) (*consensusAPI.Status, erro return status, nil } +// Implements consensusAPI.Backend. func (t *fullService) GetNextBlockState(ctx context.Context) (*consensusAPI.NextBlockState, error) { if !t.started() { return nil, fmt.Errorf("tendermint: not yet started") @@ -482,6 +495,7 @@ func (t *fullService) GetNextBlockState(ctx context.Context) (*consensusAPI.Next return nbs, nil } +// Implements consensusAPI.Backend. func (t *fullService) WatchBlocks(ctx context.Context) (<-chan *consensusAPI.Block, pubsub.ClosableSubscription, error) { ch, sub, err := t.WatchTendermintBlocks() if err != nil { @@ -508,6 +522,7 @@ func (t *fullService) WatchBlocks(ctx context.Context) (<-chan *consensusAPI.Blo return mapCh, sub, nil } +// Implements consensusAPI.Backend. func (t *fullService) WatchTendermintBlocks() (<-chan *tmtypes.Block, *pubsub.Subscription, error) { typedCh := make(chan *tmtypes.Block) sub := t.blockNotifier.Subscribe() diff --git a/go/consensus/tendermint/seed/seed.go b/go/consensus/tendermint/seed/seed.go index 677e50dcebf..1f0b633b7a5 100644 --- a/go/consensus/tendermint/seed/seed.go +++ b/go/consensus/tendermint/seed/seed.go @@ -69,12 +69,12 @@ type seedService struct { quitCh chan struct{} } -// Name returns the service name. +// Implements consensus.Backend. func (srv *seedService) Name() string { return "tendermint/seed" } -// Start starts the service. +// Implements consensus.Backend. func (srv *seedService) Start() error { if err := srv.transport.Listen(*srv.addr); err != nil { return fmt.Errorf("tendermint/seed: failed to listen on transport: %w", err) @@ -88,7 +88,7 @@ func (srv *seedService) Start() error { return nil } -// Stop halts the service. +// Implements consensus.Backend. func (srv *seedService) Stop() { srv.stopOnce.Do(func() { close(srv.quitCh) @@ -105,17 +105,17 @@ func (srv *seedService) Stop() { }) } -// Quit returns a channel that will be closed when the service terminates. +// Implements consensus.Backend. func (srv *seedService) Quit() <-chan struct{} { return srv.quitCh } -// Cleanup performs the service specific post-termination cleanup. +// Implements consensus.Backend. func (srv *seedService) Cleanup() { // No cleanup in particular. } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) Synced() <-chan struct{} { // Seed is always considered synced. ch := make(chan struct{}) @@ -123,12 +123,17 @@ func (srv *seedService) Synced() <-chan struct{} { return ch } -// Implements Backend. +// Implements consensus.Backend. +func (srv *seedService) Mode() consensus.Mode { + return consensus.ModeSeed +} + +// Implements consensus.Backend. func (srv *seedService) SupportedFeatures() consensus.FeatureMask { return consensus.FeatureMask(0) } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) GetStatus(ctx context.Context) (*consensus.Status, error) { status := &consensus.Status{ Status: consensus.StatusStateReady, @@ -150,22 +155,22 @@ func (srv *seedService) GetStatus(ctx context.Context) (*consensus.Status, error return status, nil } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) GetNextBlockState(ctx context.Context) (*consensus.NextBlockState, error) { return nil, consensus.ErrUnsupported } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) GetGenesisDocument(ctx context.Context) (*genesis.Document, error) { return srv.doc, nil } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) GetChainContext(ctx context.Context) (string, error) { return srv.doc.ChainContext(), nil } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) GetAddresses() ([]node.ConsensusAddress, error) { u, err := tmcommon.GetExternalAddress() if err != nil { @@ -181,87 +186,87 @@ func (srv *seedService) GetAddresses() ([]node.ConsensusAddress, error) { return []node.ConsensusAddress{addr}, nil } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) Checkpointer() checkpoint.Checkpointer { return nil } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) SubmitEvidence(ctx context.Context, evidence *consensus.Evidence) error { return consensus.ErrUnsupported } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) SubmitTx(ctx context.Context, tx *transaction.SignedTransaction) error { return consensus.ErrUnsupported } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) StateToGenesis(ctx context.Context, height int64) (*genesis.Document, error) { return nil, consensus.ErrUnsupported } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) EstimateGas(ctx context.Context, req *consensus.EstimateGasRequest) (transaction.Gas, error) { return 0, consensus.ErrUnsupported } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) GetBlock(ctx context.Context, height int64) (*consensus.Block, error) { return nil, consensus.ErrUnsupported } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) GetTransactions(ctx context.Context, height int64) ([][]byte, error) { return nil, consensus.ErrUnsupported } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) GetTransactionsWithResults(ctx context.Context, height int64) (*consensus.TransactionsWithResults, error) { return nil, consensus.ErrUnsupported } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) GetUnconfirmedTransactions(ctx context.Context) ([][]byte, error) { return nil, consensus.ErrUnsupported } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) WatchBlocks(ctx context.Context) (<-chan *consensus.Block, pubsub.ClosableSubscription, error) { return nil, nil, consensus.ErrUnsupported } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) GetSignerNonce(ctx context.Context, req *consensus.GetSignerNonceRequest) (uint64, error) { return 0, consensus.ErrUnsupported } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) GetLightBlock(ctx context.Context, height int64) (*consensus.LightBlock, error) { return nil, consensus.ErrUnsupported } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) GetParameters(ctx context.Context, height int64) (*consensus.Parameters, error) { return nil, consensus.ErrUnsupported } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) State() syncer.ReadSyncer { return syncer.NopReadSyncer } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) ConsensusKey() signature.PublicKey { return srv.identity.ConsensusSigner.Public() } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) SubmitTxNoWait(ctx context.Context, tx *transaction.SignedTransaction) error { return consensus.ErrUnsupported } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) RegisterHaltHook(consensus.HaltHook) { panic(consensus.ErrUnsupported) } @@ -270,42 +275,42 @@ func (srv *seedService) RegisterHaltHook(consensus.HaltHook) { // consensus services so the caller is at fault for not adhering to the // SupportedFeatures flag, in case any of the following methods is called. -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) Beacon() beacon.Backend { panic(consensus.ErrUnsupported) } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) KeyManager() keymanager.Backend { panic(consensus.ErrUnsupported) } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) Registry() registry.Backend { panic(consensus.ErrUnsupported) } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) RootHash() roothash.Backend { panic(consensus.ErrUnsupported) } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) Staking() staking.Backend { panic(consensus.ErrUnsupported) } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) Scheduler() scheduler.Backend { panic(consensus.ErrUnsupported) } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) Governance() governance.Backend { panic(consensus.ErrUnsupported) } -// Implements Backend. +// Implements consensus.Backend. func (srv *seedService) SubmissionManager() consensus.SubmissionManager { panic(consensus.ErrUnsupported) } diff --git a/go/oasis-node/cmd/debug/byzantine/executor.go b/go/oasis-node/cmd/debug/byzantine/executor.go index 0ad8d402102..5971f0d42f3 100644 --- a/go/oasis-node/cmd/debug/byzantine/executor.go +++ b/go/oasis-node/cmd/debug/byzantine/executor.go @@ -20,7 +20,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/storage/mkvs" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/syncer" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/writelog" - "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" + p2p "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/api" ) type computeBatchContext struct { diff --git a/go/oasis-node/cmd/debug/byzantine/p2p.go b/go/oasis-node/cmd/debug/byzantine/p2p.go index c5f54d63172..aa023933999 100644 --- a/go/oasis-node/cmd/debug/byzantine/p2p.go +++ b/go/oasis-node/cmd/debug/byzantine/p2p.go @@ -9,6 +9,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" "github.com/oasisprotocol/oasis-core/go/common/identity" "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" + p2pAPI "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/api" ) type p2pReqRes struct { @@ -18,7 +19,7 @@ type p2pReqRes struct { } type p2pHandle struct { - service *p2p.P2P + service p2pAPI.Service requests chan p2pReqRes } @@ -63,7 +64,7 @@ type committeeMsgHandler struct { } func (h *committeeMsgHandler) DecodeMessage(msg []byte) (interface{}, error) { - var dec p2p.CommitteeMessage + var dec p2pAPI.CommitteeMessage if err := cbor.Unmarshal(msg, &dec); err != nil { return nil, err } @@ -100,8 +101,8 @@ func (ph *p2pHandle) start(ht *honestTendermint, id *identity.Identity, runtimeI return fmt.Errorf("P2P service New: %w", err) } - ph.service.RegisterHandler(runtimeID, p2p.TopicKindTx, &txMsgHandler{ph}) - ph.service.RegisterHandler(runtimeID, p2p.TopicKindCommittee, &committeeMsgHandler{ph}) + ph.service.RegisterHandler(runtimeID, p2pAPI.TopicKindTx, &txMsgHandler{ph}) + ph.service.RegisterHandler(runtimeID, p2pAPI.TopicKindCommittee, &committeeMsgHandler{ph}) return nil } diff --git a/go/oasis-node/cmd/node/node.go b/go/oasis-node/cmd/node/node.go index 076116281eb..fff98318f99 100644 --- a/go/oasis-node/cmd/node/node.go +++ b/go/oasis-node/cmd/node/node.go @@ -52,6 +52,7 @@ import ( workerClient "github.com/oasisprotocol/oasis-core/go/worker/client" workerCommon "github.com/oasisprotocol/oasis-core/go/worker/common" "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" + p2pAPI "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/api" "github.com/oasisprotocol/oasis-core/go/worker/compute/executor" workerConsensusRPC "github.com/oasisprotocol/oasis-core/go/worker/consensusrpc" workerKeymanager "github.com/oasisprotocol/oasis-core/go/worker/keymanager" @@ -113,7 +114,7 @@ type Node struct { StorageWorker *workerStorage.Worker ClientWorker *workerClient.Worker SentryWorker *workerSentry.Worker - P2P *p2p.P2P + P2P p2pAPI.Service RegistrationWorker *registration.Worker KeymanagerWorker *workerKeymanager.Worker ConsensusWorker *workerConsensusRPC.Worker @@ -273,16 +274,16 @@ func (n *Node) initRuntimeWorkers() error { // Since the P2P layer does not have a separate Start method and starts // listening immediately when created, make sure that we don't start it if // it is not needed. - if n.RuntimeRegistry.Mode() != runtimeRegistry.RuntimeModeNone { - if genesisDoc.Registry.Parameters.DebugAllowUnroutableAddresses { - p2p.DebugForceAllowUnroutableAddresses() - } + switch { + case n.RuntimeRegistry.Mode() != runtimeRegistry.RuntimeModeNone && n.Consensus.Mode() != consensusAPI.ModeArchive: n.P2P, err = p2p.New(n.Identity, n.Consensus) if err != nil { return err } - n.svcMgr.Register(n.P2P) + default: + n.P2P = p2p.NewMock() } + n.svcMgr.Register(n.P2P) // Initialize the common worker. n.CommonWorker, err = workerCommon.New( diff --git a/go/worker/common/committee/keymanager.go b/go/worker/common/committee/keymanager.go index 866d14c8904..8ba85417fcb 100644 --- a/go/worker/common/committee/keymanager.go +++ b/go/worker/common/committee/keymanager.go @@ -9,7 +9,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/logging" consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" enclaverpc "github.com/oasisprotocol/oasis-core/go/runtime/enclaverpc/api" - "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" + p2p "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/api" "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/rpc" keymanagerP2P "github.com/oasisprotocol/oasis-core/go/worker/keymanager/p2p" ) @@ -22,7 +22,7 @@ type KeyManagerClientWrapper struct { l sync.Mutex id *common.Namespace - p2p *p2p.P2P + p2p p2p.Service consensus consensus.Backend cli keymanagerP2P.Client logger *logging.Logger @@ -128,7 +128,7 @@ func (km *KeyManagerClientWrapper) CallEnclave( } // NewKeyManagerClientWrapper creates a new key manager client wrapper. -func NewKeyManagerClientWrapper(p2p *p2p.P2P, consensus consensus.Backend, logger *logging.Logger) *KeyManagerClientWrapper { +func NewKeyManagerClientWrapper(p2p p2p.Service, consensus consensus.Backend, logger *logging.Logger) *KeyManagerClientWrapper { return &KeyManagerClientWrapper{ p2p: p2p, consensus: consensus, diff --git a/go/worker/common/committee/node.go b/go/worker/common/committee/node.go index fc330371806..d52b6d86855 100644 --- a/go/worker/common/committee/node.go +++ b/go/worker/common/committee/node.go @@ -24,7 +24,7 @@ import ( runtimeRegistry "github.com/oasisprotocol/oasis-core/go/runtime/registry" "github.com/oasisprotocol/oasis-core/go/runtime/txpool" "github.com/oasisprotocol/oasis-core/go/worker/common/api" - "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" + p2pAPI "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/api" "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/txsync" ) @@ -162,7 +162,7 @@ type Node struct { KeyManagerClient *KeyManagerClientWrapper Consensus consensus.Backend Group *Group - P2P *p2p.P2P + P2P p2pAPI.Service TxPool txpool.TransactionPool ctx context.Context @@ -350,7 +350,7 @@ func (n *Node) handleEpochTransitionLocked(height int64) { // Mark all executor nodes in the current committee as important. if ec := epoch.GetExecutorCommittee(); ec != nil { - n.P2P.SetNodeImportance(p2p.ImportantNodeCompute, n.Runtime.ID(), ec.Peers) + n.P2P.SetNodeImportance(p2pAPI.ImportantNodeCompute, n.Runtime.ID(), ec.Peers) } epochNumber.With(n.getMetricLabels()).Set(float64(epoch.epochNumber)) @@ -840,7 +840,7 @@ func NewNode( identity *identity.Identity, keymanager keymanager.Backend, consensus consensus.Backend, - p2pHost *p2p.P2P, + p2pHost p2pAPI.Service, txPoolCfg *txpool.Config, ) (*Node, error) { metricsOnce.Do(func() { @@ -890,7 +890,7 @@ func NewNode( n.TxPool = txPool // Register transaction message handler as that is something that all workers must handle. - p2pHost.RegisterHandler(runtime.ID(), p2p.TopicKindTx, &txMsgHandler{n}) + p2pHost.RegisterHandler(runtime.ID(), p2pAPI.TopicKindTx, &txMsgHandler{n}) // Register transaction sync service. p2pHost.RegisterProtocolServer(txsync.NewServer(runtime.ID(), txPool)) diff --git a/go/worker/common/p2p/api/api.go b/go/worker/common/p2p/api/api.go new file mode 100644 index 00000000000..7da4b5a43c1 --- /dev/null +++ b/go/worker/common/p2p/api/api.go @@ -0,0 +1,132 @@ +// Package api implements the P2P API. +package api + +import ( + "context" + "fmt" + "time" + + core "github.com/libp2p/go-libp2p-core" + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" + "github.com/oasisprotocol/oasis-core/go/common/node" + "github.com/oasisprotocol/oasis-core/go/common/service" + "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/rpc" +) + +// TopicKind is the gossipsub topic kind. +type TopicKind string + +const ( + // TopicKindCommittee is the topic kind for the topic that is used to gossip batch proposals + // and other committee messages. + TopicKindCommittee TopicKind = "committee" + // TopicKindTx is the topic kind for the topic that is used to gossip transactions. + TopicKindTx TopicKind = "tx" +) + +// Service is a P2P node service interface. +type Service interface { + service.BackgroundService + + // Addresses returns the P2P addresses of the node. + Addresses() []node.Address + + // Peers returns a list of connected P2P peers for the given runtime. + Peers(runtimeID common.Namespace) []string + + // PublishCommittee publishes a committee message. + PublishCommittee(ctx context.Context, runtimeID common.Namespace, msg *CommitteeMessage) + + // PublishTx publishes a transaction message. + PublishTx(ctx context.Context, runtimeID common.Namespace, msg TxMessage) + + // RegisterHandler registers a message handler for the specified runtime and topic kind. + RegisterHandler(runtimeID common.Namespace, kind TopicKind, handler Handler) + + // BlockPeer blocks a specific peer from being used by the local node. + BlockPeer(peerID core.PeerID) + + // GetHost returns the P2P host. + GetHost() core.Host + + // RegisterProtocolServer registers a protocol server for the given protocol. + RegisterProtocolServer(srv rpc.Server) + + // GetMinRepublishInterval returns the minimum republish interval that needs to be respected by + // the caller when publishing the same message. If Publish is called for the same message more + // quickly, the message may be dropped and not published. + GetMinRepublishInterval() time.Duration + + // SetNodeImportance configures node importance for the given set of nodes. + // + // This makes it less likely for those nodes to be pruned. + SetNodeImportance(kind ImportanceKind, runtimeID common.Namespace, p2pIDs map[signature.PublicKey]bool) +} + +// Handler is a handler for P2P messages. +type Handler interface { + // DecodeMessage decodes the given incoming message. + DecodeMessage(msg []byte) (interface{}, error) + + // AuthorizeMessage handles authorizing an incoming message. + // + // The message handler will be re-invoked on error with a periodic backoff unless errors are + // wrapped via `p2pError.Permanent`. + AuthorizeMessage(ctx context.Context, peerID signature.PublicKey, msg interface{}) error + + // HandleMessage handles an incoming message from a peer. + // + // The message handler will be re-invoked on error with a periodic backoff unless errors are + // wrapped via `p2pError.Permanent`. + HandleMessage(ctx context.Context, peerID signature.PublicKey, msg interface{}, isOwn bool) error +} + +// PublicKeyToPeerID converts a public key to a peer identifier. +func PublicKeyToPeerID(pk signature.PublicKey) (core.PeerID, error) { + pubKey, err := publicKeyToPubKey(pk) + if err != nil { + return "", err + } + + id, err := peer.IDFromPublicKey(pubKey) + if err != nil { + return "", err + } + + return id, nil +} + +const peerTagImportancePrefix = "oasis-core/importance" + +// ImportanceKind is the node importance kind. +type ImportanceKind uint8 + +const ( + ImportantNodeCompute = 1 + ImportantNodeKeyManager = 2 +) + +// Tag returns the connection manager tag associated with the given importance kind. +func (ik ImportanceKind) Tag(runtimeID common.Namespace) string { + switch ik { + case ImportantNodeCompute: + return peerTagImportancePrefix + "/compute/" + runtimeID.String() + case ImportantNodeKeyManager: + return peerTagImportancePrefix + "/keymanager/" + runtimeID.String() + default: + panic(fmt.Errorf("unsupported tag: %d", ik)) + } +} + +// TagValue returns the connection manager tag value associated with the given importance kind. +func (ik ImportanceKind) TagValue() int { + switch ik { + case ImportantNodeCompute, ImportantNodeKeyManager: + return 1000 + default: + panic(fmt.Errorf("unsupported tag: %d", ik)) + } +} diff --git a/go/worker/common/p2p/crypto.go b/go/worker/common/p2p/api/crypto.go similarity index 91% rename from go/worker/common/p2p/crypto.go rename to go/worker/common/p2p/api/crypto.go index a8b192ca7de..f6f3442530d 100644 --- a/go/worker/common/p2p/crypto.go +++ b/go/worker/common/p2p/api/crypto.go @@ -1,4 +1,4 @@ -package p2p +package api import ( "errors" @@ -54,13 +54,15 @@ func (s *p2pSigner) GetPublic() libp2pCrypto.PubKey { return pubKey } -func signerToPrivKey(signer signature.Signer) libp2pCrypto.PrivKey { +// SignerToPRivKey converts a Signer to a libp2pCrypto.PrivKey. +func SignerToPrivKey(signer signature.Signer) libp2pCrypto.PrivKey { return &p2pSigner{ signer: signer, } } -func pubKeyToPublicKey(pubKey libp2pCrypto.PubKey) (signature.PublicKey, error) { +// PubKeyToPublicKey converts a libp2pCrypto.PubKey to a PublicKey. +func PubKeyToPublicKey(pubKey libp2pCrypto.PubKey) (signature.PublicKey, error) { var pk signature.PublicKey if pubKey.Type() != libp2pCrypto.Ed25519 { return pk, errCryptoNotSupported diff --git a/go/worker/common/p2p/types.go b/go/worker/common/p2p/api/types.go similarity index 98% rename from go/worker/common/p2p/types.go rename to go/worker/common/p2p/api/types.go index d3f2d0fb4ea..1f905a2125f 100644 --- a/go/worker/common/p2p/types.go +++ b/go/worker/common/p2p/api/types.go @@ -1,4 +1,4 @@ -package p2p +package api import ( beacon "github.com/oasisprotocol/oasis-core/go/beacon/api" diff --git a/go/worker/common/p2p/dispatch.go b/go/worker/common/p2p/dispatch.go index f3b72198680..15a280d834a 100644 --- a/go/worker/common/p2p/dispatch.go +++ b/go/worker/common/p2p/dispatch.go @@ -14,6 +14,7 @@ import ( cmnBackoff "github.com/oasisprotocol/oasis-core/go/common/backoff" "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" "github.com/oasisprotocol/oasis-core/go/common/logging" + "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/api" p2pError "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/error" ) @@ -30,33 +31,15 @@ type rawMessage struct { msg []byte } -// Handler is a handler for P2P messages. -type Handler interface { - // DecodeMessage decodes the given incoming message. - DecodeMessage(msg []byte) (interface{}, error) - - // AuthorizeMessage handles authorizing an incoming message. - // - // The message handler will be re-invoked on error with a periodic backoff unless errors are - // wrapped via `p2pError.Permanent`. - AuthorizeMessage(ctx context.Context, peerID signature.PublicKey, msg interface{}) error - - // HandleMessage handles an incoming message from a peer. - // - // The message handler will be re-invoked on error with a periodic backoff unless errors are - // wrapped via `p2pError.Permanent`. - HandleMessage(ctx context.Context, peerID signature.PublicKey, msg interface{}, isOwn bool) error -} - type topicHandler struct { ctx context.Context - p2p *P2P + p2p *p2p topic *pubsub.Topic host core.Host cancelRelay pubsub.RelayCancelFunc - handler Handler + handler api.Handler numWorkers uint64 @@ -265,7 +248,7 @@ func (h *topicHandler) pendingMessagesWorker() { } } -func newTopicHandler(p *P2P, runtimeID common.Namespace, kind TopicKind, handler Handler) (string, *topicHandler, error) { +func newTopicHandler(p *p2p, runtimeID common.Namespace, kind api.TopicKind, handler api.Handler) (string, *topicHandler, error) { topicID := p.topicIDForRuntime(runtimeID, kind) topic, err := p.pubsub.Join(topicID) // Note: Disallows duplicates. if err != nil { @@ -301,5 +284,5 @@ func peerIDToPublicKey(peerID core.PeerID) (signature.PublicKey, error) { if err != nil { return signature.PublicKey{}, err } - return pubKeyToPublicKey(pk) + return api.PubKeyToPublicKey(pk) } diff --git a/go/worker/common/p2p/nop.go b/go/worker/common/p2p/nop.go new file mode 100644 index 00000000000..5285f46afbc --- /dev/null +++ b/go/worker/common/p2p/nop.go @@ -0,0 +1,91 @@ +package p2p + +import ( + "context" + "time" + + core "github.com/libp2p/go-libp2p-core" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" + "github.com/oasisprotocol/oasis-core/go/common/node" + "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/api" + "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/rpc" +) + +// nopP2P is a no-op mock peer-to-peer node. +type nopP2P struct{} + +// Implements api.Service. +func (*nopP2P) SetNodeImportance(kind api.ImportanceKind, runtimeID common.Namespace, p2pIDs map[signature.PublicKey]bool) { +} + +// Implements api.Service. +func (p *nopP2P) Cleanup() { +} + +// Implements api.Service. +func (p *nopP2P) Name() string { + return "mock p2p" +} + +// Implements api.Service. +func (p *nopP2P) Start() error { + return nil +} + +// Implements api.Service. +func (p *nopP2P) Stop() { +} + +// Implements api.Service. +func (p *nopP2P) Quit() <-chan struct{} { + ch := make(chan struct{}) + close(ch) + return ch +} + +// Implements api.Service. +func (p *nopP2P) Addresses() []node.Address { + return nil +} + +// Implements api.Service. +func (p *nopP2P) Peers(runtimeID common.Namespace) []string { + return nil +} + +// Implements api.Service. +func (p *nopP2P) PublishCommittee(ctx context.Context, runtimeID common.Namespace, msg *api.CommitteeMessage) { +} + +// Implements api.Service. +func (p *nopP2P) PublishTx(ctx context.Context, runtimeID common.Namespace, msg api.TxMessage) { +} + +// Implements api.Service. +func (p *nopP2P) RegisterHandler(runtimeID common.Namespace, kind api.TopicKind, handler api.Handler) { +} + +// Implements api.Service. +func (p *nopP2P) BlockPeer(peerID core.PeerID) { +} + +// Implements api.Service. +func (p *nopP2P) GetHost() core.Host { + return nil +} + +// Implements api.Service. +func (p *nopP2P) RegisterProtocolServer(srv rpc.Server) { +} + +// Implements api.Service. +func (p *nopP2P) GetMinRepublishInterval() time.Duration { + return time.Hour +} + +// NewMock creates a new no-op mock P2P node. +func NewMock() api.Service { + return &nopP2P{} +} diff --git a/go/worker/common/p2p/p2p.go b/go/worker/common/p2p/p2p.go index 06bd3deb1fa..463f90da8ee 100644 --- a/go/worker/common/p2p/p2p.go +++ b/go/worker/common/p2p/p2p.go @@ -31,6 +31,7 @@ import ( consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" registryAPI "github.com/oasisprotocol/oasis-core/go/registry/api" "github.com/oasisprotocol/oasis-core/go/worker/common/configparser" + "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/api" "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/rpc" ) @@ -41,17 +42,6 @@ const peersHighWatermarkDelta = 30 // messageIdContext is the domain separation context for computing message identifier hashes. var messageIdContext = []byte("oasis-core/p2p: message id") -// TopicKind is the gossipsub topic kind. -type TopicKind string - -const ( - // TopicKindCommittee is the topic kind for the topic that is used to gossip batch proposals - // and other committee messages. - TopicKindCommittee TopicKind = "committee" - // TopicKindTx is the topic kind for the topic that is used to gossip transactions. - TopicKindTx TopicKind = "tx" -) - var allowUnroutableAddresses bool // DebugForceAllowUnroutableAddresses allows unroutable addresses. @@ -59,8 +49,8 @@ func DebugForceAllowUnroutableAddresses() { allowUnroutableAddresses = true } -// P2P is a peer-to-peer node using libp2p. -type P2P struct { +// p2p is a peer-to-peer node using libp2p. +type p2p struct { sync.RWMutex *PeerManager @@ -73,44 +63,40 @@ type P2P struct { pubsub *pubsub.PubSub registerAddresses []multiaddr.Multiaddr - topics map[common.Namespace]map[TopicKind]*topicHandler + topics map[common.Namespace]map[api.TopicKind]*topicHandler logger *logging.Logger } -// Cleanup performs the service specific post-termination cleanup. -func (p *P2P) Cleanup() { +// Implements api.Service. +func (p *p2p) Cleanup() { } -// Name returns the service name. -func (p *P2P) Name() string { +// Implements api.Service. +func (p *p2p) Name() string { return "worker p2p" } -// Start starts the service. -func (p *P2P) Start() error { +// Implements api.Service. +func (p *p2p) Start() error { // Unfortunately libp2p starts everything on construction. return nil } -// Stop halts the service. -func (p *P2P) Stop() { +// Implements api.Service. +func (p *p2p) Stop() { p.ctxCancel() _ = p.host.Close() // This blocks until the host stops. close(p.quitCh) } -// Quit returns a channel that will be closed when the service terminates. -func (p *P2P) Quit() <-chan struct{} { +// Implements api.Service. +func (p *p2p) Quit() <-chan struct{} { return p.quitCh } -// Addresses returns the P2P addresses of the node. -func (p *P2P) Addresses() []node.Address { - if p == nil { - return nil - } - +// Implements api.Service. +func (p *p2p) Addresses() []node.Address { var addrs []multiaddr.Multiaddr if len(p.registerAddresses) == 0 { addrs = p.host.Addrs() @@ -143,10 +129,10 @@ func (p *P2P) Addresses() []node.Address { return addresses } -// Peers returns a list of connected P2P peers for the given runtime. -func (p *P2P) Peers(runtimeID common.Namespace) []string { - allPeers := p.pubsub.ListPeers(p.topicIDForRuntime(runtimeID, TopicKindCommittee)) - allPeers = append(allPeers, p.pubsub.ListPeers(p.topicIDForRuntime(runtimeID, TopicKindTx))...) +// Implements api.Service. +func (p *p2p) Peers(runtimeID common.Namespace) []string { + allPeers := p.pubsub.ListPeers(p.topicIDForRuntime(runtimeID, api.TopicKindCommittee)) + allPeers = append(allPeers, p.pubsub.ListPeers(p.topicIDForRuntime(runtimeID, api.TopicKindTx))...) var peers []string peerMap := make(map[core.PeerID]bool) @@ -199,7 +185,7 @@ func filterGloballyReachableAddresses(addrs []multiaddr.Multiaddr) []multiaddr.M return ret } -func (p *P2P) publish(ctx context.Context, runtimeID common.Namespace, kind TopicKind, msg interface{}) { +func (p *p2p) publish(ctx context.Context, runtimeID common.Namespace, kind api.TopicKind, msg interface{}) { rawMsg := cbor.Marshal(msg) p.RLock() @@ -235,24 +221,24 @@ func (p *P2P) publish(ctx context.Context, runtimeID common.Namespace, kind Topi ) } -// PublishCommittee publishes a committee message. -func (p *P2P) PublishCommittee(ctx context.Context, runtimeID common.Namespace, msg *CommitteeMessage) { - p.publish(ctx, runtimeID, TopicKindCommittee, msg) +// Implements api.Service. +func (p *p2p) PublishCommittee(ctx context.Context, runtimeID common.Namespace, msg *api.CommitteeMessage) { + p.publish(ctx, runtimeID, api.TopicKindCommittee, msg) } -// PublishCommittee publishes a transaction message. -func (p *P2P) PublishTx(ctx context.Context, runtimeID common.Namespace, msg TxMessage) { - p.publish(ctx, runtimeID, TopicKindTx, msg) +// Implements api.Service. +func (p *p2p) PublishTx(ctx context.Context, runtimeID common.Namespace, msg api.TxMessage) { + p.publish(ctx, runtimeID, api.TopicKindTx, msg) } -// RegisterHandler registers a message handler for the specified runtime and topic kind. -func (p *P2P) RegisterHandler(runtimeID common.Namespace, kind TopicKind, handler Handler) { +// Implements api.Service. +func (p *p2p) RegisterHandler(runtimeID common.Namespace, kind api.TopicKind, handler api.Handler) { p.Lock() defer p.Unlock() topics := p.topics[runtimeID] if topics == nil { - topics = make(map[TopicKind]*topicHandler) + topics = make(map[api.TopicKind]*topicHandler) p.topics[runtimeID] = topics } @@ -277,7 +263,7 @@ func (p *P2P) RegisterHandler(runtimeID common.Namespace, kind TopicKind, handle ) } -func (p *P2P) topicIDForRuntime(runtimeID common.Namespace, kind TopicKind) string { +func (p *p2p) topicIDForRuntime(runtimeID common.Namespace, kind api.TopicKind) string { return fmt.Sprintf("%s/%d/%s/%s", p.chainContext, version.RuntimeCommitteeProtocol.Major, @@ -286,8 +272,8 @@ func (p *P2P) topicIDForRuntime(runtimeID common.Namespace, kind TopicKind) stri ) } -// BlockPeer blocks a specific peer from being used by the local node. -func (p *P2P) BlockPeer(peerID core.PeerID) { +// Implements api.Service. +func (p *p2p) BlockPeer(peerID core.PeerID) { p.logger.Warn("blocking peer", "peer_id", peerID, ) @@ -296,13 +282,13 @@ func (p *P2P) BlockPeer(peerID core.PeerID) { p.PeerManager.blockPeer(peerID) } -// GetHost returns the P2P host. -func (p *P2P) GetHost() core.Host { +// Implements api.Service. +func (p *p2p) GetHost() core.Host { return p.host } -// RegisterProtocolServer registers a protocol server for the given protocol. -func (p *P2P) RegisterProtocolServer(srv rpc.Server) { +// Implements api.Service. +func (p *p2p) RegisterProtocolServer(srv rpc.Server) { p.host.SetStreamHandler(srv.Protocol(), srv.HandleStream) p.logger.Info("registered protocol server", @@ -310,10 +296,8 @@ func (p *P2P) RegisterProtocolServer(srv rpc.Server) { ) } -// GetMinRepublishInterval returns the minimum republish interval that needs to be respected by -// the caller when publishing the same message. If Publish is called for the same message more -// quickly, the message may be dropped and not published. -func (p *P2P) GetMinRepublishInterval() time.Duration { +// Implements api.Service. +func (p *p2p) GetMinRepublishInterval() time.Duration { return pubsub.TimeCacheDuration + 5*time.Second } @@ -326,7 +310,7 @@ func messageIdFn(pmsg *pb.Message) string { } // New creates a new P2P node. -func New(identity *identity.Identity, consensus consensus.Backend) (*P2P, error) { +func New(identity *identity.Identity, consensus consensus.Backend) (api.Service, error) { // Instantiate the libp2p host. addresses, err := configparser.ParseAddressList(viper.GetStringSlice(cfgP2pAddresses)) if err != nil { @@ -398,7 +382,7 @@ func New(identity *identity.Identity, consensus consensus.Backend) (*P2P, error) if grr := pk.UnmarshalText([]byte(pubkey)); grr != nil { return nil, fmt.Errorf("worker/common/p2p: malformed persistent peer address: cannot unmarshal P2P public key (%s): %w", pubkey, grr) } - pid, grr := PublicKeyToPeerID(pk) + pid, grr := api.PublicKeyToPeerID(pk) if grr != nil { return nil, fmt.Errorf("worker/common/p2p: invalid persistent peer public key (%s): %w", pubkey, grr) } @@ -437,7 +421,7 @@ func New(identity *identity.Identity, consensus consensus.Backend) (*P2P, error) host, err := libp2p.New( libp2p.UserAgent(fmt.Sprintf("oasis-core/%s", version.SoftwareVersion)), libp2p.ListenAddrs(sourceMultiAddr), - libp2p.Identity(signerToPrivKey(identity.P2PSigner)), + libp2p.Identity(api.SignerToPrivKey(identity.P2PSigner)), libp2p.ConnectionManager(cm), libp2p.ConnectionGater(cg), ) @@ -473,7 +457,7 @@ func New(identity *identity.Identity, consensus consensus.Backend) (*P2P, error) return nil, fmt.Errorf("worker/common/p2p: failed to get consensus chain context: %w", err) } - p := &P2P{ + p := &p2p{ PeerManager: newPeerManager(ctx, host, cg, consensus, persistentPeers), ctxCancel: ctxCancel, quitCh: make(chan struct{}), @@ -481,7 +465,7 @@ func New(identity *identity.Identity, consensus consensus.Backend) (*P2P, error) host: host, pubsub: pubsub, registerAddresses: registerAddresses, - topics: make(map[common.Namespace]map[TopicKind]*topicHandler), + topics: make(map[common.Namespace]map[api.TopicKind]*topicHandler), logger: logging.GetLogger("worker/common/p2p"), } diff --git a/go/worker/common/p2p/peermgmt.go b/go/worker/common/p2p/peermgmt.go index 148e209d0ec..0ec8b8822d9 100644 --- a/go/worker/common/p2p/peermgmt.go +++ b/go/worker/common/p2p/peermgmt.go @@ -22,42 +22,11 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/logging" "github.com/oasisprotocol/oasis-core/go/common/node" consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" + "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/api" ) const connectionRefreshInterval = 5 * time.Second -const peerTagImportancePrefix = "oasis-core/importance" - -// ImportanceKind is the node importance kind. -type ImportanceKind uint8 - -const ( - ImportantNodeCompute = 1 - ImportantNodeKeyManager = 2 -) - -// Tag returns the connection manager tag associated with the given importance kind. -func (ik ImportanceKind) Tag(runtimeID common.Namespace) string { - switch ik { - case ImportantNodeCompute: - return peerTagImportancePrefix + "/compute/" + runtimeID.String() - case ImportantNodeKeyManager: - return peerTagImportancePrefix + "/keymanager/" + runtimeID.String() - default: - panic(fmt.Errorf("unsupported tag: %d", ik)) - } -} - -// TagValue returns the connection manager tag value associated with the given importance kind. -func (ik ImportanceKind) TagValue() int { - switch ik { - case ImportantNodeCompute, ImportantNodeKeyManager: - return 1000 - default: - panic(fmt.Errorf("unsupported tag: %d", ik)) - } -} - // PeerManager handles managing peers in the gossipsub network. type PeerManager struct { sync.RWMutex @@ -68,7 +37,7 @@ type PeerManager struct { cg *conngater.BasicConnectionGater peers map[core.PeerID]*p2pPeer - importantPeers map[ImportanceKind]map[common.Namespace]map[core.PeerID]bool + importantPeers map[api.ImportanceKind]map[common.Namespace]map[core.PeerID]bool persistentPeers map[core.PeerID]bool initCh chan struct{} @@ -117,7 +86,7 @@ func (mgr *PeerManager) SetNodes(nodes []*node.Node) { newNodes := make(map[core.PeerID]*node.Node) for _, node := range nodes { - peerID, err := PublicKeyToPeerID(node.P2P.ID) + peerID, err := api.PublicKeyToPeerID(node.P2P.ID) if err != nil { mgr.logger.Warn("error while getting peer ID from public key, skipping", "err", err, @@ -156,7 +125,7 @@ func (mgr *PeerManager) SetNodes(nodes []*node.Node) { // UpdateNode upserts a node into the gossipsub network. func (mgr *PeerManager) UpdateNode(node *node.Node) error { - peerID, err := PublicKeyToPeerID(node.P2P.ID) + peerID, err := api.PublicKeyToPeerID(node.P2P.ID) if err != nil { return fmt.Errorf("worker/common/p2p/peermgr: failed to get peer ID from public key: %w", err) } @@ -184,10 +153,7 @@ func (mgr *PeerManager) blockPeer(peerID core.PeerID) { _ = mgr.host.Network().ClosePeer(peerID) } -// SetNodeImportance configures node importance for the given set of nodes. -// -// This makes it less likely for those nodes to be pruned. -func (mgr *PeerManager) SetNodeImportance(kind ImportanceKind, runtimeID common.Namespace, p2pIDs map[signature.PublicKey]bool) { +func (mgr *PeerManager) SetNodeImportance(kind api.ImportanceKind, runtimeID common.Namespace, p2pIDs map[signature.PublicKey]bool) { mgr.Lock() defer mgr.Unlock() @@ -200,7 +166,7 @@ func (mgr *PeerManager) SetNodeImportance(kind ImportanceKind, runtimeID common. mgr.importantPeers[kind][runtimeID] = make(map[core.PeerID]bool) for p2pID := range p2pIDs { - peerID, err := PublicKeyToPeerID(p2pID) + peerID, err := api.PublicKeyToPeerID(p2pID) if err != nil { return } @@ -375,7 +341,7 @@ func newPeerManager(ctx context.Context, host core.Host, cg *conngater.BasicConn host: host, cg: cg, peers: make(map[core.PeerID]*p2pPeer), - importantPeers: make(map[ImportanceKind]map[common.Namespace]map[core.PeerID]bool), + importantPeers: make(map[api.ImportanceKind]map[common.Namespace]map[core.PeerID]bool), persistentPeers: persistentPeers, initCh: make(chan struct{}), logger: logging.GetLogger("worker/common/p2p/peermgr"), @@ -445,27 +411,12 @@ func (p *p2pPeer) connectWorker(mgr *PeerManager, peerID core.PeerID) { } } -// PublicKeyToPeerID converts a public key to a peer identifier. -func PublicKeyToPeerID(pk signature.PublicKey) (core.PeerID, error) { - pubKey, err := publicKeyToPubKey(pk) - if err != nil { - return "", err - } - - id, err := peer.IDFromPublicKey(pubKey) - if err != nil { - return "", err - } - - return id, nil -} - func nodeToAddrInfo(node *node.Node) (*peer.AddrInfo, error) { var ( ai peer.AddrInfo err error ) - if ai.ID, err = PublicKeyToPeerID(node.P2P.ID); err != nil { + if ai.ID, err = api.PublicKeyToPeerID(node.P2P.ID); err != nil { return nil, fmt.Errorf("failed to extract public key from node P2P ID: %w", err) } for _, nodeAddr := range node.P2P.Addresses { diff --git a/go/worker/common/p2p/rpc/client.go b/go/worker/common/p2p/rpc/client.go index 8ee24f966f8..1d165f275af 100644 --- a/go/worker/common/p2p/rpc/client.go +++ b/go/worker/common/p2p/rpc/client.go @@ -215,6 +215,33 @@ type Client interface { ) ([]interface{}, []PeerFeedback, error) } +type nopClient struct { + *nopPeerManager +} + +// Implements Client. +func (c *nopClient) Call( + ctx context.Context, + method string, + body, rsp interface{}, + maxPeerResponseTime time.Duration, + opts ...CallOption, +) (PeerFeedback, error) { + return nil, fmt.Errorf("unsupported: mock p2p") +} + +// Implements Client. +func (c *nopClient) CallMulti( + ctx context.Context, + method string, + body, rspTyp interface{}, + maxPeerResponseTime time.Duration, + maxParallelRequests uint, + opts ...CallMultiOption, +) ([]interface{}, []PeerFeedback, error) { + return nil, nil, fmt.Errorf("unsupported: mock p2p") +} + type client struct { PeerManager @@ -517,9 +544,15 @@ func NewClient(p2p P2P, runtimeID common.Namespace, protocolID string, version v opt(&co) } + host := p2p.GetHost() + if host == nil { + // Mock P2P service, use the mock client. + return &nopClient{&nopPeerManager{}} + } + return &client{ PeerManager: NewPeerManager(p2p, pid, co.stickyPeers), - host: p2p.GetHost(), + host: host, protocolID: pid, runtimeID: runtimeID, opts: &co, diff --git a/go/worker/common/p2p/rpc/peermgmt.go b/go/worker/common/p2p/rpc/peermgmt.go index 74c3ac2a54d..f5b4d11b92c 100644 --- a/go/worker/common/p2p/rpc/peermgmt.go +++ b/go/worker/common/p2p/rpc/peermgmt.go @@ -10,6 +10,7 @@ import ( core "github.com/libp2p/go-libp2p-core" "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" "github.com/oasisprotocol/oasis-core/go/common/crypto/mathrand" @@ -59,6 +60,33 @@ type PeerManager interface { GetBestPeers() []core.PeerID } +type nopPeerManager struct{} + +// Implements PeerManager. +func (*nopPeerManager) AddPeer(peerID peer.ID) { +} + +// Implements PeerManager. +func (*nopPeerManager) GetBestPeers() []peer.ID { + return nil +} + +// Implements PeerManager. +func (*nopPeerManager) RecordBadPeer(peerID peer.ID) { +} + +// Implements PeerManager. +func (*nopPeerManager) RecordFailure(peerID peer.ID, latency time.Duration) { +} + +// Implements PeerManager. +func (*nopPeerManager) RecordSuccess(peerID peer.ID, latency time.Duration) { +} + +// Implements PeerManager. +func (*nopPeerManager) RemovePeer(peerID peer.ID) { +} + type peerStats struct { successes int failures int @@ -325,6 +353,10 @@ func (mgr *peerManager) peerProtocolWatcher() { // NewPeerManager creates a new peer manager for the given protocol. func NewPeerManager(p2p P2P, protocolID protocol.ID, stickyPeers bool) PeerManager { + if p2p.GetHost() == nil { + // Mock P2P service, use the mock peer manager. + return &nopPeerManager{} + } mgr := &peerManager{ p2p: p2p, host: p2p.GetHost(), diff --git a/go/worker/common/worker.go b/go/worker/common/worker.go index 370bebb92a8..e70f715e84c 100644 --- a/go/worker/common/worker.go +++ b/go/worker/common/worker.go @@ -17,7 +17,7 @@ import ( runtimeRegistry "github.com/oasisprotocol/oasis-core/go/runtime/registry" "github.com/oasisprotocol/oasis-core/go/sentry/policywatcher" "github.com/oasisprotocol/oasis-core/go/worker/common/committee" - "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" + p2p "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/api" ) // Worker is a garbage bag with lower level services and common runtime objects. @@ -31,7 +31,7 @@ type Worker struct { Consensus consensus.Backend Grpc *grpc.Server GrpcPolicyWatcher policyAPI.PolicyWatcher - P2P *p2p.P2P + P2P p2p.Service IAS ias.Endpoint KeyManager keymanagerApi.Backend RuntimeRegistry runtimeRegistry.Registry @@ -198,7 +198,7 @@ func newWorker( consensus consensus.Backend, grpc *grpc.Server, grpcPolicyWatcher policyAPI.PolicyWatcher, - p2p *p2p.P2P, + p2p p2p.Service, ias ias.Endpoint, keyManager keymanagerApi.Backend, rtRegistry runtimeRegistry.Registry, @@ -256,7 +256,7 @@ func New( dataDir string, identity *identity.Identity, consensus consensus.Backend, - p2p *p2p.P2P, + p2p p2p.Service, ias ias.Endpoint, keyManager keymanagerApi.Backend, runtimeRegistry runtimeRegistry.Registry, diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index aca83027924..0e8f0bc7bf7 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -27,7 +27,7 @@ import ( storage "github.com/oasisprotocol/oasis-core/go/storage/api" commonWorker "github.com/oasisprotocol/oasis-core/go/worker/common" "github.com/oasisprotocol/oasis-core/go/worker/common/committee" - "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" + p2p "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/api" p2pError "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/error" "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/txsync" "github.com/oasisprotocol/oasis-core/go/worker/registration" diff --git a/go/worker/compute/executor/committee/p2p.go b/go/worker/compute/executor/committee/p2p.go index d91cfcf5cbc..83fea50b856 100644 --- a/go/worker/compute/executor/committee/p2p.go +++ b/go/worker/compute/executor/committee/p2p.go @@ -7,7 +7,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/cbor" "github.com/oasisprotocol/oasis-core/go/common/crash" "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" - "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" + p2p "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/api" p2pError "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/error" ) diff --git a/go/worker/keymanager/p2p/client.go b/go/worker/keymanager/p2p/client.go index ef2de6c6a37..dd764e2878c 100644 --- a/go/worker/keymanager/p2p/client.go +++ b/go/worker/keymanager/p2p/client.go @@ -12,7 +12,7 @@ import ( consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" keymanager "github.com/oasisprotocol/oasis-core/go/keymanager/api" registry "github.com/oasisprotocol/oasis-core/go/registry/api" - "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" + p2p "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/api" "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/rpc" ) @@ -56,7 +56,7 @@ func (c *client) Initialized() <-chan struct{} { type nodeTracker struct { sync.Mutex - p2p *p2p.P2P + p2p p2p.Service consensus consensus.Backend keymanagerID common.Namespace @@ -154,7 +154,7 @@ func (nt *nodeTracker) trackKeymanagerNodes() { } // NewClient creates a new keymanager protocol client. -func NewClient(p2p *p2p.P2P, consensus consensus.Backend, keymanagerID common.Namespace) Client { +func NewClient(p2p p2p.Service, consensus consensus.Backend, keymanagerID common.Namespace) Client { // Create a peer filter as we want the client to only talk to known key manager nodes. nt := &nodeTracker{ p2p: p2p, diff --git a/go/worker/keymanager/watcher.go b/go/worker/keymanager/watcher.go index 8975f5a5a47..73fed3e794f 100644 --- a/go/worker/keymanager/watcher.go +++ b/go/worker/keymanager/watcher.go @@ -5,7 +5,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/node" consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" "github.com/oasisprotocol/oasis-core/go/runtime/nodes" - "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" + p2p "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/api" ) type kmNodeWatcher struct { diff --git a/go/worker/keymanager/worker.go b/go/worker/keymanager/worker.go index 67287b3a93b..f730cfcc5b9 100644 --- a/go/worker/keymanager/worker.go +++ b/go/worker/keymanager/worker.go @@ -29,7 +29,7 @@ import ( runtimeRegistry "github.com/oasisprotocol/oasis-core/go/runtime/registry" scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api" workerCommon "github.com/oasisprotocol/oasis-core/go/worker/common" - "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" + p2p "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/api" "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/rpc" "github.com/oasisprotocol/oasis-core/go/worker/registration" ) diff --git a/go/worker/registration/worker.go b/go/worker/registration/worker.go index 29e089aa511..db9aeb64517 100644 --- a/go/worker/registration/worker.go +++ b/go/worker/registration/worker.go @@ -35,7 +35,7 @@ import ( runtimeRegistry "github.com/oasisprotocol/oasis-core/go/runtime/registry" sentryClient "github.com/oasisprotocol/oasis-core/go/sentry/client" workerCommon "github.com/oasisprotocol/oasis-core/go/worker/common" - "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" + p2p "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/api" ) const ( @@ -206,7 +206,7 @@ type Worker struct { // nolint: maligned beacon beacon.Backend registry registry.Backend identity *identity.Identity - p2p *p2p.P2P + p2p p2p.Service ctx context.Context // Bandaid: Idempotent Stop for testing. @@ -1202,7 +1202,7 @@ func New( registry registry.Backend, identity *identity.Identity, consensus consensus.Backend, - p2p *p2p.P2P, + p2p p2p.Service, workerCommonCfg *workerCommon.Config, store *persistent.CommonStore, delegate Delegate,