Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor get block generator #16335

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions chia/consensus/block_body_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from chia.consensus.coinbase import create_farmer_coin, create_pool_coin
from chia.consensus.constants import ConsensusConstants
from chia.consensus.cost_calculator import NPCResult
from chia.consensus.find_fork_point import find_fork_point_in_chain
from chia.full_node.block_store import BlockStore
from chia.full_node.coin_store import CoinStore
from chia.full_node.mempool_check_conditions import get_name_puzzle_conditions, mempool_check_time_locks
Expand Down Expand Up @@ -43,8 +42,8 @@ async def validate_block_body(
block: Union[FullBlock, UnfinishedBlock],
height: uint32,
npc_result: Optional[NPCResult],
fork_point_with_peak: Optional[uint32],
get_block_generator: Callable[[BlockInfo], Awaitable[Optional[BlockGenerator]]],
fork_point_with_peak: int,
get_block_generator: Callable[[BlockInfo, uint32], Awaitable[Optional[BlockGenerator]]],
*,
validate_signature: bool = True,
) -> Tuple[Optional[Err], Optional[NPCResult]]:
Expand Down Expand Up @@ -279,12 +278,6 @@ async def validate_block_body(

# 15. Check if removals exist and were not previously spent. (unspent_db + diff_store + this_block)
# The fork point is the last block in common between the peak chain and the chain of `block`
if peak is None or height == 0:
fork_h: int = -1
elif fork_point_with_peak is not None:
fork_h = fork_point_with_peak
else:
fork_h = find_fork_point_in_chain(blocks, peak, blocks.block_record(block.prev_header_hash))

# Get additions and removals since (after) fork_h but not including this block
# The values include: the coin that was added, the height of the block in which it was confirmed, and the
Expand All @@ -299,22 +292,24 @@ async def validate_block_body(
reorg_blocks: Dict[uint32, FullBlock] = {}
curr: Optional[FullBlock] = prev_block
assert curr is not None
while curr.height > fork_h:
while curr.height > fork_point_with_peak:
if curr.height == 0:
break
curr = await block_store.get_full_block(curr.prev_header_hash)
assert curr is not None
reorg_blocks[curr.height] = curr
if fork_h != -1:
assert len(reorg_blocks) == height - fork_h - 1
if fork_point_with_peak != -1:
assert len(reorg_blocks) == height - fork_point_with_peak - 1

curr = prev_block
assert curr is not None
while curr.height > fork_h:
while curr.height > fork_point_with_peak:
# Coin store doesn't contain coins from fork, we have to run generator for each block in fork
if curr.transactions_generator is not None:
# These blocks are in the past and therefore assumed to be valid, so get_block_generator won't raise
curr_block_generator: Optional[BlockGenerator] = await get_block_generator(curr)
curr_block_generator: Optional[BlockGenerator] = await get_block_generator(
curr, uint32(0 if fork_point_with_peak is None else fork_point_with_peak)
)
assert curr_block_generator is not None and curr.transactions_info is not None
curr_npc_result = get_name_puzzle_conditions(
curr_block_generator,
Expand Down Expand Up @@ -381,10 +376,10 @@ async def validate_block_body(
# can't find in the DB, but also coins that were spent after the fork point
look_in_fork: List[bytes32] = []
for unspent in unspent_records:
if unspent.confirmed_block_index <= fork_h:
if unspent.confirmed_block_index <= fork_point_with_peak:
# Spending something in the current chain, confirmed before fork
# (We ignore all coins confirmed after fork)
if unspent.spent == 1 and unspent.spent_block_index <= fork_h:
if unspent.spent == 1 and unspent.spent_block_index <= fork_point_with_peak:
# Check for coins spent in an ancestor block
return Err.DOUBLE_SPEND, None
removal_coin_records[unspent.name] = unspent
Expand Down
87 changes: 32 additions & 55 deletions chia/consensus/blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ async def add_block(
block,
block.height,
npc_result,
fork_point_with_peak,
block.height - 1 if fork_point_with_peak is None else fork_point_with_peak,
self.get_block_generator,
# If we did not already validate the signature, validate it now
validate_signature=not pre_validation_result.validated_signature,
Expand Down Expand Up @@ -404,10 +404,12 @@ async def _reconsider_peak(
# We need to recompute the additions and removals, since they are not stored on DB (only generator is).
if fetched_block_record.header_hash == block_record.header_hash:
tx_removals, tx_additions, npc_res = await self.get_tx_removals_and_additions(
fetched_full_block, npc_result
fetched_full_block, fork_height, npc_result
)
else:
tx_removals, tx_additions, npc_res = await self.get_tx_removals_and_additions(fetched_full_block, None)
tx_removals, tx_additions, npc_res = await self.get_tx_removals_and_additions(
fetched_full_block, fork_height
)

# Collect the NPC results for later post-processing
if npc_res is not None:
Expand Down Expand Up @@ -438,7 +440,7 @@ async def _reconsider_peak(
)

async def get_tx_removals_and_additions(
self, block: FullBlock, npc_result: Optional[NPCResult] = None
self, block: FullBlock, fork_height: int, npc_result: Optional[NPCResult] = None
) -> Tuple[List[bytes32], List[Coin], Optional[NPCResult]]:
if not block.is_transaction_block():
return [], [], None
Expand All @@ -447,7 +449,7 @@ async def get_tx_removals_and_additions(
return [], [], None

if npc_result is None:
block_generator: Optional[BlockGenerator] = await self.get_block_generator(block)
block_generator: Optional[BlockGenerator] = await self.get_block_generator(block, fork_height)
assert block_generator is not None
npc_result = get_name_puzzle_conditions(
block_generator,
Expand Down Expand Up @@ -629,7 +631,7 @@ async def validate_unfinished_block(
block,
uint32(prev_height + 1),
npc_result,
None,
uint32(prev_height),
self.get_block_generator,
validate_signature=False, # Signature was already validated before calling this method, no need to validate
)
Expand All @@ -647,6 +649,7 @@ async def pre_validate_blocks_multiprocessing(
wp_summaries: Optional[List[SubEpochSummary]] = None,
*,
validate_signatures: bool,
fork_height: Optional[uint32] = None,
) -> List[PreValidationResult]:
return await pre_validate_blocks_multiprocessing(
self.constants,
Expand All @@ -657,6 +660,7 @@ async def pre_validate_blocks_multiprocessing(
npc_results,
self.get_block_generator,
batch_size,
fork_height,
wp_summaries,
validate_signatures=validate_signatures,
)
Expand Down Expand Up @@ -882,7 +886,10 @@ def seen_compact_proofs(self, vdf_info: VDFInfo, height: uint32) -> bool:
return False

async def get_block_generator(
self, block: BlockInfo, additional_blocks: Optional[Dict[bytes32, FullBlock]] = None
self,
block: BlockInfo,
fork: Optional[int] = None,
additional_blocks: Optional[Dict[bytes32, FullBlock]] = None,
) -> Optional[BlockGenerator]:
if additional_blocks is None:
additional_blocks = {}
Expand All @@ -905,56 +912,26 @@ async def get_block_generator(
# (as long as we're in the main chain)
result = await self.block_store.get_generators_at(block.transactions_generator_ref_list)
else:
# First tries to find the blocks in additional_blocks
reorg_chain: Dict[uint32, FullBlock] = {}
curr = block
additional_height_dict = {}
while curr.prev_header_hash in additional_blocks:
prev: FullBlock = additional_blocks[curr.prev_header_hash]
additional_height_dict[prev.height] = prev
if isinstance(curr, FullBlock):
assert curr.height == prev.height + 1
reorg_chain[prev.height] = prev
curr = prev

peak: Optional[BlockRecord] = self.get_peak()
if self.contains_block(curr.prev_header_hash) and peak is not None:
# Then we look up blocks up to fork point one at a time, backtracking
previous_block_hash = curr.prev_header_hash
prev_block_record = await self.block_store.get_block_record(previous_block_hash)
prev_block = await self.block_store.get_full_block(previous_block_hash)
assert prev_block is not None
assert prev_block_record is not None
fork = find_fork_point_in_chain(self, peak, prev_block_record)
curr_2: Optional[FullBlock] = prev_block
assert curr_2 is not None and isinstance(curr_2, FullBlock)
reorg_chain[curr_2.height] = curr_2
while curr_2.height > fork and curr_2.height > 0:
curr_2 = await self.block_store.get_full_block(curr_2.prev_header_hash)
assert curr_2 is not None
reorg_chain[curr_2.height] = curr_2
Comment on lines -921 to -935
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still need, at least, the height-to-hash mapping that reorg_chain provides. I don't think you can remove this whole seciont

Copy link
Contributor Author

@almogdepaz almogdepaz Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok think i get what wrong here.

we traverse all the heights in the end anyway and fetch the missing blocks from db but we do that by height so might get the wrong one, so i just need the mapping to get the hashes this code still redundant, we dont need all the full blocks up to the fork we only need the referenced ones

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think what confused me is the if statement, we assume curr would be in the current chain ?
after that i get that we would get the blocks from the reorg_chain but why would we even get into the if, we dont expect block_records to contain blocks from the reorg

Copy link
Contributor

@arvidn arvidn Oct 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by block_records, you mean the Blockchain object, right? The self. It has access to all blocks, including orphaned blocks and other forks. If we're in a reorg, those blocks are in the Blockchain object as well.

I think the intention of that if-statement, if self.contains_block(curr.prev_header_hash) was to ask if we know of that block, not if we happen to have it in the cache right now. If we assume the intention was the latter, there's another block of code missing here to do the same thing but pull the block hashes from the database.

the additional_blocks is just the current batch of block's we're validating. It does not contain the whole fork. The fork is still only stored in the database at this point, that's where we need to get the generators from.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then i dont get the point of this check, im pretty sure the initial intention was to see if its in the chain

otherwise we are just checking the store to contain something that we expect to be there, in that case i would have expected an assert or some exception being thrown, what is the case then when the block not in blockchain.blockrecords ?


assert fork is not None
almogdepaz marked this conversation as resolved.
Show resolved Hide resolved
prev_full_block = await self.block_store.get_full_block(block.prev_header_hash)
assert prev_full_block is not None
reorg_chain_height_to_hash = {}
block_recs = await self.block_store.get_block_records_in_range(fork, prev_full_block.height)
curr = block_recs[prev_full_block.header_hash]
while curr.height > fork and curr.height > 0:
reorg_chain_height_to_hash[curr.height] = curr.header_hash

# todo aggregate heights and hashes to one query
for ref_height in block.transactions_generator_ref_list:
if ref_height in reorg_chain:
ref_block = reorg_chain[ref_height]
assert ref_block is not None
if ref_block.transactions_generator is None:
if ref_height > fork:
gen = await self.block_store.get_generator(reorg_chain_height_to_hash[ref_height])
if gen is None:
raise ValueError(Err.GENERATOR_REF_HAS_NO_GENERATOR)
result.append(ref_block.transactions_generator)
result.append(gen)
else:
if ref_height in additional_height_dict:
ref_block = additional_height_dict[ref_height]
assert ref_block is not None
if ref_block.transactions_generator is None:
raise ValueError(Err.GENERATOR_REF_HAS_NO_GENERATOR)
result.append(ref_block.transactions_generator)
else:
header_hash = self.height_to_hash(ref_height)
if header_hash is None:
raise ValueError(Err.GENERATOR_REF_HAS_NO_GENERATOR)
gen = await self.block_store.get_generator(header_hash)
if gen is None:
raise ValueError(Err.GENERATOR_REF_HAS_NO_GENERATOR)
result.append(gen)
[gen] = await self.block_store.get_generators_at([ref_height])
if gen is None:
raise ValueError(Err.GENERATOR_REF_HAS_NO_GENERATOR)
result.append(gen)
assert len(result) == len(ref_list)
return BlockGenerator(block.transactions_generator, result, [])
6 changes: 3 additions & 3 deletions chia/consensus/find_fork_point.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from __future__ import annotations

from typing import Union
from typing import Union, Optional

from chia.consensus.block_record import BlockRecord
from chia.consensus.blockchain_interface import BlockchainInterface
from chia.types.header_block import HeaderBlock
from chia.util.ints import uint32


def find_fork_point_in_chain(
Expand All @@ -14,7 +15,7 @@ def find_fork_point_in_chain(
) -> int:
"""Tries to find height where new chain (block_2) diverged from block_1 (assuming prev blocks
are all included in chain)
Returns -1 if chains have no common ancestor
Returns 0 if chains have no common ancestor
* assumes the fork point is loaded in blocks
"""
while block_2.height > 0 or block_1.height > 0:
Expand All @@ -30,6 +31,5 @@ def find_fork_point_in_chain(
if block_2 != block_1:
# All blocks are different
return -1

# First block is the same
return 0
9 changes: 7 additions & 2 deletions chia/consensus/multiprocess_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,11 @@ async def pre_validate_blocks_multiprocessing(
pool: Executor,
check_filter: bool,
npc_results: Dict[uint32, NPCResult],
get_block_generator: Callable[[BlockInfo, Dict[bytes32, FullBlock]], Awaitable[Optional[BlockGenerator]]],
get_block_generator: Callable[
[BlockInfo, Optional[uint32], Dict[bytes32, FullBlock]], Awaitable[Optional[BlockGenerator]]
],
batch_size: int,
fork_height: Optional[uint32],
wp_summaries: Optional[List[SubEpochSummary]] = None,
*,
validate_signatures: bool = True,
Expand Down Expand Up @@ -313,7 +316,9 @@ async def pre_validate_blocks_multiprocessing(
b_pickled = []
b_pickled.append(bytes(block))
try:
block_generator: Optional[BlockGenerator] = await get_block_generator(block, prev_blocks_dict)
block_generator: Optional[BlockGenerator] = await get_block_generator(
block, fork_height, prev_blocks_dict
)
except ValueError:
return [
PreValidationResult(
Expand Down
8 changes: 6 additions & 2 deletions chia/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ async def short_sync_batch(self, peer: WSChiaConnection, start_height: uint32, t
raise ValueError(f"Error short batch syncing, invalid/no response for {height}-{end_height}")
async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high):
state_change_summary: Optional[StateChangeSummary]
success, state_change_summary, _ = await self.add_block_batch(response.blocks, peer, None)
success, state_change_summary, _ = await self.add_block_batch(response.blocks, peer, start_height)
if not success:
raise ValueError(f"Error short batch syncing, failed to validate blocks {height}-{end_height}")
if state_change_summary is not None:
Expand Down Expand Up @@ -1202,7 +1202,11 @@ async def add_block_batch(
# for these blocks (unlike during normal operation where we validate one at a time)
pre_validate_start = time.monotonic()
pre_validation_results: List[PreValidationResult] = await self.blockchain.pre_validate_blocks_multiprocessing(
blocks_to_validate, {}, wp_summaries=wp_summaries, validate_signatures=True
blocks_to_validate,
{},
wp_summaries=wp_summaries,
validate_signatures=True,
fork_height=fork_point,
)
pre_validate_end = time.monotonic()
pre_validate_time = pre_validate_end - pre_validate_start
Expand Down
5 changes: 4 additions & 1 deletion tests/blockchain/blockchain_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ async def _validate_and_add_block(
else:
# Do not change this, validate_signatures must be False
pre_validation_results: List[PreValidationResult] = await blockchain.pre_validate_blocks_multiprocessing(
[block], {}, validate_signatures=False
[block],
{},
validate_signatures=False,
fork_height=fork_point_with_peak,
)
assert pre_validation_results is not None
results = pre_validation_results[0]
Expand Down
11 changes: 7 additions & 4 deletions tests/blockchain/test_blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -3196,7 +3196,7 @@ async def test_reorg_from_genesis(self, empty_blockchain, bt):
assert b.get_peak().height == 14

# Reorg to alternate chain that is 1 height longer
blocks_reorg_chain = bt.get_consecutive_blocks(16, [], seed=b"2")
blocks_reorg_chain = bt.get_consecutive_blocks(15, [blocks[0]], seed=b"2")
for reorg_block in blocks_reorg_chain:
if reorg_block.height < 15:
await _validate_and_add_block_multi_result(
Expand Down Expand Up @@ -3380,6 +3380,7 @@ async def test_reorg_new_ref(empty_blockchain, bt):
else:
expected = AddBlockResult.NEW_PEAK
fork_point_with_peak = uint32(1)
log.info(f"check block {block.height}")
await _validate_and_add_block(b, block, expected_result=expected, fork_point_with_peak=fork_point_with_peak)
assert b.get_peak().height == 20

Expand Down Expand Up @@ -3430,7 +3431,7 @@ async def test_reorg_stale_fork_height(empty_blockchain, bt):

# fake the fork_height to make every new block look like a reorg
for block in blocks[5:]:
await _validate_and_add_block(b, block, expected_result=AddBlockResult.NEW_PEAK, fork_point_with_peak=2)
await _validate_and_add_block(b, block, expected_result=AddBlockResult.NEW_PEAK, fork_point_with_peak=uint32(2))
assert b.get_peak().height == 13


Expand Down Expand Up @@ -3585,12 +3586,14 @@ async def test_reorg_flip_flop(empty_blockchain, bt):
fork_height = 2 if counter > 3 else None

preval: List[PreValidationResult] = await b.pre_validate_blocks_multiprocessing(
[block1], {}, validate_signatures=False
[block1], {}, validate_signatures=False, fork_height=fork_height
)
result, err, _ = await b.add_block(block1, preval[0], fork_point_with_peak=fork_height)
assert not err
preval: List[PreValidationResult] = await b.pre_validate_blocks_multiprocessing(
[block2], {}, validate_signatures=False
[block2],
{},
validate_signatures=False,
)
result, err, _ = await b.add_block(block2, preval[0], fork_point_with_peak=fork_height)
assert not err
Expand Down
6 changes: 3 additions & 3 deletions tests/core/full_node/test_full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ async def check_transaction_confirmed(transaction) -> bool:
else:
assert template is not None
if test_reorgs:
reog_blocks = bt.get_consecutive_blocks(14)
reog_blocks: List[FullBlock] = bt.get_consecutive_blocks(14)
for r in range(0, len(reog_blocks), 3):
for reorg_block in reog_blocks[:r]:
await _validate_and_add_block_no_error(blockchain, reorg_block)
Expand Down Expand Up @@ -495,10 +495,10 @@ async def test_basic_chain(self, wallet_nodes, self_hostname):

assert full_node_1.full_node.blockchain.get_peak().height == 0

for block in bt.get_consecutive_blocks(30):
for block in bt.get_consecutive_blocks(30, block_list_input=blocks):
await full_node_1.full_node.add_block(block, peer)

assert full_node_1.full_node.blockchain.get_peak().height == 29
assert full_node_1.full_node.blockchain.get_peak().height == 30

@pytest.mark.asyncio
async def test_respond_end_of_sub_slot(self, wallet_nodes, self_hostname):
Expand Down
Loading
Loading