Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Various Control Plane migration fixes #552

Merged
merged 6 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ async fn synchronise_block_streams(
indexer_config.start_block_height
{
start_block_height
} else if let Ok(last_published_block) = redis_client
} else if let Ok(Some(last_published_block)) = redis_client
.get::<String, u64>(format!(
"{}:last_published_block",
indexer_config.get_full_name()
Expand Down Expand Up @@ -476,7 +476,7 @@ mod tests {
let mut redis_client = RedisClient::default();
redis_client
.expect_get::<String, u64>()
.returning(|_| Ok(500));
.returning(|_| Ok(Some(500)));

let mut block_stream_handler = BlockStreamsHandler::default();
block_stream_handler.expect_list().returning(|| Ok(vec![]));
Expand Down Expand Up @@ -530,7 +530,7 @@ mod tests {
let mut redis_client = RedisClient::default();
redis_client
.expect_get::<String, u64>()
.returning(|_| Ok(500));
.returning(|_| Ok(Some(500)));

let mut block_stream_handler = BlockStreamsHandler::default();
block_stream_handler.expect_list().returning(|| {
Expand Down
39 changes: 35 additions & 4 deletions coordinator/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@ pub struct AllowlistEntry {
v1_ack: bool,
migrated: bool,
failed: bool,
v2_control: bool,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have to set the migrated flag, this allows us to still prevent V2 from taking control. Providing time to sanity check the migration itself.

}

pub type Allowlist = Vec<AllowlistEntry>;

pub async fn fetch_allowlist(redis_client: &RedisClient) -> anyhow::Result<Allowlist> {
let raw_allowlist: String = redis_client.get(RedisClient::ALLOWLIST).await?;
let raw_allowlist: String = redis_client
.get(RedisClient::ALLOWLIST)
.await?
.ok_or(anyhow::anyhow!("Allowlist doesn't exist"))?;

serde_json::from_str(&raw_allowlist).context("Failed to parse allowlist")
}

Expand All @@ -31,7 +36,11 @@ pub async fn filter_registry_by_allowlist(
.into_iter()
.filter(|(account_id, _)| {
allowlist.iter().any(|entry| {
entry.account_id == *account_id && entry.v1_ack && entry.migrated && !entry.failed
entry.account_id == *account_id
&& entry.v1_ack
&& entry.migrated
&& !entry.failed
&& entry.v2_control
})
})
.collect();
Expand Down Expand Up @@ -104,8 +113,7 @@ async fn migrate_account(
.context("Failed to merge streams")?;
}

// TODO Uncomment when V2 correctly continues from V1 stop point
// set_migrated_flag(redis_client, account_id)?;
set_migrated_flag(redis_client, account_id)?;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to set the migrated flag to prevent further attempts to migrate


tracing::info!("Finished migrating {}", account_id);

Expand All @@ -125,6 +133,9 @@ async fn remove_from_streams_set(
)
.await?
.is_some()
&& redis_client
.exists(indexer_config.get_historical_redis_stream())
.await?
{
result.push(indexer_config.get_historical_redis_stream());
}
Expand All @@ -136,6 +147,9 @@ async fn remove_from_streams_set(
)
.await?
.is_some()
&& redis_client
.exists(indexer_config.get_real_time_redis_stream())
.await?
{
result.push(indexer_config.get_real_time_redis_stream());
};
Expand Down Expand Up @@ -303,6 +317,7 @@ mod tests {
v1_ack: true,
migrated: true,
failed: false,
v2_control: false,
}];

let redis_client = RedisClient::default();
Expand All @@ -327,6 +342,7 @@ mod tests {
v1_ack: true,
migrated: true,
failed: false,
v2_control: false,
}];

let redis_client = RedisClient::default();
Expand Down Expand Up @@ -374,6 +390,7 @@ mod tests {
v1_ack: true,
migrated: false,
failed: false,
v2_control: false,
}];

let mut redis_client = RedisClient::default();
Expand All @@ -393,6 +410,20 @@ mod tests {
)
.returning(|_, _| Ok(Some(())))
.once();
redis_client
.expect_exists::<String>()
.with(predicate::eq(String::from(
"morgs.near/test:historical:stream",
)))
.returning(|_| Ok(true))
.once();
redis_client
.expect_exists::<String>()
.with(predicate::eq(String::from(
"morgs.near/test:real_time:stream",
)))
.returning(|_| Ok(true))
.once();
redis_client
.expect_rename::<String, String>()
.with(
Expand Down
55 changes: 42 additions & 13 deletions coordinator/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,17 @@ impl RedisClientImpl {
})
}

pub async fn get<T, U>(&self, key: T) -> anyhow::Result<U>
pub async fn get<T, U>(&self, key: T) -> anyhow::Result<Option<U>>
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not actually directly related, but quite a nice change to have

where
T: ToRedisArgs + Debug + 'static,
T: ToRedisArgs + Debug + Send + Sync + 'static,
U: FromRedisValue + Debug + 'static,
{
let value = redis::cmd("GET")
.arg(&key)
.query_async(&mut self.connection.clone())
let value: Option<U> = self
.connection
.clone()
.get(&key)
.await
.map_err(|e| anyhow::format_err!(e))?;
.context(format!("GET: {key:?}"))?;

tracing::debug!("GET: {:?}={:?}", key, value);

Expand All @@ -57,7 +58,11 @@ impl RedisClientImpl {
{
tracing::debug!("RENAME: {:?} -> {:?}", old_key, new_key);

self.connection.clone().rename(old_key, new_key).await?;
self.connection
.clone()
.rename(&old_key, &new_key)
.await
.context(format!("RENAME: {old_key:?} {new_key:?}"))?;

Ok(())
}
Expand All @@ -69,11 +74,12 @@ impl RedisClientImpl {
{
tracing::debug!("SREM: {:?}={:?}", key, value);

match self.connection.clone().srem(key, value).await {
match self.connection.clone().srem(&key, &value).await {
Ok(1) => Ok(Some(())),
Ok(_) => Ok(None),
Err(e) => Err(anyhow::format_err!(e)),
}
.context(format!("SREM: {key:?} {value:?}"))
}

pub async fn xread<K, V>(
Expand All @@ -92,11 +98,12 @@ impl RedisClientImpl {
.connection
.clone()
.xread_options(
&[key],
&[start_id],
&[&key],
&[&start_id],
&streams::StreamReadOptions::default().count(count),
)
.await?;
.await
.context(format!("XREAD {key:?} {start_id:?} {count:?}"))?;

if results.keys.is_empty() {
return Ok([].to_vec());
Expand All @@ -112,7 +119,11 @@ impl RedisClientImpl {
{
tracing::debug!("XADD: {:?} {:?} {:?}", key, "*", fields);

self.connection.clone().xadd(key, "*", fields).await?;
self.connection
.clone()
.xadd(&key, "*", fields)
.await
.context(format!("XADD {key:?} {fields:?}"))?;

Ok(())
}
Expand All @@ -124,11 +135,29 @@ impl RedisClientImpl {
{
tracing::debug!("XDEL: {:?} {:?}", key, id);

self.connection.clone().xdel(key, &[id]).await?;
self.connection
.clone()
.xdel(&key, &[&id])
.await
.context(format!("XDEL {key:?} {id:?}"))?;

Ok(())
}

pub async fn exists<K>(&self, key: K) -> anyhow::Result<bool>
where
K: ToRedisArgs + Debug + Send + Sync + 'static,
{
tracing::debug!("EXISTS {key:?}");

self.connection
.clone()
.exists(&key)
.await
.map_err(|e| anyhow::format_err!(e))
.context(format!("EXISTS {key:?}"))
}

// `redis::transaction`s currently don't work with async connections, so we have to create a _new_
// blocking connection to atmoically update a value.
pub fn atomic_update<K, O, N, F>(&self, key: K, update_fn: F) -> anyhow::Result<()>
Expand Down
3 changes: 3 additions & 0 deletions indexer/queryapi_coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ pub(crate) struct QueryApiContext<'a> {
struct DenylistEntry {
account_id: AccountId,
v1_ack: bool,
migrated: bool,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can still be deserialized without these fields, but it means they will be removed when serializing again.

failed: bool,
v2_control: bool,
}

type Denylist = Vec<DenylistEntry>;
Expand Down
Loading