From e7181097a92d26ad3f86b1cb91a5239405bec212 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Thu, 27 Jan 2022 18:35:26 -0300 Subject: [PATCH 1/2] refactor: Streamline access shared utilities --- src/bin/oura/daemon.rs | 50 ++++++++------- src/bin/oura/dump.rs | 44 ++++++------- src/bin/oura/watch.rs | 35 +++++----- src/lib.rs | 2 +- src/mapper/crawl.rs | 3 +- src/mapper/map.rs | 3 +- src/mapper/prelude.rs | 98 +++------------------------- src/sources/common.rs | 8 ++- src/sources/n2c/setup.rs | 25 ++++---- src/sources/n2n/setup.rs | 25 ++++---- src/utils/bech32.rs | 9 +++ src/utils/mod.rs | 134 ++++++++++++++++++++++++++++++++++++++- src/utils/time.rs | 12 +++- 13 files changed, 264 insertions(+), 184 deletions(-) diff --git a/src/bin/oura/daemon.rs b/src/bin/oura/daemon.rs index 3e714565..0297e173 100644 --- a/src/bin/oura/daemon.rs +++ b/src/bin/oura/daemon.rs @@ -1,22 +1,29 @@ -use std::thread::JoinHandle; +use std::{sync::Arc, thread::JoinHandle}; use clap::ArgMatches; use config::{Config, ConfigError, Environment, File}; use log::debug; -use oura::pipelining::{ - BootstrapResult, FilterProvider, PartialBootstrapResult, SinkProvider, SourceProvider, - StageReceiver, +use serde::Deserialize; + +use oura::{ + pipelining::{ + BootstrapResult, FilterProvider, PartialBootstrapResult, SinkProvider, SourceProvider, + StageReceiver, + }, + utils::{ChainWellKnownInfo, Utils, WithUtils}, + Error, }; + +use oura::filters::noop::Config as NoopFilterConfig; +use oura::filters::selection::Config as SelectionConfig; use oura::sinks::stdout::Config as StdoutConfig; use oura::sinks::terminal::Config as TerminalConfig; +use oura::sources::n2c::Config as N2CConfig; +use oura::sources::n2n::Config as N2NConfig; #[cfg(feature = "logs")] use oura::sinks::logs::Config as WriterConfig; -use oura::sources::n2c::Config as N2CConfig; -use oura::sources::n2n::Config as N2NConfig; -use serde::Deserialize; - #[cfg(feature = "webhook")] use oura::sinks::webhook::Config as WebhookConfig; @@ -26,14 +33,9 @@ use oura::sinks::kafka::Config as KafkaConfig; #[cfg(feature = "elasticsink")] use oura::sinks::elastic::Config as ElasticConfig; -use oura::filters::noop::Config as NoopFilterConfig; -use oura::filters::selection::Config as SelectionConfig; - #[cfg(feature = "fingerprint")] use oura::filters::fingerprint::Config as FingerprintConfig; -use crate::Error; - #[derive(Debug, Deserialize)] #[serde(tag = "type")] enum Source { @@ -41,12 +43,10 @@ enum Source { N2N(N2NConfig), } -impl SourceProvider for Source { - fn bootstrap(&self) -> PartialBootstrapResult { - match self { - Source::N2C(c) => c.bootstrap(), - Source::N2N(c) => c.bootstrap(), - } +fn bootstrap_source(config: Source, utils: Arc) -> PartialBootstrapResult { + match config { + Source::N2C(config) => WithUtils::new(config, utils).bootstrap(), + Source::N2N(config) => WithUtils::new(config, utils).bootstrap(), } } @@ -120,6 +120,8 @@ struct ConfigRoot { filters: Vec, sink: Sink, + + chain: Option, } impl ConfigRoot { @@ -145,10 +147,14 @@ impl ConfigRoot { } /// Sets up the whole pipeline from configuration -fn bootstrap(config: &ConfigRoot) -> Result>, Error> { +fn bootstrap(config: ConfigRoot) -> Result>, Error> { + let well_known = config.chain.unwrap_or_default(); + + let utils = Arc::new(Utils::new(well_known)); + let mut threads = Vec::with_capacity(10); - let (source_handle, source_rx) = config.source.bootstrap()?; + let (source_handle, source_rx) = bootstrap_source(config.source, utils)?; threads.push(source_handle); let mut last_rx = source_rx; @@ -177,7 +183,7 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { debug!("daemon starting with this config: {:?}", root); - let threads = bootstrap(&root)?; + let threads = bootstrap(root)?; // TODO: refactor into new loop that monitors thread health for handle in threads { diff --git a/src/bin/oura/dump.rs b/src/bin/oura/dump.rs index 88899a93..77478072 100644 --- a/src/bin/oura/dump.rs +++ b/src/bin/oura/dump.rs @@ -1,24 +1,22 @@ -use std::str::FromStr; +use serde::Deserialize; +use std::{str::FromStr, sync::Arc}; use clap::ArgMatches; + use oura::{ mapper::Config as MapperConfig, - pipelining::{ - BootstrapResult, PartialBootstrapResult, SinkProvider, SourceProvider, StageReceiver, - }, - sources::{AddressArg, BearerKind}, + pipelining::{BootstrapResult, SinkProvider, SourceProvider, StageReceiver}, + sources::{AddressArg, BearerKind, MagicArg}, + utils::{ChainWellKnownInfo, Utils, WithUtils}, }; -use serde::Deserialize; - +use oura::sinks::stdout::Config as StdoutConfig; use oura::sources::n2c::Config as N2CConfig; use oura::sources::n2n::Config as N2NConfig; #[cfg(feature = "logs")] use oura::sinks::logs::Config as LogsConfig; -use oura::sinks::stdout::Config as StdoutConfig; - use crate::Error; #[derive(Clone, Debug, Deserialize)] @@ -44,15 +42,6 @@ enum DumpSource { N2N(N2NConfig), } -impl SourceProvider for DumpSource { - fn bootstrap(&self) -> PartialBootstrapResult { - match self { - DumpSource::N2C(c) => c.bootstrap(), - DumpSource::N2N(c) => c.bootstrap(), - } - } -} - enum DumpSink { Stdout(StdoutConfig), #[cfg(feature = "logs")] @@ -85,8 +74,8 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { }; let magic = match args.is_present("magic") { - true => Some(args.value_of_t("magic")?), - false => None, + true => args.value_of_t("magic")?, + false => MagicArg::default(), }; let since = match args.is_present("since") { @@ -114,17 +103,22 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { ..Default::default() }; + let well_known = ChainWellKnownInfo::try_from_magic(*magic)?; + + let utils = Arc::new(Utils::new(well_known)); + + #[allow(deprecated)] let source_setup = match mode { PeerMode::AsNode => DumpSource::N2N(N2NConfig { address: AddressArg(bearer, socket), - magic, + magic: Some(magic), well_known: None, mapper, since, }), PeerMode::AsClient => DumpSource::N2C(N2CConfig { address: AddressArg(bearer, socket), - magic, + magic: Some(magic), well_known: None, mapper, since, @@ -142,7 +136,11 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { }), }; - let (source_handle, source_output) = source_setup.bootstrap()?; + let (source_handle, source_output) = match source_setup { + DumpSource::N2C(c) => WithUtils::new(c, utils).bootstrap()?, + DumpSource::N2N(c) => WithUtils::new(c, utils).bootstrap()?, + }; + let sink_handle = sink_setup.bootstrap(source_output)?; log::info!( diff --git a/src/bin/oura/watch.rs b/src/bin/oura/watch.rs index 26692476..07eafc98 100644 --- a/src/bin/oura/watch.rs +++ b/src/bin/oura/watch.rs @@ -1,10 +1,11 @@ -use std::str::FromStr; +use std::{str::FromStr, sync::Arc}; use clap::ArgMatches; use oura::{ mapper::Config as MapperConfig, - pipelining::{PartialBootstrapResult, SinkProvider, SourceProvider}, - sources::{AddressArg, BearerKind}, + pipelining::{SinkProvider, SourceProvider}, + sources::{AddressArg, BearerKind, MagicArg}, + utils::{ChainWellKnownInfo, Utils, WithUtils}, }; use serde::Deserialize; @@ -37,15 +38,6 @@ enum WatchSource { N2N(N2NConfig), } -impl SourceProvider for WatchSource { - fn bootstrap(&self) -> PartialBootstrapResult { - match self { - WatchSource::N2C(c) => c.bootstrap(), - WatchSource::N2N(c) => c.bootstrap(), - } - } -} - pub fn run(args: &ArgMatches) -> Result<(), Error> { env_logger::builder() .filter_level(log::LevelFilter::Error) @@ -62,8 +54,8 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { }; let magic = match args.is_present("magic") { - true => Some(args.value_of_t("magic")?), - false => None, + true => args.value_of_t("magic")?, + false => MagicArg::default(), }; let since = match args.is_present("since") { @@ -90,17 +82,22 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { ..Default::default() }; + let well_known = ChainWellKnownInfo::try_from_magic(*magic)?; + + let utils = Arc::new(Utils::new(well_known)); + + #[allow(deprecated)] let source_setup = match mode { PeerMode::AsNode => WatchSource::N2N(N2NConfig { address: AddressArg(bearer, socket), - magic, + magic: Some(magic), well_known: None, mapper, since, }), PeerMode::AsClient => WatchSource::N2C(N2CConfig { address: AddressArg(bearer, socket), - magic, + magic: Some(magic), well_known: None, mapper, since, @@ -111,7 +108,11 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { throttle_min_span_millis: throttle, }; - let (source_handle, source_output) = source_setup.bootstrap()?; + let (source_handle, source_output) = match source_setup { + WatchSource::N2C(c) => WithUtils::new(c, utils).bootstrap()?, + WatchSource::N2N(c) => WithUtils::new(c, utils).bootstrap()?, + }; + let sink_handle = sink_setup.bootstrap(source_output)?; sink_handle.join().map_err(|_| "error in sink thread")?; diff --git a/src/lib.rs b/src/lib.rs index 0d256e7e..6b30e2bc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ mod framework; -mod utils; +pub mod utils; pub mod filters; pub mod mapper; diff --git a/src/mapper/crawl.rs b/src/mapper/crawl.rs index d5a4f0c8..ddb6c94b 100644 --- a/src/mapper/crawl.rs +++ b/src/mapper/crawl.rs @@ -82,7 +82,8 @@ impl EventWriter { let child = &self.child_writer(EventContext { output_address: self - .bech32_provider + .utils + .bech32 .encode_address(output.address.as_slice())? .into(), ..EventContext::default() diff --git a/src/mapper/map.rs b/src/mapper/map.rs index 5d79ef13..2002b286 100644 --- a/src/mapper/map.rs +++ b/src/mapper/map.rs @@ -145,7 +145,8 @@ impl EventWriter { ) -> Result { Ok(TxOutputRecord { address: self - .bech32_provider + .utils + .bech32 .encode_address(output.address.as_slice())?, amount: get_tx_output_coin_value(&output.amount), assets: self.collect_asset_records(&output.amount).into(), diff --git a/src/mapper/prelude.rs b/src/mapper/prelude.rs index b6bceb2d..4af76530 100644 --- a/src/mapper/prelude.rs +++ b/src/mapper/prelude.rs @@ -1,86 +1,17 @@ +use std::sync::Arc; + use crate::{ framework::{Event, EventContext, EventData}, pipelining::StageSender, - utils::{ - bech32::{Bech32Config, Bech32Provider}, - time::{NaiveConfig as TimeConfig, NaiveProvider as NaiveTime, TimeProvider}, - }, + utils::{time::TimeProvider, Utils}, }; use merge::Merge; use serde::Deserialize; -use pallas::ouroboros::network::{ - handshake::{MAINNET_MAGIC, TESTNET_MAGIC}, - machines::primitives::Point, -}; -use serde::Serialize; - use crate::Error; -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct ChainWellKnownInfo { - pub shelley_slot_length: u32, - pub shelley_known_slot: u64, - pub shelley_known_hash: String, - pub shelley_known_time: u64, - pub address_hrp: String, -} - -impl ChainWellKnownInfo { - pub fn try_from_magic(magic: u64) -> Result { - match magic { - MAINNET_MAGIC => Ok(ChainWellKnownInfo { - shelley_slot_length: 1, - shelley_known_slot: 4492800, - shelley_known_hash: - "aa83acbf5904c0edfe4d79b3689d3d00fcfc553cf360fd2229b98d464c28e9de".to_string(), - shelley_known_time: 1596059091, - address_hrp: "addr".to_string(), - }), - TESTNET_MAGIC => Ok(ChainWellKnownInfo { - shelley_slot_length: 1, - shelley_known_slot: 1598400, - shelley_known_hash: - "02b1c561715da9e540411123a6135ee319b02f60b9a11a603d3305556c04329f".to_string(), - shelley_known_time: 1595967616, - address_hrp: "addr_test".to_string(), - }), - _ => Err("can't infer well-known chain infro from specified magic".into()), - } - } -} - -// HACK: to glue together legacy config with new time provider -impl From for TimeConfig { - fn from(other: ChainWellKnownInfo) -> Self { - TimeConfig { - slot_length: other.shelley_slot_length, - start_slot: other.shelley_known_slot, - start_timestamp: other.shelley_known_time, - } - } -} - -impl TryFrom for Point { - type Error = crate::Error; - - fn try_from(other: ChainWellKnownInfo) -> Result { - let out = Point( - other.shelley_known_slot, - hex::decode(other.shelley_known_hash)?, - ); - - Ok(out) - } -} - -impl From for Bech32Config { - fn from(other: ChainWellKnownInfo) -> Self { - Bech32Config { - address_hrp: other.address_hrp, - } - } -} +#[deprecated] +pub use crate::utils::ChainWellKnownInfo; #[derive(Deserialize, Clone, Debug, Default)] pub struct Config { @@ -98,23 +29,16 @@ pub struct Config { pub(crate) struct EventWriter { context: EventContext, output: StageSender, - time_provider: Option, - pub(crate) bech32_provider: Bech32Provider, pub(crate) config: Config, + pub(crate) utils: Arc, } impl EventWriter { - pub fn new( - output: StageSender, - well_known: Option, - config: Config, - ) -> Self { + pub fn new(output: StageSender, utils: Arc, config: Config) -> Self { EventWriter { context: EventContext::default(), output, - time_provider: well_known.clone().map(|x| NaiveTime::new(x.into())), - bech32_provider: well_known - .map_or_else(Bech32Provider::default, |x| Bech32Provider::new(x.into())), + utils, config, } } @@ -143,18 +67,16 @@ impl EventWriter { pub fn child_writer(&self, mut extra_context: EventContext) -> EventWriter { extra_context.merge(self.context.clone()); - // TODO: too much cloning, lets move to lifecycles here EventWriter { context: extra_context, output: self.output.clone(), - time_provider: self.time_provider.clone(), - bech32_provider: self.bech32_provider.clone(), + utils: self.utils.clone(), config: self.config.clone(), } } pub fn compute_timestamp(&self, slot: u64) -> Option { - match &self.time_provider { + match &self.utils.time { Some(provider) => provider.slot_to_wallclock(slot).ok(), _ => None, } diff --git a/src/sources/common.rs b/src/sources/common.rs index c685f499..3e1ebbbd 100644 --- a/src/sources/common.rs +++ b/src/sources/common.rs @@ -11,7 +11,7 @@ use pallas::ouroboros::network::{ use serde::{de::Visitor, Deserializer}; use serde::{Deserialize, Serialize}; -use crate::{mapper::ChainWellKnownInfo, Error}; +use crate::{utils::ChainWellKnownInfo, Error}; #[derive(Debug, Deserialize)] pub enum BearerKind { @@ -95,6 +95,12 @@ impl FromStr for MagicArg { } } +impl Default for MagicArg { + fn default() -> Self { + Self(MAINNET_MAGIC) + } +} + pub(crate) fn deserialize_magic_arg<'de, D>(deserializer: D) -> Result, D::Error> where D: Deserializer<'de>, diff --git a/src/sources/n2c/setup.rs b/src/sources/n2c/setup.rs index d8a91697..8f9a5962 100644 --- a/src/sources/n2c/setup.rs +++ b/src/sources/n2c/setup.rs @@ -15,9 +15,10 @@ use pallas::ouroboros::network::{ use serde::Deserialize; use crate::{ - mapper::{ChainWellKnownInfo, Config as MapperConfig, EventWriter}, + mapper::{Config as MapperConfig, EventWriter}, pipelining::{new_inter_stage_channel, PartialBootstrapResult, SourceProvider}, sources::common::{find_end_of_chain, AddressArg, BearerKind, MagicArg, PointArg}, + utils::{ChainWellKnownInfo, WithUtils}, Error, }; @@ -32,6 +33,7 @@ pub struct Config { pub since: Option, + #[deprecated(note = "chain info is now pipeline-wide, use utils")] pub well_known: Option, #[serde(default)] @@ -64,36 +66,31 @@ fn setup_tcp_multiplexer(address: &str) -> Result { Multiplexer::setup(tcp, &[0, 5]) } -impl SourceProvider for Config { +impl SourceProvider for WithUtils { fn bootstrap(&self) -> PartialBootstrapResult { let (output_tx, output_rx) = new_inter_stage_channel(None); - let mut muxer = match self.address.0 { - BearerKind::Tcp => setup_tcp_multiplexer(&self.address.1)?, + let mut muxer = match self.inner.address.0 { + BearerKind::Tcp => setup_tcp_multiplexer(&self.inner.address.1)?, #[cfg(target_family = "unix")] - BearerKind::Unix => setup_unix_multiplexer(&self.address.1)?, + BearerKind::Unix => setup_unix_multiplexer(&self.inner.address.1)?, }; - let magic = match &self.magic { + let magic = match &self.inner.magic { Some(m) => *m.deref(), None => MAINNET_MAGIC, }; - let well_known = match &self.well_known { - Some(info) => info.clone(), - None => ChainWellKnownInfo::try_from_magic(magic)?, - }; - - let writer = EventWriter::new(output_tx, well_known.clone().into(), self.mapper.clone()); + let writer = EventWriter::new(output_tx, self.utils.clone(), self.inner.mapper.clone()); let mut hs_channel = muxer.use_channel(0); do_handshake(&mut hs_channel, magic)?; let mut cs_channel = muxer.use_channel(5); - let since: Point = match &self.since { + let since: Point = match &self.inner.since { Some(arg) => arg.try_into()?, - None => find_end_of_chain(&mut cs_channel, &well_known)?, + None => find_end_of_chain(&mut cs_channel, &self.utils.well_known)?, }; info!("starting from chain point: {:?}", &since); diff --git a/src/sources/n2n/setup.rs b/src/sources/n2n/setup.rs index 3f008bde..2ea53afc 100644 --- a/src/sources/n2n/setup.rs +++ b/src/sources/n2n/setup.rs @@ -15,9 +15,10 @@ use pallas::ouroboros::network::{ use serde::Deserialize; use crate::{ - mapper::{ChainWellKnownInfo, Config as MapperConfig, EventWriter}, + mapper::{Config as MapperConfig, EventWriter}, pipelining::{new_inter_stage_channel, PartialBootstrapResult, SourceProvider}, sources::common::{find_end_of_chain, AddressArg, BearerKind, MagicArg, PointArg}, + utils::{ChainWellKnownInfo, WithUtils}, Error, }; @@ -32,6 +33,7 @@ pub struct Config { pub since: Option, + #[deprecated(note = "chain info is now pipeline-wide, use utils")] pub well_known: Option, #[serde(default)] @@ -64,36 +66,31 @@ fn setup_tcp_multiplexer(address: &str) -> Result { Multiplexer::setup(tcp, &[0, 2, 3]) } -impl SourceProvider for Config { +impl SourceProvider for WithUtils { fn bootstrap(&self) -> PartialBootstrapResult { let (output_tx, output_rx) = new_inter_stage_channel(None); - let mut muxer = match self.address.0 { - BearerKind::Tcp => setup_tcp_multiplexer(&self.address.1)?, + let mut muxer = match self.inner.address.0 { + BearerKind::Tcp => setup_tcp_multiplexer(&self.inner.address.1)?, #[cfg(target_family = "unix")] - BearerKind::Unix => setup_unix_multiplexer(&self.address.1)?, + BearerKind::Unix => setup_unix_multiplexer(&self.inner.address.1)?, }; - let magic = match &self.magic { + let magic = match &self.inner.magic { Some(m) => *m.deref(), None => MAINNET_MAGIC, }; - let well_known = match &self.well_known { - Some(info) => info.clone(), - None => ChainWellKnownInfo::try_from_magic(magic)?, - }; - - let writer = EventWriter::new(output_tx, well_known.clone().into(), self.mapper.clone()); + let writer = EventWriter::new(output_tx, self.utils.clone(), self.inner.mapper.clone()); let mut hs_channel = muxer.use_channel(0); do_handshake(&mut hs_channel, magic)?; let mut cs_channel = muxer.use_channel(2); - let since: Point = match &self.since { + let since: Point = match &self.inner.since { Some(arg) => arg.try_into()?, - None => find_end_of_chain(&mut cs_channel, &well_known)?, + None => find_end_of_chain(&mut cs_channel, &self.utils.well_known)?, }; info!("starting from chain point: {:?}", &since); diff --git a/src/utils/bech32.rs b/src/utils/bech32.rs index c6013a5a..30e1bdff 100644 --- a/src/utils/bech32.rs +++ b/src/utils/bech32.rs @@ -6,6 +6,7 @@ use bech32::{self, ToBase32}; use serde::Deserialize; +use super::ChainWellKnownInfo; use crate::Error; #[derive(Clone, Deserialize)] @@ -13,6 +14,14 @@ pub struct Bech32Config { pub address_hrp: String, } +impl Bech32Config { + pub(crate) fn from_well_known(info: &ChainWellKnownInfo) -> Self { + Self { + address_hrp: info.address_hrp.to_owned(), + } + } +} + impl Default for Bech32Config { fn default() -> Self { Self { diff --git a/src/utils/mod.rs b/src/utils/mod.rs index d34cb5f3..41049161 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,4 +1,25 @@ -use crate::framework::Error; +//! Pipeline-wide utilities +//! +//! This module includes general-purpose utilities that could potentially be +//! used by more than a single stage. The entry point to this utilities is +//! desgined as singelton [`Utils`] instance shared by all stages through an Arc +//! pointer. + +use std::sync::Arc; + +use pallas::ouroboros::network::{ + handshake::{MAINNET_MAGIC, TESTNET_MAGIC}, + machines::primitives::Point, +}; +use serde::{Deserialize, Serialize}; + +use crate::{ + framework::Error, + utils::{ + bech32::{Bech32Config, Bech32Provider}, + time::{NaiveConfig as TimeConfig, NaiveProvider as NaiveTime}, + }, +}; pub mod throttle; @@ -17,3 +38,114 @@ impl SwallowResult for Result<(), Error> { } } } + +/// Well-known information about the blockhain network +/// +/// Some of the logic in Oura depends on particular characteristic of the +/// network that it's consuming from. For example: time calculation and bech32 +/// encoding. This struct groups all of these blockchain network specific +/// values. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ChainWellKnownInfo { + pub shelley_slot_length: u32, + pub shelley_known_slot: u64, + pub shelley_known_hash: String, + pub shelley_known_time: u64, + pub address_hrp: String, +} + +impl ChainWellKnownInfo { + /// Hardcoded values for mainnet + pub fn mainnet() -> Self { + ChainWellKnownInfo { + shelley_slot_length: 1, + shelley_known_slot: 4492800, + shelley_known_hash: "aa83acbf5904c0edfe4d79b3689d3d00fcfc553cf360fd2229b98d464c28e9de" + .to_string(), + shelley_known_time: 1596059091, + address_hrp: "addr".to_string(), + } + } + + /// Hardcoded values for testnet + pub fn testnet() -> Self { + ChainWellKnownInfo { + shelley_slot_length: 1, + shelley_known_slot: 1598400, + shelley_known_hash: "02b1c561715da9e540411123a6135ee319b02f60b9a11a603d3305556c04329f" + .to_string(), + shelley_known_time: 1595967616, + address_hrp: "addr_test".to_string(), + } + } + + /// Uses the value of the magic to return either mainnet or testnet + /// hardcoded values. + pub fn try_from_magic(magic: u64) -> Result { + match magic { + MAINNET_MAGIC => Ok(Self::mainnet()), + TESTNET_MAGIC => Ok(Self::testnet()), + _ => Err("can't infer well-known chain infro from specified magic".into()), + } + } +} + +impl Default for ChainWellKnownInfo { + fn default() -> Self { + Self::mainnet() + } +} + +/// Entry point for all shared utilities +pub struct Utils { + pub(crate) well_known: ChainWellKnownInfo, + pub(crate) time: Option, + pub(crate) bech32: Bech32Provider, +} + +impl Utils { + pub fn new(well_known: ChainWellKnownInfo) -> Self { + Self { + time: NaiveTime::new(TimeConfig::from_well_known(&well_known)).into(), + bech32: Bech32Provider::new(Bech32Config::from_well_known(&well_known)), + well_known, + } + } +} + +/// Wraps a struct with pipeline-wide utilities +/// +/// Most of the stage bootstrapping processes will require a custom config value +/// and a reference to the shared utilities singelton. This is a quality-of-life +/// artifact to wrap other structs (usually configs) and attach the utilities +/// singelton entrypoint. +pub struct WithUtils { + pub utils: Arc, + pub inner: C, +} + +impl WithUtils { + pub fn new(inner: C, utils: Arc) -> Self { + WithUtils { inner, utils } + } + + pub fn attach_utils_to(&self, target: T) -> WithUtils { + WithUtils { + inner: target, + utils: self.utils.clone(), + } + } +} + +impl TryFrom for Point { + type Error = crate::Error; + + fn try_from(other: ChainWellKnownInfo) -> Result { + let out = Point( + other.shelley_known_slot, + hex::decode(other.shelley_known_hash)?, + ); + + Ok(out) + } +} diff --git a/src/utils/time.rs b/src/utils/time.rs index bb741f07..2a5187a1 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -4,7 +4,7 @@ use serde::Deserialize; -use crate::Error; +use crate::{utils::ChainWellKnownInfo, Error}; /// Abstraction available to stages to deal with blockchain time conversions pub(crate) trait TimeProvider { @@ -19,6 +19,16 @@ pub struct NaiveConfig { pub start_timestamp: u64, } +impl NaiveConfig { + pub(crate) fn from_well_known(info: &ChainWellKnownInfo) -> Self { + Self { + slot_length: info.shelley_slot_length, + start_slot: info.shelley_known_slot, + start_timestamp: info.shelley_known_time, + } + } +} + /// A naive, standalone implementation of a time provider /// /// This time provider doesn't require any external resources other than an From 8c115ac4f6581a1deeade37ed3136bff6caf8231 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Fri, 28 Jan 2022 08:30:41 -0300 Subject: [PATCH 2/2] Include standalone mapper constructor --- src/mapper/prelude.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/mapper/prelude.rs b/src/mapper/prelude.rs index 4af76530..a081e2c8 100644 --- a/src/mapper/prelude.rs +++ b/src/mapper/prelude.rs @@ -43,6 +43,17 @@ impl EventWriter { } } + #[allow(unused)] + pub fn standalone( + output: StageSender, + well_known: Option, + config: Config, + ) -> Self { + let utils = Arc::new(Utils::new(well_known.unwrap_or_default())); + + Self::new(output, utils, config) + } + pub fn append(&self, data: EventData) -> Result<(), Error> { let evt = Event { context: self.context.clone(),