From 544e42369a3fd30cb944ba7963bac6fb534d4858 Mon Sep 17 00:00:00 2001 From: Gabe Hamilton Date: Thu, 25 May 2023 14:06:31 -0600 Subject: [PATCH] IndexerRule parsing of registration function calls, minor logging cleanup. (#78) --- indexer/Cargo.lock | 7 +++ indexer/queryapi_coordinator/Cargo.toml | 3 +- .../src/indexer_registry.rs | 44 ++++++++++++++----- indexer/queryapi_coordinator/src/main.rs | 12 ++++- indexer/queryapi_coordinator/src/opts.rs | 4 -- 5 files changed, 53 insertions(+), 17 deletions(-) diff --git a/indexer/Cargo.lock b/indexer/Cargo.lock index d894ebc0c..b0e37ad68 100644 --- a/indexer/Cargo.lock +++ b/indexer/Cargo.lock @@ -3370,6 +3370,7 @@ dependencies = [ "tokio-stream", "tracing", "tracing-subscriber 0.2.25", + "unescape", ] [[package]] @@ -4540,6 +4541,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "unescape" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccb97dac3243214f8d8507998906ca3e2e0b900bf9bf4870477f125b82e68f6e" + [[package]] name = "unicode-bidi" version = "0.3.13" diff --git a/indexer/queryapi_coordinator/Cargo.toml b/indexer/queryapi_coordinator/Cargo.toml index 55b407cb3..c3b526627 100644 --- a/indexer/queryapi_coordinator/Cargo.toml +++ b/indexer/queryapi_coordinator/Cargo.toml @@ -36,4 +36,5 @@ aws-types = "0.53.0" aws-credential-types = "0.53.0" aws-sdk-s3 = "0.23.0" aws-sdk-sqs = "0.23.0" -tracing-subscriber = "0.2.4" \ No newline at end of file +tracing-subscriber = "0.2.4" +unescape = "0.1.0" diff --git a/indexer/queryapi_coordinator/src/indexer_registry.rs b/indexer/queryapi_coordinator/src/indexer_registry.rs index 291291da1..868613917 100644 --- a/indexer/queryapi_coordinator/src/indexer_registry.rs +++ b/indexer/queryapi_coordinator/src/indexer_registry.rs @@ -10,6 +10,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use tokio::sync::MutexGuard; use tokio::task::JoinHandle; +use unescape::unescape; use crate::indexer_reducer; use crate::indexer_reducer::FunctionCallInfo; @@ -130,7 +131,6 @@ fn index_and_process_register_calls( let new_indexer_function = build_indexer_function_from_args( parse_indexer_function_args(&update), update.signer_id, - ®istry_calls_rule, ); match new_indexer_function { @@ -260,7 +260,6 @@ fn build_function_invocation_from_args( fn build_indexer_function_from_args( args: Option, signer_id: String, - indexer_rule: &IndexerRule, ) -> Option { match args { None => None, @@ -272,18 +271,44 @@ fn build_indexer_function_from_args( let function_name = args["function_name"].as_str(); match function_name { None => { - tracing::error!( + tracing::warn!( "Unable to parse function_name from indexer function: {:?}", &args ); return None; } - Some(function_name) => build_indexer_function( - &args, - function_name.to_string(), - account_id, - indexer_rule, - ), + Some(function_name) => { + match unescape(&args["filter_json"].to_string()) { + Some(filter_string) => { + let filter_json_strip_quotes = &filter_string[1..filter_string.len() - 1]; + match serde_json::from_str(&filter_json_strip_quotes) { + Ok(filter_json) => match serde_json::from_value(filter_json) { + Ok(indexer_rule) => build_indexer_function( + &args, + function_name.to_string(), + account_id, + &indexer_rule, + ), + Err(e) => { + tracing::warn!("Error parsing filter into indexer_rule for account {} function {}: {}, {}", account_id, function_name, e, filter_string); + None + } + }, + Err(e) => { + tracing::warn!("Error parsing indexer_rule filter for account {} function {}: {}, {}", account_id, function_name, e, filter_string); + None + } + } + }, + None => { + tracing::warn!( + "Unable to unescape filter_json from registration args: {:?}", + &args + ); + None + } + } + } } } } @@ -291,7 +316,6 @@ fn build_indexer_function_from_args( fn parse_indexer_function_args(update: &FunctionCallInfo) -> Option { if let Ok(mut args_json) = serde_json::from_str(&update.args) { - escape_json(&mut args_json); return Some(args_json); } else { tracing::error!( diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index d4b2e3da1..328598ba9 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -169,6 +169,14 @@ async fn handle_streamer_message( let mut indexer_function_messages: Vec = Vec::new(); for indexer_rule_match in indexer_rule_matches.iter() { + tracing::debug!( + target: INDEXER, + "Matched filter {:?} for function {} {}", + indexer_function.indexer_rule.matching_rule, + indexer_function.account_id, + indexer_function.function_name, + ); + let msg = IndexerQueueMessage { chain_id: indexer_rule_match.chain_id.clone(), indexer_rule_id: indexer_rule_match.indexer_rule_id.unwrap_or(0), @@ -189,11 +197,11 @@ async fn handle_streamer_message( stream::iter(indexer_function_messages.into_iter()) .chunks(10) - .for_each(|alert_queue_messages_batch| async { + .for_each(|indexer_queue_messages_batch| async { match opts::send_to_indexer_queue( context.queue_client, context.queue_url.to_string(), - alert_queue_messages_batch, + indexer_queue_messages_batch, ) .await { diff --git a/indexer/queryapi_coordinator/src/opts.rs b/indexer/queryapi_coordinator/src/opts.rs index f61ff941a..84c59c630 100644 --- a/indexer/queryapi_coordinator/src/opts.rs +++ b/indexer/queryapi_coordinator/src/opts.rs @@ -227,10 +227,6 @@ pub async fn send_to_indexer_queue( queue_url: String, indexer_queue_messages: Vec, ) -> anyhow::Result<()> { - tracing::info!( - target: "queryapi_coordinator", - "Sending indexer tasks to the queue: {queue_url}", - ); let message_bodies: Vec = indexer_queue_messages .into_iter()