From 814fea5ef3cc961bfa175e361a14fe4a280dbf6e Mon Sep 17 00:00:00 2001 From: kant Date: Mon, 29 Jul 2024 23:25:58 -0700 Subject: [PATCH] feat: indexer indexes blocks and txs of mev-commit chain to any pluggable storage --- indexer/README.md | 1 + indexer/cmd/indexer/main.go | 121 ++++ indexer/cmd/main.go | 101 +++ indexer/find_missing_block_num.py | 49 ++ indexer/go.mod | 56 ++ indexer/go.sum | 169 +++++ indexer/internal/indexer/indexer.go | 293 +++++++++ indexer/pkg/ethclient/client.go | 15 + indexer/pkg/ethclient/w3client.go | 73 +++ indexer/pkg/indexer/indexer.go | 578 ++++++++++++++++++ indexer/pkg/logutil/logutil.go | 33 + .../pkg/store/elasticsearch/elasticsearch.go | 200 ++++++ indexer/pkg/store/store.go | 16 + indexer/pkg/types/types.go | 43 ++ 14 files changed, 1748 insertions(+) create mode 100644 indexer/README.md create mode 100644 indexer/cmd/indexer/main.go create mode 100644 indexer/cmd/main.go create mode 100644 indexer/find_missing_block_num.py create mode 100644 indexer/go.mod create mode 100644 indexer/go.sum create mode 100644 indexer/internal/indexer/indexer.go create mode 100644 indexer/pkg/ethclient/client.go create mode 100644 indexer/pkg/ethclient/w3client.go create mode 100644 indexer/pkg/indexer/indexer.go create mode 100644 indexer/pkg/logutil/logutil.go create mode 100644 indexer/pkg/store/elasticsearch/elasticsearch.go create mode 100644 indexer/pkg/store/store.go create mode 100644 indexer/pkg/types/types.go diff --git a/indexer/README.md b/indexer/README.md new file mode 100644 index 000000000..34b40e3fd --- /dev/null +++ b/indexer/README.md @@ -0,0 +1 @@ +# indexer diff --git a/indexer/cmd/indexer/main.go b/indexer/cmd/indexer/main.go new file mode 100644 index 000000000..ab61d6657 --- /dev/null +++ b/indexer/cmd/indexer/main.go @@ -0,0 +1,121 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" + "time" + + "github.com/primev/mev-commit/indexer/internal/indexer" + "github.com/primev/mev-commit/indexer/pkg/ethclient" + "github.com/primev/mev-commit/indexer/pkg/logutil" + "github.com/primev/mev-commit/indexer/pkg/store" + "github.com/primev/mev-commit/indexer/pkg/store/elasticsearch" + + "log/slog" + + "github.com/urfave/cli/v2" +) + +func main() { + app := &cli.App{ + Name: "blockchain-indexer", + Usage: "Index blockchain data into Elasticsearch", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "ethereum-endpoint", + EnvVars: []string{"ETHEREUM_ENDPOINT"}, + Value: "http://localhost:8545", + Usage: "Ethereum node endpoint", + }, + &cli.StringFlag{ + Name: "elasticsearch-endpoint", + EnvVars: []string{"ELASTICSEARCH_ENDPOINT"}, + Value: "http://localhost:9200", + Usage: "Elasticsearch endpoint", + }, + &cli.StringFlag{ + Name: "es-username", + EnvVars: []string{"ES_USERNAME"}, + Value: "", + Usage: "Elasticsearch username", + }, + &cli.StringFlag{ + Name: "es-password", + EnvVars: []string{"ES_PASSWORD"}, + Value: "", + Usage: "Elasticsearch password", + }, + &cli.DurationFlag{ + Name: "index-interval", + EnvVars: []string{"INDEX_INTERVAL"}, + Value: 15 * time.Second, + Usage: "Interval between indexing operations", + }, + &cli.StringFlag{ + Name: "log-level", + EnvVars: []string{"LOG_LEVEL"}, + Value: "info", + Usage: "Log level (debug, info, warn, error)", + }, + }, + Action: run, + } + + if err := app.Run(os.Args); err != nil { + fmt.Fprintf(os.Stderr, "Error: %v\n", err) + os.Exit(1) + } +} + +func run(cliCtx *cli.Context) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ethClient, err := ethclient.NewW3EthereumClient(cliCtx.String("ethereum-endpoint")) + if err != nil { + slog.Error("Failed to create Ethereum client", "error", err) + return err + } + + esClient, err := elasticsearch.NewESClient(cliCtx.String("elasticsearch-endpoint"), cliCtx.String("es-username"), cliCtx.String("es-password")) + if err != nil { + slog.Error("Failed to create Elasticsearch client", "error", err) + return err + } + defer func() { + if err := esClient.Close(ctx); err != nil { + slog.Error("Failed to close Elasticsearch client", "error", err) + } + }() + + var esStorage store.Storage = esClient + + blockchainIndexer := indexer.NewBlockchainIndexer( + ethClient, + esStorage, + cliCtx.Duration("index-interval"), + ) + + // Set log level + logutil.SetLogLevel(cliCtx.String("log-level")) + + if err := blockchainIndexer.Start(ctx); err != nil { + slog.Error("Failed to start blockchain indexer", "error", err) + return err + } + + // Set up graceful shutdown + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + + // Wait for interrupt signal + <-c + slog.Info("Shutting down gracefully...") + cancel() + // Wait for some time to allow ongoing operations to complete + time.Sleep(5 * time.Second) + return nil +} diff --git a/indexer/cmd/main.go b/indexer/cmd/main.go new file mode 100644 index 000000000..badfd246e --- /dev/null +++ b/indexer/cmd/main.go @@ -0,0 +1,101 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" + "time" + + "github.com/primev/mev-commit/indexer/pkg/indexer" + "github.com/urfave/cli/v2" + "log/slog" +) + +func main() { + app := &cli.App{ + Name: "blockchain-indexer", + Usage: "Index blockchain data into Elasticsearch", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "ethereum-endpoint", + EnvVars: []string{"ETHEREUM_ENDPOINT"}, + Value: "http://localhost:8545", + Usage: "Ethereum node endpoint", + }, + &cli.StringFlag{ + Name: "elasticsearch-endpoint", + EnvVars: []string{"ELASTICSEARCH_ENDPOINT"}, + Value: "http://localhost:9200", + Usage: "Elasticsearch endpoint", + }, + &cli.DurationFlag{ + Name: "index-interval", + EnvVars: []string{"INDEX_INTERVAL"}, + Value: 15 * time.Second, + Usage: "Interval between indexing operations", + }, + &cli.StringFlag{ + Name: "log-level", + EnvVars: []string{"LOG_LEVEL"}, + Value: "info", + Usage: "Log level (debug, info, warn, error)", + }, + }, + Action: run, + } + + if err := app.Run(os.Args); err != nil { + fmt.Fprintf(os.Stderr, "Error: %v\n", err) + os.Exit(1) + } +} + +func run(cliCtx *cli.Context) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ethClient, err := indexer.NewW3EthereumClient(cliCtx.String("ethereum-endpoint")) + if err != nil { + slog.Error("Failed to create Ethereum client", "error", err) + return err + } + + esClient, err := indexer.NewESClient(cliCtx.String("elasticsearch-endpoint")) + if err != nil { + slog.Error("Failed to create Elasticsearch client", "error", err) + return err + } + defer func() { + if err := esClient.Close(ctx); err != nil { + slog.Error("Failed to close Elasticsearch client", "error", err) + } + }() + + blockchainIndexer := indexer.NewBlockchainIndexer( + ethClient, + esClient, + cliCtx.Duration("index-interval"), + ) + + // Set log level + indexer.SetLogLevel(cliCtx.String("log-level")) + + if err := blockchainIndexer.Start(ctx); err != nil { + slog.Error("Failed to start blockchain indexer", "error", err) + return err + } + + // Set up graceful shutdown + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + + // Wait for interrupt signal + <-c + slog.Info("Shutting down gracefully...") + cancel() + // Wait for some time to allow ongoing operations to complete + time.Sleep(5 * time.Second) + return nil +} diff --git a/indexer/find_missing_block_num.py b/indexer/find_missing_block_num.py new file mode 100644 index 000000000..61d879b08 --- /dev/null +++ b/indexer/find_missing_block_num.py @@ -0,0 +1,49 @@ +from elasticsearch import Elasticsearch, helpers + +# Initialize Elasticsearch client with authentication +es = Elasticsearch( + ["http://localhost:9200"], # Replace with your Elasticsearch host if different + basic_auth=("elastic", "mev-commit") +) + +# Function to get all numbers using scroll API +def get_all_numbers(): + numbers = [] + scroll_size = 10000 + + # Initial search request + response = es.search( + index="blocks", + body={ + "size": scroll_size, + "_source": ["number"], + "sort": [{"number": "asc"}] + }, + scroll='2m' + ) + + # Get the scroll ID + scroll_id = response['_scroll_id'] + + # Get the first batch of numbers + numbers.extend([hit['_source']['number'] for hit in response['hits']['hits']]) + + # Continue scrolling until no more hits + while len(response['hits']['hits']): + response = es.scroll(scroll_id=scroll_id, scroll='2m') + numbers.extend([hit['_source']['number'] for hit in response['hits']['hits']]) + + return numbers + +# Get all numbers +all_numbers = get_all_numbers() + +# Find missing numbers +missing_numbers = [] +for i in range(len(all_numbers) - 1): + current_number = all_numbers[i] + next_number = all_numbers[i + 1] + if next_number != current_number + 1: + missing_numbers.extend(range(current_number + 1, next_number)) + +print("Missing numbers:", missing_numbers) diff --git a/indexer/go.mod b/indexer/go.mod new file mode 100644 index 000000000..130fe2632 --- /dev/null +++ b/indexer/go.mod @@ -0,0 +1,56 @@ +module github.com/primev/mev-commit/indexer + +go 1.22 + +require ( + github.com/elastic/go-elasticsearch/v8 v8.14.0 + github.com/ethereum/go-ethereum v1.14.6 + github.com/lmittmann/tint v1.0.5 + github.com/lmittmann/w3 v0.16.7 + github.com/urfave/cli/v2 v2.27.1 +) + +require ( + github.com/DataDog/zstd v1.5.2 // indirect + github.com/Microsoft/go-winio v0.6.2 // indirect + github.com/StackExchange/wmi v1.2.1 // indirect + github.com/bits-and-blooms/bitset v1.10.0 // indirect + github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect + github.com/consensys/bavard v0.1.13 // indirect + github.com/consensys/gnark-crypto v0.12.1 // indirect + github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect + github.com/crate-crypto/go-kzg-4844 v1.0.0 // indirect + github.com/deckarep/golang-set/v2 v2.6.0 // indirect + github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect + github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect + github.com/ethereum/c-kzg-4844 v1.0.0 // indirect + github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-ole/go-ole v1.3.0 // indirect + github.com/gorilla/websocket v1.5.3 // indirect + github.com/holiman/uint256 v1.2.4 // indirect + github.com/klauspost/compress v1.17.8 // indirect + github.com/mattn/go-runewidth v0.0.14 // indirect + github.com/mmcloughlin/addchain v0.4.0 // indirect + github.com/prometheus/client_golang v1.19.1 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/rivo/uniseg v0.4.2 // indirect + github.com/rogpeppe/go-internal v1.10.0 // indirect + github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect + github.com/supranational/blst v0.3.11 // indirect + github.com/tklauser/go-sysconf v0.3.13 // indirect + github.com/tklauser/numcpus v0.7.0 // indirect + github.com/xrash/smetrics v0.0.0-20231213231151-1d8dd44e695e // indirect + go.opentelemetry.io/otel v1.24.0 // indirect + go.opentelemetry.io/otel/metric v1.24.0 // indirect + go.opentelemetry.io/otel/sdk v1.22.0 // indirect + go.opentelemetry.io/otel/trace v1.24.0 // indirect + golang.org/x/crypto v0.23.0 // indirect + golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/time v0.5.0 // indirect + google.golang.org/protobuf v1.34.1 // indirect + rsc.io/tmplfunc v0.0.3 // indirect +) diff --git a/indexer/go.sum b/indexer/go.sum new file mode 100644 index 000000000..91c3dde9f --- /dev/null +++ b/indexer/go.sum @@ -0,0 +1,169 @@ +github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8= +github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= +github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= +github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= +github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= +github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= +github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= +github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bits-and-blooms/bitset v1.10.0 h1:ePXTeiPEazB5+opbv5fr8umg2R/1NlzgDsyepwsSr88= +github.com/bits-and-blooms/bitset v1.10.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= +github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k= +github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU= +github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U= +github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cockroachdb/errors v1.11.3 h1:5bA+k2Y6r+oz/6Z/RFlNeVCesGARKuC6YymtcDrbC/I= +github.com/cockroachdb/errors v1.11.3/go.mod h1:m4UIW4CDjx+R5cybPsNrRbreomiFqt8o1h1wUVazSd8= +github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce h1:giXvy4KSc/6g/esnpM7Geqxka4WSqI1SZc7sMJFd3y4= +github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce/go.mod h1:9/y3cnZ5GKakj/H4y9r9GTjCvAFta7KLgSHPJJYc52M= +github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE= +github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= +github.com/cockroachdb/pebble v1.1.1 h1:XnKU22oiCLy2Xn8vp1re67cXg4SAasg/WDt1NtcRFaw= +github.com/cockroachdb/pebble v1.1.1/go.mod h1:4exszw1r40423ZsmkG/09AFEG83I0uDgfujJdbL6kYU= +github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30= +github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= +github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo= +github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= +github.com/consensys/bavard v0.1.13 h1:oLhMLOFGTLdlda/kma4VOJazblc7IM5y5QPd2A/YjhQ= +github.com/consensys/bavard v0.1.13/go.mod h1:9ItSMtA/dXMAiL7BG6bqW2m3NdSEObYWoH223nGHukI= +github.com/consensys/gnark-crypto v0.12.1 h1:lHH39WuuFgVHONRl3J0LRBtuYdQTumFSDtJF7HpyG8M= +github.com/consensys/gnark-crypto v0.12.1/go.mod h1:v2Gy7L/4ZRosZ7Ivs+9SfUDr0f5UlG+EM5t7MPHiLuY= +github.com/cpuguy83/go-md2man/v2 v2.0.3 h1:qMCsGGgs+MAzDFyp9LpAe1Lqy/fY/qCovCm0qnXZOBM= +github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/crate-crypto/go-ipa v0.0.0-20240223125850-b1e8a79f509c h1:uQYC5Z1mdLRPrZhHjHxufI8+2UG/i25QG92j0Er9p6I= +github.com/crate-crypto/go-ipa v0.0.0-20240223125850-b1e8a79f509c/go.mod h1:geZJZH3SzKCqnz5VT0q/DyIG/tvu/dZk+VIfXicupJs= +github.com/crate-crypto/go-kzg-4844 v1.0.0 h1:TsSgHwrkTKecKJ4kadtHi4b3xHW5dCFUDFnUp1TsawI= +github.com/crate-crypto/go-kzg-4844 v1.0.0/go.mod h1:1kMhvPgI0Ky3yIa+9lFySEBUBXkYxeOi8ZF1sYioxhc= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM= +github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= +github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5ilcvdfma9wOH6Y= +github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 h1:rpfIENRNNilwHwZeG5+P150SMrnNEcHYvcCuK6dPZSg= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= +github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= +github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= +github.com/elastic/go-elasticsearch/v8 v8.14.0 h1:1ywU8WFReLLcxE1WJqii3hTtbPUE2hc38ZK/j4mMFow= +github.com/elastic/go-elasticsearch/v8 v8.14.0/go.mod h1:WRvnlGkSuZyp83M2U8El/LGXpCjYLrvlkSgkAH4O5I4= +github.com/ethereum/c-kzg-4844 v1.0.0 h1:0X1LBXxaEtYD9xsyj9B9ctQEZIpnvVDeoBx8aHEwTNA= +github.com/ethereum/c-kzg-4844 v1.0.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0= +github.com/ethereum/go-ethereum v1.14.6 h1:ZTxnErSopkDyxdvB8zW/KcK+/AVrdil/TzoWXVKaaC8= +github.com/ethereum/go-ethereum v1.14.6/go.mod h1:hglUZo/5pVIYXNyYjWzsAUDpT/zI+WbWo/Nih7ot+G0= +github.com/ethereum/go-verkle v0.1.1-0.20240306133620-7d920df305f0 h1:KrE8I4reeVvf7C1tm8elRjj4BdscTYzz/WAbYyf/JI4= +github.com/ethereum/go-verkle v0.1.1-0.20240306133620-7d920df305f0/go.mod h1:D9AJLVXSyZQXJQVk8oh1EwjISE+sJTn2duYIZC0dy3w= +github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps= +github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= +github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= +github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= +github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb h1:PBC98N2aIaM3XXiurYmW7fx4GZkL8feAMVq7nEjURHk= +github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/holiman/uint256 v1.2.4 h1:jUc4Nk8fm9jZabQuqr2JzednajVmBpC+oiTiXZJEApU= +github.com/holiman/uint256 v1.2.4/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= +github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= +github.com/lmittmann/tint v1.0.5 h1:NQclAutOfYsqs2F1Lenue6OoWCajs5wJcP3DfWVpePw= +github.com/lmittmann/tint v1.0.5/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= +github.com/lmittmann/w3 v0.16.7 h1:1xlsXQ5xTbW1Rfa7ClH+KUTfscmuVgl0bzNkFfzSoz8= +github.com/lmittmann/w3 v0.16.7/go.mod h1:30EWzDfQAvqdSdTDEtNvOV4Ad6qpEX0WP2fcLKzQE5I= +github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU= +github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY= +github.com/mmcloughlin/addchain v0.4.0/go.mod h1:A86O+tHqZLMNO4w6ZZ4FlVQEadcoqkyU72HC5wJ4RlU= +github.com/mmcloughlin/profile v0.1.1/go.mod h1:IhHD7q1ooxgwTgjxQYkACGA77oFTDdFVejUS1/tS/qU= +github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= +github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= +github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= +github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= +github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= +github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.2 h1:YwD0ulJSJytLpiaWua0sBDusfsCZohxjxzVTYjwxfV8= +github.com/rivo/uniseg v0.4.2/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible h1:Bn1aCHHRnjv4Bl16T8rcaFjYSrGrIZvpiGO6P3Q4GpU= +github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/supranational/blst v0.3.11 h1:LyU6FolezeWAhvQk0k6O/d49jqgO52MSDDfYgbeoEm4= +github.com/supranational/blst v0.3.11/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= +github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= +github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= +github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4= +github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0= +github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4= +github.com/tklauser/numcpus v0.7.0/go.mod h1:bb6dMVcj8A42tSE7i32fsIUCbQNllK5iDguyOZRUzAY= +github.com/urfave/cli/v2 v2.27.1 h1:8xSQ6szndafKVRmfyeUMxkNUJQMjL1F2zmsZ+qHpfho= +github.com/urfave/cli/v2 v2.27.1/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ= +github.com/xrash/smetrics v0.0.0-20231213231151-1d8dd44e695e h1:+SOyEddqYF09QP7vr7CgJ1eti3pY9Fn3LHO1M1r/0sI= +github.com/xrash/smetrics v0.0.0-20231213231151-1d8dd44e695e/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= +go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= +go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= +go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= +go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= +go.opentelemetry.io/otel/sdk v1.22.0 h1:6coWHw9xw7EfClIC/+O31R8IY3/+EiRFHevmHafB2Gw= +go.opentelemetry.io/otel/sdk v1.22.0/go.mod h1:iu7luyVGYovrRpe2fmj3CVKouQNdTOkxtLzPvPz1DOc= +go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= +go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= +golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM= +golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +rsc.io/tmplfunc v0.0.3 h1:53XFQh69AfOa8Tw0Jm7t+GV7KZhOi6jzsCzTtKbMvzU= +rsc.io/tmplfunc v0.0.3/go.mod h1:AG3sTPzElb1Io3Yg4voV9AGZJuleGAwaVRxL9M49PhA= diff --git a/indexer/internal/indexer/indexer.go b/indexer/internal/indexer/indexer.go new file mode 100644 index 000000000..d1036a18e --- /dev/null +++ b/indexer/internal/indexer/indexer.go @@ -0,0 +1,293 @@ +package indexer + +import ( + "context" + "encoding/hex" + "fmt" + "log/slog" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/primev/mev-commit/indexer/pkg/ethclient" + "github.com/primev/mev-commit/indexer/pkg/store" + types2 "github.com/primev/mev-commit/indexer/pkg/types" +) + +type BlockchainIndexer struct { + ethClient ethclient.EthereumClient + storage store.Storage + blockChan chan *types.Block + txChan chan *types.Transaction + indexInterval time.Duration + lastForwardIndexedBlock *big.Int + lastBackwardIndexedBlock *big.Int + logger *slog.Logger +} + +func NewBlockchainIndexer(ethClient ethclient.EthereumClient, storage store.Storage, indexInterval time.Duration) *BlockchainIndexer { + return &BlockchainIndexer{ + ethClient: ethClient, + storage: storage, + blockChan: make(chan *types.Block, 100), + txChan: make(chan *types.Transaction, 100), + indexInterval: indexInterval, + logger: slog.Default(), + } +} + +func (bi *BlockchainIndexer) Start(ctx context.Context) error { + if err := bi.storage.CreateIndices(ctx); err != nil { + return fmt.Errorf("failed to create indices: %w", err) + } + + latestBlockNumber, err := bi.ethClient.BlockNumber(ctx) + bi.logger.Info("latest block number", "block number", latestBlockNumber) + if err != nil { + return fmt.Errorf("failed to get latest block number: %w", err) + } + + if err = bi.initializeForwardIndex(ctx, latestBlockNumber.Uint64()); err != nil { + return err + } + + if err = bi.initializeBackwardIndex(ctx, latestBlockNumber.Uint64()); err != nil { + return err + } + + go bi.fetchForwardBlocks(ctx) + go bi.fetchBackwardBlocks(ctx) + go bi.processBlocks(ctx) + + // Block the main function indefinitely + select {} +} + +func (bi *BlockchainIndexer) initializeForwardIndex(ctx context.Context, latestBlockNumber uint64) error { + lastForwardIndexedBlock, err := bi.storage.GetLastIndexedBlock(ctx, "forward") + if err != nil { + return fmt.Errorf("failed to get last forward indexed block: %w", err) + } + + bi.logger.Info("last indexed block", "blockNumber", lastForwardIndexedBlock, "direction", "forward") + + if lastForwardIndexedBlock == nil || lastForwardIndexedBlock.Cmp(big.NewInt(0)) == 0 { + bi.lastForwardIndexedBlock = new(big.Int).SetUint64(latestBlockNumber - 1) + } else { + bi.lastForwardIndexedBlock = lastForwardIndexedBlock + } + + return nil +} + +func (bi *BlockchainIndexer) initializeBackwardIndex(ctx context.Context, latestBlockNumber uint64) error { + lastBackwardIndexedBlock, err := bi.storage.GetLastIndexedBlock(ctx, "backward") + if err != nil { + return fmt.Errorf("failed to get last backward indexed block: %w", err) + } + + bi.logger.Info("last indexed block", "blockNumber", lastBackwardIndexedBlock, "direction", "backward") + + if lastBackwardIndexedBlock == nil || lastBackwardIndexedBlock.Cmp(big.NewInt(0)) == 0 { + bi.lastBackwardIndexedBlock = new(big.Int).SetUint64(latestBlockNumber) + } else { + bi.lastBackwardIndexedBlock = lastBackwardIndexedBlock + } + + return nil +} + +func (bi *BlockchainIndexer) fetchForwardBlocks(ctx context.Context) { + ticker := time.NewTicker(bi.indexInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + latestBlockNumber, err := bi.ethClient.BlockNumber(ctx) + if err != nil { + bi.logger.Error("Failed to get latest block number", "error", err) + continue + } + + for blockNum := new(big.Int).Add(bi.lastForwardIndexedBlock, big.NewInt(1)); blockNum.Cmp(latestBlockNumber) <= 0; blockNum.Add(blockNum, big.NewInt(5)) { + endBlockNum := new(big.Int).Add(blockNum, big.NewInt(4)) + if endBlockNum.Cmp(latestBlockNumber) > 0 { + endBlockNum.Set(latestBlockNumber) + } + + blockNums := []*big.Int{} + for bn := new(big.Int).Set(blockNum); bn.Cmp(endBlockNum) <= 0; bn.Add(bn, big.NewInt(1)) { + blockNums = append(blockNums, new(big.Int).Set(bn)) + } + + blocks, err := bi.fetchBlocks(ctx, blockNums) + if err != nil { + bi.logger.Error("Failed to fetch blocks", "start", blockNum, "end", endBlockNum, "error", err) + continue + } + + for _, block := range blocks { + bi.blockChan <- block + bi.lastForwardIndexedBlock.Set(block.Number()) + } + } + } + } +} + +func (bi *BlockchainIndexer) fetchBackwardBlocks(ctx context.Context) { + ticker := time.NewTicker(bi.indexInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if bi.lastBackwardIndexedBlock.Sign() <= 0 { + return + } + zeroBigNum := big.NewInt(0) + blockNum := new(big.Int).Sub(bi.lastBackwardIndexedBlock, big.NewInt(1)) + + for i := 0; i < 5 && blockNum.Cmp(zeroBigNum) >= 0; i++ { + endBlockNum := new(big.Int).Sub(blockNum, big.NewInt(4)) + if endBlockNum.Cmp(zeroBigNum) < 0 { + endBlockNum.Set(zeroBigNum) + } + + blockNums := []*big.Int{} + for bn := new(big.Int).Set(blockNum); bn.Cmp(endBlockNum) >= 0; bn.Sub(bn, big.NewInt(1)) { + blockNums = append(blockNums, new(big.Int).Set(bn)) + } + + blocks, err := bi.fetchBlocks(ctx, blockNums) + if err != nil { + bi.logger.Error("Failed to fetch blocks", "start", blockNum, "end", endBlockNum, "error", err) + break + } + + for _, block := range blocks { + bi.blockChan <- block + bi.lastBackwardIndexedBlock.Set(block.Number()) + } + + blockNum.Sub(endBlockNum, big.NewInt(1)) + } + } + } +} + +func (bi *BlockchainIndexer) processBlocks(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case block := <-bi.blockChan: + if err := bi.indexBlock(ctx, block); err != nil { + bi.logger.Error("Failed to index block", "error", err) + } + if err := bi.indexTransactions(ctx, block); err != nil { + bi.logger.Error("Failed to index transactions", "error", err) + } + } + } +} + +func (bi *BlockchainIndexer) indexBlock(ctx context.Context, block *types.Block) error { + timestamp := time.UnixMilli(int64(block.Time())).UTC().Format("2006-01-02T15:04:05.000Z") + indexBlock := &types2.IndexBlock{ + Number: block.NumberU64(), + Hash: block.Hash().Hex(), + ParentHash: block.ParentHash().Hex(), + Root: block.Root().Hex(), + Nonce: block.Nonce(), + Timestamp: timestamp, + Transactions: len(block.Transactions()), + BaseFee: block.BaseFee().Uint64(), + GasLimit: block.GasLimit(), + GasUsed: block.GasUsed(), + Difficulty: block.Difficulty().Uint64(), + ExtraData: hex.EncodeToString(block.Extra()), + } + + return bi.storage.IndexBlock(ctx, indexBlock) +} + +func (bi *BlockchainIndexer) indexTransactions(ctx context.Context, block *types.Block) error { + var transactions []*types2.IndexTransaction + var txHashes []string + + for _, tx := range block.Transactions() { + from, err := types.Sender(types.NewCancunSigner(tx.ChainId()), tx) + if err != nil { + return fmt.Errorf("failed to derive sender: %w", err) + } + + v, r, s := tx.RawSignatureValues() + timestamp := tx.Time().UTC().Format("2006-01-02T15:04:05.000Z") + transaction := &types2.IndexTransaction{ + Hash: tx.Hash().Hex(), + From: from.Hex(), + Gas: tx.Gas(), + Nonce: tx.Nonce(), + BlockHash: block.Hash().Hex(), + BlockNumber: block.NumberU64(), + ChainId: tx.ChainId().String(), + V: v.String(), + R: r.String(), + S: s.String(), + Input: hex.EncodeToString(tx.Data()), + Timestamp: timestamp, + } + + if tx.To() != nil { + transaction.To = tx.To().Hex() + } + if tx.GasPrice() != nil { + transaction.GasPrice = tx.GasPrice().Uint64() + } + if tx.GasTipCap() != nil { + transaction.GasTipCap = tx.GasTipCap().Uint64() + } + if tx.GasFeeCap() != nil { + transaction.GasFeeCap = tx.GasFeeCap().Uint64() + } + if tx.Value() != nil { + transaction.Value = tx.Value().Uint64() + } + + transactions = append(transactions, transaction) + txHashes = append(txHashes, tx.Hash().Hex()) + } + + receipts, err := bi.fetchReceipts(ctx, txHashes) + if err != nil { + return fmt.Errorf("failed to fetch transaction receipts: %w", err) + } + + for _, tx := range transactions { + if receipt, ok := receipts[tx.Hash]; ok { + tx.Status = receipt.Status + tx.GasUsed = receipt.GasUsed + tx.CumulativeGasUsed = receipt.CumulativeGasUsed + tx.ContractAddress = receipt.ContractAddress.Hex() + tx.TransactionIndex = receipt.TransactionIndex + tx.ReceiptBlockHash = receipt.BlockHash.Hex() + tx.ReceiptBlockNumber = receipt.BlockNumber.Uint64() + } + } + + return bi.storage.IndexTransactions(ctx, transactions) +} + +func (bi *BlockchainIndexer) fetchReceipts(ctx context.Context, txHashes []string) (map[string]*types.Receipt, error) { + return bi.ethClient.TxReceipts(ctx, txHashes) +} + +func (bi *BlockchainIndexer) fetchBlocks(ctx context.Context, blockNums []*big.Int) ([]*types.Block, error) { + return bi.ethClient.GetBlocks(ctx, blockNums) +} diff --git a/indexer/pkg/ethclient/client.go b/indexer/pkg/ethclient/client.go new file mode 100644 index 000000000..6ea2742c4 --- /dev/null +++ b/indexer/pkg/ethclient/client.go @@ -0,0 +1,15 @@ +package ethclient + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/core/types" +) + +type EthereumClient interface { + GetBlocks(ctx context.Context, blockNums []*big.Int) ([]*types.Block, error) + BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) + BlockNumber(ctx context.Context) (*big.Int, error) + TxReceipts(ctx context.Context, txHashes []string) (map[string]*types.Receipt, error) +} diff --git a/indexer/pkg/ethclient/w3client.go b/indexer/pkg/ethclient/w3client.go new file mode 100644 index 000000000..3b4576c2a --- /dev/null +++ b/indexer/pkg/ethclient/w3client.go @@ -0,0 +1,73 @@ +package ethclient + +import ( + "context" + "fmt" + "github.com/ethereum/go-ethereum/core/types" + "github.com/lmittmann/w3" + "github.com/lmittmann/w3/module/eth" + "github.com/lmittmann/w3/w3types" + "math/big" +) + +type W3EvmClient struct { + client *w3.Client +} + +func NewW3EthereumClient(endpoint string) (*W3EvmClient, error) { + client, err := w3.Dial(endpoint) + if err != nil { + return nil, fmt.Errorf("failed to connect to Ethereum node: %w", err) + } + return &W3EvmClient{client: client}, nil +} + +func (c *W3EvmClient) GetBlocks(ctx context.Context, blockNums []*big.Int) ([]*types.Block, error) { + batchBlocksCaller := make([]w3types.RPCCaller, len(blockNums)) + blocks := make([]types.Block, len(blockNums)) + for i, blockNum := range blockNums { + batchBlocksCaller[i] = eth.BlockByNumber(blockNum).Returns(&blocks[i]) + } + err := c.client.Call(batchBlocksCaller...) + if err != nil { + return nil, err + } + var b []*types.Block + for _, block := range blocks { + b = append(b, &block) + } + return b, nil +} + +func (c *W3EvmClient) TxReceipts(ctx context.Context, txHashes []string) (map[string]*types.Receipt, error) { + batchTxReceiptCaller := make([]w3types.RPCCaller, len(txHashes)) + txReceipts := make([]types.Receipt, len(txHashes)) + for i, txHash := range txHashes { + batchTxReceiptCaller[i] = eth.TxReceipt(w3.H(txHash)).Returns(&txReceipts[i]) + } + err := c.client.Call(batchTxReceiptCaller...) + if err != nil { + return map[string]*types.Receipt{}, nil + } + txHashToReceipt := make(map[string]*types.Receipt) + for _, txReceipt := range txReceipts { + txHashToReceipt[txReceipt.TxHash.Hex()] = &txReceipt + } + return txHashToReceipt, nil +} + +func (c *W3EvmClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + var block types.Block + if err := c.client.Call(eth.BlockByNumber(number).Returns(&block)); err != nil { + return nil, err + } + return &block, nil +} + +func (c *W3EvmClient) BlockNumber(ctx context.Context) (*big.Int, error) { + var blockNumber big.Int + if err := c.client.Call(eth.BlockNumber().Returns(&blockNumber)); err != nil { + return nil, err + } + return &blockNumber, nil +} diff --git a/indexer/pkg/indexer/indexer.go b/indexer/pkg/indexer/indexer.go new file mode 100644 index 000000000..0d1b33200 --- /dev/null +++ b/indexer/pkg/indexer/indexer.go @@ -0,0 +1,578 @@ +package indexer + +import ( + "bytes" + "context" + "encoding/hex" + "encoding/json" + "fmt" + "github.com/lmittmann/tint" + "github.com/lmittmann/w3/w3types" + "log/slog" + "math/big" + "os" + "strings" + "sync/atomic" + "time" + + "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/esapi" + "github.com/elastic/go-elasticsearch/v8/esutil" + "github.com/elastic/go-elasticsearch/v8/typedapi/core/search" + estypes "github.com/elastic/go-elasticsearch/v8/typedapi/types" + "github.com/ethereum/go-ethereum/core/types" + "github.com/lmittmann/w3" + "github.com/lmittmann/w3/module/eth" +) + +type BlockchainIndexer struct { + ethClient EthereumClient + esClient ElasticsearchClient + blockChan chan *types.Block + txChan chan *types.Transaction + indexInterval time.Duration + lastForwardIndexedBlock *big.Int + lastBackwardIndexedBlock *big.Int + logger *slog.Logger +} + +type EthereumClient interface { + GetBlocks(ctx context.Context, blockNums []*big.Int) ([]*types.Block, error) + BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) + BlockNumber(ctx context.Context) (*big.Int, error) + TxReceipts(ctx context.Context, txHashes []string) (map[string]*types.Receipt, error) +} + +type ElasticsearchClient interface { + Index(ctx context.Context, index string, document interface{}) error + Search(ctx context.Context, index string, query *estypes.Query) (*search.Response, error) + GetLastIndexedBlock(ctx context.Context, direction string) (*big.Int, error) + CreateIndices(ctx context.Context) error + Bulk(ctx context.Context, indexName string, docs []interface{}) error +} + +func NewBlockchainIndexer(ethClient EthereumClient, esClient ElasticsearchClient, indexInterval time.Duration) *BlockchainIndexer { + return &BlockchainIndexer{ + ethClient: ethClient, + esClient: esClient, + blockChan: make(chan *types.Block, 100), + txChan: make(chan *types.Transaction, 100), + indexInterval: indexInterval, + logger: slog.Default(), + } +} + +func (bi *BlockchainIndexer) Start(ctx context.Context) error { + if err := bi.esClient.CreateIndices(ctx); err != nil { + return fmt.Errorf("failed to create indices: %w", err) + } + + latestBlockNumber, err := bi.ethClient.BlockNumber(ctx) + if err != nil { + return fmt.Errorf("failed to get latest block number: %w", err) + } + + if err = bi.initializeForwardIndex(ctx, latestBlockNumber.Uint64()); err != nil { + return err + } + + if err = bi.initializeBackwardIndex(ctx, latestBlockNumber.Uint64()); err != nil { + return err + } + + go bi.fetchForwardBlocks(ctx) + go bi.fetchBackwardBlocks(ctx) + go bi.processBlocks(ctx) + + // Block the main function indefinitely + select {} +} + +func (bi *BlockchainIndexer) initializeForwardIndex(ctx context.Context, latestBlockNumber uint64) error { + lastForwardIndexedBlock, err := bi.esClient.GetLastIndexedBlock(ctx, "forward") + if err != nil { + return fmt.Errorf("failed to get last forward indexed block: %w", err) + } + bi.logger.Info("last indexed block", "blockNumber", lastForwardIndexedBlock, "direction", "forward") + + if lastForwardIndexedBlock == nil || lastForwardIndexedBlock.Cmp(big.NewInt(0)) == 0 { + bi.lastForwardIndexedBlock = new(big.Int).SetUint64(latestBlockNumber - 1) + } else { + bi.lastForwardIndexedBlock = lastForwardIndexedBlock + } + + return nil +} + +func (bi *BlockchainIndexer) initializeBackwardIndex(ctx context.Context, latestBlockNumber uint64) error { + lastBackwardIndexedBlock, err := bi.esClient.GetLastIndexedBlock(ctx, "backward") + if err != nil { + return fmt.Errorf("failed to get last backward indexed block: %w", err) + } + bi.logger.Info("last indexed block", "blockNumber", lastBackwardIndexedBlock, "direction", "backward") + + if lastBackwardIndexedBlock == nil || lastBackwardIndexedBlock.Cmp(big.NewInt(0)) == 0 { + bi.lastBackwardIndexedBlock = new(big.Int).SetUint64(latestBlockNumber - 1) + } else { + bi.lastBackwardIndexedBlock = new(big.Int).Sub(lastBackwardIndexedBlock, big.NewInt(1)) + } + + return nil +} + +func (bi *BlockchainIndexer) fetchForwardBlocks(ctx context.Context) { + ticker := time.NewTicker(bi.indexInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + latestBlockNumber, err := bi.ethClient.BlockNumber(ctx) + if err != nil { + bi.logger.Error("Failed to get latest block number", "error", err) + continue + } + + for blockNum := new(big.Int).Add(bi.lastForwardIndexedBlock, big.NewInt(1)); blockNum.Cmp(latestBlockNumber) <= 0; blockNum.Add(blockNum, big.NewInt(5)) { + endBlockNum := new(big.Int).Add(blockNum, big.NewInt(4)) + if endBlockNum.Cmp(latestBlockNumber) > 0 { + endBlockNum.Set(latestBlockNumber) + } + + blockNums := []*big.Int{} + for bn := new(big.Int).Set(blockNum); bn.Cmp(endBlockNum) <= 0; bn.Add(bn, big.NewInt(1)) { + blockNums = append(blockNums, new(big.Int).Set(bn)) + } + + blocks, err := bi.fetchBlocks(ctx, blockNums) + if err != nil { + bi.logger.Error("Failed to fetch blocks", "start", blockNum, "end", endBlockNum, "error", err) + continue + } + + for _, block := range blocks { + bi.blockChan <- block + bi.lastForwardIndexedBlock.Set(block.Number()) + } + } + } + } +} + +func (bi *BlockchainIndexer) fetchBackwardBlocks(ctx context.Context) { + ticker := time.NewTicker(bi.indexInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if bi.lastBackwardIndexedBlock.Sign() < 0 { + return + } + block, err := bi.ethClient.BlockByNumber(ctx, bi.lastBackwardIndexedBlock) + if err != nil { + bi.logger.Error("Failed to get block", "number", bi.lastBackwardIndexedBlock, "error", err) + continue + } + bi.blockChan <- block + bi.lastBackwardIndexedBlock.Set(new(big.Int).Sub(bi.lastBackwardIndexedBlock, big.NewInt(1))) + } + } +} + +func (bi *BlockchainIndexer) processBlocks(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case block := <-bi.blockChan: + if err := bi.indexBlock(ctx, block); err != nil { + bi.logger.Error("Failed to index block", "error", err) + } + if err := bi.indexTransactions(ctx, block); err != nil { + bi.logger.Error("Failed to index transactions", "error", err) + } + } + } +} + +func (bi *BlockchainIndexer) indexBlock(ctx context.Context, block *types.Block) error { + timestamp := time.UnixMilli(int64(block.Time())).UTC().Format("2006-01-02T15:04:05.000Z") + blockDoc := map[string]interface{}{ + "number": block.NumberU64(), + "hash": block.Hash().Hex(), + "parentHash": block.ParentHash().Hex(), + "root": block.Root().Hex(), + "nonce": block.Nonce(), + "timestamp": timestamp, + "transactions": len(block.Transactions()), + "baseFee": block.BaseFee().Uint64(), + "gasLimit": block.GasLimit(), + "gasUsed": block.GasUsed(), + "difficulty": block.Difficulty().Uint64(), + "extraData": hex.EncodeToString(block.Extra()), + } + + if err := bi.esClient.Index(ctx, "blocks", blockDoc); err != nil { + return fmt.Errorf("failed to index block: %w", err) + } + return nil +} + +func (bi *BlockchainIndexer) indexTransactions(ctx context.Context, block *types.Block) error { + txDocs := make([]interface{}, 0, len(block.Transactions())) + + var txHashes []string + for _, tx := range block.Transactions() { + from, err := types.Sender(types.NewCancunSigner(tx.ChainId()), tx) + if err != nil { + return fmt.Errorf("failed to derive sender: %w", err) + } + + v, r, s := tx.RawSignatureValues() + timestamp := tx.Time().UTC().Format("2006-01-02T15:04:05.000Z") + txDoc := map[string]interface{}{ + "hash": tx.Hash().Hex(), + "from": from.Hex(), + "gas": tx.Gas(), + "nonce": tx.Nonce(), + "blockHash": block.Hash().Hex(), + "blockNumber": block.NumberU64(), + "chainId": tx.ChainId().String(), + "v": v.String(), + "r": r.String(), + "s": s.String(), + "input": hex.EncodeToString(tx.Data()), + "timestamp": timestamp, + } + + if tx.To() != nil { + txDoc["to"] = tx.To().Hex() + } else { + txDoc["to"] = "" + } + + if tx.GasPrice() != nil { + txDoc["gasPrice"] = tx.GasPrice().Uint64() + } + + if tx.GasTipCap() != nil { + txDoc["gasTipCap"] = tx.GasTipCap().Uint64() + } + + if tx.GasFeeCap() != nil { + txDoc["gasFeeCap"] = tx.GasFeeCap().Uint64() + } + + if tx.Value() != nil { + txDoc["value"] = tx.Value().Uint64() + } + txDocs = append(txDocs, txDoc) + txHashes = append(txHashes, tx.Hash().Hex()) + } + + receipts, err := bi.fetchReceipts(ctx, txHashes) + if err != nil { + return fmt.Errorf("failed to fetch transaction receipts: %w", err) + } + // Add receipt information to transaction documents + for _, txDoc := range txDocs { + txD := txDoc.(map[string]interface{}) + txHash := txD["hash"].(string) + if receipt, ok := receipts[txHash]; ok { + txD["status"] = receipt.Status + txD["gasUsed"] = receipt.GasUsed + txD["cumulativeGasUsed"] = receipt.CumulativeGasUsed + txD["receiptContractAddress"] = receipt.ContractAddress.Hex() + txD["transactionIndex"] = receipt.TransactionIndex + txD["receiptBlockHash"] = receipt.BlockHash + txD["receiptBlockNumber"] = receipt.BlockNumber.Uint64() + txD["logs"] = receipt.Logs + } + } + + if err := bi.esClient.Bulk(ctx, "transactions", txDocs); err != nil { + return fmt.Errorf("bulk indexing of transactions failed: %w", err) + } + return nil +} + +func (bi *BlockchainIndexer) fetchReceipts(ctx context.Context, txHashes []string) (map[string]*types.Receipt, error) { + return bi.ethClient.TxReceipts(ctx, txHashes) +} + +func (bi *BlockchainIndexer) fetchBlocks(ctx context.Context, blockNums []*big.Int) ([]*types.Block, error) { + return bi.ethClient.GetBlocks(ctx, blockNums) +} + +type W3EvmClient struct { + client *w3.Client +} + +func NewW3EthereumClient(endpoint string) (*W3EvmClient, error) { + client, err := w3.Dial(endpoint) + if err != nil { + return nil, fmt.Errorf("failed to connect to Ethereum node: %w", err) + } + return &W3EvmClient{client: client}, nil +} + +func (c *W3EvmClient) GetBlocks(ctx context.Context, blockNums []*big.Int) ([]*types.Block, error) { + batchBlocksCaller := make([]w3types.RPCCaller, len(blockNums)) + blocks := make([]types.Block, len(blockNums)) + for i, blockNum := range blockNums { + batchBlocksCaller[i] = eth.BlockByNumber(blockNum).Returns(&blocks[i]) + } + err := c.client.Call(batchBlocksCaller...) + if err != nil { + return nil, err + } + var b []*types.Block + for _, block := range blocks { + b = append(b, &block) + } + return b, nil +} + +func (c *W3EvmClient) TxReceipts(ctx context.Context, txHashes []string) (map[string]*types.Receipt, error) { + batchTxReceiptCaller := make([]w3types.RPCCaller, len(txHashes)) + txReceipts := make([]types.Receipt, len(txHashes)) + for i, txHash := range txHashes { + batchTxReceiptCaller[i] = eth.TxReceipt(w3.H(txHash)).Returns(&txReceipts[i]) + } + err := c.client.Call(batchTxReceiptCaller...) + if err != nil { + return map[string]*types.Receipt{}, nil + } + txHashToReceipt := make(map[string]*types.Receipt) + for _, txReceipt := range txReceipts { + txHashToReceipt[txReceipt.TxHash.Hex()] = &txReceipt + } + return txHashToReceipt, nil +} + +func (c *W3EvmClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + var block types.Block + if err := c.client.Call(eth.BlockByNumber(number).Returns(&block)); err != nil { + return nil, err + } + return &block, nil +} + +func (c *W3EvmClient) BlockNumber(ctx context.Context) (*big.Int, error) { + var blockNumber big.Int + if err := c.client.Call(eth.BlockNumber().Returns(&blockNumber)); err != nil { + return nil, err + } + return &blockNumber, nil +} + +type ESClient struct { + client *elasticsearch.TypedClient + bulkIndexer esutil.BulkIndexer +} + +func NewESClient(endpoint string) (*ESClient, error) { + client, err := elasticsearch.NewTypedClient(elasticsearch.Config{ + Addresses: []string{endpoint}, + Username: "elastic", + Password: "mev-commit", + }) + if err != nil { + return nil, fmt.Errorf("failed to create Elasticsearch client: %w", err) + } + + bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Client: &elasticsearch.Client{ + BaseClient: client.BaseClient, + API: esapi.New(client.Transport), + }, + NumWorkers: 4, + FlushBytes: 5e+6, // 5MB + }) + if err != nil { + return nil, fmt.Errorf("failed to create bulk indexer: %w", err) + } + + return &ESClient{client: client, bulkIndexer: bi}, nil +} + +func (c *ESClient) Index(ctx context.Context, index string, document interface{}) error { + _, err := c.client.Index(index).Document(document).Do(ctx) + return err +} + +func (c *ESClient) Search(ctx context.Context, index string, query *estypes.Query) (*search.Response, error) { + return c.client.Search().Index(index).Query(query).Do(ctx) +} + +func (c *ESClient) GetLastIndexedBlock(ctx context.Context, direction string) (*big.Int, error) { + // Check if the index exists + exists, err := c.client.Indices.Exists("blocks").Do(ctx) + if err != nil { + return nil, fmt.Errorf("failed to check if index exists: %w", err) + } + + if !exists { + return big.NewInt(0), nil + } + + // Check if the index contains any documents + countRes, err := c.client.Count().Index("blocks").Do(ctx) + if err != nil { + return nil, fmt.Errorf("failed to count documents in index: %w", err) + } + + if countRes.Count == 0 { + return big.NewInt(0), nil + } + + var sortOrder string + if direction == "forward" { + sortOrder = "desc" + } else if direction == "backward" { + sortOrder = "asc" + } else { + return nil, fmt.Errorf("invalid direction: %s", direction) + } + + // Perform the search query + res, err := c.client.Search(). + Index("blocks"). + Sort(map[string]interface{}{ + "number": map[string]interface{}{ + "order": sortOrder, + }, + }). + Size(1). + Do(ctx) + if err != nil { + return nil, fmt.Errorf("failed to execute search query: %w", err) + } + + // Check if there are no hits (index exists but no documents) + if res.Hits.Total.Value == 0 { + return big.NewInt(0), nil + } + + var block struct { + Number uint64 `json:"number"` + } + + if err := json.Unmarshal(res.Hits.Hits[0].Source_, &block); err != nil { + return nil, fmt.Errorf("failed to unmarshal search result: %w", err) + } + blockNumber := new(big.Int).SetUint64(block.Number) + return blockNumber, nil +} + +func (c *ESClient) CreateIndices(ctx context.Context) error { + indices := []string{"blocks", "transactions"} + for _, index := range indices { + res, err := c.client.Indices.Exists(index).Do(ctx) + if err != nil { + return fmt.Errorf("failed to check if index %s exists: %w", index, err) + } + + if !res { + indexSettings := esapi.IndicesCreateRequest{ + Index: index, + Body: strings.NewReader(`{ + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0 + }, + "mappings": { + "properties": { + "timestamp": { + "type": "date", + "format": "strict_date_optional_time||epoch_millis" + } + } + } + }`), + } + + createRes, err := indexSettings.Do(ctx, c.client) + if err != nil { + return fmt.Errorf("failed to create index %s: %w", index, err) + } + defer createRes.Body.Close() + + if createRes.IsError() { + return fmt.Errorf("error creating index %s: %s", index, createRes.String()) + } + } + } + return nil +} + +func (c *ESClient) Bulk(ctx context.Context, indexName string, docs []interface{}) error { + var ( + countSuccessful uint64 + countFailed uint64 + ) + + for _, doc := range docs { + data, err := json.Marshal(doc) + if err != nil { + return fmt.Errorf("failed to marshal document: %w", err) + } + + err = c.bulkIndexer.Add( + ctx, + esutil.BulkIndexerItem{ + Action: "index", + Index: indexName, + Body: bytes.NewReader(data), + OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { + atomic.AddUint64(&countSuccessful, 1) + }, + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { + atomic.AddUint64(&countFailed, 1) + if err != nil { + slog.Error("Bulk indexing error", "error", err) + } else { + slog.Error("Bulk indexing error", "type", res.Error.Type, "reason", res.Error.Reason) + } + }, + }, + ) + + if err != nil { + return fmt.Errorf("failed to add document to bulk indexer: %w", err) + } + } + return nil +} + +func (c *ESClient) Close(ctx context.Context) error { + return c.bulkIndexer.Close(ctx) +} + +func SetLogLevel(level string) { + var logLevel slog.Level + switch level { + case "debug": + logLevel = slog.LevelDebug + case "info": + logLevel = slog.LevelInfo + case "warn": + logLevel = slog.LevelWarn + case "error": + logLevel = slog.LevelError + default: + logLevel = slog.LevelInfo + } + opts := &tint.Options{ + Level: logLevel, + TimeFormat: time.Kitchen, // Optional: Customize the time format + } + handler := tint.NewHandler(os.Stdout, opts) + logger := slog.New(handler) + slog.SetDefault(logger) +} diff --git a/indexer/pkg/logutil/logutil.go b/indexer/pkg/logutil/logutil.go new file mode 100644 index 000000000..28677bcb5 --- /dev/null +++ b/indexer/pkg/logutil/logutil.go @@ -0,0 +1,33 @@ +package logutil + +import ( + "os" + "time" + + "github.com/lmittmann/tint" + "log/slog" +) + +func SetLogLevel(level string) { + var logLevel slog.Level + switch level { + case "debug": + logLevel = slog.LevelDebug + case "info": + logLevel = slog.LevelInfo + case "warn": + logLevel = slog.LevelWarn + case "error": + logLevel = slog.LevelError + default: + logLevel = slog.LevelInfo + } + opts := &tint.Options{ + Level: logLevel, + TimeFormat: time.Kitchen, // Optional: Customize the time format + AddSource: true, + } + handler := tint.NewHandler(os.Stdout, opts) + logger := slog.New(handler) + slog.SetDefault(logger) +} diff --git a/indexer/pkg/store/elasticsearch/elasticsearch.go b/indexer/pkg/store/elasticsearch/elasticsearch.go new file mode 100644 index 000000000..f47dc9793 --- /dev/null +++ b/indexer/pkg/store/elasticsearch/elasticsearch.go @@ -0,0 +1,200 @@ +package elasticsearch + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "log/slog" + "math/big" + "strings" + "sync/atomic" + + "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/esapi" + "github.com/elastic/go-elasticsearch/v8/esutil" + "github.com/elastic/go-elasticsearch/v8/typedapi/core/search" + estypes "github.com/elastic/go-elasticsearch/v8/typedapi/types" + types2 "github.com/primev/mev-commit/indexer/pkg/types" +) + +type ESClient struct { + client *elasticsearch.TypedClient + bulkIndexer esutil.BulkIndexer +} + +func NewESClient(endpoint string, user, pass string) (*ESClient, error) { + client, err := elasticsearch.NewTypedClient(elasticsearch.Config{ + Addresses: []string{endpoint}, + Username: user, + Password: pass, + }) + if err != nil { + return nil, fmt.Errorf("failed to create Elasticsearch client: %w", err) + } + bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Client: &elasticsearch.Client{BaseClient: client.BaseClient, API: esapi.New(client.Transport)}, + NumWorkers: 4, + FlushBytes: 5e+6, // 5MB + }) + if err != nil { + return nil, fmt.Errorf("failed to create bulk indexer: %w", err) + } + return &ESClient{client: client, bulkIndexer: bi}, nil +} + +func (c *ESClient) Index(ctx context.Context, index string, document interface{}) error { + _, err := c.client.Index(index).Document(document).Do(ctx) + return err +} + +func (c *ESClient) Search(ctx context.Context, index string, query *estypes.Query) (*search.Response, error) { + return c.client.Search().Index(index).Query(query).Do(ctx) +} + +func (c *ESClient) GetLastIndexedBlock(ctx context.Context, direction string) (*big.Int, error) { + // Check if the index exists + exists, err := c.client.Indices.Exists("blocks").Do(ctx) + if err != nil { + return nil, fmt.Errorf("failed to check if index exists: %w", err) + } + if !exists { + return big.NewInt(0), nil + } + + // Check if the index contains any documents + countRes, err := c.client.Count().Index("blocks").Do(ctx) + if err != nil { + return nil, fmt.Errorf("failed to count documents in index: %w", err) + } + if countRes.Count == 0 { + return big.NewInt(0), nil + } + + var sortOrder string + if direction == "forward" { + sortOrder = "desc" + } else if direction == "backward" { + sortOrder = "asc" + } else { + return nil, fmt.Errorf("invalid direction: %s", direction) + } + + // Perform the search query + res, err := c.client.Search(). + Index("blocks"). + Sort(map[string]interface{}{ + "number": map[string]interface{}{ + "order": sortOrder, + }, + }). + Size(1). + Do(ctx) + if err != nil { + return nil, fmt.Errorf("failed to execute search query: %w", err) + } + + // Check if there are no hits (index exists but no documents) + if res.Hits.Total.Value == 0 { + return big.NewInt(0), nil + } + + var block struct { + Number uint64 `json:"number"` + } + if err := json.Unmarshal(res.Hits.Hits[0].Source_, &block); err != nil { + return nil, fmt.Errorf("failed to unmarshal search result: %w", err) + } + blockNumber := new(big.Int).SetUint64(block.Number) + return blockNumber, nil +} + +func (c *ESClient) IndexBlock(ctx context.Context, block *types2.IndexBlock) error { + return c.Bulk(ctx, "blocks", []interface{}{block}) +} + +func (c *ESClient) IndexTransactions(ctx context.Context, transactions []*types2.IndexTransaction) error { + docs := make([]interface{}, len(transactions)) + for i, tx := range transactions { + docs[i] = tx + } + return c.Bulk(ctx, "transactions", docs) +} + +func (c *ESClient) CreateIndices(ctx context.Context) error { + indices := []string{"blocks", "transactions"} + for _, index := range indices { + res, err := c.client.Indices.Exists(index).Do(ctx) + if err != nil { + return fmt.Errorf("failed to check if index %s exists: %w", index, err) + } + if !res { + indexSettings := esapi.IndicesCreateRequest{ + Index: index, + Body: strings.NewReader(`{ + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0 + }, + "mappings": { + "properties": { + "timestamp": { + "type": "date", + "format": "strict_date_optional_time||epoch_millis" + } + } + } + }`), + } + createRes, err := indexSettings.Do(ctx, c.client) + if err != nil { + return fmt.Errorf("failed to create index %s: %w", index, err) + } + defer createRes.Body.Close() + if createRes.IsError() { + return fmt.Errorf("error creating index %s: %s", index, createRes.String()) + } + } + } + return nil +} + +func (c *ESClient) Bulk(ctx context.Context, indexName string, docs []interface{}) error { + var ( + countSuccessful uint64 + countFailed uint64 + ) + for _, doc := range docs { + data, err := json.Marshal(doc) + if err != nil { + return fmt.Errorf("failed to marshal document: %w", err) + } + err = c.bulkIndexer.Add( + ctx, + esutil.BulkIndexerItem{ + Action: "index", + Index: indexName, + Body: bytes.NewReader(data), + OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { + atomic.AddUint64(&countSuccessful, 1) + }, + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { + atomic.AddUint64(&countFailed, 1) + if err != nil { + slog.Error("Bulk indexing error", "error", err) + } else { + slog.Error("Bulk indexing error", "type", res.Error.Type, "reason", res.Error.Reason) + } + }, + }, + ) + if err != nil { + return fmt.Errorf("failed to add document to bulk indexer: %w", err) + } + } + return nil +} + +func (c *ESClient) Close(ctx context.Context) error { + return c.bulkIndexer.Close(ctx) +} diff --git a/indexer/pkg/store/store.go b/indexer/pkg/store/store.go new file mode 100644 index 000000000..4cd16ffce --- /dev/null +++ b/indexer/pkg/store/store.go @@ -0,0 +1,16 @@ +package store + +import ( + "context" + "math/big" + + types2 "github.com/primev/mev-commit/indexer/pkg/types" +) + +type Storage interface { + IndexBlock(ctx context.Context, block *types2.IndexBlock) error + IndexTransactions(ctx context.Context, transactions []*types2.IndexTransaction) error + GetLastIndexedBlock(ctx context.Context, direction string) (*big.Int, error) + CreateIndices(ctx context.Context) error + Close(ctx context.Context) error +} diff --git a/indexer/pkg/types/types.go b/indexer/pkg/types/types.go new file mode 100644 index 000000000..e948b0d5e --- /dev/null +++ b/indexer/pkg/types/types.go @@ -0,0 +1,43 @@ +package types + +type IndexBlock struct { + Number uint64 `json:"number"` + Hash string `json:"hash"` + ParentHash string `json:"parentHash"` + Root string `json:"root"` + Nonce uint64 `json:"nonce"` + Timestamp string `json:"timestamp"` + Transactions int `json:"transactions"` + BaseFee uint64 `json:"baseFee"` + GasLimit uint64 `json:"gasLimit"` + GasUsed uint64 `json:"gasUsed"` + Difficulty uint64 `json:"difficulty"` + ExtraData string `json:"extraData"` +} + +type IndexTransaction struct { + Hash string `json:"hash"` + From string `json:"from"` + To string `json:"to"` + Gas uint64 `json:"gas"` + GasPrice uint64 `json:"gasPrice"` + GasTipCap uint64 `json:"gasTipCap"` + GasFeeCap uint64 `json:"gasFeeCap"` + Value uint64 `json:"value"` + Nonce uint64 `json:"nonce"` + BlockHash string `json:"blockHash"` + BlockNumber uint64 `json:"blockNumber"` + ChainId string `json:"chainId"` + V string `json:"v"` + R string `json:"r"` + S string `json:"s"` + Input string `json:"input"` + Timestamp string `json:"timestamp"` + Status uint64 `json:"status"` + GasUsed uint64 `json:"gasUsed"` + CumulativeGasUsed uint64 `json:"cumulativeGasUsed"` + ContractAddress string `json:"contractAddress"` + TransactionIndex uint `json:"transactionIndex"` + ReceiptBlockHash string `json:"receiptBlockHash"` + ReceiptBlockNumber uint64 `json:"receiptBlockNumber"` +}