Skip to content

Commit

Permalink
fix: add a configurable max timeout for outgoing predicate payload re…
Browse files Browse the repository at this point in the history
…quests (#642)

This PR adds a new (optional) entry to the config .toml file so outgoing
predicate payload HTTP POST requests can have a configurable timeout
that will cancel the request if hit.

```toml
[predicates]
payload_http_request_timeout_ms = 10000
```

This new setting is useful to public-facing chainhook instances that may
run into unresponsive 3rd party servers that may take forever to respond
to a payload request.

Fixes #643
  • Loading branch information
rafaelcr authored Oct 10, 2024
1 parent 9a6d362 commit 6c1dfa9
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 38 deletions.
6 changes: 6 additions & 0 deletions components/chainhook-cli/src/config/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub struct ConfigFile {
pub storage: StorageConfigFile,
pub pox_config: Option<PoxConfigFile>,
pub http_api: Option<PredicatesApiConfigFile>,
pub predicates: Option<PredicatesConfigFile>,
pub event_source: Option<Vec<EventSourceConfigFile>>,
pub limits: LimitsConfigFile,
pub network: NetworkConfigFile,
Expand Down Expand Up @@ -64,6 +65,11 @@ pub struct NetworkConfigFile {
pub stacks_events_ingestion_port: Option<u16>,
}

#[derive(Deserialize, Debug, Clone)]
pub struct PredicatesConfigFile {
pub payload_http_request_timeout_ms: Option<u64>,
}

#[derive(Deserialize, Debug, Clone)]
#[serde(rename_all = "snake_case")]
pub enum NetworkConfigMode {
Expand Down
23 changes: 22 additions & 1 deletion components/chainhook-cli/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub mod generator;

use chainhook_sdk::chainhooks::types::{ChainhookStore, PoxConfig};
pub use chainhook_sdk::indexer::IndexerConfig;
use chainhook_sdk::observer::EventObserverConfig;
use chainhook_sdk::observer::{EventObserverConfig, PredicatesConfig};
use chainhook_sdk::types::{
BitcoinBlockSignaling, BitcoinNetwork, StacksNetwork, StacksNodeConfig,
};
Expand All @@ -30,6 +30,7 @@ pub struct Config {
pub storage: StorageConfig,
pub pox_config: PoxConfig,
pub http_api: PredicatesApi,
pub predicates: PredicatesConfig,
pub event_sources: Vec<EventSourceConfig>,
pub limits: LimitsConfig,
pub network: IndexerConfig,
Expand Down Expand Up @@ -117,6 +118,9 @@ impl Config {
EventObserverConfig {
bitcoin_rpc_proxy_enabled: true,
registered_chainhooks: ChainhookStore::new(),
predicates_config: PredicatesConfig {
payload_http_request_timeout_ms: self.predicates.payload_http_request_timeout_ms,
},
bitcoind_rpc_username: self.network.bitcoind_rpc_username.clone(),
bitcoind_rpc_password: self.network.bitcoind_rpc_password.clone(),
bitcoind_rpc_url: self.network.bitcoind_rpc_url.clone(),
Expand Down Expand Up @@ -193,6 +197,14 @@ impl Config {
}),
},
},
predicates: match config_file.predicates {
None => PredicatesConfig {
payload_http_request_timeout_ms: None,
},
Some(predicates) => PredicatesConfig {
payload_http_request_timeout_ms: predicates.payload_http_request_timeout_ms,
},
},
event_sources,
limits: LimitsConfig {
max_number_of_stacks_predicates: config_file
Expand Down Expand Up @@ -357,6 +369,9 @@ impl Config {
},
pox_config: PoxConfig::devnet_default(),
http_api: PredicatesApi::Off,
predicates: PredicatesConfig {
payload_http_request_timeout_ms: None,
},
event_sources: vec![],
limits: LimitsConfig {
max_number_of_bitcoin_predicates: BITCOIN_MAX_PREDICATE_REGISTRATION,
Expand Down Expand Up @@ -390,6 +405,9 @@ impl Config {
},
pox_config: PoxConfig::testnet_default(),
http_api: PredicatesApi::Off,
predicates: PredicatesConfig {
payload_http_request_timeout_ms: None,
},
event_sources: vec![EventSourceConfig::StacksTsvUrl(UrlConfig {
file_url: DEFAULT_TESTNET_STACKS_TSV_ARCHIVE.into(),
})],
Expand Down Expand Up @@ -425,6 +443,9 @@ impl Config {
},
pox_config: PoxConfig::mainnet_default(),
http_api: PredicatesApi::Off,
predicates: PredicatesConfig {
payload_http_request_timeout_ms: None,
},
event_sources: vec![EventSourceConfig::StacksTsvUrl(UrlConfig {
file_url: DEFAULT_MAINNET_STACKS_TSV_ARCHIVE.into(),
})],
Expand Down
2 changes: 1 addition & 1 deletion components/chainhook-cli/src/scan/bitcoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ pub async fn execute_predicates_action<'a>(
gather_proofs(&trigger, &mut proofs, config, ctx);
}
let predicate_uuid = &trigger.chainhook.uuid;
match handle_bitcoin_hook_action(trigger, &proofs) {
match handle_bitcoin_hook_action(trigger, &proofs, &config) {
Err(e) => {
warn!(
ctx.expect_logger(),
Expand Down
26 changes: 19 additions & 7 deletions components/chainhook-cli/src/scan/stacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ pub async fn get_canonical_fork_from_tsv(
for result in reader_builder.deserialize() {
line += 1;
let record: Record = result.unwrap();
if let RecordKind::StacksBlockReceived = &record.kind { if let Err(_e) = record_tx.send(Some((record, line))) {
break;
} };
if let RecordKind::StacksBlockReceived = &record.kind {
if let Err(_e) = record_tx.send(Some((record, line))) {
break;
}
};
}
let _ = record_tx.send(None);
})
Expand Down Expand Up @@ -338,7 +340,12 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
apply: hits_per_blocks,
rollback: vec![],
};
let res = match handle_stacks_hook_action(trigger, &proofs, ctx) {
let res = match handle_stacks_hook_action(
trigger,
&proofs,
&config.get_event_observer_config(),
ctx,
) {
Err(e) => {
warn!(
ctx.expect_logger(),
Expand Down Expand Up @@ -487,7 +494,9 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
let mut tsv_line = String::new();
while tsv_current_line < tsv_line_number {
tsv_line.clear();
let bytes_read = tsv_reader.read_line(&mut tsv_line).map_err(|e| e.to_string())?;
let bytes_read = tsv_reader
.read_line(&mut tsv_line)
.map_err(|e| e.to_string())?;
if bytes_read == 0 {
return Err("Unexpected EOF when reading TSV".to_string());
}
Expand Down Expand Up @@ -525,7 +534,8 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate(
apply: hits_per_blocks,
rollback: vec![],
};
match handle_stacks_hook_action(trigger, &proofs, ctx) {
match handle_stacks_hook_action(trigger, &proofs, &config.get_event_observer_config(), ctx)
{
Err(e) => {
error!(ctx.expect_logger(), "unable to handle action {}", e);
}
Expand Down Expand Up @@ -604,7 +614,9 @@ pub async fn consolidate_local_stacks_chainstate_using_csv(
let mut tsv_line = String::new();
while tsv_current_line < tsv_line_number {
tsv_line.clear();
let bytes_read = tsv_reader.read_line(&mut tsv_line).map_err(|e| e.to_string())?;
let bytes_read = tsv_reader
.read_line(&mut tsv_line)
.map_err(|e| e.to_string())?;
if bytes_read == 0 {
return Err("Unexpected EOF when reading TSV".to_string());
}
Expand Down
26 changes: 11 additions & 15 deletions components/chainhook-cli/src/service/tests/helpers/mock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::service::{
PredicateStatus, Service,
};
use chainhook_sdk::chainhooks::types::PoxConfig;
use chainhook_sdk::observer::PredicatesConfig;
use chainhook_sdk::{
chainhooks::stacks::StacksChainhookSpecificationNetworkMap,
chainhooks::types::{ChainhookInstance, ChainhookSpecificationNetworkMap},
Expand Down Expand Up @@ -82,12 +83,12 @@ pub async fn filter_predicate_status_from_all_predicates(
match matching_predicate {
Some(predicate) => match predicate.get("status") {
Some(status) => {
return serde_json::from_value(status.clone()).map_err(|e| {
format!("failed to parse status {}", e)
});
return serde_json::from_value(status.clone())
.map_err(|e| format!("failed to parse status {}", e));
}
None => {
return Err("no status field on matching get predicates result".to_string())
return Err("no status field on matching get predicates result"
.to_string())
}
},
None => {
Expand All @@ -98,7 +99,9 @@ pub async fn filter_predicate_status_from_all_predicates(
}
}
None => {
return Err("failed to parse get predicate response's result field".to_string())
return Err(
"failed to parse get predicate response's result field".to_string()
)
}
},
None => {
Expand Down Expand Up @@ -267,10 +270,7 @@ pub fn flush_redis(port: u16) {
let client = redis::Client::open(format!("redis://localhost:{port}/"))
.expect("unable to connect to redis");
let mut predicate_db_conn = client.get_connection().expect("unable to connect to redis");
let db_keys: Vec<String> = predicate_db_conn
.scan_match("*")
.unwrap()
.collect();
let db_keys: Vec<String> = predicate_db_conn.scan_match("*").unwrap().collect();
for k in db_keys {
predicate_db_conn.del::<_, ()>(&k).unwrap();
}
Expand All @@ -293,6 +293,7 @@ pub fn get_chainhook_config(
};
Config {
http_api: PredicatesApi::On(api_config),
predicates: PredicatesConfig::default(),
pox_config: PoxConfig::devnet_default(),
storage: StorageConfig {
working_dir: working_dir.into(),
Expand Down Expand Up @@ -343,12 +344,7 @@ pub async fn start_chainhook_service(
);
let _ = hiro_system_kit::nestable_block_on(future);
})
.map_err(|e| {
format!(
"failed to start chainhook service thread, {}",
e
)
})?;
.map_err(|e| format!("failed to start chainhook service thread, {}", e))?;

// Loop to check if the server is ready
let mut attempts = 0;
Expand Down
3 changes: 2 additions & 1 deletion components/chainhook-cli/src/service/tests/observer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{sync::mpsc::channel, thread::sleep, time::Duration};

use chainhook_sdk::{
chainhooks::types::ChainhookStore,
observer::{start_event_observer, EventObserverConfig},
observer::{start_event_observer, EventObserverConfig, PredicatesConfig},
types::{BitcoinNetwork, StacksNodeConfig},
utils::Context,
};
Expand Down Expand Up @@ -190,6 +190,7 @@ async fn it_responds_200_for_unimplemented_endpoints(
});
let config = EventObserverConfig {
registered_chainhooks: ChainhookStore::new(),
predicates_config: PredicatesConfig::default(),
bitcoin_rpc_proxy_enabled: false,
bitcoind_rpc_username: String::new(),
bitcoind_rpc_password: String::new(),
Expand Down
11 changes: 8 additions & 3 deletions components/chainhook-sdk/src/chainhooks/bitcoin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::types::{
append_error_context, validate_txid, ChainhookInstance, ExactMatchingRule, HookAction,
MatchingRule, PoxConfig, TxinPredicate,
};
use crate::utils::{Context, MAX_BLOCK_HEIGHTS_ENTRIES};
use crate::{observer::EventObserverConfig, utils::{Context, MAX_BLOCK_HEIGHTS_ENTRIES}};

use bitcoincore_rpc_json::bitcoin::{address::Payload, Address};
use chainhook_types::{
Expand All @@ -21,7 +21,7 @@ use serde::{de, Deserialize, Deserializer};
use serde_json::Value as JsonValue;
use std::{
collections::{BTreeMap, HashMap, HashSet},
str::FromStr,
str::FromStr, time::Duration,
};

use reqwest::RequestBuilder;
Expand Down Expand Up @@ -760,10 +760,15 @@ pub fn serialize_bitcoin_transactions_to_json(
pub fn handle_bitcoin_hook_action<'a>(
trigger: BitcoinTriggerChainhook<'a>,
proofs: &HashMap<&'a TransactionIdentifier, String>,
config: &EventObserverConfig,
) -> Result<BitcoinChainhookOccurrence, String> {
match &trigger.chainhook.action {
HookAction::HttpPost(http) => {
let client = Client::builder()
let mut client_builder = Client::builder();
if let Some(timeout) = config.predicates_config.payload_http_request_timeout_ms {
client_builder = client_builder.timeout(Duration::from_millis(timeout));
}
let client = client_builder
.build()
.map_err(|e| format!("unable to build http client: {}", e))?;
let host = http.url.to_string();
Expand Down
9 changes: 8 additions & 1 deletion components/chainhook-sdk/src/chainhooks/stacks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::observer::EventObserverConfig;
use crate::utils::{AbstractStacksBlock, Context, MAX_BLOCK_HEIGHTS_ENTRIES};

use super::types::{
Expand All @@ -22,6 +23,7 @@ use schemars::JsonSchema;
use serde_json::Value as JsonValue;
use std::collections::{BTreeMap, HashMap};
use std::io::Cursor;
use std::time::Duration;

use reqwest::RequestBuilder;

Expand Down Expand Up @@ -1325,11 +1327,16 @@ pub fn serialize_stacks_payload_to_json<'a>(
pub fn handle_stacks_hook_action<'a>(
trigger: StacksTriggerChainhook<'a>,
proofs: &HashMap<&'a TransactionIdentifier, String>,
config: &EventObserverConfig,
ctx: &Context,
) -> Result<StacksChainhookOccurrence, String> {
match &trigger.chainhook.action {
HookAction::HttpPost(http) => {
let client = Client::builder()
let mut client_builder = Client::builder();
if let Some(timeout) = config.predicates_config.payload_http_request_timeout_ms {
client_builder = client_builder.timeout(Duration::from_millis(timeout));
}
let client = client_builder
.build()
.map_err(|e| format!("unable to build http client: {}", e))?;
let host = http.url.to_string();
Expand Down
12 changes: 9 additions & 3 deletions components/chainhook-sdk/src/chainhooks/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ use super::{
},
types::{ExactMatchingRule, FileHook},
};
use crate::{chainhooks::stacks::serialize_stacks_payload_to_json, utils::Context};
use crate::{
chainhooks::stacks::serialize_stacks_payload_to_json,
observer::EventObserverConfig,
utils::Context,
};
use crate::{
chainhooks::{
tests::fixtures::{get_expected_occurrence, get_test_event_payload_by_type},
Expand Down Expand Up @@ -735,7 +739,8 @@ fn test_stacks_hook_action_noop() {
logger: None,
tracer: false,
};
let occurrence = handle_stacks_hook_action(trigger, &proofs, &ctx).unwrap();
let occurrence =
handle_stacks_hook_action(trigger, &proofs, &EventObserverConfig::default(), &ctx).unwrap();
if let StacksChainhookOccurrence::Data(data) = occurrence {
assert_eq!(data.apply.len(), 1);
assert_eq!(
Expand Down Expand Up @@ -812,7 +817,8 @@ fn test_stacks_hook_action_file_append() {
logger: None,
tracer: false,
};
let occurrence = handle_stacks_hook_action(trigger, &proofs, &ctx).unwrap();
let occurrence =
handle_stacks_hook_action(trigger, &proofs, &EventObserverConfig::default(), &ctx).unwrap();
if let StacksChainhookOccurrence::File(path, bytes) = occurrence {
assert_eq!(path, "./".to_string());
let json: JsonValue = serde_json::from_slice(&bytes).unwrap();
Expand Down
Loading

0 comments on commit 6c1dfa9

Please sign in to comment.