Skip to content

Commit

Permalink
get txindex on inserting tx draft
Browse files Browse the repository at this point in the history
  • Loading branch information
pgebal committed Sep 24, 2020
1 parent 89a71aa commit 3d9b6a8
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 28 deletions.
1 change: 1 addition & 0 deletions apps/engine/lib/engine/callbacks/in_flight_exit_started.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ defmodule Engine.Callbacks.InFlightExitStarted do

defp do_callback(multi, positions, []) do
query = where(Output.usable(), [output], output.position in ^positions)
# how precise do we want to be in tracking output state? Should we have statuses exited, challenged?
Multi.update_all(multi, :exiting_outputs, query, set: [state: "exiting", updated_at: NaiveDateTime.utc_now()])
end
end
3 changes: 3 additions & 0 deletions apps/engine/lib/engine/callbacks/piggyback.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ defmodule Engine.Callbacks.Piggyback do

transaction ->
case transaction |> Map.get(type) |> Enum.at(index) do
# transaction inputs status is set to exiting when ife starts
# which means only transaction outputs can have piggybacked status
# is that intentional?
%Output{state: "confirmed"} = output ->
changeset = change(output, state: "piggybacked")
position = get_field(changeset, :position)
Expand Down
72 changes: 57 additions & 15 deletions apps/engine/lib/engine/db/block.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ defmodule Engine.DB.Block do

require Logger

@max_transaction_in_block 65_000

@optional_fields [:hash, :tx_hash, :formed_at_ethereum_height, :submitted_at_ethereum_height, :gas, :attempts_counter]
@required_fields [:nonce, :blknum]
@required_fields [:nonce, :blknum, :txcount, :state]

@type t() :: %{
hash: binary(),
state: :forming | :pending_submission | :submitted | :confirmed,
nonce: pos_integer(),
blknum: pos_integer() | nil,
txcount: non_neg_integer(),
tx_hash: binary() | nil,
formed_at_ethereum_height: pos_integer() | nil,
id: pos_integer(),
Expand All @@ -42,12 +46,13 @@ defmodule Engine.DB.Block do
@timestamps_opts [inserted_at: :node_inserted_at, updated_at: :node_updated_at]

schema "blocks" do
# Extracted from `output_id`
field(:hash, :binary)
field(:state, Ecto.Atom)
# nonce = max(nonce) + 1
field(:nonce, :integer)
# blknum = nonce * 1000
field(:blknum, :integer)
field(:txcount, :integer)
field(:tx_hash, :binary)
field(:formed_at_ethereum_height, :integer)
field(:submitted_at_ethereum_height, :integer)
Expand Down Expand Up @@ -91,8 +96,7 @@ defmodule Engine.DB.Block do
@decorate trace(service: :ecto, type: :backend)
def form() do
Multi.new()
|> Multi.run("new-block", &insert_block/2)
|> Multi.run("form-block", &attach_transactions_to_block/2)
|> Multi.run("forming-block", &get_forming_or_insert_block/2)
|> Multi.run("hash-block", &generate_block_hash/2)
|> Repo.transaction()
end
Expand All @@ -111,6 +115,36 @@ defmodule Engine.DB.Block do
end
end

@doc """
Get a forming block or inserts a new one if current forming block hit it's transaction limit
"""
def get_forming_or_insert_block(repo, params) do
{:ok, forming_block} = get_forming_block_for_update(repo, params)

case forming_block do
nil -> insert_block(repo)
block -> insert_block_if_transaction_limit_exceeded(repo, block)
end
end

@doc """
Increases and returns number of transaction in block
"""
def increase_txcount(repo, %{"block" => block}) do
{1, [{txcount}]} =
repo.update_all(
from(b in __MODULE__, where: b.id == ^block.id, select: {b.txcount}),
inc: [txcount: 1]
)

{:ok, txcount}
end

defp get_forming_block_for_update(repo, _params) do
block = repo.one(from(block in __MODULE__, where: block.state == ^:forming, lock: "FOR UPDATE"))
{:ok, block}
end

defp get_all(repo, _changeset, new_height, mined_child_block) do
query =
from(p in __MODULE__,
Expand Down Expand Up @@ -156,7 +190,22 @@ defmodule Engine.DB.Block do
end
end

defp insert_block(repo, _params) do
defp insert_block_if_transaction_limit_exceeded(_repo, %{txcount: txcount} = block)
when txcount < @max_transaction_in_block do
{:ok, block}
end

defp insert_block_if_transaction_limit_exceeded(repo, block) do
{:ok, _} = finalize_block(repo, block)
insert_block(repo)
end

defp finalize_block(repo, block) do
changeset = change(block, state: :pending_submission)
repo.update(changeset)
end

defp insert_block(repo) do
nonce =
query_max_nonce()
|> Repo.one()
Expand All @@ -167,7 +216,7 @@ defmodule Engine.DB.Block do

blknum = nonce * Configuration.child_block_interval()

params = %{nonce: nonce, blknum: blknum}
params = %{state: :forming, nonce: nonce, blknum: blknum, txcount: 0}

%__MODULE__{}
|> changeset(params)
Expand All @@ -176,20 +225,13 @@ defmodule Engine.DB.Block do

defp query_max_nonce(), do: from(block in __MODULE__, select: max(block.nonce))

defp attach_transactions_to_block(repo, %{"new-block" => block}) do
updates = [block_id: block.id, updated_at: NaiveDateTime.utc_now()]
{total, _} = repo.update_all(Transaction.pending(), set: updates)

{:ok, total}
end

defp generate_block_hash(repo, %{"new-block" => block}) do
defp generate_block_hash(repo, %{"forming-block" => block}) do
hash =
block.id
|> fetch_tx_bytes_in_block()
|> Merkle.root_hash()

changeset = change(block, hash: hash)
changeset = change(block, hash: hash, state: :pending_submission)
repo.update(changeset)
end

Expand Down
5 changes: 4 additions & 1 deletion apps/engine/lib/engine/db/output.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ defmodule Engine.DB.Output do
position: pos_integer() | nil,
spending_transaction: Transaction.t() | nil,
spending_transaction_id: pos_integer() | nil,
state: String.t(),
state: :forming | :pending_submission | :submitted | :confirmed,
updated_at: DateTime.t()
}

@timestamps_opts [inserted_at: :node_inserted_at, updated_at: :node_updated_at]

schema "outputs" do
# Extracted from `output_id`
# it depends on type if output_id stores a position
field(:position, :integer)

field(:output_type, :integer)
Expand Down Expand Up @@ -69,6 +70,8 @@ defmodule Engine.DB.Output do
"""
def usable() do
from(o in __MODULE__,
# what does "confirmed" status means?
# Block in which this output was created is published on root chain?
where: is_nil(o.spending_transaction_id) and o.state == "confirmed"
)
end
Expand Down
17 changes: 16 additions & 1 deletion apps/engine/lib/engine/db/transaction.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule Engine.DB.Transaction do
import Ecto.Changeset, only: [cast: 3, cast_assoc: 2, validate_required: 2]
import Ecto.Query, only: [from: 2]

alias Ecto.Multi
alias Engine.DB.Block
alias Engine.DB.Output
alias Engine.DB.Transaction.Validator
Expand All @@ -26,6 +27,7 @@ defmodule Engine.DB.Transaction do
@type t() :: %{
block: Block.t(),
block_id: pos_integer(),
tx_index: non_neg_integer(),
id: pos_integer(),
inputs: list(Output.t()),
inserted_at: DateTime.t(),
Expand Down Expand Up @@ -53,6 +55,7 @@ defmodule Engine.DB.Transaction do
field(:tx_bytes, :binary)
field(:tx_hash, :binary)
field(:tx_type, :integer)
field(:tx_index, :integer)
field(:kind, Ecto.Atom)

# Virtual fields used for convenience and validation
Expand Down Expand Up @@ -122,7 +125,19 @@ defmodule Engine.DB.Transaction do
|> Validator.validate_statefully(params)
end

def insert(changeset), do: Repo.insert(changeset)
def insert(changeset) do
Multi.new()
|> Multi.run("block", &Block.get_forming_or_insert_block/2)
|> Multi.run("block-txcount", &Block.increase_txcount/2)
|> Multi.run("new-transaction", fn repo, params -> insert_transaction(repo, params, changeset) end)
|> Repo.transaction()
end

defp insert_transaction(repo, %{"block-txcount" => block_txcount, "block" => block}, changeset) do
changeset
|> Ecto.Changeset.change(%{tx_index: block_txcount - 1, block: block})
|> repo.insert()
end

defp recovered_to_map(transaction) do
inputs = Enum.map(transaction.inputs, &Map.from_struct/1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ defmodule Engine.Repo.Migrations.CreateBlocksTable do
create table(:blocks) do
# keccak hash of transactions
add(:hash, :binary)
# blocks state: forming, pending_submission, submitted, confirmed
add(:state, :string, null: false)
# transaction order!
add(:nonce, :integer, null: false)
# plasma block number
add(:blknum, :integer, null: false)
# number of transactions in block
add(:txcount, :integer, null: false)
# submitted transaction hash (gets updated with submitted_at_ethereum_height)
add(:tx_hash, :binary)
# at which height did we form the block
Expand All @@ -26,6 +30,7 @@ defmodule Engine.Repo.Migrations.CreateBlocksTable do

create(unique_index(:blocks, :blknum))
create(unique_index(:blocks, :hash))
create(unique_index(:blocks, [:state], where: "state = 'forming'", name: :single_forming_block_index))

create(
constraint(
Expand All @@ -35,6 +40,14 @@ defmodule Engine.Repo.Migrations.CreateBlocksTable do
)
)

create(
constraint(
:blocks,
:txcount,
check: "txcount <= 65000"
)
)

execute("SELECT ecto_manage_updated_at('blocks');")
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ defmodule Engine.Repo.Migrations.CreateTransactions do

def change do
create table(:transactions) do
add(:tx_bytes, :binary)
add(:tx_hash, :binary)
add(:tx_type, :integer)
add(:kind, :string)
add(:tx_bytes, :binary, null: false)
add(:tx_hash, :binary, null: false)
add(:tx_type, :integer, null: false)
add(:tx_index, :integer)
add(:kind, :string, null: false)

add(:block_id, references(:blocks))
add(:inserted_at, :utc_datetime, null: false, default: fragment("now_utc()"))
Expand Down
36 changes: 32 additions & 4 deletions apps/engine/test/engine/db/block_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ defmodule Engine.DB.BlockTest do
describe "form/0" do
test "forms a block from the existing pending transactions" do
_ = insert(:deposit_transaction)
_ = insert(:payment_v1_transaction)
_ = insert_transaction(:payment_v1_transaction)
{:ok, %{"new-block" => block}} = Block.form()
transactions = Repo.all(from(t in Transaction, where: t.block_id == ^block.id))

assert length(transactions) == 1
assert block.state == :pending_submission
end

test "generates the block hash" do
_ = insert(:deposit_transaction)
txn1 = insert(:payment_v1_transaction)
txn1 = insert_transaction(:payment_v1_transaction)
hash = Merkle.root_hash([Transaction.encode_unsigned(txn1)])

assert {:ok, %{"hash-block" => block}} = Block.form()
Expand Down Expand Up @@ -122,7 +123,8 @@ defmodule Engine.DB.BlockTest do
assert block1.nonce == 1
assert block1.blknum == 1_000

_ = insert(:payment_v1_transaction)
tx = build(:payment_v1_transaction)
_ = Transaction.insert(tx)

assert {:ok, %{"hash-block" => block2}} = Block.form()
assert block2.nonce == 2
Expand Down Expand Up @@ -193,6 +195,12 @@ defmodule Engine.DB.BlockTest do
insert(:block, %{
nonce: nonce,
blknum: blknum,
state:
if index < 8 do
:confirmed
else
:pending_submission
end,
submitted_at_ethereum_height: index,
formed_at_ethereum_height: index,
attempts_counter: 1
Expand Down Expand Up @@ -244,6 +252,12 @@ defmodule Engine.DB.BlockTest do
insert(:block, %{
nonce: nonce,
blknum: blknum,
state:
if index < 8 do
:confirmed
else
:pending_submission
end,
submitted_at_ethereum_height: index,
formed_at_ethereum_height: index,
attempts_counter: 1
Expand Down Expand Up @@ -304,6 +318,12 @@ defmodule Engine.DB.BlockTest do
insert(:block, %{
nonce: nonce,
blknum: blknum,
state:
if index < 8 do
:confirmed
else
:pending_submission
end,
submitted_at_ethereum_height: index,
formed_at_ethereum_height: index
})
Expand Down Expand Up @@ -337,11 +357,12 @@ defmodule Engine.DB.BlockTest do
blknum = 2000
my_current_eth_height = 6

# just insert 10 blocks that were created over 10 eth blocks
# just insert a block

insert(:block, %{
nonce: nonce,
blknum: blknum,
state: :submitted,
submitted_at_ethereum_height: my_current_eth_height,
formed_at_ethereum_height: my_current_eth_height
})
Expand Down Expand Up @@ -377,6 +398,7 @@ defmodule Engine.DB.BlockTest do
insert(:block, %{
nonce: nonce,
blknum: blknum,
state: :confirmed,
submitted_at_ethereum_height: index,
formed_at_ethereum_height: index,
attempts_counter: 1
Expand Down Expand Up @@ -432,4 +454,10 @@ defmodule Engine.DB.BlockTest do
Enum.reverse(acc)
end
end

defp insert_transaction(kind) do
tx = build(kind)
{:ok, %{"new-transaction" => tx}} = Transaction.insert(tx)
tx
end
end
Loading

0 comments on commit 3d9b6a8

Please sign in to comment.