Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store Real Time Streamer Messages in Redis #241

Merged
merged 11 commits into from
Oct 5, 2023
4 changes: 2 additions & 2 deletions block-server/handler.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';
const AWS = require('aws-sdk');
const S3= new AWS.S3();
import { S3 } from '@aws-sdk/client-s3';
const S3 = new S3();

const NETWORK = process.env.NETWORK || 'mainnet';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ pub(crate) async fn process_historical_messages(
redis_connection_manager,
storage::generate_historical_storage_key(&indexer_function.get_full_name()),
serde_json::to_string(&indexer_function)?,
None,
)
.await?;
}
Expand Down
13 changes: 12 additions & 1 deletion indexer/queryapi_coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ use tokio::sync::{Mutex, MutexGuard};
use indexer_rules_engine::types::indexer_rule_match::{ChainId, IndexerRuleMatch};
use near_lake_framework::near_indexer_primitives::types::{AccountId, BlockHeight};
use near_lake_framework::near_indexer_primitives::{types, StreamerMessage};
use utils::serialize_to_camel_case_json_string;

use crate::indexer_types::IndexerFunction;
use indexer_types::{IndexerQueueMessage, IndexerRegistry};
use opts::{Opts, Parser};
use storage::{self, ConnectionManager};
use storage::{self, generate_real_time_streamer_message_key, ConnectionManager};

mod historical_block_processing;
mod indexer_reducer;
Expand Down Expand Up @@ -146,6 +147,15 @@ async fn handle_streamer_message(

let block_height: BlockHeight = context.streamer_message.block.header.height;

// Cache streamer message block and shards for use in real time processing
storage::set(
context.redis_connection_manager,
generate_real_time_streamer_message_key(block_height),
&serialize_to_camel_case_json_string(&context.streamer_message)?,
Some(60),
)
.await?;

let spawned_indexers = indexer_registry::index_registry_changes(
block_height,
&mut indexer_registry_locked,
Expand Down Expand Up @@ -206,6 +216,7 @@ async fn handle_streamer_message(
context.redis_connection_manager,
storage::generate_real_time_storage_key(&indexer_function.get_full_name()),
serde_json::to_string(indexer_function)?,
None,
)
.await?;
storage::xadd(
Expand Down
51 changes: 51 additions & 0 deletions indexer/queryapi_coordinator/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use serde_json::Value;

pub(crate) async fn stats(redis_connection_manager: storage::ConnectionManager) {
let interval_secs = 10;
let mut previous_processed_blocks: u64 =
Expand Down Expand Up @@ -49,3 +51,52 @@ pub(crate) async fn stats(redis_connection_manager: storage::ConnectionManager)
tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await;
}
}

pub(crate) fn serialize_to_camel_case_json_string(
streamer_message: &near_lake_framework::near_indexer_primitives::StreamerMessage,
) -> anyhow::Result<String, serde_json::Error> {
// Serialize the Message object to a JSON string
let json_str = serde_json::to_string(&streamer_message)?;

// Deserialize the JSON string to a Value Object
let mut message_value: Value = serde_json::from_str(&json_str)?;

// Convert keys to Camel Case
to_camel_case_keys(&mut message_value);

return serde_json::to_string(&message_value);
}

fn to_camel_case_keys(message_value: &mut Value) {
// Only process if subfield contains objects
match message_value {
Value::Object(map) => {
for key in map.keys().cloned().collect::<Vec<String>>() {
// Generate Camel Case Key
let new_key = key
.split("_")
.enumerate()
.map(|(i, str)| {
if i > 0 {
return str[..1].to_uppercase() + &str[1..];
}
return str.to_owned();
})
.collect::<Vec<String>>()
.join("");

// Recursively process inner fields and update map with new key
if let Some(mut val) = map.remove(&key) {
to_camel_case_keys(&mut val);
map.insert(new_key, val);
}
}
}
Value::Array(vec) => {
for val in vec {
to_camel_case_keys(val);
}
}
_ => {}
}
}
31 changes: 24 additions & 7 deletions indexer/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs};

const STORAGE: &str = "storage_alertexer";

pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-";
pub const STREAMS_SET_KEY: &str = "streams";

pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client {
Expand All @@ -12,6 +13,10 @@ pub fn generate_real_time_stream_key(prefix: &str) -> String {
format!("{}:real_time:stream", prefix)
}

pub fn generate_real_time_streamer_message_key(block_height: u64) -> String {
format!("streamer:message:{}", block_height)
}

pub fn generate_real_time_storage_key(prefix: &str) -> String {
format!("{}:real_time:stream:storage", prefix)
}
Expand Down Expand Up @@ -47,13 +52,19 @@ pub async fn set(
redis_connection_manager: &ConnectionManager,
key: impl ToRedisArgs + std::fmt::Debug,
value: impl ToRedisArgs + std::fmt::Debug,
expiration_seconds: Option<usize>,
) -> anyhow::Result<()> {
redis::cmd("SET")
.arg(&key)
.arg(&value)
.query_async(&mut redis_connection_manager.clone())
let mut cmd = redis::cmd("SET");
cmd.arg(&key).arg(&value);

// Add expiration arguments if present
if let Some(expiration_seconds) = expiration_seconds {
cmd.arg("EX").arg(expiration_seconds);
}

cmd.query_async(&mut redis_connection_manager.clone())
.await?;
tracing::debug!(target: STORAGE, "SET: {:?}: {:?}", key, value,);
tracing::debug!(target: STORAGE, "SET: {:?}: {:?} Ex: {:?}", key, value, expiration_seconds);
Ok(())
}

Expand Down Expand Up @@ -113,7 +124,7 @@ pub async fn push_receipt_to_watching_list(
receipt_id: &str,
cache_value: &[u8],
) -> anyhow::Result<()> {
set(redis_connection_manager, receipt_id, cache_value).await?;
set(redis_connection_manager, receipt_id, cache_value, None).await?;
// redis::cmd("INCR")
// .arg(format!("receipts_{}", transaction_hash))
// .query_async(&mut redis_connection_manager.clone())
Expand Down Expand Up @@ -161,7 +172,13 @@ pub async fn update_last_indexed_block(
redis_connection_manager: &ConnectionManager,
block_height: u64,
) -> anyhow::Result<()> {
set(redis_connection_manager, "last_indexed_block", block_height).await?;
set(
redis_connection_manager,
"last_indexed_block",
block_height,
None,
)
.await?;
redis::cmd("INCR")
.arg("blocks_processed")
.query_async(&mut redis_connection_manager.clone())
Expand Down
2 changes: 1 addition & 1 deletion runner/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
"typescript": "^5.1.6"
},
"dependencies": {
"@aws-sdk/client-s3": "^3.414.0",
darunrs marked this conversation as resolved.
Show resolved Hide resolved
"@near-lake/primitives": "^0.1.0",
"aws-sdk": "^2.1402.0",
"express": "^4.18.2",
"node-fetch": "^2.6.11",
"node-sql-parser": "^4.10.0",
Expand Down
Loading