Skip to content

Commit

Permalink
merged
Browse files Browse the repository at this point in the history
  • Loading branch information
dhrubabasu committed Nov 29, 2023
2 parents b8bf721 + 1d30f2b commit e2c7817
Show file tree
Hide file tree
Showing 32 changed files with 715 additions and 442 deletions.
26 changes: 12 additions & 14 deletions codec/hierarchycodec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/ava-labs/avalanchego/codec"
"github.com/ava-labs/avalanchego/codec/reflectcodec"
"github.com/ava-labs/avalanchego/utils/bimap"
"github.com/ava-labs/avalanchego/utils/wrappers"
)

Expand Down Expand Up @@ -42,20 +43,18 @@ type typeID struct {
type hierarchyCodec struct {
codec.Codec

lock sync.RWMutex
currentGroupID uint16
nextTypeID uint16
typeIDToType map[typeID]reflect.Type
typeToTypeID map[reflect.Type]typeID
lock sync.RWMutex
currentGroupID uint16
nextTypeID uint16
registeredTypes *bimap.BiMap[typeID, reflect.Type]
}

// New returns a new, concurrency-safe codec
func New(tagNames []string, maxSliceLen uint32) Codec {
hCodec := &hierarchyCodec{
currentGroupID: 0,
nextTypeID: 0,
typeIDToType: map[typeID]reflect.Type{},
typeToTypeID: map[reflect.Type]typeID{},
currentGroupID: 0,
nextTypeID: 0,
registeredTypes: bimap.New[typeID, reflect.Type](),
}
hCodec.Codec = reflectcodec.New(hCodec, tagNames, maxSliceLen)
return hCodec
Expand Down Expand Up @@ -88,7 +87,7 @@ func (c *hierarchyCodec) RegisterType(val interface{}) error {
defer c.lock.Unlock()

valType := reflect.TypeOf(val)
if _, exists := c.typeToTypeID[valType]; exists {
if c.registeredTypes.HasValue(valType) {
return fmt.Errorf("%w: %v", codec.ErrDuplicateType, valType)
}

Expand All @@ -98,8 +97,7 @@ func (c *hierarchyCodec) RegisterType(val interface{}) error {
}
c.nextTypeID++

c.typeIDToType[valTypeID] = valType
c.typeToTypeID[valType] = valTypeID
c.registeredTypes.Put(valTypeID, valType)
return nil
}

Expand All @@ -112,7 +110,7 @@ func (c *hierarchyCodec) PackPrefix(p *wrappers.Packer, valueType reflect.Type)
c.lock.RLock()
defer c.lock.RUnlock()

typeID, ok := c.typeToTypeID[valueType] // Get the type ID of the value being marshaled
typeID, ok := c.registeredTypes.GetKey(valueType) // Get the type ID of the value being marshaled
if !ok {
return fmt.Errorf("can't marshal unregistered type %q", valueType)
}
Expand All @@ -136,7 +134,7 @@ func (c *hierarchyCodec) UnpackPrefix(p *wrappers.Packer, valueType reflect.Type
typeID: typeIDShort,
}
// Get a type that implements the interface
implementingType, ok := c.typeIDToType[t]
implementingType, ok := c.registeredTypes.GetValue(t)
if !ok {
return reflect.Value{}, fmt.Errorf("couldn't unmarshal interface: unknown type ID %+v", t)
}
Expand Down
22 changes: 10 additions & 12 deletions codec/linearcodec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/ava-labs/avalanchego/codec"
"github.com/ava-labs/avalanchego/codec/reflectcodec"
"github.com/ava-labs/avalanchego/utils/bimap"
"github.com/ava-labs/avalanchego/utils/wrappers"
)

Expand All @@ -36,19 +37,17 @@ type Codec interface {
type linearCodec struct {
codec.Codec

lock sync.RWMutex
nextTypeID uint32
typeIDToType map[uint32]reflect.Type
typeToTypeID map[reflect.Type]uint32
lock sync.RWMutex
nextTypeID uint32
registeredTypes *bimap.BiMap[uint32, reflect.Type]
}

// New returns a new, concurrency-safe codec; it allow to specify
// both tagNames and maxSlicelenght
func New(tagNames []string, maxSliceLen uint32) Codec {
hCodec := &linearCodec{
nextTypeID: 0,
typeIDToType: map[uint32]reflect.Type{},
typeToTypeID: map[reflect.Type]uint32{},
nextTypeID: 0,
registeredTypes: bimap.New[uint32, reflect.Type](),
}
hCodec.Codec = reflectcodec.New(hCodec, tagNames, maxSliceLen)
return hCodec
Expand Down Expand Up @@ -78,12 +77,11 @@ func (c *linearCodec) RegisterType(val interface{}) error {
defer c.lock.Unlock()

valType := reflect.TypeOf(val)
if _, exists := c.typeToTypeID[valType]; exists {
if c.registeredTypes.HasValue(valType) {
return fmt.Errorf("%w: %v", codec.ErrDuplicateType, valType)
}

c.typeIDToType[c.nextTypeID] = valType
c.typeToTypeID[valType] = c.nextTypeID
c.registeredTypes.Put(c.nextTypeID, valType)
c.nextTypeID++
return nil
}
Expand All @@ -97,7 +95,7 @@ func (c *linearCodec) PackPrefix(p *wrappers.Packer, valueType reflect.Type) err
c.lock.RLock()
defer c.lock.RUnlock()

typeID, ok := c.typeToTypeID[valueType] // Get the type ID of the value being marshaled
typeID, ok := c.registeredTypes.GetKey(valueType) // Get the type ID of the value being marshaled
if !ok {
return fmt.Errorf("can't marshal unregistered type %q", valueType)
}
Expand All @@ -114,7 +112,7 @@ func (c *linearCodec) UnpackPrefix(p *wrappers.Packer, valueType reflect.Type) (
return reflect.Value{}, fmt.Errorf("couldn't unmarshal interface: %w", p.Err)
}
// Get a type that implements the interface
implementingType, ok := c.typeIDToType[typeID]
implementingType, ok := c.registeredTypes.GetValue(typeID)
if !ok {
return reflect.Value{}, fmt.Errorf("couldn't unmarshal interface: unknown type ID %d", typeID)
}
Expand Down
2 changes: 1 addition & 1 deletion scripts/mocks.mockgen.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ github.com/ava-labs/avalanchego/utils/logging=Logger=utils/logging/mock_logger.g
github.com/ava-labs/avalanchego/utils/resource=User=utils/resource/mock_user.go
github.com/ava-labs/avalanchego/vms/avm/block=Block=vms/avm/block/mock_block.go
github.com/ava-labs/avalanchego/vms/avm/metrics=Metrics=vms/avm/metrics/mock_metrics.go
github.com/ava-labs/avalanchego/vms/avm/states=Chain,State,Diff=vms/avm/states/mock_states.go
github.com/ava-labs/avalanchego/vms/avm/state=Chain,State,Diff=vms/avm/state/mock_state.go
github.com/ava-labs/avalanchego/vms/avm/txs/mempool=Mempool=vms/avm/txs/mempool/mock_mempool.go
github.com/ava-labs/avalanchego/vms/components/avax=TransferableIn=vms/components/avax/mock_transferable_in.go
github.com/ava-labs/avalanchego/vms/components/verify=Verifiable=vms/components/verify/mock_verifiable.go
Expand Down
40 changes: 29 additions & 11 deletions snow/engine/avalanche/bootstrap/bootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ava-labs/avalanchego/snow/choices"
"github.com/ava-labs/avalanchego/snow/consensus/avalanche"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/bimap"
"github.com/ava-labs/avalanchego/utils/heap"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
Expand Down Expand Up @@ -57,10 +58,10 @@ func New(
ChitsHandler: common.NewNoOpChitsHandler(config.Ctx.Log),
AppHandler: config.VM,

outstandingRequests: bimap.New[common.Request, ids.ID](),

processedCache: &cache.LRU[ids.ID, struct{}]{Size: cacheSize},
Fetcher: common.Fetcher{
OnFinished: onFinished,
},
onFinished: onFinished,
}
return b, b.metrics.Initialize("bs", config.Ctx.AvalancheRegisterer)
}
Expand All @@ -81,9 +82,11 @@ type bootstrapper struct {
common.ChitsHandler
common.AppHandler

common.Fetcher
metrics

// tracks which validators were asked for which containers in which requests
outstandingRequests *bimap.BiMap[common.Request, ids.ID]

// IDs of vertices that we will send a GetAncestors request for once we are
// not at the max number of outstanding requests
needToFetch set.Set[ids.ID]
Expand All @@ -93,6 +96,9 @@ type bootstrapper struct {

// Tracks the last requestID that was used in a request
requestID uint32

// Called when bootstrapping is done on a specific chain
onFinished func(ctx context.Context, lastReqID uint32) error
}

func (b *bootstrapper) Context() *snow.ConsensusContext {
Expand Down Expand Up @@ -137,7 +143,10 @@ func (b *bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, request
vtxs = vtxs[:b.Config.AncestorsMaxContainersReceived]
}

requestedVtxID, requested := b.OutstandingRequests.Remove(nodeID, requestID)
requestedVtxID, requested := b.outstandingRequests.DeleteKey(common.Request{
NodeID: nodeID,
RequestID: requestID,
})
vtx, err := b.Manager.ParseVtx(ctx, vtxs[0]) // first vertex should be the one we requested in GetAncestors request
if err != nil {
if !requested {
Expand Down Expand Up @@ -177,7 +186,7 @@ func (b *bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, request
)
return b.fetch(ctx, requestedVtxID)
}
if !requested && !b.OutstandingRequests.Contains(vtxID) && !b.needToFetch.Contains(vtxID) {
if !requested && !b.outstandingRequests.HasValue(vtxID) && !b.needToFetch.Contains(vtxID) {
b.Ctx.Log.Debug("received un-needed vertex",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
Expand Down Expand Up @@ -244,7 +253,10 @@ func (b *bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, request
}

func (b *bootstrapper) GetAncestorsFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
vtxID, ok := b.OutstandingRequests.Remove(nodeID, requestID)
vtxID, ok := b.outstandingRequests.DeleteKey(common.Request{
NodeID: nodeID,
RequestID: requestID,
})
if !ok {
b.Ctx.Log.Debug("skipping GetAncestorsFailed call",
zap.String("reason", "no matching outstanding request"),
Expand Down Expand Up @@ -388,12 +400,12 @@ func (b *bootstrapper) HealthCheck(ctx context.Context) (interface{}, error) {
// to fetch or we are at the maximum number of outstanding requests.
func (b *bootstrapper) fetch(ctx context.Context, vtxIDs ...ids.ID) error {
b.needToFetch.Add(vtxIDs...)
for b.needToFetch.Len() > 0 && b.OutstandingRequests.Len() < maxOutstandingGetAncestorsRequests {
for b.needToFetch.Len() > 0 && b.outstandingRequests.Len() < maxOutstandingGetAncestorsRequests {
vtxID := b.needToFetch.CappedList(1)[0]
b.needToFetch.Remove(vtxID)

// Make sure we haven't already requested this vertex
if b.OutstandingRequests.Contains(vtxID) {
if b.outstandingRequests.HasValue(vtxID) {
continue
}

Expand All @@ -409,7 +421,13 @@ func (b *bootstrapper) fetch(ctx context.Context, vtxIDs ...ids.ID) error {
validatorID := validatorIDs[0]
b.requestID++

b.OutstandingRequests.Add(validatorID, b.requestID, vtxID)
b.outstandingRequests.Put(
common.Request{
NodeID: validatorID,
RequestID: b.requestID,
},
vtxID,
)
b.Config.Sender.SendGetAncestors(ctx, validatorID, b.requestID, vtxID) // request vertex and ancestors
}
return b.checkFinish(ctx)
Expand Down Expand Up @@ -606,7 +624,7 @@ func (b *bootstrapper) checkFinish(ctx context.Context) error {
}

b.processedCache.Flush()
return b.OnFinished(ctx, b.requestID)
return b.onFinished(ctx, b.requestID)
}

// A vertex is less than another vertex if it is unknown. Ties are broken by
Expand Down
14 changes: 0 additions & 14 deletions snow/engine/common/fetcher.go

This file was deleted.

11 changes: 11 additions & 0 deletions snow/engine/common/request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package common

import "github.com/ava-labs/avalanchego/ids"

type Request struct {
NodeID ids.NodeID
RequestID uint32
}
Loading

0 comments on commit e2c7817

Please sign in to comment.