Skip to content

Commit

Permalink
refactor: Rename exchange service to sync service for block and header (
Browse files Browse the repository at this point in the history
cosmos#1245)

<!--
Please read and fill out this form before submitting your PR.

Please make sure you have reviewed our contributors guide before
submitting your
first PR.
-->

## Overview
Closes: cosmos#1246 
<!-- 
Please provide an explanation of the PR, including the appropriate
context,
background, goal, and rationale. If there is an issue with this
information,
please provide a tl;dr and link the issue. 
-->

## Checklist

<!-- 
Please complete the checklist to ensure that the PR is ready to be
reviewed.

IMPORTANT:
PRs should be left in Draft until the below checklist is completed.
-->

- [x] New and updated code has appropriate documentation
- [x] New and updated code has new and/or updated testing
- [x] Required CI checks are passing
- [ ] Visual proof for any user facing features like CLI or
documentation updates
- [x] Linked issues closed with keywords
  • Loading branch information
Manav-Aggarwal authored Oct 13, 2023
1 parent c8ef277 commit 0a8d655
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 156 deletions.
106 changes: 53 additions & 53 deletions block/block_exchange.go → block/block_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
"github.com/rollkit/rollkit/types"
)

// P2P Exchange Service for block that implements the go-header interface.
// P2P Sync Service for block that implements the go-header interface.
// Contains a block store where synced blocks are stored.
// Uses the go-header library for handling all P2P logic.
type BlockExchangeService struct {
type BlockSyncService struct {
conf config.NodeConfig
genesis *cmtypes.GenesisDoc
p2p *p2p.Client
Expand All @@ -43,7 +43,7 @@ type BlockExchangeService struct {
ctx context.Context
}

func NewBlockExchangeService(ctx context.Context, store ds.TxnDatastore, conf config.NodeConfig, genesis *cmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*BlockExchangeService, error) {
func NewBlockSyncService(ctx context.Context, store ds.TxnDatastore, conf config.NodeConfig, genesis *cmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*BlockSyncService, error) {
if genesis == nil {
return nil, errors.New("genesis doc cannot be nil")
}
Expand All @@ -56,12 +56,12 @@ func NewBlockExchangeService(ctx context.Context, store ds.TxnDatastore, conf co
if !ok {
return nil, errors.New("failed to access the datastore")
}
ss, err := goheaderstore.NewStore[*types.Block](storeBatch, goheaderstore.WithStorePrefix("blockEx"))
ss, err := goheaderstore.NewStore[*types.Block](storeBatch, goheaderstore.WithStorePrefix("blockSync"))
if err != nil {
return nil, fmt.Errorf("failed to initialize the block store: %w", err)
}

return &BlockExchangeService{
return &BlockSyncService{
conf: conf,
genesis: genesis,
p2p: p2p,
Expand All @@ -72,94 +72,94 @@ func NewBlockExchangeService(ctx context.Context, store ds.TxnDatastore, conf co
}, nil
}

// BlockStore returns the blockstore of the BlockExchangeService
func (bExService *BlockExchangeService) BlockStore() *goheaderstore.Store[*types.Block] {
return bExService.blockStore
// BlockStore returns the blockstore of the BlockSyncService
func (bSyncService *BlockSyncService) BlockStore() *goheaderstore.Store[*types.Block] {
return bSyncService.blockStore
}

func (bExService *BlockExchangeService) initBlockStoreAndStartSyncer(ctx context.Context, initial *types.Block) error {
func (bSyncService *BlockSyncService) initBlockStoreAndStartSyncer(ctx context.Context, initial *types.Block) error {
if initial == nil {
return fmt.Errorf("failed to initialize the blockstore and start syncer")
}
if err := bExService.blockStore.Init(ctx, initial); err != nil {
if err := bSyncService.blockStore.Init(ctx, initial); err != nil {
return err
}
if err := bExService.StartSyncer(); err != nil {
if err := bSyncService.StartSyncer(); err != nil {
return err
}
return nil
}

// Initialize block store if needed and broadcasts provided block.
// Note: Only returns an error in case block store can't be initialized. Logs error if there's one while broadcasting.
func (bExService *BlockExchangeService) WriteToBlockStoreAndBroadcast(ctx context.Context, block *types.Block) error {
func (bSyncService *BlockSyncService) WriteToBlockStoreAndBroadcast(ctx context.Context, block *types.Block) error {
// For genesis block initialize the store and start the syncer
if int64(block.Height()) == bExService.genesis.InitialHeight {
if err := bExService.blockStore.Init(ctx, block); err != nil {
if int64(block.Height()) == bSyncService.genesis.InitialHeight {
if err := bSyncService.blockStore.Init(ctx, block); err != nil {
return fmt.Errorf("failed to initialize block store")
}

if err := bExService.StartSyncer(); err != nil {
if err := bSyncService.StartSyncer(); err != nil {
return fmt.Errorf("failed to start syncer after initializing block store")
}
}

// Broadcast for subscribers
if err := bExService.sub.Broadcast(ctx, block); err != nil {
bExService.logger.Error("failed to broadcast block", "error", err)
if err := bSyncService.sub.Broadcast(ctx, block); err != nil {
bSyncService.logger.Error("failed to broadcast block", "error", err)
}
return nil
}

func (bExService *BlockExchangeService) isInitialized() bool {
return bExService.blockStore.Height() > 0
func (bSyncService *BlockSyncService) isInitialized() bool {
return bSyncService.blockStore.Height() > 0
}

// OnStart is a part of Service interface.
func (bExService *BlockExchangeService) Start() error {
func (bSyncService *BlockSyncService) Start() error {
// have to do the initializations here to utilize the p2p node which is created on start
ps := bExService.p2p.PubSub()
chainIDBlock := bExService.genesis.ChainID + "-block"
bExService.sub = goheaderp2p.NewSubscriber[*types.Block](ps, pubsub.DefaultMsgIdFn, chainIDBlock)
if err := bExService.sub.Start(bExService.ctx); err != nil {
ps := bSyncService.p2p.PubSub()
chainIDBlock := bSyncService.genesis.ChainID + "-block"
bSyncService.sub = goheaderp2p.NewSubscriber[*types.Block](ps, pubsub.DefaultMsgIdFn, chainIDBlock)
if err := bSyncService.sub.Start(bSyncService.ctx); err != nil {
return fmt.Errorf("error while starting subscriber: %w", err)
}
if _, err := bExService.sub.Subscribe(); err != nil {
if _, err := bSyncService.sub.Subscribe(); err != nil {
return fmt.Errorf("error while subscribing: %w", err)
}

if err := bExService.blockStore.Start(bExService.ctx); err != nil {
if err := bSyncService.blockStore.Start(bSyncService.ctx); err != nil {
return fmt.Errorf("error while starting block store: %w", err)
}

var err error
_, _, network, err := bExService.p2p.Info()
_, _, network, err := bSyncService.p2p.Info()
if err != nil {
return fmt.Errorf("error while fetching the network: %w", err)
}
networkIDBlock := network + "-block"

if bExService.p2pServer, err = newBlockP2PServer(bExService.p2p.Host(), bExService.blockStore, networkIDBlock); err != nil {
if bSyncService.p2pServer, err = newBlockP2PServer(bSyncService.p2p.Host(), bSyncService.blockStore, networkIDBlock); err != nil {
return fmt.Errorf("error while creating p2p server: %w", err)
}
if err := bExService.p2pServer.Start(bExService.ctx); err != nil {
if err := bSyncService.p2pServer.Start(bSyncService.ctx); err != nil {
return fmt.Errorf("error while starting p2p server: %w", err)
}

peerIDs := bExService.p2p.PeerIDs()
if bExService.ex, err = newBlockP2PExchange(bExService.p2p.Host(), peerIDs, networkIDBlock, chainIDBlock, bExService.p2p.ConnectionGater()); err != nil {
peerIDs := bSyncService.p2p.PeerIDs()
if bSyncService.ex, err = newBlockP2PExchange(bSyncService.p2p.Host(), peerIDs, networkIDBlock, chainIDBlock, bSyncService.p2p.ConnectionGater()); err != nil {
return fmt.Errorf("error while creating exchange: %w", err)
}
if err := bExService.ex.Start(bExService.ctx); err != nil {
if err := bSyncService.ex.Start(bSyncService.ctx); err != nil {
return fmt.Errorf("error while starting exchange: %w", err)
}

if bExService.syncer, err = newBlockSyncer(bExService.ex, bExService.blockStore, bExService.sub, goheadersync.WithBlockTime(bExService.conf.BlockTime)); err != nil {
if bSyncService.syncer, err = newBlockSyncer(bSyncService.ex, bSyncService.blockStore, bSyncService.sub, goheadersync.WithBlockTime(bSyncService.conf.BlockTime)); err != nil {
return fmt.Errorf("error while creating syncer: %w", err)
}

if bExService.isInitialized() {
if err := bExService.StartSyncer(); err != nil {
if bSyncService.isInitialized() {
if err := bSyncService.StartSyncer(); err != nil {
return fmt.Errorf("error while starting the syncer: %w", err)
}
return nil
Expand All @@ -169,36 +169,36 @@ func (bExService *BlockExchangeService) Start() error {
var trustedBlock *types.Block
// Try fetching the trusted block from peers if exists
if len(peerIDs) > 0 {
if bExService.conf.TrustedHash != "" {
trustedHashBytes, err := hex.DecodeString(bExService.conf.TrustedHash)
if bSyncService.conf.TrustedHash != "" {
trustedHashBytes, err := hex.DecodeString(bSyncService.conf.TrustedHash)
if err != nil {
return fmt.Errorf("failed to parse the trusted hash for initializing the blockstore: %w", err)
}

if trustedBlock, err = bExService.ex.Get(bExService.ctx, header.Hash(trustedHashBytes)); err != nil {
if trustedBlock, err = bSyncService.ex.Get(bSyncService.ctx, header.Hash(trustedHashBytes)); err != nil {
return fmt.Errorf("failed to fetch the trusted block for initializing the blockStore: %w", err)
}
} else {
// Try fetching the genesis block if available, otherwise fallback to blocks
if trustedBlock, err = bExService.ex.GetByHeight(bExService.ctx, uint64(bExService.genesis.InitialHeight)); err != nil {
if trustedBlock, err = bSyncService.ex.GetByHeight(bSyncService.ctx, uint64(bSyncService.genesis.InitialHeight)); err != nil {
// Full/light nodes have to wait for aggregator to publish the genesis block
// proposing aggregator can init the store and start the syncer when the first block is published
return fmt.Errorf("failed to fetch the genesis block: %w", err)
}
}
return bExService.initBlockStoreAndStartSyncer(bExService.ctx, trustedBlock)
return bSyncService.initBlockStoreAndStartSyncer(bSyncService.ctx, trustedBlock)
}
return nil
}

// OnStop is a part of Service interface.
func (bExService *BlockExchangeService) Stop() error {
err := bExService.blockStore.Stop(bExService.ctx)
err = multierr.Append(err, bExService.p2pServer.Stop(bExService.ctx))
err = multierr.Append(err, bExService.ex.Stop(bExService.ctx))
err = multierr.Append(err, bExService.sub.Stop(bExService.ctx))
if bExService.syncerStatus.isStarted() {
err = multierr.Append(err, bExService.syncer.Stop(bExService.ctx))
func (bSyncService *BlockSyncService) Stop() error {
err := bSyncService.blockStore.Stop(bSyncService.ctx)
err = multierr.Append(err, bSyncService.p2pServer.Stop(bSyncService.ctx))
err = multierr.Append(err, bSyncService.ex.Stop(bSyncService.ctx))
err = multierr.Append(err, bSyncService.sub.Stop(bSyncService.ctx))
if bSyncService.syncerStatus.isStarted() {
err = multierr.Append(err, bSyncService.syncer.Stop(bSyncService.ctx))
}
return err
}
Expand Down Expand Up @@ -240,16 +240,16 @@ func newBlockSyncer(
return goheadersync.NewSyncer[*types.Block](ex, store, sub, opt)
}

func (bExService *BlockExchangeService) StartSyncer() error {
bExService.syncerStatus.m.Lock()
defer bExService.syncerStatus.m.Unlock()
if bExService.syncerStatus.started {
func (bSyncService *BlockSyncService) StartSyncer() error {
bSyncService.syncerStatus.m.Lock()
defer bSyncService.syncerStatus.m.Unlock()
if bSyncService.syncerStatus.started {
return nil
}
err := bExService.syncer.Start(bExService.ctx)
err := bSyncService.syncer.Start(bSyncService.ctx)
if err != nil {
return err
}
bExService.syncerStatus.started = true
bSyncService.syncerStatus.started = true
return nil
}
Loading

0 comments on commit 0a8d655

Please sign in to comment.