Skip to content

Commit

Permalink
p2p receipts (#11010)
Browse files Browse the repository at this point in the history
closes #10320 and closes #11014

---------

Co-authored-by: JkLondon <[email protected]>
Co-authored-by: alex.sharov <[email protected]>
  • Loading branch information
3 people authored Jul 16, 2024
1 parent c8d9143 commit e0f065b
Show file tree
Hide file tree
Showing 26 changed files with 345 additions and 174 deletions.
31 changes: 16 additions & 15 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,24 +322,24 @@ func EmbeddedServices(ctx context.Context,
func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger, rootCancel context.CancelFunc) (
db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient,
stateCache kvcache.Cache, blockReader services.FullBlockReader, engine consensus.EngineReader,
ff *rpchelper.Filters, agg *libstate.Aggregator, err error) {
ff *rpchelper.Filters, err error) {
if !cfg.WithDatadir && cfg.PrivateApiAddr == "" {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("either remote db or local db must be specified")
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("either remote db or local db must be specified")
}
creds, err := grpcutil.TLS(cfg.TLSCACert, cfg.TLSCertfile, cfg.TLSKeyFile)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("open tls cert: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("open tls cert: %w", err)
}
conn, err := grpcutil.Connect(creds, cfg.PrivateApiAddr)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to execution service privateApi: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("could not connect to execution service privateApi: %w", err)
}

remoteBackendClient := remote.NewETHBACKENDClient(conn)
remoteKvClient := remote.NewKVClient(conn)
remoteKv, err := remotedb.NewRemote(gointerfaces.VersionFromProto(remotedbserver.KvServiceAPIVersion), logger, remoteKvClient).Open()
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to remoteKv: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("could not connect to remoteKv: %w", err)
}

// Configure DB first
Expand All @@ -364,10 +364,10 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
limiter := semaphore.NewWeighted(int64(cfg.DBReadConcurrency))
rwKv, err = kv2.NewMDBX(logger).RoTxsLimiter(limiter).Path(cfg.Dirs.Chaindata).Accede().Open(ctx)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
return nil, nil, nil, nil, nil, nil, nil, ff, err
}
if compatErr := checkDbCompatibility(ctx, rwKv); compatErr != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, compatErr
return nil, nil, nil, nil, nil, nil, nil, ff, compatErr
}
db = rwKv

Expand All @@ -386,10 +386,10 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
}
return nil
}); err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
return nil, nil, nil, nil, nil, nil, nil, ff, err
}
if cc == nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("chain config not found in db. Need start erigon at least once on this db")
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("chain config not found in db. Need start erigon at least once on this db")
}
cfg.Snap.Enabled = cfg.Snap.Enabled || cfg.Sync.UseSnapshots
if !cfg.Snap.Enabled {
Expand All @@ -407,8 +407,9 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
allBorSnapshots.LogStat("bor:remote")

cr := rawdb.NewCanonicalReader()
if agg, err = libstate.NewAggregator(ctx, cfg.Dirs, config3.HistoryV3AggregationStep, db, cr, logger); err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err)
agg, err := libstate.NewAggregator(ctx, cfg.Dirs, config3.HistoryV3AggregationStep, db, cr, logger)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("create aggregator: %w", err)
}
_ = agg.OpenFolder() //TODO: must use analog of `OptimisticReopenWithDB`

Expand Down Expand Up @@ -460,7 +461,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger

db, err = temporal.New(rwKv, agg)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, nil, nil, nil, err
}
stateCache = kvcache.NewDummy()
}
Expand All @@ -484,7 +485,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
if cfg.TxPoolApiAddr != cfg.PrivateApiAddr {
txpoolConn, err = grpcutil.Connect(creds, cfg.TxPoolApiAddr)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("could not connect to txpool api: %w", err)
return nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("could not connect to txpool api: %w", err)
}
}

Expand Down Expand Up @@ -515,7 +516,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
logger.Warn("[rpc] Opening Bor db", "path", borDbPath)
borKv, err = kv2.NewMDBX(logger).Path(borDbPath).Label(kv.ConsensusDB).Accede().Open(ctx)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
return nil, nil, nil, nil, nil, nil, nil, ff, err
}
// Skip the compatibility check, until we have a schema in erigon-lib

Expand Down Expand Up @@ -558,7 +559,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
}()

ff = rpchelper.New(ctx, cfg.RpcFiltersConfig, eth, txPool, mining, onNewSnapshot, logger)
return db, eth, txPool, mining, stateCache, blockReader, engine, ff, agg, err
return db, eth, txPool, mining, stateCache, blockReader, engine, ff, err
}

func StartRpcServer(ctx context.Context, cfg *httpcfg.HttpCfg, rpcAPI []rpc.API, logger log.Logger) error {
Expand Down
4 changes: 2 additions & 2 deletions cmd/rpcdaemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func main() {
cmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
logger := debug.SetupCobra(cmd, "sentry")
db, backend, txPool, mining, stateCache, blockReader, engine, ff, agg, err := cli.RemoteServices(ctx, cfg, logger, rootCancel)
db, backend, txPool, mining, stateCache, blockReader, engine, ff, err := cli.RemoteServices(ctx, cfg, logger, rootCancel)
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("Could not connect to DB", "err", err)
Expand All @@ -49,7 +49,7 @@ func main() {
defer db.Close()
defer engine.Close()

apiList := jsonrpc.APIList(db, backend, txPool, mining, ff, stateCache, blockReader, agg, cfg, engine, logger)
apiList := jsonrpc.APIList(db, backend, txPool, mining, ff, stateCache, blockReader, cfg, engine, logger)
rpc.PreAllocateRPCMetricLabels(apiList)
if err := cli.StartRpcServer(ctx, cfg, apiList, logger); err != nil {
logger.Error(err.Error())
Expand Down
6 changes: 6 additions & 0 deletions core/types/receipt.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,3 +494,9 @@ func (r *Receipt) DeriveFieldsV3ForSingleReceipt(txnIdx int, blockHash libcommon
}
return nil
}

// TODO: maybe make it more prettier (only for debug purposes)
func (r *Receipt) String() string {
str := fmt.Sprintf("Receipt of tx %+v", *r)
return str
}
11 changes: 3 additions & 8 deletions erigon-lib/txpool/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func (f *Fetch) receiveMessageLoop(sentryClient sentry.SentryClient) {
f.logger.Warn("[txpool.recvMessage] sentry not ready yet", "err", err)
continue
}

if err := f.receiveMessage(f.ctx, sentryClient); err != nil {
if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) {
time.Sleep(3 * time.Second)
Expand Down Expand Up @@ -175,7 +174,6 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl
}
return err
}

var req *sentry.InboundMessage
for req, err = stream.Recv(); ; req, err = stream.Recv() {
if err != nil {
Expand All @@ -184,17 +182,14 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl
return ctx.Err()
default:
}
return err
}
if req == nil {
return nil
return fmt.Errorf("txpool.receiveMessage: %w", err)
}
if err := f.handleInboundMessage(streamCtx, req, sentryClient); err != nil {
if err = f.handleInboundMessage(streamCtx, req, sentryClient); err != nil {
if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) {
time.Sleep(3 * time.Second)
continue
}
f.logger.Debug("[txpool.fetch] Handling incoming message", "msg", req.Id.String(), "err", err)
f.logger.Debug("[txpool.fetch] Handling incoming message", "msg", string(req.Data), "reqID", req.Id.String(), "err", err)
}
if f.wg != nil {
f.wg.Done()
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,7 @@ func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config, chainConfig
}
}

s.apiList = jsonrpc.APIList(chainKv, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, blockReader, s.agg, &httpRpcCfg, s.engine, s.logger)
s.apiList = jsonrpc.APIList(chainKv, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, blockReader, &httpRpcCfg, s.engine, s.logger)

if config.SilkwormRpcDaemon && httpRpcCfg.Enabled {
interface_log_settings := silkworm.RpcInterfaceLogSettings{
Expand Down
23 changes: 19 additions & 4 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ package eth
import (
"context"
"fmt"

"github.com/ledgerwatch/erigon-lib/chain"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/log/v3"

"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
Expand Down Expand Up @@ -160,12 +159,17 @@ func AnswerGetBlockBodiesQuery(db kv.Tx, query GetBlockBodiesPacket, blockReader
return bodies
}

func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue, error) { //nolint:unparam
type ReceiptsGetter interface {
GetReceipts(ctx context.Context, cfg *chain.Config, tx kv.Tx, block *types.Block, senders []libcommon.Address) (types.Receipts, error)
}

func AnswerGetReceiptsQuery(ctx context.Context, cfg *chain.Config, receiptsGetter ReceiptsGetter, br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue, error) { //nolint:unparam
// Gather state data until the fetch or network limits is reached
var (
bytes int
receipts []rlp.RawValue
)

for lookups, hash := range query {
if bytes >= softResponseLimit || len(receipts) >= maxReceiptsServe ||
lookups >= 2*maxReceiptsServe {
Expand All @@ -183,7 +187,12 @@ func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetRece
if b == nil {
return nil, nil
}
results := rawdb.ReadReceipts(db, b, s)

results, err := receiptsGetter.GetReceipts(ctx, cfg, db, b, s)
if err != nil {
return nil, err
}

if results == nil {
header, err := rawdb.ReadHeaderByHash(db, hash)
if err != nil {
Expand All @@ -193,6 +202,12 @@ func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetRece
continue
}
}
// For debug
//println("receipts:")
//for _, result := range results {
// println(result.String())
//}

// If known, encode and queue for response packet
if encoded, err := rlp.EncodeToBytes(results); err != nil {
return nil, fmt.Errorf("failed to encode receipt: %w", err)
Expand Down
104 changes: 63 additions & 41 deletions p2p/sentry/sentry_multi_client/sentry_multi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"encoding/hex"
"errors"
"fmt"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon/turbo/jsonrpc/receipts"
"golang.org/x/sync/semaphore"
"math/rand"
"sort"
"sync"
Expand Down Expand Up @@ -291,9 +295,13 @@ type MultiClient struct {
// decouple sentry multi client from header and body downloading logic is done
disableBlockDownload bool

logger log.Logger
logger log.Logger
getReceiptsActiveGoroutineNumber *semaphore.Weighted
ethApiWrapper eth.ReceiptsGetter
}

var _ eth.ReceiptsGetter = new(receipts.Generator) // compile-time interface-check

func NewMultiClient(
db kv.RwDB,
chainConfig *chain.Config,
Expand Down Expand Up @@ -342,6 +350,14 @@ func NewMultiClient(
bd = &bodydownload.BodyDownload{}
}

receiptsCacheLimit := 32
receiptsCache, err := lru.New[common.Hash, []*types.Receipt](receiptsCacheLimit)
if err != nil {
return nil, err
}

receiptsGenerator := receipts.NewGenerator(receiptsCache, blockReader, engine)

cs := &MultiClient{
Hd: hd,
Bd: bd,
Expand All @@ -356,6 +372,8 @@ func NewMultiClient(
maxBlockBroadcastPeers: maxBlockBroadcastPeers,
disableBlockDownload: disableBlockDownload,
logger: logger,
getReceiptsActiveGoroutineNumber: semaphore.NewWeighted(1),
ethApiWrapper: receiptsGenerator,
}

return cs, nil
Expand Down Expand Up @@ -696,45 +714,50 @@ func (cs *MultiClient) getBlockBodies66(ctx context.Context, inreq *proto_sentry
return nil
}

func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
return nil //TODO: https://github.com/ledgerwatch/erigon/issues/10320
//var query eth.GetReceiptsPacket66
//if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
// return fmt.Errorf("decoding getReceipts66: %w, data: %x", err, inreq.Data)
//}
//tx, err := cs.db.BeginRo(ctx)
//if err != nil {
// return err
//}
//defer tx.Rollback()
//receipts, err := eth.AnswerGetReceiptsQuery(cs.blockReader, tx, query.GetReceiptsPacket)
//if err != nil {
// return err
//}
//tx.Rollback()
//b, err := rlp.EncodeToBytes(&eth.ReceiptsRLPPacket66{
// RequestId: query.RequestId,
// ReceiptsRLPPacket: receipts,
//})
//if err != nil {
// return fmt.Errorf("encode header response: %w", err)
//}
//outreq := proto_sentry.SendMessageByIdRequest{
// PeerId: inreq.PeerId,
// Data: &proto_sentry.OutboundMessageData{
// Id: proto_sentry.MessageId_RECEIPTS_66,
// Data: b,
// },
//}
//_, err = sentry.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{})
//if err != nil {
// if isPeerNotFoundErr(err) {
// return nil
// }
// return fmt.Errorf("send bodies response: %w", err)
//}
////cs.logger.Info(fmt.Sprintf("[%s] GetReceipts responseLen %d", ConvertH512ToPeerID(inreq.PeerId), len(b)))
//return nil
func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentryClient direct.SentryClient) error {
err := cs.getReceiptsActiveGoroutineNumber.Acquire(ctx, 1)
if err != nil {
return err
}
defer cs.getReceiptsActiveGoroutineNumber.Release(1)
var query eth.GetReceiptsPacket66
if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
return fmt.Errorf("decoding getReceipts66: %w, data: %x", err, inreq.Data)
}

tx, err := cs.db.BeginRo(ctx)
if err != nil {
return err
}
defer tx.Rollback()

receiptsList, err := eth.AnswerGetReceiptsQuery(ctx, cs.ChainConfig, cs.ethApiWrapper, cs.blockReader, tx, query.GetReceiptsPacket)
if err != nil {
return err
}
b, err := rlp.EncodeToBytes(&eth.ReceiptsRLPPacket66{
RequestId: query.RequestId,
ReceiptsRLPPacket: receiptsList,
})
if err != nil {
return fmt.Errorf("encode header response: %w", err)
}
outreq := proto_sentry.SendMessageByIdRequest{
PeerId: inreq.PeerId,
Data: &proto_sentry.OutboundMessageData{
Id: proto_sentry.MessageId_RECEIPTS_66,
Data: b,
},
}
_, err = sentryClient.SendMessageById(ctx, &outreq, &grpc.OnFinishCallOption{})
if err != nil {
if isPeerNotFoundErr(err) {
return nil
}
return fmt.Errorf("send receipts response: %w", err)
}
//println(fmt.Sprintf("[%s] GetReceipts responseLen %d", sentry.ConvertH512ToPeerID(inreq.PeerId), len(b)))
return nil
}

func MakeInboundMessage() *proto_sentry.InboundMessage {
Expand All @@ -747,7 +770,6 @@ func (cs *MultiClient) HandleInboundMessage(ctx context.Context, message *proto_
err = fmt.Errorf("%+v, msgID=%s, trace: %s", rec, message.Id.String(), dbg.Stack())
}
}() // avoid crash because Erigon's core does many things

err = cs.handleInboundMessage(ctx, message, sentry)

if (err != nil) && rlp.IsInvalidRLPError(err) {
Expand Down
2 changes: 1 addition & 1 deletion turbo/engineapi/engine_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (e *EngineServer) Start(
txPool txpool.TxpoolClient,
mining txpool.MiningClient,
) {
base := jsonrpc.NewBaseApi(filters, stateCache, blockReader, agg, httpConfig.WithDatadir, httpConfig.EvmCallTimeout, engineReader, httpConfig.Dirs)
base := jsonrpc.NewBaseApi(filters, stateCache, blockReader, httpConfig.WithDatadir, httpConfig.EvmCallTimeout, engineReader)

ethImpl := jsonrpc.NewEthAPI(base, db, eth, txPool, mining, httpConfig.Gascap, httpConfig.Feecap, httpConfig.ReturnDataLimit, httpConfig.AllowUnprotectedTxs, httpConfig.MaxGetProofRewindBlockCount, httpConfig.WebsocketSubscribeLogsChannelSize, e.logger)

Expand Down
Loading

0 comments on commit e0f065b

Please sign in to comment.