Skip to content

Commit

Permalink
fix: Delete incorrect coin balances on reorg (blockscout#10879)
Browse files Browse the repository at this point in the history
  • Loading branch information
Qwerty5Uiop authored Oct 18, 2024
1 parent 92e7ce5 commit a23e03e
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 4 deletions.
2 changes: 1 addition & 1 deletion apps/explorer/lib/explorer/chain/import/runner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ defmodule Explorer.Chain.Import.Runner do
@type changes_list :: [changes]

@type changeset_function_name :: atom
@type on_conflict :: :nothing | :replace_all | Ecto.Query.t()
@type on_conflict :: :nothing | :replace_all | {:replace, [atom()]} | Ecto.Query.t()

@typedoc """
Runner-specific options under `c:option_key/0` in all options passed to `c:run/3`.
Expand Down
4 changes: 2 additions & 2 deletions apps/explorer/lib/explorer/chain/import/runner/addresses.ex
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ defmodule Explorer.Chain.Import.Runner.Addresses do
required(:timeout) => timeout,
required(:timestamps) => Import.timestamps()
}) :: {:ok, [Address.t()]}
defp insert(repo, ordered_changes_list, %{timeout: timeout, timestamps: timestamps} = options)
when is_list(ordered_changes_list) do
def insert(repo, ordered_changes_list, %{timeout: timeout, timestamps: timestamps} = options)
when is_list(ordered_changes_list) do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)

Import.insert_changes_list(
Expand Down
79 changes: 78 additions & 1 deletion apps/explorer/lib/explorer/chain/import/runner/blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
alias Explorer.Chain.Block.Reward
alias Explorer.Chain.Import.Runner
alias Explorer.Chain.Import.Runner.Address.CurrentTokenBalances
alias Explorer.Chain.Import.Runner.{TokenInstances, Tokens}
alias Explorer.Chain.Import.Runner.{Addresses, TokenInstances, Tokens}
alias Explorer.Prometheus.Instrumenter
alias Explorer.Utility.MissingRangesManipulator

Expand Down Expand Up @@ -160,6 +160,23 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
:derive_transaction_forks
)
end)
|> Multi.run(:delete_address_coin_balances, fn repo, %{lose_consensus: non_consensus_blocks} ->
Instrumenter.block_import_stage_runner(
fn -> delete_address_coin_balances(repo, non_consensus_blocks, insert_options) end,
:address_referencing,
:blocks,
:delete_address_coin_balances
)
end)
|> Multi.run(:derive_address_fetched_coin_balances, fn repo,
%{delete_address_coin_balances: delete_address_coin_balances} ->
Instrumenter.block_import_stage_runner(
fn -> derive_address_fetched_coin_balances(repo, delete_address_coin_balances, insert_options) end,
:address_referencing,
:blocks,
:derive_address_fetched_coin_balances
)
end)
|> Multi.run(:delete_address_token_balances, fn repo, %{lose_consensus: non_consensus_blocks} ->
Instrumenter.block_import_stage_runner(
fn -> delete_address_token_balances(repo, non_consensus_blocks, insert_options) end,
Expand Down Expand Up @@ -464,6 +481,66 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
)
end

defp delete_address_coin_balances(_repo, [], _options), do: {:ok, []}

defp delete_address_coin_balances(repo, non_consensus_blocks, %{timeout: timeout}) do
non_consensus_block_numbers = Enum.map(non_consensus_blocks, fn {number, _hash} -> number end)

ordered_query =
from(cb in Address.CoinBalance,
where: cb.block_number in ^non_consensus_block_numbers,
select: map(cb, [:address_hash, :block_number]),
# Enforce TokenBalance ShareLocks order (see docs: sharelocks.md)
order_by: [cb.address_hash, cb.block_number],
lock: "FOR UPDATE"
)

query =
from(cb in Address.CoinBalance,
select: cb.address_hash,
inner_join: ordered_address_coin_balance in subquery(ordered_query),
on:
ordered_address_coin_balance.address_hash == cb.address_hash and
ordered_address_coin_balance.block_number == cb.block_number
)

try do
{_count, deleted_coin_balances_address_hashes} = repo.delete_all(query, timeout: timeout)

{:ok, deleted_coin_balances_address_hashes}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, block_numbers: non_consensus_block_numbers}}
end
end

defp derive_address_fetched_coin_balances(_repo, [], _options), do: {:ok, []}

defp derive_address_fetched_coin_balances(repo, deleted_balances_address_hashes, options) do
last_balances_query =
from(cb in Address.CoinBalance,
where: cb.address_hash in ^deleted_balances_address_hashes,
where: not is_nil(cb.value),
distinct: cb.address_hash,
order_by: [asc: cb.address_hash, desc: cb.block_number],
select: %{
hash: cb.address_hash,
fetched_coin_balance: cb.value,
fetched_coin_balance_block_number: cb.block_number
}
)

addresses_params =
last_balances_query
|> repo.all()
|> Enum.sort_by(& &1.hash)

addresses_options =
Map.put(options, :on_conflict, {:replace, [:fetched_coin_balance, :fetched_coin_balance_block_number]})

Addresses.insert(repo, addresses_params, addresses_options)
end

defp delete_address_token_balances(_, [], _), do: {:ok, []}

defp delete_address_token_balances(repo, non_consensus_blocks, %{timeout: timeout}) do
Expand Down
28 changes: 28 additions & 0 deletions apps/explorer/test/explorer/chain/import/runner/blocks_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,34 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
end)
end

test "coin balances are deleted and new balances are derived if some blocks lost consensus",
%{consensus_block: %{number: block_number} = block, options: options} do
%{hash: address_hash} = address = insert(:address)

prev_block_number = block_number - 1

insert(:address_coin_balance, address: address, block_number: block_number)
%{value: prev_value} = insert(:address_coin_balance, address: address, block_number: prev_block_number)

assert count(Address.CoinBalance) == 2

insert(:block, number: block_number, consensus: true)

assert {:ok,
%{
delete_address_coin_balances: [^address_hash],
derive_address_fetched_coin_balances: [
%{
hash: ^address_hash,
fetched_coin_balance: ^prev_value,
fetched_coin_balance_block_number: ^prev_block_number
}
]
}} = run_block_consensus_change(block, true, options)

assert %{value: ^prev_value, block_number: ^prev_block_number} = Repo.one(Address.CoinBalance)
end

test "delete_address_current_token_balances deletes rows with matching block number when consensus is true",
%{consensus_block: %{number: block_number} = block, options: options} do
%Address.CurrentTokenBalance{address_hash: address_hash, token_contract_address_hash: token_contract_address_hash} =
Expand Down

0 comments on commit a23e03e

Please sign in to comment.