Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set backend config from admin rpc #304

Merged
merged 7 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 88 additions & 49 deletions core/src/proxy/block_engine_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use {

const CONNECTION_TIMEOUT_S: u64 = 10;
const CONNECTION_BACKOFF_S: u64 = 5;
const CONFIG_BACKOFF_S: u64 = 30;
jedleggett marked this conversation as resolved.
Show resolved Hide resolved

#[derive(Default)]
struct BlockEngineStageStats {
Expand All @@ -72,13 +73,10 @@ pub struct BlockBuilderFeeInfo {
pub block_builder_commission: u64,
}

#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct BlockEngineConfig {
/// Address to the external auth-service responsible for generating access tokens.
pub auth_service_endpoint: Endpoint,

/// Primary backend endpoint.
pub backend_endpoint: Endpoint,
/// Block Engine URL
pub block_engine_url: String,
jedleggett marked this conversation as resolved.
Show resolved Hide resolved

/// If set then it will be assumed the backend verified packets so signature verification will be bypassed in the validator.
pub trust_packets: bool,
Expand All @@ -90,7 +88,7 @@ pub struct BlockEngineStage {

impl BlockEngineStage {
pub fn new(
block_engine_config: BlockEngineConfig,
block_engine_config: Arc<Mutex<BlockEngineConfig>>,
// Channel that bundles get piped through.
bundle_tx: Sender<Vec<PacketBundle>>,
// The keypair stored here is used to sign auth challenges.
Expand Down Expand Up @@ -137,7 +135,7 @@ impl BlockEngineStage {

#[allow(clippy::too_many_arguments)]
async fn start(
block_engine_config: BlockEngineConfig,
block_engine_config: Arc<Mutex<BlockEngineConfig>>,
cluster_info: Arc<ClusterInfo>,
bundle_tx: Sender<Vec<PacketBundle>>,
packet_tx: Sender<PacketBatch>,
Expand All @@ -147,10 +145,15 @@ impl BlockEngineStage {
) {
const CONNECTION_TIMEOUT: Duration = Duration::from_secs(CONNECTION_TIMEOUT_S);
const CONNECTION_BACKOFF: Duration = Duration::from_secs(CONNECTION_BACKOFF_S);
const CONFIG_BACKOFF: Duration = Duration::from_secs(CONFIG_BACKOFF_S);
let mut error_count: u64 = 0;

while !exit.load(Ordering::Relaxed) {
if let Err(e) = Self::connect_auth_and_stream(
// Wait until a valid config is supplied (either initially or by admin rpc)
// Use if!/else here to avoid extra CONFIG_BACKOFF wait on successful termination
if !Self::validate_block_engine_config(&block_engine_config.lock().unwrap()) {
sleep(CONFIG_BACKOFF).await;
} else if let Err(e) = Self::connect_auth_and_stream(
&block_engine_config,
&cluster_info,
&bundle_tx,
Expand Down Expand Up @@ -183,7 +186,7 @@ impl BlockEngineStage {
}

async fn connect_auth_and_stream(
block_engine_config: &BlockEngineConfig,
block_engine_config: &Arc<Mutex<BlockEngineConfig>>,
cluster_info: &Arc<ClusterInfo>,
bundle_tx: &Sender<Vec<PacketBundle>>,
packet_tx: &Sender<PacketBatch>,
Expand All @@ -194,18 +197,48 @@ impl BlockEngineStage {
) -> crate::proxy::Result<()> {
// Get a copy of configs here in case they have changed at runtime
let keypair = cluster_info.keypair().clone();
let local_config = &block_engine_config.lock().unwrap().clone();
jedleggett marked this conversation as resolved.
Show resolved Hide resolved

let mut auth_service_endpoint =
Endpoint::from_shared(local_config.block_engine_url.clone()).map_err(|_| {
ProxyError::AuthenticationConnectionError(
"invalid block engine url value".parse().unwrap(),
jedleggett marked this conversation as resolved.
Show resolved Hide resolved
)
})?;
let mut backend_endpoint = Endpoint::from_shared(local_config.block_engine_url.clone())
.map_err(|_| {
ProxyError::BlockEngineConnectionError(
"invalid block engine url value".parse().unwrap(),
jedleggett marked this conversation as resolved.
Show resolved Hide resolved
)
})?
.tcp_keepalive(Some(Duration::from_secs(60)));
jedleggett marked this conversation as resolved.
Show resolved Hide resolved

if local_config.block_engine_url.contains("https") {
jedleggett marked this conversation as resolved.
Show resolved Hide resolved
auth_service_endpoint = auth_service_endpoint
.tls_config(tonic::transport::ClientTlsConfig::new())
.map_err(|_| {
ProxyError::AuthenticationConnectionError(
"failed to set tls_config for block engine auth service"
.parse()
.unwrap(),
)
})?;
backend_endpoint = backend_endpoint
.tls_config(tonic::transport::ClientTlsConfig::new())
.map_err(|_| {
ProxyError::BlockEngineConnectionError(
"failed to set tls_config for block engine service"
.parse()
.unwrap(),
)
})?;
}

debug!(
"connecting to auth: {:?}",
block_engine_config.auth_service_endpoint.uri()
);
let auth_channel = timeout(
*connection_timeout,
block_engine_config.auth_service_endpoint.connect(),
)
.await
.map_err(|_| ProxyError::AuthenticationConnectionTimeout)?
.map_err(|e| ProxyError::AuthenticationConnectionError(e.to_string()))?;
debug!("connecting to auth: {:?}", &local_config.block_engine_url);
let auth_channel = timeout(*connection_timeout, auth_service_endpoint.connect())
.await
.map_err(|_| ProxyError::AuthenticationConnectionTimeout)?
.map_err(|e| ProxyError::AuthenticationConnectionError(e.to_string()))?;

let mut auth_client = AuthServiceClient::new(auth_channel);

Expand All @@ -219,25 +252,18 @@ impl BlockEngineStage {

datapoint_info!(
"block_engine_stage-tokens_generated",
(
"url",
block_engine_config.auth_service_endpoint.uri().to_string(),
String
),
("url", &local_config.block_engine_url, String),
("count", 1, i64),
);

debug!(
"connecting to block engine: {:?}",
jedleggett marked this conversation as resolved.
Show resolved Hide resolved
block_engine_config.backend_endpoint.uri()
&local_config.block_engine_url
);
let block_engine_channel = timeout(
*connection_timeout,
block_engine_config.backend_endpoint.connect(),
)
.await
.map_err(|_| ProxyError::BlockEngineConnectionTimeout)?
.map_err(|e| ProxyError::BlockEngineConnectionError(e.to_string()))?;
let block_engine_channel = timeout(*connection_timeout, backend_endpoint.connect())
.await
.map_err(|_| ProxyError::BlockEngineConnectionTimeout)?
.map_err(|e| ProxyError::BlockEngineConnectionError(e.to_string()))?;

let access_token = Arc::new(Mutex::new(access_token));
let block_engine_client = BlockEngineValidatorClient::with_interceptor(
Expand All @@ -246,13 +272,14 @@ impl BlockEngineStage {
);

Self::start_consuming_block_engine_bundles_and_packets(
&bundle_tx,
bundle_tx,
block_engine_client,
&packet_tx,
packet_tx,
local_config,
block_engine_config,
&verified_packet_tx,
&exit,
&block_builder_fee_info,
verified_packet_tx,
exit,
block_builder_fee_info,
auth_client,
access_token,
refresh_token,
Expand All @@ -263,11 +290,13 @@ impl BlockEngineStage {
.await
}

#[allow(clippy::too_many_arguments)]
async fn start_consuming_block_engine_bundles_and_packets(
bundle_tx: &Sender<Vec<PacketBundle>>,
mut client: BlockEngineValidatorClient<InterceptedService<Channel, AuthInterceptor>>,
packet_tx: &Sender<PacketBatch>,
block_engine_config: &BlockEngineConfig,
local_config: &BlockEngineConfig,
global_config: &Arc<Mutex<BlockEngineConfig>>,
verified_packet_tx: &Sender<(Vec<PacketBatch>, Option<SigverifyTracerPacketStats>)>,
exit: &Arc<AtomicBool>,
block_builder_fee_info: &Arc<Mutex<BlockBuilderFeeInfo>>,
Expand Down Expand Up @@ -317,7 +346,8 @@ impl BlockEngineStage {
(subscribe_bundles_stream, subscribe_packets_stream),
bundle_tx,
packet_tx,
block_engine_config,
local_config,
global_config,
verified_packet_tx,
exit,
block_builder_fee_info,
Expand All @@ -331,6 +361,7 @@ impl BlockEngineStage {
.await
}

#[allow(clippy::too_many_arguments)]
async fn consume_bundle_and_packet_stream(
mut client: BlockEngineValidatorClient<InterceptedService<Channel, AuthInterceptor>>,
(mut bundle_stream, mut packet_stream): (
Expand All @@ -339,7 +370,8 @@ impl BlockEngineStage {
),
bundle_tx: &Sender<Vec<PacketBundle>>,
packet_tx: &Sender<PacketBatch>,
block_engine_config: &BlockEngineConfig,
local_config: &BlockEngineConfig,
global_config: &Arc<Mutex<BlockEngineConfig>>,
jedleggett marked this conversation as resolved.
Show resolved Hide resolved
verified_packet_tx: &Sender<(Vec<PacketBatch>, Option<SigverifyTracerPacketStats>)>,
exit: &Arc<AtomicBool>,
block_builder_fee_info: &Arc<Mutex<BlockBuilderFeeInfo>>,
Expand All @@ -357,33 +389,36 @@ impl BlockEngineStage {
let mut num_full_refreshes: u64 = 1;
let mut num_refresh_access_token: u64 = 0;
let mut block_engine_stats = BlockEngineStageStats::default();
let mut metrics_tick = interval(METRICS_TICK);
let mut metrics_and_auth_tick = interval(METRICS_TICK);
let mut maintenance_tick = interval(MAINTENANCE_TICK);
let auth_uri_string = block_engine_config.auth_service_endpoint.uri().to_string();

info!("connected to packet and bundle stream");

while !exit.load(Ordering::Relaxed) {
tokio::select! {
maybe_msg = packet_stream.message() => {
let resp = maybe_msg?.ok_or(ProxyError::GrpcStreamDisconnected)?;
Self::handle_block_engine_packets(resp, packet_tx, verified_packet_tx, block_engine_config.trust_packets, &mut block_engine_stats)?;
Self::handle_block_engine_packets(resp, packet_tx, verified_packet_tx, local_config.trust_packets, &mut block_engine_stats)?;
}
maybe_bundles = bundle_stream.message() => {
Self::handle_block_engine_maybe_bundles(maybe_bundles, bundle_tx, &mut block_engine_stats)?;
}
_ = metrics_tick.tick() => {
_ = metrics_and_auth_tick.tick() => {
block_engine_stats.report();
block_engine_stats = BlockEngineStageStats::default();

if cluster_info.id() != keypair.pubkey() {
return Err(ProxyError::AuthenticationConnectionError("Validator ID Changed".to_string()));
}

if *global_config.lock().unwrap() != *local_config {
return Err(ProxyError::AuthenticationConnectionError("Block Engine Config Changed".to_string()));
jedleggett marked this conversation as resolved.
Show resolved Hide resolved
}

let (maybe_new_access, maybe_new_refresh) = maybe_refresh_auth_tokens(&mut auth_client,
&access_token,
&refresh_token,
&cluster_info,
cluster_info,
connection_timeout,
refresh_within_s,
).await?;
Expand All @@ -392,7 +427,7 @@ impl BlockEngineStage {
num_refresh_access_token += 1;
datapoint_info!(
"block_engine_stage-refresh_access_token",
("url", auth_uri_string, String),
("url", &local_config.block_engine_url, String),
("count", num_refresh_access_token, i64),
);
*access_token.lock().unwrap() = new_token;
Expand All @@ -401,7 +436,7 @@ impl BlockEngineStage {
num_full_refreshes += 1;
datapoint_info!(
"block_engine_stage-tokens_generated",
("url", auth_uri_string, String),
("url", &local_config.block_engine_url, String),
("count", num_full_refreshes, i64),
);
refresh_token = new_token;
Expand Down Expand Up @@ -500,4 +535,8 @@ impl BlockEngineStage {

Ok(())
}

fn validate_block_engine_config(config: &BlockEngineConfig) -> bool {
jedleggett marked this conversation as resolved.
Show resolved Hide resolved
!config.block_engine_url.is_empty()
}
}
Loading