diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 25dc9fc49..e308f1613 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -198,7 +198,7 @@ async fn synchronise_block_streams( indexer_config.start_block_height { start_block_height - } else if let Ok(last_published_block) = redis_client + } else if let Ok(Some(last_published_block)) = redis_client .get::(format!( "{}:last_published_block", indexer_config.get_full_name() @@ -476,7 +476,7 @@ mod tests { let mut redis_client = RedisClient::default(); redis_client .expect_get::() - .returning(|_| Ok(500)); + .returning(|_| Ok(Some(500))); let mut block_stream_handler = BlockStreamsHandler::default(); block_stream_handler.expect_list().returning(|| Ok(vec![])); @@ -530,7 +530,7 @@ mod tests { let mut redis_client = RedisClient::default(); redis_client .expect_get::() - .returning(|_| Ok(500)); + .returning(|_| Ok(Some(500))); let mut block_stream_handler = BlockStreamsHandler::default(); block_stream_handler.expect_list().returning(|| { diff --git a/coordinator/src/migration.rs b/coordinator/src/migration.rs index d45763f02..38575b23b 100644 --- a/coordinator/src/migration.rs +++ b/coordinator/src/migration.rs @@ -14,12 +14,17 @@ pub struct AllowlistEntry { v1_ack: bool, migrated: bool, failed: bool, + v2_control: bool, } pub type Allowlist = Vec; pub async fn fetch_allowlist(redis_client: &RedisClient) -> anyhow::Result { - let raw_allowlist: String = redis_client.get(RedisClient::ALLOWLIST).await?; + let raw_allowlist: String = redis_client + .get(RedisClient::ALLOWLIST) + .await? + .ok_or(anyhow::anyhow!("Allowlist doesn't exist"))?; + serde_json::from_str(&raw_allowlist).context("Failed to parse allowlist") } @@ -31,7 +36,11 @@ pub async fn filter_registry_by_allowlist( .into_iter() .filter(|(account_id, _)| { allowlist.iter().any(|entry| { - entry.account_id == *account_id && entry.v1_ack && entry.migrated && !entry.failed + entry.account_id == *account_id + && entry.v1_ack + && entry.migrated + && !entry.failed + && entry.v2_control }) }) .collect(); @@ -104,8 +113,7 @@ async fn migrate_account( .context("Failed to merge streams")?; } - // TODO Uncomment when V2 correctly continues from V1 stop point - // set_migrated_flag(redis_client, account_id)?; + set_migrated_flag(redis_client, account_id)?; tracing::info!("Finished migrating {}", account_id); @@ -125,6 +133,9 @@ async fn remove_from_streams_set( ) .await? .is_some() + && redis_client + .exists(indexer_config.get_historical_redis_stream()) + .await? { result.push(indexer_config.get_historical_redis_stream()); } @@ -136,6 +147,9 @@ async fn remove_from_streams_set( ) .await? .is_some() + && redis_client + .exists(indexer_config.get_real_time_redis_stream()) + .await? { result.push(indexer_config.get_real_time_redis_stream()); }; @@ -303,6 +317,7 @@ mod tests { v1_ack: true, migrated: true, failed: false, + v2_control: false, }]; let redis_client = RedisClient::default(); @@ -327,6 +342,7 @@ mod tests { v1_ack: true, migrated: true, failed: false, + v2_control: false, }]; let redis_client = RedisClient::default(); @@ -374,6 +390,7 @@ mod tests { v1_ack: true, migrated: false, failed: false, + v2_control: false, }]; let mut redis_client = RedisClient::default(); @@ -393,6 +410,20 @@ mod tests { ) .returning(|_, _| Ok(Some(()))) .once(); + redis_client + .expect_exists::() + .with(predicate::eq(String::from( + "morgs.near/test:historical:stream", + ))) + .returning(|_| Ok(true)) + .once(); + redis_client + .expect_exists::() + .with(predicate::eq(String::from( + "morgs.near/test:real_time:stream", + ))) + .returning(|_| Ok(true)) + .once(); redis_client .expect_rename::() .with( diff --git a/coordinator/src/redis.rs b/coordinator/src/redis.rs index c2df6d826..7780b582e 100644 --- a/coordinator/src/redis.rs +++ b/coordinator/src/redis.rs @@ -34,16 +34,17 @@ impl RedisClientImpl { }) } - pub async fn get(&self, key: T) -> anyhow::Result + pub async fn get(&self, key: T) -> anyhow::Result> where - T: ToRedisArgs + Debug + 'static, + T: ToRedisArgs + Debug + Send + Sync + 'static, U: FromRedisValue + Debug + 'static, { - let value = redis::cmd("GET") - .arg(&key) - .query_async(&mut self.connection.clone()) + let value: Option = self + .connection + .clone() + .get(&key) .await - .map_err(|e| anyhow::format_err!(e))?; + .context(format!("GET: {key:?}"))?; tracing::debug!("GET: {:?}={:?}", key, value); @@ -57,7 +58,11 @@ impl RedisClientImpl { { tracing::debug!("RENAME: {:?} -> {:?}", old_key, new_key); - self.connection.clone().rename(old_key, new_key).await?; + self.connection + .clone() + .rename(&old_key, &new_key) + .await + .context(format!("RENAME: {old_key:?} {new_key:?}"))?; Ok(()) } @@ -69,11 +74,12 @@ impl RedisClientImpl { { tracing::debug!("SREM: {:?}={:?}", key, value); - match self.connection.clone().srem(key, value).await { + match self.connection.clone().srem(&key, &value).await { Ok(1) => Ok(Some(())), Ok(_) => Ok(None), Err(e) => Err(anyhow::format_err!(e)), } + .context(format!("SREM: {key:?} {value:?}")) } pub async fn xread( @@ -92,11 +98,12 @@ impl RedisClientImpl { .connection .clone() .xread_options( - &[key], - &[start_id], + &[&key], + &[&start_id], &streams::StreamReadOptions::default().count(count), ) - .await?; + .await + .context(format!("XREAD {key:?} {start_id:?} {count:?}"))?; if results.keys.is_empty() { return Ok([].to_vec()); @@ -112,7 +119,11 @@ impl RedisClientImpl { { tracing::debug!("XADD: {:?} {:?} {:?}", key, "*", fields); - self.connection.clone().xadd(key, "*", fields).await?; + self.connection + .clone() + .xadd(&key, "*", fields) + .await + .context(format!("XADD {key:?} {fields:?}"))?; Ok(()) } @@ -124,11 +135,29 @@ impl RedisClientImpl { { tracing::debug!("XDEL: {:?} {:?}", key, id); - self.connection.clone().xdel(key, &[id]).await?; + self.connection + .clone() + .xdel(&key, &[&id]) + .await + .context(format!("XDEL {key:?} {id:?}"))?; Ok(()) } + pub async fn exists(&self, key: K) -> anyhow::Result + where + K: ToRedisArgs + Debug + Send + Sync + 'static, + { + tracing::debug!("EXISTS {key:?}"); + + self.connection + .clone() + .exists(&key) + .await + .map_err(|e| anyhow::format_err!(e)) + .context(format!("EXISTS {key:?}")) + } + // `redis::transaction`s currently don't work with async connections, so we have to create a _new_ // blocking connection to atmoically update a value. pub fn atomic_update(&self, key: K, update_fn: F) -> anyhow::Result<()> diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index 96511f544..dbb08ba85 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -47,6 +47,9 @@ pub(crate) struct QueryApiContext<'a> { struct DenylistEntry { account_id: AccountId, v1_ack: bool, + migrated: bool, + failed: bool, + v2_control: bool, } type Denylist = Vec;