Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add(scan): Create a tower Service in zebra-scan #8185

Merged
merged 7 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5862,6 +5862,7 @@ dependencies = [
"chrono",
"color-eyre",
"ff",
"futures",
"group",
"indexmap 2.1.0",
"insta",
Expand All @@ -5879,6 +5880,7 @@ dependencies = [
"zcash_note_encryption",
"zcash_primitives",
"zebra-chain",
"zebra-node-services",
"zebra-state",
"zebra-test",
]
Expand Down
2 changes: 2 additions & 0 deletions zebra-node-services/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ pub mod rpc_client;
/// non-'static lifetimes, (e.g., when a type contains a borrow and is
/// parameterized by 'a), *not* that the object itself has 'static lifetime.
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

pub mod scan_service;
4 changes: 4 additions & 0 deletions zebra-node-services/src/scan_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
//! Request and response types for zebra-scan tower service.

pub mod request;
pub mod response;
23 changes: 23 additions & 0 deletions zebra-node-services/src/scan_service/request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//! `zebra_scan::service::ScanService` request types.

#[derive(Debug)]
/// Request types for `zebra_scan::service::ScanService`
pub enum Request {
/// TODO: Accept `KeyHash`es and return key hashes that are registered
CheckKeyHashes(Vec<()>),

/// TODO: Accept `ViewingKeyWithHash`es and return Ok(()) if successful or an error
RegisterKeys(Vec<()>),

/// TODO: Accept `KeyHash`es and return Ok(`Vec<KeyHash>`) with hashes of deleted keys
DeleteKeys(Vec<()>),

/// TODO: Accept `KeyHash`es and return `Transaction`s
Results(Vec<()>),

/// TODO: Accept `KeyHash`es and return a channel receiver
SubscribeResults(Vec<()>),

/// TODO: Accept `KeyHash`es and return transaction ids
ClearResults(Vec<()>),
}
15 changes: 15 additions & 0 deletions zebra-node-services/src/scan_service/response.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
//! `zebra_scan::service::ScanService` response types.

use std::sync::{mpsc, Arc};

use zebra_chain::transaction::Transaction;

#[derive(Debug)]
/// Response types for `zebra_scan::service::ScanService`
pub enum Response {
/// Response to Results request
Results(Vec<Transaction>),

/// Response to SubscribeResults request
SubscribeResults(mpsc::Receiver<Arc<Transaction>>),
}
2 changes: 2 additions & 0 deletions zebra-scan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ serde = { version = "1.0.193", features = ["serde_derive"] }
tokio = { version = "1.35.1", features = ["time"] }
tower = "0.4.13"
tracing = "0.1.39"
futures = "0.3.30"

zcash_client_backend = "0.10.0-rc.1"
zcash_primitives = "0.13.0-rc.1"

zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.34" }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.34", features = ["shielded-scan"] }
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.33" }

chrono = { version = "0.4.31", default-features = false, features = ["clock", "std", "serde"] }

Expand Down
67 changes: 65 additions & 2 deletions zebra-scan/src/init.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,77 @@
//! Initializing the scanner.

use std::sync::{mpsc, Arc};

use color_eyre::Report;
use tokio::task::JoinHandle;
use tokio::{sync::oneshot, task::JoinHandle};
use tracing::Instrument;

use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network};
use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network, transaction::Transaction};
use zebra_state::ChainTipChange;

use crate::{scan, storage::Storage, Config};

#[derive(Debug)]
/// Commands that can be sent to [`ScanTask`]
pub enum ScanTaskCommand {
/// Start scanning for new viewing keys
RegisterKeys(Vec<()>), // TODO: send `ViewingKeyWithHash`es

/// Stop scanning for deleted viewing keys
RemoveKeys {
/// Notify the caller once the key is removed (so the caller can wait before clearing results)
done_tx: oneshot::Sender<()>,

/// Key hashes that are to be removed
key_hashes: Vec<()>,
},

/// Start sending results for key hashes to `result_sender`
SubscribeResults {
/// Sender for results
result_sender: mpsc::Sender<Arc<Transaction>>,

/// Key hashes to send the results of to result channel
key_hashes: Vec<()>,
},
}

#[derive(Debug)]
/// Scan task handle and command channel sender
pub struct ScanTask {
/// [`JoinHandle`] of scan task
pub handle: JoinHandle<Result<(), Report>>,

/// Task command channel sender
cmd_sender: mpsc::Sender<ScanTaskCommand>,
}

impl ScanTask {
/// Spawns a new [`ScanTask`].
pub fn spawn(
config: &Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Self {
// TODO: Pass `_cmd_receiver` to `scan::start()` to pass it new keys after it's been spawned
let (cmd_sender, _cmd_receiver) = mpsc::channel();

Self {
handle: spawn_init(config, network, state, chain_tip_change),
cmd_sender,
}
}

/// Sends a command to the scan task
pub fn send(
&mut self,
command: ScanTaskCommand,
) -> Result<(), mpsc::SendError<ScanTaskCommand>> {
self.cmd_sender.send(command)
}
}

/// Initialize the scanner based on its config, and spawn a task for it.
///
/// TODO: add a test for this function.
Expand Down
3 changes: 3 additions & 0 deletions zebra-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub mod init;
pub mod scan;
pub mod storage;

use zebra_node_services::scan_service::{request::Request, response::Response};

mod service;
#[cfg(any(test, feature = "proptest-impl"))]
pub mod tests;

Expand Down
89 changes: 89 additions & 0 deletions zebra-scan/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//! [`tower::Service`] for zebra-scan.

use std::{future::Future, pin::Pin, task::Poll};

use futures::future::FutureExt;
use tower::Service;

use zebra_chain::parameters::Network;
use zebra_state::ChainTipChange;

use crate::{init::ScanTask, scan, storage::Storage, Config, Request, Response};

/// Zebra-scan [`tower::Service`]
#[derive(Debug)]
pub struct ScanService {
/// On-disk storage
db: Storage,

/// Handle to scan task that's responsible for writing results
scan_task: ScanTask,
}

impl ScanService {
/// Create a new [`ScanService`].
pub fn _new(
config: &Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Self {
Self {
db: Storage::new(config, network, false),
scan_task: ScanTask::spawn(config, network, state, chain_tip_change),
}
}
}

impl Service<Request> for ScanService {
type Response = Response;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
// TODO: If scan task returns an error, add error to the panic message
assert!(
!self.scan_task.handle.is_finished(),
"scan task finished unexpectedly"
);

self.db.check_for_panics();

Poll::Ready(Ok(()))
}

fn call(&mut self, req: Request) -> Self::Future {
match req {
Request::CheckKeyHashes(_key_hashes) => {
// TODO: check that these entries exist in db
}

Request::RegisterKeys(_viewing_key_with_hashes) => {
// TODO:
// - add these keys as entries in db
// - send new keys to scan task
}

Request::DeleteKeys(_key_hashes) => {
// TODO:
// - delete these keys and their results from db
// - send deleted keys to scan task
}

Request::Results(_key_hashes) => {
// TODO: read results from db
}

Request::SubscribeResults(_key_hashes) => {
// TODO: send key_hashes and mpsc::Sender to scanner task, return mpsc::Receiver to caller
}

Request::ClearResults(_key_hashes) => {
// TODO: clear results for these keys from db
}
}

async move { Ok(Response::Results(vec![])) }.boxed()
}
}
Loading