Skip to content

Commit

Permalink
beacon/light/sync: print error log if checkpoint retrieval fails (eth…
Browse files Browse the repository at this point in the history
…ereum#29532)


Co-authored-by: Felix Lange <[email protected]>
  • Loading branch information
zsfelfoldi and fjl authored Apr 22, 2024
1 parent 1ec7af2 commit e6689fe
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 40 deletions.
10 changes: 8 additions & 2 deletions beacon/blsync/block_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
)

var (
testServer1 = "testServer1"
testServer2 = "testServer2"
testServer1 = testServer("testServer1")
testServer2 = testServer("testServer2")

testBlock1 = types.NewBeaconBlock(&deneb.BeaconBlock{
Slot: 123,
Expand All @@ -51,6 +51,12 @@ var (
})
)

type testServer string

func (t testServer) Name() string {
return string(t)
}

func TestBlockSync(t *testing.T) {
ht := &testHeadTracker{}
blockSync := newBeaconBlockSync(ht)
Expand Down
9 changes: 8 additions & 1 deletion beacon/light/api/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ func (s *ApiServer) SendRequest(id request.ID, req request.Request) {
r.Updates, r.Committees, err = s.api.GetBestUpdatesAndCommittees(data.FirstPeriod, data.Count)
resp = r
case sync.ReqHeader:
var r sync.RespHeader
log.Debug("Beacon API: requesting header", "reqid", id, "hash", common.Hash(data))
resp, err = s.api.GetHeader(common.Hash(data))
r.Header, r.Canonical, r.Finalized, err = s.api.GetHeader(common.Hash(data))
resp = r
case sync.ReqCheckpointData:
log.Debug("Beacon API: requesting checkpoint data", "reqid", id, "hash", common.Hash(data))
resp, err = s.api.GetCheckpointData(common.Hash(data))
Expand All @@ -101,3 +103,8 @@ func (s *ApiServer) Unsubscribe() {
s.unsubscribe = nil
}
}

// Name implements request.Server
func (s *ApiServer) Name() string {
return s.api.url
}
17 changes: 10 additions & 7 deletions beacon/light/api/light_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,9 @@ func decodeFinalityUpdate(enc []byte) (types.FinalityUpdate, error) {

// GetHeader fetches and validates the beacon header with the given blockRoot.
// If blockRoot is null hash then the latest head header is fetched.
func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, error) {
// The values of the canonical and finalized flags are also returned. Note that
// these flags are not validated.
func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, bool, bool, error) {
var blockId string
if blockRoot == (common.Hash{}) {
blockId = "head"
Expand All @@ -300,11 +302,12 @@ func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, error
}
resp, err := api.httpGetf("/eth/v1/beacon/headers/%s", blockId)
if err != nil {
return types.Header{}, err
return types.Header{}, false, false, err
}

var data struct {
Data struct {
Finalized bool `json:"finalized"`
Data struct {
Root common.Hash `json:"root"`
Canonical bool `json:"canonical"`
Header struct {
Expand All @@ -314,16 +317,16 @@ func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, error
} `json:"data"`
}
if err := json.Unmarshal(resp, &data); err != nil {
return types.Header{}, err
return types.Header{}, false, false, err
}
header := data.Data.Header.Message
if blockRoot == (common.Hash{}) {
blockRoot = data.Data.Root
}
if header.Hash() != blockRoot {
return types.Header{}, errors.New("retrieved beacon header root does not match")
return types.Header{}, false, false, errors.New("retrieved beacon header root does not match")
}
return header, nil
return header, data.Data.Canonical, data.Finalized, nil
}

// GetCheckpointData fetches and validates bootstrap data belonging to the given checkpoint.
Expand Down Expand Up @@ -446,7 +449,7 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()
defer wg.Done()

// Request initial data.
if head, err := api.GetHeader(common.Hash{}); err == nil {
if head, _, _, err := api.GetHeader(common.Hash{}); err == nil {
listener.OnNewHead(head.Slot, head.Hash())
}
if signedHead, err := api.GetOptimisticHeadUpdate(); err == nil {
Expand Down
4 changes: 3 additions & 1 deletion beacon/light/request/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ type (
// the modules that do not interact with them directly.
// In order to make module testing easier, Server interface is used in
// events and modules.
Server any
Server interface {
Name() string
}
Request any
Response any
ID uint64
Expand Down
4 changes: 4 additions & 0 deletions beacon/light/request/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ type testServer struct {
canRequest int
}

func (s *testServer) Name() string {
return ""
}

func (s *testServer) subscribe(eventCb func(Event)) {
s.eventCb = eventCb
}
Expand Down
7 changes: 7 additions & 0 deletions beacon/light/request/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
// EvResponse or EvFail. Additionally, it may also send application-defined
// events that the Modules can interpret.
type requestServer interface {
Name() string
Subscribe(eventCallback func(Event))
SendRequest(ID, Request)
Unsubscribe()
Expand All @@ -69,6 +70,7 @@ type requestServer interface {
// limit the number of parallel in-flight requests and temporarily disable
// new requests based on timeouts and response failures.
type server interface {
Server
subscribe(eventCallback func(Event))
canRequestNow() bool
sendRequest(Request) ID
Expand Down Expand Up @@ -138,6 +140,11 @@ type serverWithTimeout struct {
lastID ID
}

// Name implements request.Server
func (s *serverWithTimeout) Name() string {
return s.parent.Name()
}

// init initializes serverWithTimeout
func (s *serverWithTimeout) init(clock mclock.Clock) {
s.clock = clock
Expand Down
1 change: 1 addition & 0 deletions beacon/light/request/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type testRequestServer struct {
eventCb func(Event)
}

func (rs *testRequestServer) Name() string { return "" }
func (rs *testRequestServer) Subscribe(eventCb func(Event)) { rs.eventCb = eventCb }
func (rs *testRequestServer) SendRequest(ID, Request) {}
func (rs *testRequestServer) Unsubscribe() {}
14 changes: 10 additions & 4 deletions beacon/light/sync/head_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
)

var (
testServer1 = "testServer1"
testServer2 = "testServer2"
testServer3 = "testServer3"
testServer4 = "testServer4"
testServer1 = testServer("testServer1")
testServer2 = testServer("testServer2")
testServer3 = testServer("testServer3")
testServer4 = testServer("testServer4")

testHead0 = types.HeadInfo{}
testHead1 = types.HeadInfo{Slot: 123, BlockRoot: common.Hash{1}}
Expand All @@ -42,6 +42,12 @@ var (
testSHead4 = types.SignedHeader{SignatureSlot: 0x6444, Header: types.Header{Slot: 0x6443, StateRoot: common.Hash{4}}}
)

type testServer string

func (t testServer) Name() string {
return string(t)
}

func TestValidatedHead(t *testing.T) {
chain := &TestCommitteeChain{}
ht := &TestHeadTracker{}
Expand Down
4 changes: 2 additions & 2 deletions beacon/light/sync/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (ts *TestScheduler) Run(testIndex int, exp ...any) {
if count == 0 {
continue
}
ts.t.Errorf("Missing %d Server.Fail(s) from server %s in test case #%d", count, server.(string), testIndex)
ts.t.Errorf("Missing %d Server.Fail(s) from server %s in test case #%d", count, server.Name(), testIndex)
}

if !reflect.DeepEqual(ts.sent[testIndex], expReqs) {
Expand Down Expand Up @@ -104,7 +104,7 @@ func (ts *TestScheduler) Send(server request.Server, req request.Request) reques

func (ts *TestScheduler) Fail(server request.Server, desc string) {
if ts.expFail[server] == 0 {
ts.t.Errorf("Unexpected Fail from server %s in test case #%d: %s", server.(string), ts.testIndex, desc)
ts.t.Errorf("Unexpected Fail from server %s in test case #%d: %s", server.Name(), ts.testIndex, desc)
return
}
ts.expFail[server]--
Expand Down
6 changes: 5 additions & 1 deletion beacon/light/sync/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ type (
Updates []*types.LightClientUpdate
Committees []*types.SerializedSyncCommittee
}
ReqHeader common.Hash
ReqHeader common.Hash
RespHeader struct {
Header types.Header
Canonical, Finalized bool
}
ReqCheckpointData common.Hash
ReqBeaconBlock common.Hash
)
139 changes: 117 additions & 22 deletions beacon/light/sync/update_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/ethereum/go-ethereum/beacon/light"
"github.com/ethereum/go-ethereum/beacon/light/request"
"github.com/ethereum/go-ethereum/beacon/params"
"github.com/ethereum/go-ethereum/beacon/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
Expand All @@ -42,47 +43,141 @@ type CheckpointInit struct {
checkpointHash common.Hash
locked request.ServerAndID
initialized bool
// per-server state is used to track the state of requesting checkpoint header
// info. Part of this info (canonical and finalized state) is not validated
// and therefore it is requested from each server separately after it has
// reported a missing checkpoint (which is also not validated info).
serverState map[request.Server]serverState
// the following fields are used to determine whether the checkpoint is on
// epoch boundary. This information is validated and therefore stored globally.
parentHash common.Hash
hasEpochInfo, epochBoundary bool
cpSlot, parentSlot uint64
}

const (
ssDefault = iota // no action yet or checkpoint requested
ssNeedHeader // checkpoint req failed, need cp header
ssHeaderRequested // cp header requested
ssNeedParent // cp header slot %32 != 0, need parent to check epoch boundary
ssParentRequested // cp parent header requested
ssPrintStatus // has all necessary info, print log message if init still not successful
ssDone // log message printed, no more action required
)

type serverState struct {
state int
hasHeader, canonical, finalized bool // stored per server because not validated
}

// NewCheckpointInit creates a new CheckpointInit.
func NewCheckpointInit(chain committeeChain, checkpointHash common.Hash) *CheckpointInit {
return &CheckpointInit{
chain: chain,
checkpointHash: checkpointHash,
serverState: make(map[request.Server]serverState),
}
}

// Process implements request.Module.
func (s *CheckpointInit) Process(requester request.Requester, events []request.Event) {
if s.initialized {
return
}
for _, event := range events {
if !event.IsRequestEvent() {
continue
}
sid, req, resp := event.RequestInfo()
if s.locked == sid {
s.locked = request.ServerAndID{}
}
if resp != nil {
if checkpoint := resp.(*types.BootstrapData); checkpoint.Header.Hash() == common.Hash(req.(ReqCheckpointData)) {
s.chain.CheckpointInit(*checkpoint)
s.initialized = true
return
switch event.Type {
case request.EvResponse, request.EvFail, request.EvTimeout:
sid, req, resp := event.RequestInfo()
if s.locked == sid {
s.locked = request.ServerAndID{}
}

requester.Fail(event.Server, "invalid checkpoint data")
if event.Type == request.EvTimeout {
continue
}
switch s.serverState[sid.Server].state {
case ssDefault:
if resp != nil {
if checkpoint := resp.(*types.BootstrapData); checkpoint.Header.Hash() == common.Hash(req.(ReqCheckpointData)) {
s.chain.CheckpointInit(*checkpoint)
s.initialized = true
return
}
requester.Fail(event.Server, "invalid checkpoint data")
}
s.serverState[sid.Server] = serverState{state: ssNeedHeader}
case ssHeaderRequested:
if resp == nil {
s.serverState[sid.Server] = serverState{state: ssPrintStatus}
continue
}
newState := serverState{
hasHeader: true,
canonical: resp.(RespHeader).Canonical,
finalized: resp.(RespHeader).Finalized,
}
s.cpSlot, s.parentHash = resp.(RespHeader).Header.Slot, resp.(RespHeader).Header.ParentRoot
if s.cpSlot%params.EpochLength == 0 {
s.hasEpochInfo, s.epochBoundary = true, true
}
if s.hasEpochInfo {
newState.state = ssPrintStatus
} else {
newState.state = ssNeedParent
}
s.serverState[sid.Server] = newState
case ssParentRequested:
s.parentSlot = resp.(RespHeader).Header.Slot
s.hasEpochInfo, s.epochBoundary = true, s.cpSlot/params.EpochLength > s.parentSlot/params.EpochLength
newState := s.serverState[sid.Server]
newState.state = ssPrintStatus
s.serverState[sid.Server] = newState
}
case request.EvUnregistered:
delete(s.serverState, event.Server)
}
}
// start a request if possible
if s.initialized || s.locked != (request.ServerAndID{}) {
return
for _, server := range requester.CanSendTo() {
switch s.serverState[server].state {
case ssDefault:
if s.locked == (request.ServerAndID{}) {
id := requester.Send(server, ReqCheckpointData(s.checkpointHash))
s.locked = request.ServerAndID{Server: server, ID: id}
}
case ssNeedHeader:
requester.Send(server, ReqHeader(s.checkpointHash))
newState := s.serverState[server]
newState.state = ssHeaderRequested
s.serverState[server] = newState
case ssNeedParent:
requester.Send(server, ReqHeader(s.parentHash))
newState := s.serverState[server]
newState.state = ssParentRequested
s.serverState[server] = newState
}
}
cs := requester.CanSendTo()
if len(cs) == 0 {
return
// print log message if necessary
for server, state := range s.serverState {
if state.state != ssPrintStatus {
continue
}
switch {
case !state.hasHeader:
log.Error("blsync: checkpoint block is not available, reported as unknown", "server", server.Name())
case !state.canonical:
log.Error("blsync: checkpoint block is not available, reported as non-canonical", "server", server.Name())
case !s.hasEpochInfo:
// should be available if hasHeader is true and state is ssPrintStatus
panic("checkpoint epoch info not available when printing retrieval status")
case !s.epochBoundary:
log.Error("blsync: checkpoint block is not first of epoch", "slot", s.cpSlot, "parent", s.parentSlot, "server", server.Name())
case !state.finalized:
log.Error("blsync: checkpoint block is reported as non-finalized", "server", server.Name())
default:
log.Error("blsync: checkpoint not available, but reported as finalized; specified checkpoint hash might be too old", "server", server.Name())
}
s.serverState[server] = serverState{state: ssDone}
}
server := cs[0]
id := requester.Send(server, ReqCheckpointData(s.checkpointHash))
s.locked = request.ServerAndID{Server: server, ID: id}
}

// ForwardUpdateSync implements request.Module; it fetches updates between the
Expand Down

0 comments on commit e6689fe

Please sign in to comment.