Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnect slasher streams on beacon node shutdown #5376

Merged
merged 10 commits into from
Apr 10, 2020
62 changes: 61 additions & 1 deletion slasher/beaconclient/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package beaconclient

import (
"context"
"errors"
"io"
"strings"
"time"

ptypes "github.com/gogo/protobuf/types"
Expand All @@ -11,6 +13,10 @@ import (
"go.opencensus.io/trace"
)

// reconnectPeriod is the frequency that we try to restart our
// streams when the beacon chain is node does not respond.
var reconnectPeriod = 5 * time.Second

// receiveBlocks starts a gRPC client stream listener to obtain
// blocks from the beacon node. Upon receiving a block, the service
// broadcasts it to a feed for other services in slasher to subscribe to.
Expand All @@ -33,6 +39,13 @@ func (bs *Service) receiveBlocks(ctx context.Context) {
log.WithError(ctx.Err()).Error("Context canceled - shutting down blocks receiver")
return
}
if err != nil && strings.Contains(strings.ToLower(err.Error()), strings.ToLower(context.Canceled.Error())) {
shayzluf marked this conversation as resolved.
Show resolved Hide resolved
stream, err = bs.restartBlockStream(ctx)
if err != nil {
log.WithError(err).Error("Could not restart stream")
return
}
}
if err != nil {
log.WithError(err).Error("Could not receive block from beacon node")
break
Expand Down Expand Up @@ -70,9 +83,16 @@ func (bs *Service) receiveAttestations(ctx context.Context) {
log.WithError(ctx.Err()).Error("Context canceled - shutting down attestations receiver")
return
}
if err != nil && strings.Contains(strings.ToLower(err.Error()), strings.ToLower(context.Canceled.Error())) {
shayzluf marked this conversation as resolved.
Show resolved Hide resolved
stream, err = bs.restartIndexedAttestationStream(ctx)
if err != nil {
log.WithError(err).Error("Could not restart stream")
return
}
}
if err != nil {
log.WithError(err).Error("Could not receive attestations from beacon node")
continue
break
}
if res == nil {
continue
Expand Down Expand Up @@ -120,3 +140,43 @@ func (bs *Service) collectReceivedAttestations(ctx context.Context) {
}
}
}

func (bs *Service) restartIndexedAttestationStream(ctx context.Context) (ethpb.BeaconChain_StreamIndexedAttestationsClient, error) {
ticker := time.NewTicker(reconnectPeriod)
for {
select {
case <-ticker.C:
log.Info("Context closed, attempting to restart attestation stream")
stream, err := bs.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{})
if err != nil {
continue
}
log.Info("Attestation stream restarted...")
return stream, nil
case <-ctx.Done():
log.Debug("Context closed, exiting reconnect routine")
return nil, errors.New("context closed, no longer attempting to restart stream")
}
}

}

func (bs *Service) restartBlockStream(ctx context.Context) (ethpb.BeaconChain_StreamBlocksClient, error) {
ticker := time.NewTicker(reconnectPeriod)
for {
select {
case <-ticker.C:
log.Info("Context closed, attempting to restart block stream")
stream, err := bs.beaconClient.StreamBlocks(ctx, &ptypes.Empty{})
if err != nil {
continue
}
log.Info("Block stream restarted...")
return stream, nil
case <-ctx.Done():
log.Debug("Context closed, exiting reconnect routine")
return nil, errors.New("context closed, no longer attempting to restart stream")
}
}

}