Skip to content

Commit

Permalink
Simplify address resolution, spawn reseed as a Tokio task
Browse files Browse the repository at this point in the history
  • Loading branch information
swallez committed Aug 23, 2024
1 parent 8354172 commit 2f64bea
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 50 deletions.
2 changes: 1 addition & 1 deletion elasticsearch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ dyn-clone = "1"
lazy_static = "1"
percent-encoding = "2"
reqwest = { version = "0.12", default-features = false, features = ["gzip", "json"] }
regex="1"
url = "2"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
Expand All @@ -49,6 +48,7 @@ http = "1"
axum = "0.7"
hyper = { version = "1", features = ["server", "http1"] }
os_type = "2"
regex="1"
#sysinfo = "0.31"
textwrap = "0.16"
xml-rs = "0.8"
Expand Down
90 changes: 41 additions & 49 deletions elasticsearch/src/http/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use crate::{
use base64::{engine::general_purpose::STANDARD as BASE64_STANDARD, write::EncoderWriter, Engine};
use bytes::BytesMut;
use lazy_static::lazy_static;
use regex::Regex;
use serde::Serialize;
use serde_json::Value;
use std::{
Expand All @@ -49,7 +48,6 @@ use std::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, RwLock,
},
thread::spawn,
time::{Duration, Instant},
};
use url::Url;
Expand Down Expand Up @@ -104,10 +102,6 @@ impl fmt::Display for BuildError {

/// Default address to Elasticsearch running on `https://localhost:9200`
pub static DEFAULT_ADDRESS: &str = "https://localhost:9200";
lazy_static! {
static ref ADDRESS_REGEX: Regex =
Regex::new(r"((?P<fqdn>[^/]+)/)?(?P<ip>[^:]+|\[[\da-fA-F:\.]+\]):(?P<port>\d+)$").unwrap();
}

lazy_static! {
/// Client metadata header: service, language, transport, followed by additional information
Expand Down Expand Up @@ -459,6 +453,7 @@ impl Transport {
Ok(transport)
}

#[allow(clippy::too_many_arguments)]
fn request_builder<B, Q>(
&self,
connection: &Connection,
Expand Down Expand Up @@ -546,17 +541,18 @@ impl Transport {
return Err(crate::error::lib("Bound Address is empty"));
}

let matches = ADDRESS_REGEX
.captures(address)
.ok_or_else(|| crate::lib(format!("error parsing address into url: {}", address)))?;
let mut host_port = None;
if let Some((host, tail)) = address.split_once('/') {
if let Some((_, port)) = tail.rsplit_once(':') {
host_port = Some((host, port));
}
} else {
host_port = address.rsplit_once(':');
}

let host = matches
.name("fqdn")
.or_else(|| Some(matches.name("ip").unwrap()))
.unwrap()
.as_str()
.trim();
let port = matches.name("port").unwrap().as_str().trim();
let (host, port) = host_port.ok_or_else(|| {
crate::error::lib(format!("error parsing address into url: {}", address))
})?;

Ok(Url::parse(
format!("{}://{}:{}", scheme, host, port).as_str(),
Expand All @@ -577,10 +573,10 @@ impl Transport {
B: Body,
Q: Serialize + ?Sized,
{
// Threads will execute against old connection pool during reseed
// Requests will execute against old connection pool during reseed
if self.conn_pool.reseedable() {
let local_conn_pool = self.conn_pool.clone();
let connection = local_conn_pool.next();
let conn_pool = self.conn_pool.clone();
let connection = conn_pool.next();

// Build node info request
let node_request = self.request_builder(
Expand All @@ -593,34 +589,30 @@ impl Transport {
timeout,
)?;

spawn(move || {
// TODO: Log reseed failures
let rt = tokio::runtime::Runtime::new().expect("Cannot create tokio runtime");
rt.block_on(async {
let scheme = connection.url.scheme();
let resp = node_request.send().await.unwrap();
let json: Value = resp.json().await.unwrap();
let connections: Vec<Connection> = json["nodes"]
.as_object()
.unwrap()
.iter()
.map(|h| {
let address = h.1["http"]["publish_address"]
.as_str()
.or_else(|| {
Some(
h.1["http"]["bound_address"].as_array().unwrap()[0]
.as_str()
.unwrap(),
)
})
.unwrap();
let url = Self::parse_to_url(address, scheme).unwrap();
Connection::new(url)
})
.collect();
local_conn_pool.reseed(connections);
})
tokio::spawn(async move {
let scheme = connection.url.scheme();
let resp = node_request.send().await.unwrap();
let json: Value = resp.json().await.unwrap();
let connections: Vec<Connection> = json["nodes"]
.as_object()
.unwrap()
.iter()
.map(|(_, node)| {
let address = node["http"]["publish_address"]
.as_str()
.or_else(|| {
Some(
node["http"]["bound_address"].as_array().unwrap()[0]
.as_str()
.unwrap(),
)
})
.unwrap();
let url = Self::parse_to_url(address, scheme).unwrap();
Connection::new(url)
})
.collect();
conn_pool.reseed(connections);
});
}

Expand Down Expand Up @@ -858,7 +850,7 @@ where
.map(|last_update| last_update.elapsed() > reseed_frequency);
let reseedable = last_update_is_stale.unwrap_or(true);

return if !reseedable {
if !reseedable {
false
} else {
// Check if refreshing is false if so, sets to true atomically and returns old value (false) meaning refreshable is true
Expand All @@ -869,7 +861,7 @@ where
// This can be replaced with `.into_ok_or_err` once stable.
// https://doc.rust-lang.org/std/result/enum.Result.html#method.into_ok_or_err
.unwrap_or(true)
};
}
}

fn reseed(&self, mut connection: Vec<Connection>) {
Expand Down

0 comments on commit 2f64bea

Please sign in to comment.