Skip to content

Commit

Permalink
Fix IPC: Remove dead event dispatcher code (ava-labs#1477)
Browse files Browse the repository at this point in the history
Co-authored-by: Dan Laine <[email protected]>
  • Loading branch information
StephenButtolph and Dan Laine authored May 16, 2022
1 parent 6f006ef commit 2e85cd5
Show file tree
Hide file tree
Showing 21 changed files with 920 additions and 615 deletions.
15 changes: 7 additions & 8 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/ava-labs/avalanchego/snow/networking/router"
"github.com/ava-labs/avalanchego/snow/networking/sender"
"github.com/ava-labs/avalanchego/snow/networking/timeout"
"github.com/ava-labs/avalanchego/snow/triggers"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils/constants"
"github.com/ava-labs/avalanchego/utils/logging"
Expand Down Expand Up @@ -143,8 +142,8 @@ type ManagerConfig struct {
Log logging.Logger
LogFactory logging.Factory
VMManager vms.Manager // Manage mappings from vm ID --> vm
DecisionEvents *triggers.EventDispatcher
ConsensusEvents *triggers.EventDispatcher
DecisionAcceptorGroup snow.AcceptorGroup
ConsensusAcceptorGroup snow.AcceptorGroup
DBManager dbManager.Manager
MsgCreator message.Creator // message creator, shared with network
Router router.Router // Routes incoming messages to the appropriate chain
Expand Down Expand Up @@ -380,9 +379,9 @@ func (m *manager) buildChain(chainParams ChainParameters, sb Subnet) (*chain, er
StakingCertLeaf: m.StakingCert.Leaf,
StakingLeafSigner: m.StakingCert.PrivateKey.(crypto.Signer),
},
DecisionDispatcher: m.DecisionEvents,
ConsensusDispatcher: m.ConsensusEvents,
Registerer: consensusMetrics,
DecisionAcceptor: m.DecisionAcceptorGroup,
ConsensusAcceptor: m.ConsensusAcceptorGroup,
Registerer: consensusMetrics,
}
// We set the state to Initializing here because failing to set the state
// before it's first access would cause a panic.
Expand Down Expand Up @@ -569,7 +568,7 @@ func (m *manager) createAvalancheChain(
return nil, fmt.Errorf("couldn't initialize sender: %w", err)
}

if err := m.ConsensusEvents.RegisterChain(ctx.ChainID, "gossip", sender, false); err != nil { // Set up the event dipatcher
if err := m.ConsensusAcceptorGroup.RegisterAcceptor(ctx.ChainID, "gossip", sender, false); err != nil { // Set up the event dipatcher
return nil, fmt.Errorf("problem initializing event dispatcher: %w", err)
}

Expand Down Expand Up @@ -752,7 +751,7 @@ func (m *manager) createSnowmanChain(
return nil, fmt.Errorf("couldn't initialize sender: %w", err)
}

if err := m.ConsensusEvents.RegisterChain(ctx.ChainID, "gossip", sender, false); err != nil { // Set up the event dipatcher
if err := m.ConsensusAcceptorGroup.RegisterAcceptor(ctx.ChainID, "gossip", sender, false); err != nil { // Set up the event dipatcher
return nil, fmt.Errorf("problem initializing event dispatcher: %w", err)
}

Expand Down
63 changes: 32 additions & 31 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/ava-labs/avalanchego/api/server"
"github.com/ava-labs/avalanchego/chains"
"github.com/ava-labs/avalanchego/snow"
"github.com/ava-labs/avalanchego/utils/constants"
"github.com/ava-labs/avalanchego/utils/hashing"
"github.com/ava-labs/avalanchego/utils/json"
Expand All @@ -25,7 +26,6 @@ import (
"github.com/ava-labs/avalanchego/snow/engine/avalanche"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/engine/snowman"
"github.com/ava-labs/avalanchego/snow/triggers"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/gorilla/rpc/v2"
)
Expand Down Expand Up @@ -55,13 +55,14 @@ var (

// Config for an indexer
type Config struct {
DB database.Database
Log logging.Logger
IndexingEnabled bool
AllowIncompleteIndex bool
DecisionDispatcher, ConsensusDispatcher *triggers.EventDispatcher
APIServer server.PathAdder
ShutdownF func()
DB database.Database
Log logging.Logger
IndexingEnabled bool
AllowIncompleteIndex bool
DecisionAcceptorGroup snow.AcceptorGroup
ConsensusAcceptorGroup snow.AcceptorGroup
APIServer server.PathAdder
ShutdownF func()
}

// Indexer causes accepted containers for a given chain
Expand All @@ -77,18 +78,18 @@ type Indexer interface {
// NewIndexer returns a new Indexer and registers a new endpoint on the given API server.
func NewIndexer(config Config) (Indexer, error) {
indexer := &indexer{
codec: codec.NewManager(codecMaxSize),
log: config.Log,
db: config.DB,
allowIncompleteIndex: config.AllowIncompleteIndex,
indexingEnabled: config.IndexingEnabled,
consensusDispatcher: config.ConsensusDispatcher,
decisionDispatcher: config.DecisionDispatcher,
txIndices: map[ids.ID]Index{},
vtxIndices: map[ids.ID]Index{},
blockIndices: map[ids.ID]Index{},
pathAdder: config.APIServer,
shutdownF: config.ShutdownF,
codec: codec.NewManager(codecMaxSize),
log: config.Log,
db: config.DB,
allowIncompleteIndex: config.AllowIncompleteIndex,
indexingEnabled: config.IndexingEnabled,
decisionAcceptorGroup: config.DecisionAcceptorGroup,
consensusAcceptorGroup: config.ConsensusAcceptorGroup,
txIndices: map[ids.ID]Index{},
vtxIndices: map[ids.ID]Index{},
blockIndices: map[ids.ID]Index{},
pathAdder: config.APIServer,
shutdownF: config.ShutdownF,
}

if err := indexer.codec.RegisterCodec(
Expand Down Expand Up @@ -136,10 +137,10 @@ type indexer struct {
// Chain ID --> index of txs of that chain (if applicable)
txIndices map[ids.ID]Index

// Notifies of newly accepted blocks and vertices
consensusDispatcher *triggers.EventDispatcher
// Notifies of newly accepted transactions
decisionDispatcher *triggers.EventDispatcher
decisionAcceptorGroup snow.AcceptorGroup
// Notifies of newly accepted blocks and vertices
consensusAcceptorGroup snow.AcceptorGroup
}

// Assumes [engine]'s context lock is not held
Expand Down Expand Up @@ -224,7 +225,7 @@ func (i *indexer) RegisterChain(name string, engine common.Engine) {

switch engine.(type) {
case snowman.Engine:
index, err := i.registerChainHelper(chainID, blockPrefix, name, "block", i.consensusDispatcher)
index, err := i.registerChainHelper(chainID, blockPrefix, name, "block", i.consensusAcceptorGroup)
if err != nil {
i.log.Fatal("couldn't create block index for %s: %s", name, err)
if err := i.close(); err != nil {
Expand All @@ -234,7 +235,7 @@ func (i *indexer) RegisterChain(name string, engine common.Engine) {
}
i.blockIndices[chainID] = index
case avalanche.Engine:
vtxIndex, err := i.registerChainHelper(chainID, vtxPrefix, name, "vtx", i.consensusDispatcher)
vtxIndex, err := i.registerChainHelper(chainID, vtxPrefix, name, "vtx", i.consensusAcceptorGroup)
if err != nil {
i.log.Fatal("couldn't create vertex index for %s: %s", name, err)
if err := i.close(); err != nil {
Expand All @@ -244,7 +245,7 @@ func (i *indexer) RegisterChain(name string, engine common.Engine) {
}
i.vtxIndices[chainID] = vtxIndex

txIndex, err := i.registerChainHelper(chainID, txPrefix, name, "tx", i.decisionDispatcher)
txIndex, err := i.registerChainHelper(chainID, txPrefix, name, "tx", i.decisionAcceptorGroup)
if err != nil {
i.log.Fatal("couldn't create tx index for %s: %s", name, err)
if err := i.close(); err != nil {
Expand All @@ -266,7 +267,7 @@ func (i *indexer) registerChainHelper(
chainID ids.ID,
prefixEnd byte,
name, endpoint string,
dispatcher *triggers.EventDispatcher,
acceptorGroup snow.AcceptorGroup,
) (Index, error) {
prefix := make([]byte, hashing.HashLen+wrappers.ByteLen)
copy(prefix, chainID[:])
Expand All @@ -279,7 +280,7 @@ func (i *indexer) registerChainHelper(
}

// Register index to learn about new accepted vertices
if err := dispatcher.RegisterChain(chainID, fmt.Sprintf("%s%s", indexNamePrefix, chainID), index, true); err != nil {
if err := acceptorGroup.RegisterAcceptor(chainID, fmt.Sprintf("%s%s", indexNamePrefix, chainID), index, true); err != nil {
_ = index.Close()
return nil, err
}
Expand Down Expand Up @@ -322,19 +323,19 @@ func (i *indexer) close() error {
for chainID, txIndex := range i.txIndices {
errs.Add(
txIndex.Close(),
i.decisionDispatcher.DeregisterChain(chainID, fmt.Sprintf("%s%s", indexNamePrefix, chainID)),
i.decisionAcceptorGroup.DeregisterAcceptor(chainID, fmt.Sprintf("%s%s", indexNamePrefix, chainID)),
)
}
for chainID, vtxIndex := range i.vtxIndices {
errs.Add(
vtxIndex.Close(),
i.consensusDispatcher.DeregisterChain(chainID, fmt.Sprintf("%s%s", indexNamePrefix, chainID)),
i.consensusAcceptorGroup.DeregisterAcceptor(chainID, fmt.Sprintf("%s%s", indexNamePrefix, chainID)),
)
}
for chainID, blockIndex := range i.blockIndices {
errs.Add(
blockIndex.Close(),
i.consensusDispatcher.DeregisterChain(chainID, fmt.Sprintf("%s%s", indexNamePrefix, chainID)),
i.consensusAcceptorGroup.DeregisterAcceptor(chainID, fmt.Sprintf("%s%s", indexNamePrefix, chainID)),
)
}
errs.Add(i.db.Close())
Expand Down
91 changes: 44 additions & 47 deletions indexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/ava-labs/avalanchego/snow/consensus/avalanche"
"github.com/ava-labs/avalanchego/snow/consensus/snowstorm"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/triggers"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/logging"

Expand Down Expand Up @@ -55,14 +54,14 @@ func (a *apiServerMock) AddAliases(string, ...string) error {
func TestNewIndexer(t *testing.T) {
assert := assert.New(t)
config := Config{
IndexingEnabled: true,
AllowIncompleteIndex: true,
Log: logging.NoLog{},
DB: memdb.New(),
ConsensusDispatcher: triggers.New(logging.NoLog{}),
DecisionDispatcher: triggers.New(logging.NoLog{}),
APIServer: &apiServerMock{},
ShutdownF: func() {},
IndexingEnabled: true,
AllowIncompleteIndex: true,
Log: logging.NoLog{},
DB: memdb.New(),
DecisionAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
ConsensusAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
APIServer: &apiServerMock{},
ShutdownF: func() {},
}

idxrIntf, err := NewIndexer(config)
Expand All @@ -82,8 +81,8 @@ func TestNewIndexer(t *testing.T) {
assert.Len(idxr.txIndices, 0)
assert.NotNil(idxr.vtxIndices)
assert.Len(idxr.vtxIndices, 0)
assert.NotNil(idxr.consensusDispatcher)
assert.NotNil(idxr.decisionDispatcher)
assert.NotNil(idxr.consensusAcceptorGroup)
assert.NotNil(idxr.decisionAcceptorGroup)
assert.NotNil(idxr.shutdownF)
assert.False(idxr.hasRunBefore)
}
Expand All @@ -96,13 +95,13 @@ func TestMarkHasRunAndShutdown(t *testing.T) {
shutdown := &sync.WaitGroup{}
shutdown.Add(1)
config := Config{
IndexingEnabled: true,
Log: logging.NoLog{},
DB: db,
ConsensusDispatcher: triggers.New(logging.NoLog{}),
DecisionDispatcher: triggers.New(logging.NoLog{}),
APIServer: &apiServerMock{},
ShutdownF: func() { shutdown.Done() },
IndexingEnabled: true,
Log: logging.NoLog{},
DB: db,
DecisionAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
ConsensusAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
APIServer: &apiServerMock{},
ShutdownF: func() { shutdown.Done() },
}

idxrIntf, err := NewIndexer(config)
Expand Down Expand Up @@ -130,19 +129,17 @@ func TestIndexer(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

cd := triggers.New(logging.NoLog{})
dd := triggers.New(logging.NoLog{})
baseDB := memdb.New()
db := versiondb.New(baseDB)
config := Config{
IndexingEnabled: true,
AllowIncompleteIndex: false,
Log: logging.NoLog{},
DB: db,
ConsensusDispatcher: cd,
DecisionDispatcher: dd,
APIServer: &apiServerMock{},
ShutdownF: func() {},
IndexingEnabled: true,
AllowIncompleteIndex: false,
Log: logging.NoLog{},
DB: db,
DecisionAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
ConsensusAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
APIServer: &apiServerMock{},
ShutdownF: func() {},
}

// Create indexer
Expand Down Expand Up @@ -192,7 +189,7 @@ func TestIndexer(t *testing.T) {
Timestamp: now.UnixNano(),
}

assert.NoError(cd.Accept(chain1Ctx, blkID, blkBytes))
assert.NoError(config.ConsensusAcceptorGroup.Accept(chain1Ctx, blkID, blkBytes))

blkIdx := idxr.blockIndices[chain1Ctx.ChainID]
assert.NotNil(blkIdx)
Expand Down Expand Up @@ -303,7 +300,7 @@ func TestIndexer(t *testing.T) {
}, nil,
).Once()

assert.NoError(cd.Accept(chain2Ctx, vtxID, blkBytes))
assert.NoError(config.ConsensusAcceptorGroup.Accept(chain2Ctx, vtxID, blkBytes))

vtxIdx := idxr.vtxIndices[chain2Ctx.ChainID]
assert.NotNil(vtxIdx)
Expand Down Expand Up @@ -352,7 +349,7 @@ func TestIndexer(t *testing.T) {
}, nil,
).Once()

assert.NoError(dd.Accept(chain2Ctx, txID, blkBytes))
assert.NoError(config.DecisionAcceptorGroup.Accept(chain2Ctx, txID, blkBytes))

txIdx := idxr.txIndices[chain2Ctx.ChainID]
assert.NotNil(txIdx)
Expand Down Expand Up @@ -429,14 +426,14 @@ func TestIncompleteIndex(t *testing.T) {

baseDB := memdb.New()
config := Config{
IndexingEnabled: false,
AllowIncompleteIndex: false,
Log: logging.NoLog{},
DB: versiondb.New(baseDB),
ConsensusDispatcher: triggers.New(logging.NoLog{}),
DecisionDispatcher: triggers.New(logging.NoLog{}),
APIServer: &apiServerMock{},
ShutdownF: func() {},
IndexingEnabled: false,
AllowIncompleteIndex: false,
Log: logging.NoLog{},
DB: versiondb.New(baseDB),
DecisionAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
ConsensusAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
APIServer: &apiServerMock{},
ShutdownF: func() {},
}
idxrIntf, err := NewIndexer(config)
assert.NoError(err)
Expand Down Expand Up @@ -513,14 +510,14 @@ func TestIgnoreNonDefaultChains(t *testing.T) {
baseDB := memdb.New()
db := versiondb.New(baseDB)
config := Config{
IndexingEnabled: true,
AllowIncompleteIndex: false,
Log: logging.NoLog{},
DB: db,
ConsensusDispatcher: triggers.New(logging.NoLog{}),
DecisionDispatcher: triggers.New(logging.NoLog{}),
APIServer: &apiServerMock{},
ShutdownF: func() {},
IndexingEnabled: true,
AllowIncompleteIndex: false,
Log: logging.NoLog{},
DB: db,
DecisionAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
ConsensusAcceptorGroup: snow.NewAcceptorGroup(logging.NoLog{}),
APIServer: &apiServerMock{},
ShutdownF: func() {},
}

// Create indexer
Expand Down
Loading

0 comments on commit 2e85cd5

Please sign in to comment.