Skip to content

Commit

Permalink
Merge pull request #2679 from gobitfly/BIDS-2654/fix-exec-transaction…
Browse files Browse the repository at this point in the history
…s-count

Bids 2654/fix exec transactions count
  • Loading branch information
guybrush authored Nov 8, 2023
2 parents 453c1dc + 9fb953e commit 6ff245c
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 17 deletions.
146 changes: 129 additions & 17 deletions cmd/misc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ func main() {
}
case "export-stats-totals":
exportStatsTotals(opts.Columns, opts.StartDay, opts.EndDay, opts.DataConcurrency)
case "fix-exec-transactions-count":
err = fixExecTransactionsCount()
default:
utils.LogFatal(nil, fmt.Sprintf("unknown command %s", opts.Command), 0)
}
Expand All @@ -370,6 +372,88 @@ func main() {
}
}

func fixExecTransactionsCount() error {
startBlockNumber := uint64(opts.StartBlock)
endBlockNumber := uint64(opts.EndBlock)

logrus.WithFields(logrus.Fields{"startBlockNumber": startBlockNumber, "endBlockNumber": endBlockNumber}).Infof("fixExecTransactionsCount")

batchSize := int64(1000)

dbUpdates := []struct {
BlockNumber uint64
ExecTxsCount uint64
}{}

for i := startBlockNumber; i <= endBlockNumber; i += uint64(batchSize) {
firstBlock := int64(i)
lastBlock := firstBlock + batchSize - 1
if lastBlock > int64(endBlockNumber) {
lastBlock = int64(endBlockNumber)
}
blocksChan := make(chan *types.Eth1Block, batchSize)
go func(stream chan *types.Eth1Block) {
high := lastBlock
low := lastBlock - batchSize + 1
if int64(firstBlock) > low {
low = firstBlock
}

err := db.BigtableClient.GetFullBlocksDescending(stream, uint64(high), uint64(low))
if err != nil {
logrus.Errorf("error getting blocks descending high: %v low: %v err: %v", high, low, err)
}
close(stream)
}(blocksChan)
totalTxsCount := 0
for b := range blocksChan {
if len(b.Transactions) > 0 {
totalTxsCount += len(b.Transactions)
dbUpdates = append(dbUpdates, struct {
BlockNumber uint64
ExecTxsCount uint64
}{b.Number, uint64(len(b.Transactions))})
}
}
logrus.Infof("%v-%v: totalTxsCount: %v", firstBlock, lastBlock, totalTxsCount)
}

logrus.Infof("dbUpdates: %v", len(dbUpdates))

tx, err := db.WriterDb.Begin()
if err != nil {
return fmt.Errorf("error starting db transactions: %w", err)
}
defer tx.Rollback()

for b := 0; b < len(dbUpdates); b += int(batchSize) {
start := b
end := b + int(batchSize)
if len(dbUpdates) < end {
end = len(dbUpdates)
}

valueStrings := []string{}
for _, v := range dbUpdates[start:end] {
valueStrings = append(valueStrings, fmt.Sprintf("(%v,%v)", v.BlockNumber, v.ExecTxsCount))
}

stmt := fmt.Sprintf(`
update blocks as a set exec_transactions_count = b.exec_transactions_count
from (values %s) as b(exec_block_number, exec_transactions_count)
where a.exec_block_number = b.exec_block_number`, strings.Join(valueStrings, ","))

_, err = tx.Exec(stmt)
if err != nil {
return err
}

logrus.Infof("updated %v-%v / %v", start, end, len(dbUpdates))
}

return tx.Commit()
}

func UpdateBlockFinalizationSequentially() error {
var err error

Expand Down Expand Up @@ -429,29 +513,57 @@ func UpdateBlockFinalizationSequentially() error {
}

func DebugBlocks() error {
elClient, err := rpc.NewErigonClient(utils.Config.Eth1ErigonEndpoint)
if err != nil {
return err
}

client, err := rpc.NewErigonClient(utils.Config.Eth1ErigonEndpoint)
clClient, err := rpc.NewLighthouseClient(fmt.Sprintf("http://%v:%v", utils.Config.Indexer.Node.Host, utils.Config.Indexer.Node.Port), new(big.Int).SetUint64(utils.Config.Chain.ClConfig.DepositChainID))
if err != nil {
return err
}

for i := opts.StartBlock; i <= opts.EndBlock; i++ {
b, err := db.BigtableClient.GetBlockFromBlocksTable(i)
btBlock, err := db.BigtableClient.GetBlockFromBlocksTable(i)
if err != nil {
return err
}
// logrus.WithFields(logrus.Fields{"block": i, "data": fmt.Sprintf("%+v", b)}).Infof("block from bt")

cb, _, err := client.GetBlock(int64(i), "parity/geth")
elBlock, _, err := elClient.GetBlock(int64(i), "parity/geth")
if err != nil {
return err
}

slot := utils.TimeToSlot(uint64(elBlock.Time.Seconds))
clBlock, err := clClient.GetBlockBySlot(slot)
if err != nil {
return err
}
logFields := logrus.Fields{
"block": i,
"bt.hash": fmt.Sprintf("%#x", btBlock.Hash),
"bt.BlobGasUsed": btBlock.BlobGasUsed,
"bt.ExcessBlobGas": btBlock.ExcessBlobGas,
"bt.txs": len(btBlock.Transactions),
"el.BlobGasUsed": elBlock.BlobGasUsed,
"el.hash": fmt.Sprintf("%#x", elBlock.Hash),
"el.ExcessBlobGas": elBlock.ExcessBlobGas,
"el.txs": len(elBlock.Transactions),
}
if !bytes.Equal(clBlock.ExecutionPayload.BlockHash, elBlock.Hash) {
logrus.Warnf("clBlock.ExecutionPayload.BlockHash != i: %x != %x", clBlock.ExecutionPayload.BlockHash, elBlock.Hash)
} else if clBlock.ExecutionPayload.BlockNumber != i {
logrus.Warnf("clBlock.ExecutionPayload.BlockNumber != i: %v != %v", clBlock.ExecutionPayload.BlockNumber, i)
} else {
logFields["cl.txs"] = len(clBlock.ExecutionPayload.Transactions)
}

logrus.WithFields(logrus.Fields{"block": i, "bt.hash": fmt.Sprintf("%#x", b.Hash), "bt.BlobGasUsed": b.BlobGasUsed, "bt.ExcessBlobGas": b.ExcessBlobGas, "bt.txs": len(b.Transactions), "c.BlobGasUsed": cb.BlobGasUsed, "c.hash": fmt.Sprintf("%#x", cb.Hash), "c.ExcessBlobGas": cb.ExcessBlobGas, "c.txs": len(cb.Transactions)}).Infof("debug block")
logrus.WithFields(logFields).Infof("debug block")

for i := range b.Transactions {
btx := b.Transactions[i]
ctx := cb.Transactions[i]
for i := range elBlock.Transactions {
btx := elBlock.Transactions[i]
ctx := elBlock.Transactions[i]
btxH := []string{}
ctxH := []string{}
for _, h := range btx.BlobVersionedHashes {
Expand All @@ -462,16 +574,16 @@ func DebugBlocks() error {
}

logrus.WithFields(logrus.Fields{
"b.hash": fmt.Sprintf("%#x", btx.Hash),
"c.hash": fmt.Sprintf("%#x", ctx.Hash),
"b.BlobVersionedHashes": fmt.Sprintf("%+v", btxH),
"c.BlobVersionedHashes": fmt.Sprintf("%+v", ctxH),
"b.maxFeePerBlobGas": btx.MaxFeePerBlobGas,
"c.maxFeePerBlobGas": ctx.MaxFeePerBlobGas,
"b.BlobGasPrice": btx.BlobGasPrice,
"c.BlobGasPrice": ctx.BlobGasPrice,
"b.BlobGasUsed": btx.BlobGasUsed,
"c.BlobGasUsed": ctx.BlobGasUsed,
"b.hash": fmt.Sprintf("%#x", btx.Hash),
"el.hash": fmt.Sprintf("%#x", ctx.Hash),
"b.BlobVersionedHashes": fmt.Sprintf("%+v", btxH),
"el.BlobVersionedHashes": fmt.Sprintf("%+v", ctxH),
"b.maxFeePerBlobGas": btx.MaxFeePerBlobGas,
"el.maxFeePerBlobGas": ctx.MaxFeePerBlobGas,
"b.BlobGasPrice": btx.BlobGasPrice,
"el.BlobGasPrice": ctx.BlobGasPrice,
"b.BlobGasUsed": btx.BlobGasUsed,
"el.BlobGasUsed": ctx.BlobGasUsed,
}).Infof("debug tx")
}
}
Expand Down
1 change: 1 addition & 0 deletions rpc/lighthouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,7 @@ func (lc *LighthouseClient) blockFromResponse(parsedHeaders *StandardBeaconHeade
tx.BlobVersionedHashes = append(tx.BlobVersionedHashes, h.Bytes())
}
}
txs = append(txs, tx)
}
withdrawals := make([]*types.Withdrawals, 0, len(payload.Withdrawals))
for _, w := range payload.Withdrawals {
Expand Down

0 comments on commit 6ff245c

Please sign in to comment.