Skip to content

Commit

Permalink
add(scan): Create a tower Service in zebra-scan (#8185)
Browse files Browse the repository at this point in the history
* Adds ScanService and ScanTask

* renames ScannerCommand to ScanTaskCommand

* fixes doc errors

* fixes clippy lints

* panic if the scan task finishes unexpectedly

* updates TODOs

---------

Co-authored-by: Marek <[email protected]>
  • Loading branch information
arya2 and upbqdn authored Jan 24, 2024
1 parent d231b3b commit 513ace2
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 2 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5862,6 +5862,7 @@ dependencies = [
"chrono",
"color-eyre",
"ff",
"futures",
"group",
"indexmap 2.1.0",
"insta",
Expand All @@ -5879,6 +5880,7 @@ dependencies = [
"zcash_note_encryption",
"zcash_primitives",
"zebra-chain",
"zebra-node-services",
"zebra-state",
"zebra-test",
]
Expand Down
2 changes: 2 additions & 0 deletions zebra-node-services/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ pub mod rpc_client;
/// non-'static lifetimes, (e.g., when a type contains a borrow and is
/// parameterized by 'a), *not* that the object itself has 'static lifetime.
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

pub mod scan_service;
4 changes: 4 additions & 0 deletions zebra-node-services/src/scan_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
//! Request and response types for zebra-scan tower service.
pub mod request;
pub mod response;
23 changes: 23 additions & 0 deletions zebra-node-services/src/scan_service/request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//! `zebra_scan::service::ScanService` request types.
#[derive(Debug)]
/// Request types for `zebra_scan::service::ScanService`
pub enum Request {
/// TODO: Accept `KeyHash`es and return key hashes that are registered
CheckKeyHashes(Vec<()>),

/// TODO: Accept `ViewingKeyWithHash`es and return Ok(()) if successful or an error
RegisterKeys(Vec<()>),

/// TODO: Accept `KeyHash`es and return Ok(`Vec<KeyHash>`) with hashes of deleted keys
DeleteKeys(Vec<()>),

/// TODO: Accept `KeyHash`es and return `Transaction`s
Results(Vec<()>),

/// TODO: Accept `KeyHash`es and return a channel receiver
SubscribeResults(Vec<()>),

/// TODO: Accept `KeyHash`es and return transaction ids
ClearResults(Vec<()>),
}
15 changes: 15 additions & 0 deletions zebra-node-services/src/scan_service/response.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
//! `zebra_scan::service::ScanService` response types.
use std::sync::{mpsc, Arc};

use zebra_chain::transaction::Transaction;

#[derive(Debug)]
/// Response types for `zebra_scan::service::ScanService`
pub enum Response {
/// Response to Results request
Results(Vec<Transaction>),

/// Response to SubscribeResults request
SubscribeResults(mpsc::Receiver<Arc<Transaction>>),
}
2 changes: 2 additions & 0 deletions zebra-scan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ serde = { version = "1.0.193", features = ["serde_derive"] }
tokio = { version = "1.35.1", features = ["time"] }
tower = "0.4.13"
tracing = "0.1.39"
futures = "0.3.30"

zcash_client_backend = "0.10.0-rc.1"
zcash_primitives = "0.13.0-rc.1"

zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.34" }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.34", features = ["shielded-scan"] }
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.33" }

chrono = { version = "0.4.32", default-features = false, features = ["clock", "std", "serde"] }

Expand Down
67 changes: 65 additions & 2 deletions zebra-scan/src/init.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,77 @@
//! Initializing the scanner.
use std::sync::{mpsc, Arc};

use color_eyre::Report;
use tokio::task::JoinHandle;
use tokio::{sync::oneshot, task::JoinHandle};
use tracing::Instrument;

use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network};
use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network, transaction::Transaction};
use zebra_state::ChainTipChange;

use crate::{scan, storage::Storage, Config};

#[derive(Debug)]
/// Commands that can be sent to [`ScanTask`]
pub enum ScanTaskCommand {
/// Start scanning for new viewing keys
RegisterKeys(Vec<()>), // TODO: send `ViewingKeyWithHash`es

/// Stop scanning for deleted viewing keys
RemoveKeys {
/// Notify the caller once the key is removed (so the caller can wait before clearing results)
done_tx: oneshot::Sender<()>,

/// Key hashes that are to be removed
key_hashes: Vec<()>,
},

/// Start sending results for key hashes to `result_sender`
SubscribeResults {
/// Sender for results
result_sender: mpsc::Sender<Arc<Transaction>>,

/// Key hashes to send the results of to result channel
key_hashes: Vec<()>,
},
}

#[derive(Debug)]
/// Scan task handle and command channel sender
pub struct ScanTask {
/// [`JoinHandle`] of scan task
pub handle: JoinHandle<Result<(), Report>>,

/// Task command channel sender
cmd_sender: mpsc::Sender<ScanTaskCommand>,
}

impl ScanTask {
/// Spawns a new [`ScanTask`].
pub fn spawn(
config: &Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Self {
// TODO: Pass `_cmd_receiver` to `scan::start()` to pass it new keys after it's been spawned
let (cmd_sender, _cmd_receiver) = mpsc::channel();

Self {
handle: spawn_init(config, network, state, chain_tip_change),
cmd_sender,
}
}

/// Sends a command to the scan task
pub fn send(
&mut self,
command: ScanTaskCommand,
) -> Result<(), mpsc::SendError<ScanTaskCommand>> {
self.cmd_sender.send(command)
}
}

/// Initialize the scanner based on its config, and spawn a task for it.
///
/// TODO: add a test for this function.
Expand Down
3 changes: 3 additions & 0 deletions zebra-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub mod init;
pub mod scan;
pub mod storage;

use zebra_node_services::scan_service::{request::Request, response::Response};

mod service;
#[cfg(any(test, feature = "proptest-impl"))]
pub mod tests;

Expand Down
89 changes: 89 additions & 0 deletions zebra-scan/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//! [`tower::Service`] for zebra-scan.
use std::{future::Future, pin::Pin, task::Poll};

use futures::future::FutureExt;
use tower::Service;

use zebra_chain::parameters::Network;
use zebra_state::ChainTipChange;

use crate::{init::ScanTask, scan, storage::Storage, Config, Request, Response};

/// Zebra-scan [`tower::Service`]
#[derive(Debug)]
pub struct ScanService {
/// On-disk storage
db: Storage,

/// Handle to scan task that's responsible for writing results
scan_task: ScanTask,
}

impl ScanService {
/// Create a new [`ScanService`].
pub fn _new(
config: &Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Self {
Self {
db: Storage::new(config, network, false),
scan_task: ScanTask::spawn(config, network, state, chain_tip_change),
}
}
}

impl Service<Request> for ScanService {
type Response = Response;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
// TODO: If scan task returns an error, add error to the panic message
assert!(
!self.scan_task.handle.is_finished(),
"scan task finished unexpectedly"
);

self.db.check_for_panics();

Poll::Ready(Ok(()))
}

fn call(&mut self, req: Request) -> Self::Future {
match req {
Request::CheckKeyHashes(_key_hashes) => {
// TODO: check that these entries exist in db
}

Request::RegisterKeys(_viewing_key_with_hashes) => {
// TODO:
// - add these keys as entries in db
// - send new keys to scan task
}

Request::DeleteKeys(_key_hashes) => {
// TODO:
// - delete these keys and their results from db
// - send deleted keys to scan task
}

Request::Results(_key_hashes) => {
// TODO: read results from db
}

Request::SubscribeResults(_key_hashes) => {
// TODO: send key_hashes and mpsc::Sender to scanner task, return mpsc::Receiver to caller
}

Request::ClearResults(_key_hashes) => {
// TODO: clear results for these keys from db
}
}

async move { Ok(Response::Results(vec![])) }.boxed()
}
}

0 comments on commit 513ace2

Please sign in to comment.