Skip to content

Commit

Permalink
Merge branch 'development' into kishan/bug/dont-drop-us
Browse files Browse the repository at this point in the history
  • Loading branch information
kishansagathiya committed Feb 18, 2022
2 parents 68fcdf2 + 9ac6642 commit 0a1ff2c
Show file tree
Hide file tree
Showing 29 changed files with 1,060 additions and 752 deletions.
2 changes: 1 addition & 1 deletion dot/network/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (s *testStreamHandler) readStream(stream libp2pnetwork.Stream,
}()

for {
tot, err := readStream(stream, msgBytes)
tot, err := readStream(stream, &msgBytes)
if errors.Is(err, io.EOF) {
return
} else if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions dot/network/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder
peer := stream.Conn().RemotePeer()
buffer := s.bufPool.Get().(*[]byte)
defer s.bufPool.Put(buffer)
msgBytes := *buffer

for {
n, err := readStream(stream, msgBytes[:])
n, err := readStream(stream, buffer)
if err != nil {
logger.Tracef(
"failed to read from stream id %s of peer %s using protocol %s: %s",
Expand All @@ -32,6 +31,7 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder

// decode message based on message type
// stream should always be inbound if it passes through service.readStream
msgBytes := *buffer
msg, err := decoder(msgBytes[:n], peer, isInbound(stream))
if err != nil {
logger.Tracef("failed to decode message from stream id %s using protocol %s: %s",
Expand Down
4 changes: 2 additions & 2 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,14 +433,14 @@ func (s *Service) readHandshake(stream libp2pnetwork.Stream, decoder HandshakeDe

buffer := s.bufPool.Get().(*[]byte)
defer s.bufPool.Put(buffer)
msgBytes := *buffer

tot, err := readStream(stream, msgBytes[:])
tot, err := readStream(stream, buffer)
if err != nil {
hsC <- &handshakeReader{hs: nil, err: err}
return
}

msgBytes := *buffer
hs, err := decoder(msgBytes[:tot])
if err != nil {
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Expand Down
2 changes: 1 addition & 1 deletion dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (
blockAnnounceID = "/block-announces/1"
transactionsID = "/transactions/1"

maxMessageSize = 1024 * 63 // 63kb for now
maxMessageSize = 1024 * 64 // 64kb for now
)

var (
Expand Down
2 changes: 1 addition & 1 deletion dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s *Service) receiveBlockResponse(stream libp2pnetwork.Stream) (*BlockRespo

buf := s.blockResponseBuf

n, err := readStream(stream, buf)
n, err := readStream(stream, &buf)
if err != nil {
return nil, fmt.Errorf("read stream error: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions dot/network/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func readLEB128ToUint64(r io.Reader, buf []byte) (uint64, int, error) {
}

// readStream reads from the stream into the given buffer, returning the number of bytes read
func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) {
func readStream(stream libp2pnetwork.Stream, bufPointer *[]byte) (int, error) {
if stream == nil {
return 0, errors.New("stream is nil")
}
Expand All @@ -185,6 +185,7 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) {
tot int
)

buf := *bufPointer
length, bytesRead, err := readLEB128ToUint64(stream, buf[:1])
if err != nil {
return bytesRead, fmt.Errorf("failed to read length: %w", err)
Expand All @@ -195,8 +196,9 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) {
}

if length > uint64(len(buf)) {
extraBytes := int(length) - len(buf)
*bufPointer = append(buf, make([]byte, extraBytes)...) // TODO #2288 use bytes.Buffer instead
logger.Warnf("received message with size %d greater than allocated message buffer size %d", length, len(buf))
return 0, fmt.Errorf("message size greater than allocated message buffer: got %d", length)
}

if length > maxBlockResponseSize {
Expand Down
5 changes: 3 additions & 2 deletions dot/rpc/modules/dev_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ func newState(t *testing.T) (*state.BlockState, *state.EpochState) {

db := state.NewInMemoryDB(t)

_, _, genesisHeader := genesis.NewTestGenesisWithTrieAndHeader(t)
bs, err := state.NewBlockStateFromGenesis(db, genesisHeader, telemetryMock)
_, genesisTrie, genesisHeader := genesis.NewTestGenesisWithTrieAndHeader(t)
tries := state.NewTries(genesisTrie)
bs, err := state.NewBlockStateFromGenesis(db, tries, genesisHeader, telemetryMock)
require.NoError(t, err)
es, err := state.NewEpochStateFromGenesis(db, bs, genesisBABEConfig)
require.NoError(t, err)
Expand Down
16 changes: 7 additions & 9 deletions dot/state/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import (
)

const (
pruneKeyBufferSize = 1000
blockPrefix = "block"
blockPrefix = "block"
)

var (
Expand Down Expand Up @@ -60,6 +59,7 @@ type BlockState struct {
genesisHash common.Hash
lastFinalised common.Hash
unfinalisedBlocks *sync.Map // map[common.Hash]*types.Block
tries *Tries

// block notifiers
imported map[chan *types.Block]struct{}
Expand All @@ -69,21 +69,19 @@ type BlockState struct {
runtimeUpdateSubscriptionsLock sync.RWMutex
runtimeUpdateSubscriptions map[uint32]chan<- runtime.Version

pruneKeyCh chan *types.Header

telemetry telemetry.Client
}

// NewBlockState will create a new BlockState backed by the database located at basePath
func NewBlockState(db chaindb.Database, telemetry telemetry.Client) (*BlockState, error) {
func NewBlockState(db chaindb.Database, trs *Tries, telemetry telemetry.Client) (*BlockState, error) {
bs := &BlockState{
dbPath: db.Path(),
baseState: NewBaseState(db),
db: chaindb.NewTable(db, blockPrefix),
unfinalisedBlocks: new(sync.Map),
tries: trs,
imported: make(map[chan *types.Block]struct{}),
finalised: make(map[chan *types.FinalisationInfo]struct{}),
pruneKeyCh: make(chan *types.Header, pruneKeyBufferSize),
runtimeUpdateSubscriptions: make(map[uint32]chan<- runtime.Version),
telemetry: telemetry,
}
Expand All @@ -107,16 +105,16 @@ func NewBlockState(db chaindb.Database, telemetry telemetry.Client) (*BlockState

// NewBlockStateFromGenesis initialises a BlockState from a genesis header,
// saving it to the database located at basePath
func NewBlockStateFromGenesis(db chaindb.Database, header *types.Header,
telemetryMailer telemetry.Client) (*BlockState, error) {
func NewBlockStateFromGenesis(db chaindb.Database, trs *Tries, header *types.Header,
telemetryMailer telemetry.Client) (*BlockState, error) { // TODO CHECKTEST
bs := &BlockState{
bt: blocktree.NewBlockTreeFromRoot(header),
baseState: NewBaseState(db),
db: chaindb.NewTable(db, blockPrefix),
unfinalisedBlocks: new(sync.Map),
tries: trs,
imported: make(map[chan *types.Block]struct{}),
finalised: make(map[chan *types.FinalisationInfo]struct{}),
pruneKeyCh: make(chan *types.Header, pruneKeyBufferSize),
runtimeUpdateSubscriptions: make(map[uint32]chan<- runtime.Version),
genesisHash: header.Hash(),
lastFinalised: header.Hash(),
Expand Down
2 changes: 1 addition & 1 deletion dot/state/block_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

func TestGetSet_ReceiptMessageQueue_Justification(t *testing.T) {
s := newTestBlockState(t, nil)
s := newTestBlockState(t, nil, newTriesEmpty())
require.NotNil(t, s)

var genesisHeader = &types.Header{
Expand Down
6 changes: 4 additions & 2 deletions dot/state/block_finalisation.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ func (bs *BlockState) SetFinalisedHash(hash common.Hash, round, setID uint64) er
continue
}

bs.tries.delete(block.Header.StateRoot)

logger.Tracef("pruned block number %s with hash %s", block.Header.Number, hash)
bs.pruneKeyCh <- &block.Header
}

// if nothing was previously finalised, set the first slot of the network to the
Expand Down Expand Up @@ -238,8 +239,9 @@ func (bs *BlockState) handleFinalisedBlock(curr common.Hash) error {
continue
}

bs.tries.delete(block.Header.StateRoot)

logger.Tracef("cleaned out finalised block from memory; block number %s with hash %s", block.Header.Number, hash)
bs.pruneKeyCh <- &block.Header
}

return batch.Flush()
Expand Down
6 changes: 3 additions & 3 deletions dot/state/block_finalisation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

func TestHighestRoundAndSetID(t *testing.T) {
bs := newTestBlockState(t, testGenesisHeader)
bs := newTestBlockState(t, testGenesisHeader, newTriesEmpty())
round, setID, err := bs.GetHighestRoundAndSetID()
require.NoError(t, err)
require.Equal(t, uint64(0), round)
Expand Down Expand Up @@ -61,7 +61,7 @@ func TestHighestRoundAndSetID(t *testing.T) {
}

func TestBlockState_SetFinalisedHash(t *testing.T) {
bs := newTestBlockState(t, testGenesisHeader)
bs := newTestBlockState(t, testGenesisHeader, newTriesEmpty())
h, err := bs.GetFinalisedHash(0, 0)
require.NoError(t, err)
require.Equal(t, testGenesisHeader.Hash(), h)
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestBlockState_SetFinalisedHash(t *testing.T) {
}

func TestSetFinalisedHash_setFirstSlotOnFinalisation(t *testing.T) {
bs := newTestBlockState(t, testGenesisHeader)
bs := newTestBlockState(t, testGenesisHeader, newTriesEmpty())
firstSlot := uint64(42069)

digest := types.NewDigest()
Expand Down
15 changes: 8 additions & 7 deletions dot/state/block_notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import (
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/runtime"
runtimemocks "github.com/ChainSafe/gossamer/lib/runtime/mocks"
"github.com/ChainSafe/gossamer/lib/trie"
"github.com/stretchr/testify/require"
)

var testMessageTimeout = time.Second * 3

func TestImportChannel(t *testing.T) {
bs := newTestBlockState(t, testGenesisHeader)
bs := newTestBlockState(t, testGenesisHeader, NewTries(trie.NewEmptyTrie()))
ch := bs.GetImportedBlockNotifierChannel()

defer bs.FreeImportedBlockNotifierChannel(ch)
Expand All @@ -35,7 +36,7 @@ func TestImportChannel(t *testing.T) {
}

func TestFreeImportedBlockNotifierChannel(t *testing.T) {
bs := newTestBlockState(t, testGenesisHeader)
bs := newTestBlockState(t, testGenesisHeader, NewTries(trie.NewEmptyTrie()))
ch := bs.GetImportedBlockNotifierChannel()
require.Equal(t, 1, len(bs.imported))

Expand All @@ -44,7 +45,7 @@ func TestFreeImportedBlockNotifierChannel(t *testing.T) {
}

func TestFinalizedChannel(t *testing.T) {
bs := newTestBlockState(t, testGenesisHeader)
bs := newTestBlockState(t, testGenesisHeader, NewTries(trie.NewEmptyTrie()))

ch := bs.GetFinalisedNotifierChannel()

Expand All @@ -66,7 +67,7 @@ func TestFinalizedChannel(t *testing.T) {
}

func TestImportChannel_Multi(t *testing.T) {
bs := newTestBlockState(t, testGenesisHeader)
bs := newTestBlockState(t, testGenesisHeader, NewTries(trie.NewEmptyTrie()))

num := 5
chs := make([]chan *types.Block, num)
Expand Down Expand Up @@ -99,7 +100,7 @@ func TestImportChannel_Multi(t *testing.T) {
}

func TestFinalizedChannel_Multi(t *testing.T) {
bs := newTestBlockState(t, testGenesisHeader)
bs := newTestBlockState(t, testGenesisHeader, NewTries(trie.NewEmptyTrie()))

num := 5
chs := make([]chan *types.FinalisationInfo, num)
Expand Down Expand Up @@ -136,7 +137,7 @@ func TestFinalizedChannel_Multi(t *testing.T) {
}

func TestService_RegisterUnRegisterRuntimeUpdatedChannel(t *testing.T) {
bs := newTestBlockState(t, testGenesisHeader)
bs := newTestBlockState(t, testGenesisHeader, NewTries(trie.NewEmptyTrie()))
ch := make(chan<- runtime.Version)
chID, err := bs.RegisterRuntimeUpdatedChannel(ch)
require.NoError(t, err)
Expand All @@ -147,7 +148,7 @@ func TestService_RegisterUnRegisterRuntimeUpdatedChannel(t *testing.T) {
}

func TestService_RegisterUnRegisterConcurrentCalls(t *testing.T) {
bs := newTestBlockState(t, testGenesisHeader)
bs := newTestBlockState(t, testGenesisHeader, NewTries(trie.NewEmptyTrie()))

go func() {
for i := 0; i < 100; i++ {
Expand Down
4 changes: 3 additions & 1 deletion dot/state/block_race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ func TestConcurrencySetHeader(t *testing.T) {
dbs[i] = NewInMemoryDB(t)
}

tries := NewTries(trie.NewEmptyTrie()) // not used in this test

pend := new(sync.WaitGroup)
pend.Add(threads)
for i := 0; i < threads; i++ {
go func(index int) {
defer pend.Done()

bs, err := NewBlockStateFromGenesis(dbs[index], testGenesisHeader, telemetryMock)
bs, err := NewBlockStateFromGenesis(dbs[index], tries, testGenesisHeader, telemetryMock)
require.NoError(t, err)

header := &types.Header{
Expand Down
Loading

0 comments on commit 0a1ff2c

Please sign in to comment.