Skip to content

Commit

Permalink
IndexerRule parsing of registration function calls, minor logging cle…
Browse files Browse the repository at this point in the history
…anup. (#78)
  • Loading branch information
gabehamilton authored May 25, 2023
1 parent f0971ad commit 544e423
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 17 deletions.
7 changes: 7 additions & 0 deletions indexer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion indexer/queryapi_coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ aws-types = "0.53.0"
aws-credential-types = "0.53.0"
aws-sdk-s3 = "0.23.0"
aws-sdk-sqs = "0.23.0"
tracing-subscriber = "0.2.4"
tracing-subscriber = "0.2.4"
unescape = "0.1.0"
44 changes: 34 additions & 10 deletions indexer/queryapi_coordinator/src/indexer_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use tokio::sync::MutexGuard;
use tokio::task::JoinHandle;
use unescape::unescape;

use crate::indexer_reducer;
use crate::indexer_reducer::FunctionCallInfo;
Expand Down Expand Up @@ -130,7 +131,6 @@ fn index_and_process_register_calls(
let new_indexer_function = build_indexer_function_from_args(
parse_indexer_function_args(&update),
update.signer_id,
&registry_calls_rule,
);

match new_indexer_function {
Expand Down Expand Up @@ -260,7 +260,6 @@ fn build_function_invocation_from_args(
fn build_indexer_function_from_args(
args: Option<Value>,
signer_id: String,
indexer_rule: &IndexerRule,
) -> Option<IndexerFunction> {
match args {
None => None,
Expand All @@ -272,26 +271,51 @@ fn build_indexer_function_from_args(
let function_name = args["function_name"].as_str();
match function_name {
None => {
tracing::error!(
tracing::warn!(
"Unable to parse function_name from indexer function: {:?}",
&args
);
return None;
}
Some(function_name) => build_indexer_function(
&args,
function_name.to_string(),
account_id,
indexer_rule,
),
Some(function_name) => {
match unescape(&args["filter_json"].to_string()) {
Some(filter_string) => {
let filter_json_strip_quotes = &filter_string[1..filter_string.len() - 1];
match serde_json::from_str(&filter_json_strip_quotes) {
Ok(filter_json) => match serde_json::from_value(filter_json) {
Ok(indexer_rule) => build_indexer_function(
&args,
function_name.to_string(),
account_id,
&indexer_rule,
),
Err(e) => {
tracing::warn!("Error parsing filter into indexer_rule for account {} function {}: {}, {}", account_id, function_name, e, filter_string);
None
}
},
Err(e) => {
tracing::warn!("Error parsing indexer_rule filter for account {} function {}: {}, {}", account_id, function_name, e, filter_string);
None
}
}
},
None => {
tracing::warn!(
"Unable to unescape filter_json from registration args: {:?}",
&args
);
None
}
}
}
}
}
}
}

fn parse_indexer_function_args(update: &FunctionCallInfo) -> Option<Value> {
if let Ok(mut args_json) = serde_json::from_str(&update.args) {
escape_json(&mut args_json);
return Some(args_json);
} else {
tracing::error!(
Expand Down
12 changes: 10 additions & 2 deletions indexer/queryapi_coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ async fn handle_streamer_message(
let mut indexer_function_messages: Vec<IndexerQueueMessage> = Vec::new();

for indexer_rule_match in indexer_rule_matches.iter() {
tracing::debug!(
target: INDEXER,
"Matched filter {:?} for function {} {}",
indexer_function.indexer_rule.matching_rule,
indexer_function.account_id,
indexer_function.function_name,
);

let msg = IndexerQueueMessage {
chain_id: indexer_rule_match.chain_id.clone(),
indexer_rule_id: indexer_rule_match.indexer_rule_id.unwrap_or(0),
Expand All @@ -189,11 +197,11 @@ async fn handle_streamer_message(

stream::iter(indexer_function_messages.into_iter())
.chunks(10)
.for_each(|alert_queue_messages_batch| async {
.for_each(|indexer_queue_messages_batch| async {
match opts::send_to_indexer_queue(
context.queue_client,
context.queue_url.to_string(),
alert_queue_messages_batch,
indexer_queue_messages_batch,
)
.await
{
Expand Down
4 changes: 0 additions & 4 deletions indexer/queryapi_coordinator/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,6 @@ pub async fn send_to_indexer_queue(
queue_url: String,
indexer_queue_messages: Vec<IndexerQueueMessage>,
) -> anyhow::Result<()> {
tracing::info!(
target: "queryapi_coordinator",
"Sending indexer tasks to the queue: {queue_url}",
);

let message_bodies: Vec<SendMessageBatchRequestEntry> = indexer_queue_messages
.into_iter()
Expand Down

0 comments on commit 544e423

Please sign in to comment.