Skip to content

Commit

Permalink
WIP: add a block write task and channels to the state
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 committed Sep 12, 2022
1 parent 67135ee commit 6ba58f8
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 31 deletions.
83 changes: 67 additions & 16 deletions zebra-state/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::{
convert,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};
Expand Down Expand Up @@ -98,19 +99,34 @@ pub(crate) struct StateService {
/// The configured Zcash network.
network: Network,

// Queued Blocks
//
/// Blocks for the [`NonFinalizedState`], which are awaiting their parent blocks
/// before they can do contextual verification.
queued_blocks: QueuedBlocks,

// Exclusively Writeable State
//
/// The non-finalized chain state, including its in-memory chain forks.
mem: NonFinalizedState,

/// The finalized chain state, including its on-disk database.
pub(crate) disk: FinalizedState,

/// The non-finalized chain state, including its in-memory chain forks.
mem: NonFinalizedState,
/// A channel to send blocks to the `block_write_task`,
/// so they can be written to the [`NonFinalizedState`].
//
// TODO: actually send blocks on this channel
_block_write_sender: tokio::sync::mpsc::UnboundedSender<PreparedBlock>,

// Queued Blocks
/// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
/// once the queues have received all their parent blocks.
///
/// Used to check for panics when writing blocks.
//
/// Blocks for the [`NonFinalizedState`], which are awaiting their parent blocks
/// before they can do contextual verification.
queued_blocks: QueuedBlocks,
// TODO: actually check for panics
// work out if we need to shut it down when the service is shutting down
_block_write_task: Arc<std::thread::JoinHandle<()>>,

// Pending UTXO Request Tracking
//
Expand Down Expand Up @@ -155,6 +171,12 @@ pub struct ReadStateService {

// Shared Concurrently Readable State
//
/// A watch channel for a recent [`NonFinalizedState`].
///
/// This state is only updated between requests,
/// so it might include some block data that is also on `disk`.
non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,

/// The shared inner on-disk database for the finalized state.
///
/// RocksDB allows reads and writes via a shared reference,
Expand All @@ -164,11 +186,13 @@ pub struct ReadStateService {
/// so it might include some block data that is also in `best_mem`.
db: ZebraDb,

/// A watch channel for a recent [`NonFinalizedState`].
/// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
/// once the queues have received all their parent blocks.
///
/// This state is only updated between requests,
/// so it might include some block data that is also on `disk`.
non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
/// Used to check for panics when writing blocks.
//
// TODO: actually check for panics
_block_write_task: Arc<std::thread::JoinHandle<()>>,
}

impl StateService {
Expand All @@ -182,7 +206,8 @@ impl StateService {
network: Network,
) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
let timer = CodeTimer::start();
let disk = FinalizedState::new(&config, network);

let (disk, mut finalized_block_write_receiver) = FinalizedState::new(&config, network);
timer.finish(module_path!(), line!(), "opening finalized state database");

let timer = CodeTimer::start();
Expand All @@ -199,16 +224,37 @@ impl StateService {

let mem = NonFinalizedState::new(network);

let (read_service, non_finalized_state_sender) = ReadStateService::new(&disk);
// Security: The number of blocks in this channel is limited by
// the syncer and inbound lookahead limits.
let (non_finalized_block_write_sender, mut non_finalized_block_write_receiver) =
tokio::sync::mpsc::unbounded_channel();

// TODO: actually write blocks here
// move this into a state service method?
let block_write_task = std::thread::spawn(move || {
while let Some(_block) = finalized_block_write_receiver.blocking_recv() {
unimplemented!("handle finalized block writes here")
}

while let Some(_block) = non_finalized_block_write_receiver.blocking_recv() {
unimplemented!("handle non-finalized block writes here")
}
});
let block_write_task = Arc::new(block_write_task);

let (read_service, non_finalized_state_sender) =
ReadStateService::new(&disk, block_write_task.clone());

let queued_blocks = QueuedBlocks::default();
let pending_utxos = PendingUtxos::default();

let state = Self {
network,
disk,
mem,
queued_blocks,
mem,
disk,
_block_write_sender: non_finalized_block_write_sender,
_block_write_task: block_write_task,
pending_utxos,
last_prune: Instant::now(),
chain_tip_sender,
Expand Down Expand Up @@ -513,18 +559,23 @@ impl StateService {
}

impl ReadStateService {
/// Creates a new read-only state service, using the provided finalized state.
/// Creates a new read-only state service, using the provided finalized state and
/// block write task handle.
///
/// Returns the newly created service,
/// and a watch channel for updating the shared recent non-finalized chain.
pub(crate) fn new(disk: &FinalizedState) -> (Self, watch::Sender<NonFinalizedState>) {
pub(crate) fn new(
disk: &FinalizedState,
block_write_task: Arc<std::thread::JoinHandle<()>>,
) -> (Self, watch::Sender<NonFinalizedState>) {
let (non_finalized_state_sender, non_finalized_state_receiver) =
watch::channel(NonFinalizedState::new(disk.network()));

let read_service = Self {
network: disk.network(),
db: disk.db().clone(),
non_finalized_state_receiver: WatchReceiver::new(non_finalized_state_receiver),
_block_write_task: block_write_task,
};

tracing::info!("created new read-only state service");
Expand Down
65 changes: 50 additions & 15 deletions zebra-state/src/service/finalized_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,37 +47,72 @@ pub(super) use zebra_db::ZebraDb;
/// The finalized part of the chain state, stored in the db.
#[derive(Debug)]
pub struct FinalizedState {
/// The underlying database.
db: ZebraDb,
// Configuration
//
/// The configured network.
network: Network,

/// The configured stop height.
///
/// Commit blocks to the finalized state up to this height, then exit Zebra.
debug_stop_at_height: Option<block::Height>,

// Queued Blocks
//
/// Queued blocks that arrived out of order, indexed by their parent block hash.
queued_by_prev_hash: HashMap<block::Hash, QueuedFinalized>,

// Owned State
//
/// The underlying database.
///
/// `rocksdb` allows reads and writes via a shared reference,
/// so this database object can be freely cloned.
/// The last instance that is dropped will close the underlying database.
//
// TODO: get rid of this struct member, and just let the [`ReadStateService`]
// and block write task share ownership of the database.
db: ZebraDb,

/// A channel to send blocks to the `block_write_task`,
/// so they can be written to the finalized state's [`ZebraDb`].
//
// TODO: actually send blocks on this channel
_block_write_sender: tokio::sync::mpsc::UnboundedSender<FinalizedBlock>,

// Metrics
//
/// A metric tracking the maximum height that's currently in `queued_by_prev_hash`
///
/// Set to `f64::NAN` if `queued_by_prev_hash` is empty, because grafana shows NaNs
/// as a break in the graph.
max_queued_height: f64,

/// The configured stop height.
///
/// Commit blocks to the finalized state up to this height, then exit Zebra.
debug_stop_at_height: Option<block::Height>,

/// The configured network.
network: Network,
}

impl FinalizedState {
pub fn new(config: &Config, network: Network) -> Self {
/// Returns an on-disk database instance for `config` and `network`.
/// If there is no existing database, creates a new database on disk.
///
/// Also returns a receiver channel for queued blocks that are ready to
/// be written to the database by the block write task.
pub fn new(
config: &Config,
network: Network,
) -> (Self, tokio::sync::mpsc::UnboundedReceiver<FinalizedBlock>) {
let db = ZebraDb::new(config, network);

// Security: The number of blocks in this channel is limited by
// the syncer and inbound lookahead limits.
let (finalized_block_write_sender, finalized_block_write_receiver) =
tokio::sync::mpsc::unbounded_channel();

let new_state = Self {
network,
debug_stop_at_height: config.debug_stop_at_height.map(block::Height),
queued_by_prev_hash: HashMap::new(),
max_queued_height: f64::NAN,
db,
debug_stop_at_height: config.debug_stop_at_height.map(block::Height),
network,
_block_write_sender: finalized_block_write_sender,
max_queued_height: f64::NAN,
};

// TODO: move debug_stop_at_height into a task in the start command (#3442)
Expand Down Expand Up @@ -119,7 +154,7 @@ impl FinalizedState {

tracing::info!(tip = ?new_state.db.tip(), "loaded Zebra state cache");

new_state
(new_state, finalized_block_write_receiver)
}

/// Returns the configured network for this database.
Expand Down

0 comments on commit 6ba58f8

Please sign in to comment.