Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPLT-929 historical filtering #81

Merged
merged 12 commits into from
Jun 2, 2023
Merged

Conversation

gabehamilton
Copy link
Collaborator

@gabehamilton gabehamilton commented May 30, 2023

Previous historical filtering added SQS messages for all blocks since start_block_height, up to 3600 blocks in the past.

This PR handles historical filtering for IndexerFunctions with an IndexerRule that matches Actions via an affected_account_id filter. First matching index files are retrieved from S3 and messages for the the blocks they list are added to the SQS queue. Second, blocks since the last block found in an index file are filtered, where each block is fetched from S3 and processed for matches.

Because historical processing occurs in a spawned thread, once the block is fetched the thread state can no longer be persisted (it is no longer Send), thus the following code must be synchronous (outcomes_reducer_sync).

More detailed error handling to come in https://pagodaplatform.atlassian.net/browse/DPLT-1012 after reviewing runtime errors and deciding what handling is appropriate.

@gabehamilton gabehamilton marked this pull request as draft May 30, 2023 15:37
@gabehamilton gabehamilton marked this pull request as ready for review May 30, 2023 21:28
None
}
},
Some(function_name) => match unescape(&args["filter_json"].to_string()) {
Copy link
Collaborator Author

@gabehamilton gabehamilton May 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block is cargo fmt changes.

}
}

// #[tokio::test]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Working on some new tests in another PR.

indexer_function: indexer_function.clone(),
};

match opts::send_to_indexer_queue(queue_client, queue_url, vec![msg]).await {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be quite slow sending all these requests/messages individually, I wonder if there is a batch API?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, send_to_indexer_queue feels out of place in opts, why does it live there?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batch is a good idea, looks like it accepts up to 10 at once.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that send_to_indexer_queue should be separated. It's an artifact of Opts originally being shared code between alertexer queue handlers. It is likely to be shared again when indexer-js-queue-handler moves to rust. At that point I think send_to_indexer_queue can move into a shared aws related module.

@@ -0,0 +1,115 @@
use futures::future::try_join_all;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just add reduce_indexer_rule_matches_from_outcomes_sync and build_indexer_rule_match_sync to outcomes_reducer.rs rather than creating an entirely new file? They are basically the same


pub fn spawn_historical_message_thread(
block_height: BlockHeight,
new_indexer_function: &mut IndexerFunction,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
new_indexer_function: &mut IndexerFunction,
new_indexer_function: &IndexerFunction,

I don't think this needs to be mutable?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for the provisioning flag.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you're right, I was thinking of a different spot.

}
}

async fn fetch_text_file_from_s3(s3_bucket: &str, key: String, s3_client: S3Client) -> String {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the multiple levels of match make things really hard to read here. We can flatten this by using early returns like so:

async fn fetch_text_file_from_s3(s3_bucket: &str, key: String, s3_client: S3Client) -> String {
    let get_object_output = s3_client
        .get_object()
        .bucket(s3_bucket)
        .key(key.clone())
        .send()
        .await;

    if get_object_output.is_err() {
        tracing::error!(target: crate::INDEXER, "Error fetching S3 file {}: {:?}", key.clone(), get_object_output.err());
        return "".to_string();
    }

    let get_object_output = get_object_output.unwrap();

    let bytes = get_object_output.body.collect().await;
    if bytes.is_err() {
        tracing::error!(target: crate::INDEXER, "Error fetching index file {}: {:?}", key.clone(), bytes.err());
        return "".to_string();
    }

    let bytes = bytes.unwrap();

    let file_contents = String::from_utf8(bytes.to_vec());
    if file_contents.is_err() {
        tracing::error!(target: crate::INDEXER, "Error parsing index file {}: {:?}", key.clone(), file_contents.err());
        return "".to_string();
    }

    let file_contents = file_contents.unwrap();

    tracing::debug!(target: crate::INDEXER, "Fetched S3 file {}", key.clone(),);

    file_contents
}

Not asking you to refactor things in this PR, but we keep it in mind for future PRs please 🙏🏽

.collect::<Vec<u64>>()
}

async fn filter_matching_blocks_manually(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could add additional metadata to the index files, i.e. the method names, to avoid us having to pull down every block to inspect it. This seems very expensive to do here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The index files do have method names, that filter type isn't implemented in this PR.
However the "manual" filtering handles blocks between what is indexed and the latest block (when processing was spawned). We just added a new metadata file that tells us the latest indexed block, upcoming PR will use that. That will reduce our max manual filtering to an hour.

It's still an expensive operation.

We also repeat it on the execution side, pulling the block down again. Once we add data extraction to the indexing (pulling out the matching action for instance), we can put that data in the SQS message, avoiding the additional fetch for many IndexerFunctions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this needs a better method name though, filter_unindexed_blocks

@gabehamilton gabehamilton merged commit 45633ba into main Jun 2, 2023
@gabehamilton gabehamilton deleted the DPLT-929-historical-filtering branch June 2, 2023 17:44
@roshaans roshaans mentioned this pull request Jun 6, 2023
gabehamilton added a commit that referenced this pull request Jun 26, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants