Skip to content

Commit

Permalink
refactor: Streamline access shared utilities (#108)
Browse files Browse the repository at this point in the history
* refactor: Streamline access shared utilities
* Include standalone mapper constructor
  • Loading branch information
scarmuega authored Jan 28, 2022
1 parent 691baeb commit 96d7a87
Show file tree
Hide file tree
Showing 13 changed files with 274 additions and 183 deletions.
50 changes: 28 additions & 22 deletions src/bin/oura/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
use std::thread::JoinHandle;
use std::{sync::Arc, thread::JoinHandle};

use clap::ArgMatches;
use config::{Config, ConfigError, Environment, File};
use log::debug;
use oura::pipelining::{
BootstrapResult, FilterProvider, PartialBootstrapResult, SinkProvider, SourceProvider,
StageReceiver,
use serde::Deserialize;

use oura::{
pipelining::{
BootstrapResult, FilterProvider, PartialBootstrapResult, SinkProvider, SourceProvider,
StageReceiver,
},
utils::{ChainWellKnownInfo, Utils, WithUtils},
Error,
};

use oura::filters::noop::Config as NoopFilterConfig;
use oura::filters::selection::Config as SelectionConfig;
use oura::sinks::stdout::Config as StdoutConfig;
use oura::sinks::terminal::Config as TerminalConfig;
use oura::sources::n2c::Config as N2CConfig;
use oura::sources::n2n::Config as N2NConfig;

#[cfg(feature = "logs")]
use oura::sinks::logs::Config as WriterConfig;

use oura::sources::n2c::Config as N2CConfig;
use oura::sources::n2n::Config as N2NConfig;
use serde::Deserialize;

#[cfg(feature = "webhook")]
use oura::sinks::webhook::Config as WebhookConfig;

Expand All @@ -26,27 +33,20 @@ use oura::sinks::kafka::Config as KafkaConfig;
#[cfg(feature = "elasticsink")]
use oura::sinks::elastic::Config as ElasticConfig;

use oura::filters::noop::Config as NoopFilterConfig;
use oura::filters::selection::Config as SelectionConfig;

#[cfg(feature = "fingerprint")]
use oura::filters::fingerprint::Config as FingerprintConfig;

use crate::Error;

#[derive(Debug, Deserialize)]
#[serde(tag = "type")]
enum Source {
N2C(N2CConfig),
N2N(N2NConfig),
}

impl SourceProvider for Source {
fn bootstrap(&self) -> PartialBootstrapResult {
match self {
Source::N2C(c) => c.bootstrap(),
Source::N2N(c) => c.bootstrap(),
}
fn bootstrap_source(config: Source, utils: Arc<Utils>) -> PartialBootstrapResult {
match config {
Source::N2C(config) => WithUtils::new(config, utils).bootstrap(),
Source::N2N(config) => WithUtils::new(config, utils).bootstrap(),
}
}

Expand Down Expand Up @@ -120,6 +120,8 @@ struct ConfigRoot {
filters: Vec<Filter>,

sink: Sink,

chain: Option<ChainWellKnownInfo>,
}

impl ConfigRoot {
Expand All @@ -145,10 +147,14 @@ impl ConfigRoot {
}

/// Sets up the whole pipeline from configuration
fn bootstrap(config: &ConfigRoot) -> Result<Vec<JoinHandle<()>>, Error> {
fn bootstrap(config: ConfigRoot) -> Result<Vec<JoinHandle<()>>, Error> {
let well_known = config.chain.unwrap_or_default();

let utils = Arc::new(Utils::new(well_known));

let mut threads = Vec::with_capacity(10);

let (source_handle, source_rx) = config.source.bootstrap()?;
let (source_handle, source_rx) = bootstrap_source(config.source, utils)?;
threads.push(source_handle);

let mut last_rx = source_rx;
Expand Down Expand Up @@ -177,7 +183,7 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {

debug!("daemon starting with this config: {:?}", root);

let threads = bootstrap(&root)?;
let threads = bootstrap(root)?;

// TODO: refactor into new loop that monitors thread health
for handle in threads {
Expand Down
44 changes: 21 additions & 23 deletions src/bin/oura/dump.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
use std::str::FromStr;
use serde::Deserialize;
use std::{str::FromStr, sync::Arc};

use clap::ArgMatches;

use oura::{
mapper::Config as MapperConfig,
pipelining::{
BootstrapResult, PartialBootstrapResult, SinkProvider, SourceProvider, StageReceiver,
},
sources::{AddressArg, BearerKind},
pipelining::{BootstrapResult, SinkProvider, SourceProvider, StageReceiver},
sources::{AddressArg, BearerKind, MagicArg},
utils::{ChainWellKnownInfo, Utils, WithUtils},
};

use serde::Deserialize;

use oura::sinks::stdout::Config as StdoutConfig;
use oura::sources::n2c::Config as N2CConfig;
use oura::sources::n2n::Config as N2NConfig;

#[cfg(feature = "logs")]
use oura::sinks::logs::Config as LogsConfig;

use oura::sinks::stdout::Config as StdoutConfig;

use crate::Error;

#[derive(Clone, Debug, Deserialize)]
Expand All @@ -44,15 +42,6 @@ enum DumpSource {
N2N(N2NConfig),
}

impl SourceProvider for DumpSource {
fn bootstrap(&self) -> PartialBootstrapResult {
match self {
DumpSource::N2C(c) => c.bootstrap(),
DumpSource::N2N(c) => c.bootstrap(),
}
}
}

enum DumpSink {
Stdout(StdoutConfig),
#[cfg(feature = "logs")]
Expand Down Expand Up @@ -85,8 +74,8 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {
};

let magic = match args.is_present("magic") {
true => Some(args.value_of_t("magic")?),
false => None,
true => args.value_of_t("magic")?,
false => MagicArg::default(),
};

let since = match args.is_present("since") {
Expand Down Expand Up @@ -114,17 +103,22 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {
..Default::default()
};

let well_known = ChainWellKnownInfo::try_from_magic(*magic)?;

let utils = Arc::new(Utils::new(well_known));

#[allow(deprecated)]
let source_setup = match mode {
PeerMode::AsNode => DumpSource::N2N(N2NConfig {
address: AddressArg(bearer, socket),
magic,
magic: Some(magic),
well_known: None,
mapper,
since,
}),
PeerMode::AsClient => DumpSource::N2C(N2CConfig {
address: AddressArg(bearer, socket),
magic,
magic: Some(magic),
well_known: None,
mapper,
since,
Expand All @@ -142,7 +136,11 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {
}),
};

let (source_handle, source_output) = source_setup.bootstrap()?;
let (source_handle, source_output) = match source_setup {
DumpSource::N2C(c) => WithUtils::new(c, utils).bootstrap()?,
DumpSource::N2N(c) => WithUtils::new(c, utils).bootstrap()?,
};

let sink_handle = sink_setup.bootstrap(source_output)?;

log::info!(
Expand Down
35 changes: 18 additions & 17 deletions src/bin/oura/watch.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::str::FromStr;
use std::{str::FromStr, sync::Arc};

use clap::ArgMatches;
use oura::{
mapper::Config as MapperConfig,
pipelining::{PartialBootstrapResult, SinkProvider, SourceProvider},
sources::{AddressArg, BearerKind},
pipelining::{SinkProvider, SourceProvider},
sources::{AddressArg, BearerKind, MagicArg},
utils::{ChainWellKnownInfo, Utils, WithUtils},
};

use serde::Deserialize;
Expand Down Expand Up @@ -37,15 +38,6 @@ enum WatchSource {
N2N(N2NConfig),
}

impl SourceProvider for WatchSource {
fn bootstrap(&self) -> PartialBootstrapResult {
match self {
WatchSource::N2C(c) => c.bootstrap(),
WatchSource::N2N(c) => c.bootstrap(),
}
}
}

pub fn run(args: &ArgMatches) -> Result<(), Error> {
env_logger::builder()
.filter_level(log::LevelFilter::Error)
Expand All @@ -62,8 +54,8 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {
};

let magic = match args.is_present("magic") {
true => Some(args.value_of_t("magic")?),
false => None,
true => args.value_of_t("magic")?,
false => MagicArg::default(),
};

let since = match args.is_present("since") {
Expand All @@ -90,17 +82,22 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {
..Default::default()
};

let well_known = ChainWellKnownInfo::try_from_magic(*magic)?;

let utils = Arc::new(Utils::new(well_known));

#[allow(deprecated)]
let source_setup = match mode {
PeerMode::AsNode => WatchSource::N2N(N2NConfig {
address: AddressArg(bearer, socket),
magic,
magic: Some(magic),
well_known: None,
mapper,
since,
}),
PeerMode::AsClient => WatchSource::N2C(N2CConfig {
address: AddressArg(bearer, socket),
magic,
magic: Some(magic),
well_known: None,
mapper,
since,
Expand All @@ -111,7 +108,11 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {
throttle_min_span_millis: throttle,
};

let (source_handle, source_output) = source_setup.bootstrap()?;
let (source_handle, source_output) = match source_setup {
WatchSource::N2C(c) => WithUtils::new(c, utils).bootstrap()?,
WatchSource::N2N(c) => WithUtils::new(c, utils).bootstrap()?,
};

let sink_handle = sink_setup.bootstrap(source_output)?;

sink_handle.join().map_err(|_| "error in sink thread")?;
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod model;
mod utils;
pub mod utils;

pub mod filters;
pub mod mapper;
Expand Down
3 changes: 2 additions & 1 deletion src/mapper/crawl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ impl EventWriter {

let child = &self.child_writer(EventContext {
output_address: self
.bech32_provider
.utils
.bech32
.encode_address(output.address.as_slice())?
.into(),
..EventContext::default()
Expand Down
3 changes: 2 additions & 1 deletion src/mapper/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ impl EventWriter {
) -> Result<TxOutputRecord, Error> {
Ok(TxOutputRecord {
address: self
.bech32_provider
.utils
.bech32
.encode_address(output.address.as_slice())?,
amount: get_tx_output_coin_value(&output.amount),
assets: self.collect_asset_records(&output.amount).into(),
Expand Down
Loading

0 comments on commit 96d7a87

Please sign in to comment.