diff --git a/cmd/misc/main.go b/cmd/misc/main.go index a8a9731d31..ac265938cc 100644 --- a/cmd/misc/main.go +++ b/cmd/misc/main.go @@ -359,6 +359,8 @@ func main() { } case "export-stats-totals": exportStatsTotals(opts.Columns, opts.StartDay, opts.EndDay, opts.DataConcurrency) + case "fix-exec-transactions-count": + err = fixExecTransactionsCount() default: utils.LogFatal(nil, fmt.Sprintf("unknown command %s", opts.Command), 0) } @@ -370,6 +372,88 @@ func main() { } } +func fixExecTransactionsCount() error { + startBlockNumber := uint64(opts.StartBlock) + endBlockNumber := uint64(opts.EndBlock) + + logrus.WithFields(logrus.Fields{"startBlockNumber": startBlockNumber, "endBlockNumber": endBlockNumber}).Infof("fixExecTransactionsCount") + + batchSize := int64(1000) + + dbUpdates := []struct { + BlockNumber uint64 + ExecTxsCount uint64 + }{} + + for i := startBlockNumber; i <= endBlockNumber; i += uint64(batchSize) { + firstBlock := int64(i) + lastBlock := firstBlock + batchSize - 1 + if lastBlock > int64(endBlockNumber) { + lastBlock = int64(endBlockNumber) + } + blocksChan := make(chan *types.Eth1Block, batchSize) + go func(stream chan *types.Eth1Block) { + high := lastBlock + low := lastBlock - batchSize + 1 + if int64(firstBlock) > low { + low = firstBlock + } + + err := db.BigtableClient.GetFullBlocksDescending(stream, uint64(high), uint64(low)) + if err != nil { + logrus.Errorf("error getting blocks descending high: %v low: %v err: %v", high, low, err) + } + close(stream) + }(blocksChan) + totalTxsCount := 0 + for b := range blocksChan { + if len(b.Transactions) > 0 { + totalTxsCount += len(b.Transactions) + dbUpdates = append(dbUpdates, struct { + BlockNumber uint64 + ExecTxsCount uint64 + }{b.Number, uint64(len(b.Transactions))}) + } + } + logrus.Infof("%v-%v: totalTxsCount: %v", firstBlock, lastBlock, totalTxsCount) + } + + logrus.Infof("dbUpdates: %v", len(dbUpdates)) + + tx, err := db.WriterDb.Begin() + if err != nil { + return fmt.Errorf("error starting db transactions: %w", err) + } + defer tx.Rollback() + + for b := 0; b < len(dbUpdates); b += int(batchSize) { + start := b + end := b + int(batchSize) + if len(dbUpdates) < end { + end = len(dbUpdates) + } + + valueStrings := []string{} + for _, v := range dbUpdates[start:end] { + valueStrings = append(valueStrings, fmt.Sprintf("(%v,%v)", v.BlockNumber, v.ExecTxsCount)) + } + + stmt := fmt.Sprintf(` + update blocks as a set exec_transactions_count = b.exec_transactions_count + from (values %s) as b(exec_block_number, exec_transactions_count) + where a.exec_block_number = b.exec_block_number`, strings.Join(valueStrings, ",")) + + _, err = tx.Exec(stmt) + if err != nil { + return err + } + + logrus.Infof("updated %v-%v / %v", start, end, len(dbUpdates)) + } + + return tx.Commit() +} + func UpdateBlockFinalizationSequentially() error { var err error @@ -429,29 +513,57 @@ func UpdateBlockFinalizationSequentially() error { } func DebugBlocks() error { + elClient, err := rpc.NewErigonClient(utils.Config.Eth1ErigonEndpoint) + if err != nil { + return err + } - client, err := rpc.NewErigonClient(utils.Config.Eth1ErigonEndpoint) + clClient, err := rpc.NewLighthouseClient(fmt.Sprintf("http://%v:%v", utils.Config.Indexer.Node.Host, utils.Config.Indexer.Node.Port), new(big.Int).SetUint64(utils.Config.Chain.ClConfig.DepositChainID)) if err != nil { return err } for i := opts.StartBlock; i <= opts.EndBlock; i++ { - b, err := db.BigtableClient.GetBlockFromBlocksTable(i) + btBlock, err := db.BigtableClient.GetBlockFromBlocksTable(i) if err != nil { return err } // logrus.WithFields(logrus.Fields{"block": i, "data": fmt.Sprintf("%+v", b)}).Infof("block from bt") - cb, _, err := client.GetBlock(int64(i), "parity/geth") + elBlock, _, err := elClient.GetBlock(int64(i), "parity/geth") + if err != nil { + return err + } + + slot := utils.TimeToSlot(uint64(elBlock.Time.Seconds)) + clBlock, err := clClient.GetBlockBySlot(slot) if err != nil { return err } + logFields := logrus.Fields{ + "block": i, + "bt.hash": fmt.Sprintf("%#x", btBlock.Hash), + "bt.BlobGasUsed": btBlock.BlobGasUsed, + "bt.ExcessBlobGas": btBlock.ExcessBlobGas, + "bt.txs": len(btBlock.Transactions), + "el.BlobGasUsed": elBlock.BlobGasUsed, + "el.hash": fmt.Sprintf("%#x", elBlock.Hash), + "el.ExcessBlobGas": elBlock.ExcessBlobGas, + "el.txs": len(elBlock.Transactions), + } + if !bytes.Equal(clBlock.ExecutionPayload.BlockHash, elBlock.Hash) { + logrus.Warnf("clBlock.ExecutionPayload.BlockHash != i: %x != %x", clBlock.ExecutionPayload.BlockHash, elBlock.Hash) + } else if clBlock.ExecutionPayload.BlockNumber != i { + logrus.Warnf("clBlock.ExecutionPayload.BlockNumber != i: %v != %v", clBlock.ExecutionPayload.BlockNumber, i) + } else { + logFields["cl.txs"] = len(clBlock.ExecutionPayload.Transactions) + } - logrus.WithFields(logrus.Fields{"block": i, "bt.hash": fmt.Sprintf("%#x", b.Hash), "bt.BlobGasUsed": b.BlobGasUsed, "bt.ExcessBlobGas": b.ExcessBlobGas, "bt.txs": len(b.Transactions), "c.BlobGasUsed": cb.BlobGasUsed, "c.hash": fmt.Sprintf("%#x", cb.Hash), "c.ExcessBlobGas": cb.ExcessBlobGas, "c.txs": len(cb.Transactions)}).Infof("debug block") + logrus.WithFields(logFields).Infof("debug block") - for i := range b.Transactions { - btx := b.Transactions[i] - ctx := cb.Transactions[i] + for i := range elBlock.Transactions { + btx := elBlock.Transactions[i] + ctx := elBlock.Transactions[i] btxH := []string{} ctxH := []string{} for _, h := range btx.BlobVersionedHashes { @@ -462,16 +574,16 @@ func DebugBlocks() error { } logrus.WithFields(logrus.Fields{ - "b.hash": fmt.Sprintf("%#x", btx.Hash), - "c.hash": fmt.Sprintf("%#x", ctx.Hash), - "b.BlobVersionedHashes": fmt.Sprintf("%+v", btxH), - "c.BlobVersionedHashes": fmt.Sprintf("%+v", ctxH), - "b.maxFeePerBlobGas": btx.MaxFeePerBlobGas, - "c.maxFeePerBlobGas": ctx.MaxFeePerBlobGas, - "b.BlobGasPrice": btx.BlobGasPrice, - "c.BlobGasPrice": ctx.BlobGasPrice, - "b.BlobGasUsed": btx.BlobGasUsed, - "c.BlobGasUsed": ctx.BlobGasUsed, + "b.hash": fmt.Sprintf("%#x", btx.Hash), + "el.hash": fmt.Sprintf("%#x", ctx.Hash), + "b.BlobVersionedHashes": fmt.Sprintf("%+v", btxH), + "el.BlobVersionedHashes": fmt.Sprintf("%+v", ctxH), + "b.maxFeePerBlobGas": btx.MaxFeePerBlobGas, + "el.maxFeePerBlobGas": ctx.MaxFeePerBlobGas, + "b.BlobGasPrice": btx.BlobGasPrice, + "el.BlobGasPrice": ctx.BlobGasPrice, + "b.BlobGasUsed": btx.BlobGasUsed, + "el.BlobGasUsed": ctx.BlobGasUsed, }).Infof("debug tx") } } diff --git a/rpc/lighthouse.go b/rpc/lighthouse.go index 08d6f5492b..07b8638a22 100644 --- a/rpc/lighthouse.go +++ b/rpc/lighthouse.go @@ -994,6 +994,7 @@ func (lc *LighthouseClient) blockFromResponse(parsedHeaders *StandardBeaconHeade tx.BlobVersionedHashes = append(tx.BlobVersionedHashes, h.Bytes()) } } + txs = append(txs, tx) } withdrawals := make([]*types.Withdrawals, 0, len(payload.Withdrawals)) for _, w := range payload.Withdrawals {