Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PERF] Harden GCP Retries #3253

Merged
merged 9 commits into from
Nov 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 233 additions & 23 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -600,20 +600,35 @@ class GCSConfig:
credentials: str | None
token: str | None
anonymous: bool
max_connections: int
retry_initial_backoff_ms: int
connect_timeout_ms: int
read_timeout_ms: int
num_tries: int

def __init__(
self,
project_id: str | None = None,
credentials: str | None = None,
token: str | None = None,
anonymous: bool | None = None,
max_connections: int | None = None,
retry_initial_backoff_ms: int | None = None,
connect_timeout_ms: int | None = None,
read_timeout_ms: int | None = None,
num_tries: int | None = None,
): ...
def replace(
self,
project_id: str | None = None,
credentials: str | None = None,
token: str | None = None,
anonymous: bool | None = None,
max_connections: int | None = None,
retry_initial_backoff_ms: int | None = None,
connect_timeout_ms: int | None = None,
read_timeout_ms: int | None = None,
num_tries: int | None = None,
) -> GCSConfig:
"""Replaces values if provided, returning a new GCSConfig"""
...
Expand Down
1 change: 1 addition & 0 deletions src/common/io-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ aws-credential-types = {version = "0.55.3"}
chrono = {workspace = true}
common-error = {path = "../error", default-features = false}
common-py-serde = {path = "../py-serde", default-features = false}
derive_more = {workspace = true}
pyo3 = {workspace = true, optional = true}
secrecy = {version = "0.8.0", features = ["alloc"], default-features = false}
serde = {workspace = true}
Expand Down
59 changes: 44 additions & 15 deletions src/common/io-config/src/gcs.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,45 @@
use std::fmt::{Display, Formatter};

use derive_more::Display;
use serde::{Deserialize, Serialize};

use crate::ObfuscatedString;

#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, Display)]
#[display(
"GCSConfig
project_id: {project_id:?}
anonymous: {anonymous}
max_connections_per_io_thread: {max_connections_per_io_thread}
retry_initial_backoff_ms: {retry_initial_backoff_ms}
connect_timeout_ms: {connect_timeout_ms}
read_timeout_ms: {read_timeout_ms}
num_tries: {num_tries}"
)]
pub struct GCSConfig {
pub project_id: Option<String>,
pub credentials: Option<ObfuscatedString>,
pub token: Option<String>,
pub anonymous: bool,
pub max_connections_per_io_thread: u32,
pub retry_initial_backoff_ms: u64,
pub connect_timeout_ms: u64,
pub read_timeout_ms: u64,
pub num_tries: u32,
}

impl Default for GCSConfig {
fn default() -> Self {
Self {
project_id: None,
credentials: None,
token: None,
anonymous: false,
max_connections_per_io_thread: 8,
retry_initial_backoff_ms: 1000,
connect_timeout_ms: 30_000,
read_timeout_ms: 30_000,
num_tries: 5,
}
}
}

impl GCSConfig {
Expand All @@ -20,18 +50,17 @@ impl GCSConfig {
res.push(format!("Project ID = {project_id}"));
}
res.push(format!("Anonymous = {}", self.anonymous));
res.push(format!(
"Max connections = {}",
self.max_connections_per_io_thread
));
res.push(format!(
"Retry initial backoff ms = {}",
self.retry_initial_backoff_ms
));
res.push(format!("Connect timeout ms = {}", self.connect_timeout_ms));
res.push(format!("Read timeout ms = {}", self.read_timeout_ms));
res.push(format!("Max retries = {}", self.num_tries));
res
}
}

impl Display for GCSConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
write!(
f,
"GCSConfig
project_id: {:?}
anonymous: {:?}",
self.project_id, self.anonymous
)
}
}
64 changes: 59 additions & 5 deletions src/common/io-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ use crate::{config, s3::S3CredentialsProvider};
/// access_key (str, optional): AWS Secret Access Key, defaults to auto-detection from the current environment
/// credentials_provider (Callable[[], S3Credentials], optional): Custom credentials provider function, should return a `S3Credentials` object
/// buffer_time (int, optional): Amount of time in seconds before the actual credential expiration time where credentials given by `credentials_provider` are considered expired, defaults to 10s
/// max_connections (int, optional): Maximum number of connections to S3 at any time, defaults to 64
/// max_connections (int, optional): Maximum number of connections to S3 at any time per io thread, defaults to 8
/// session_token (str, optional): AWS Session Token, required only if `key_id` and `access_key` are temporary credentials
/// retry_initial_backoff_ms (int, optional): Initial backoff duration in milliseconds for an S3 retry, defaults to 1000ms
/// connect_timeout_ms (int, optional): Timeout duration to wait to make a connection to S3 in milliseconds, defaults to 10 seconds
/// read_timeout_ms (int, optional): Timeout duration to wait to read the first byte from S3 in milliseconds, defaults to 10 seconds
/// num_tries (int, optional): Number of attempts to make a connection, defaults to 5
/// connect_timeout_ms (int, optional): Timeout duration to wait to make a connection to S3 in milliseconds, defaults to 30 seconds
/// read_timeout_ms (int, optional): Timeout duration to wait to read the first byte from S3 in milliseconds, defaults to 30 seconds
/// num_tries (int, optional): Number of attempts to make a connection, defaults to 25
/// retry_mode (str, optional): Retry Mode when a request fails, current supported values are `standard` and `adaptive`, defaults to `adaptive`
/// anonymous (bool, optional): Whether or not to use "anonymous mode", which will access S3 without any credentials
/// use_ssl (bool, optional): Whether or not to use SSL, which require accessing S3 over HTTPS rather than HTTP, defaults to True
Expand Down Expand Up @@ -107,6 +107,11 @@ pub struct AzureConfig {
/// credentials (str, optional): Path to credentials file or JSON string with credentials
/// token (str, optional): OAuth2 token to use for authentication. You likely want to use `credentials` instead, since it can be used to refresh the token. This value is used when vended by a data catalog.
/// anonymous (bool, optional): Whether or not to use "anonymous mode", which will access Google Storage without any credentials. Defaults to false
/// max_connections (int, optional): Maximum number of connections to GCS at any time per io thread, defaults to 8
/// retry_initial_backoff_ms (int, optional): Initial backoff duration in milliseconds for an GCS retry, defaults to 1000ms
/// connect_timeout_ms (int, optional): Timeout duration to wait to make a connection to GCS in milliseconds, defaults to 30 seconds
/// read_timeout_ms (int, optional): Timeout duration to wait to read the first byte from GCS in milliseconds, defaults to 30 seconds
/// num_tries (int, optional): Number of attempts to make a connection, defaults to 5
///
/// Example:
/// >>> io_config = IOConfig(gcs=GCSConfig(anonymous=True))
Expand Down Expand Up @@ -848,6 +853,11 @@ impl GCSConfig {
credentials: Option<String>,
token: Option<String>,
anonymous: Option<bool>,
max_connections: Option<u32>,
retry_initial_backoff_ms: Option<u64>,
connect_timeout_ms: Option<u64>,
read_timeout_ms: Option<u64>,
num_tries: Option<u32>,
) -> Self {
let def = crate::GCSConfig::default();
Self {
Expand All @@ -858,17 +868,29 @@ impl GCSConfig {
.or(def.credentials),
token: token.or(def.token),
anonymous: anonymous.unwrap_or(def.anonymous),
max_connections_per_io_thread: max_connections
.unwrap_or(def.max_connections_per_io_thread),
retry_initial_backoff_ms: retry_initial_backoff_ms
.unwrap_or(def.retry_initial_backoff_ms),
connect_timeout_ms: connect_timeout_ms.unwrap_or(def.connect_timeout_ms),
read_timeout_ms: read_timeout_ms.unwrap_or(def.read_timeout_ms),
num_tries: num_tries.unwrap_or(def.num_tries),
},
}
}

#[allow(clippy::too_many_arguments)]
#[must_use]
pub fn replace(
&self,
project_id: Option<String>,
credentials: Option<String>,
token: Option<String>,
anonymous: Option<bool>,
max_connections: Option<u32>,
retry_initial_backoff_ms: Option<u64>,
connect_timeout_ms: Option<u64>,
read_timeout_ms: Option<u64>,
num_tries: Option<u32>,
) -> Self {
Self {
config: crate::GCSConfig {
Expand All @@ -878,6 +900,13 @@ impl GCSConfig {
.or_else(|| self.config.credentials.clone()),
token: token.or_else(|| self.config.token.clone()),
anonymous: anonymous.unwrap_or(self.config.anonymous),
max_connections_per_io_thread: max_connections
.unwrap_or(self.config.max_connections_per_io_thread),
retry_initial_backoff_ms: retry_initial_backoff_ms
.unwrap_or(self.config.retry_initial_backoff_ms),
connect_timeout_ms: connect_timeout_ms.unwrap_or(self.config.connect_timeout_ms),
read_timeout_ms: read_timeout_ms.unwrap_or(self.config.read_timeout_ms),
num_tries: num_tries.unwrap_or(self.config.num_tries),
},
}
}
Expand Down Expand Up @@ -913,6 +942,31 @@ impl GCSConfig {
pub fn anonymous(&self) -> PyResult<bool> {
Ok(self.config.anonymous)
}

#[getter]
pub fn max_connections(&self) -> PyResult<u32> {
Ok(self.config.max_connections_per_io_thread)
}

#[getter]
pub fn retry_initial_backoff_ms(&self) -> PyResult<u64> {
Ok(self.config.retry_initial_backoff_ms)
}

#[getter]
pub fn connect_timeout_ms(&self) -> PyResult<u64> {
Ok(self.config.connect_timeout_ms)
}

#[getter]
pub fn read_timeout_ms(&self) -> PyResult<u64> {
Ok(self.config.read_timeout_ms)
}

#[getter]
pub fn num_tries(&self) -> PyResult<u32> {
Ok(self.config.num_tries)
}
}

impl From<config::IOConfig> for IOConfig {
Expand Down
6 changes: 5 additions & 1 deletion src/daft-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ common-runtime = {path = "../common/runtime", default-features = false}
derive_builder = {workspace = true}
futures = {workspace = true}
globset = "0.4"
google-cloud-storage = {version = "0.15.0", default-features = false, features = ["default-tls", "auth"]}
google-cloud-storage = {version = "0.22.1", default-features = false, features = ["default-tls", "auth"]}
google-cloud-token = {version = "0.1.2"}
home = "0.5.9"
hyper = "0.14.27"
Expand All @@ -32,10 +32,14 @@ openssl-sys = {version = "0.9.102", features = ["vendored"]}
pyo3 = {workspace = true, optional = true}
rand = "0.8.5"
regex = {version = "1.10.4"}
reqwest-middleware = "0.3.3"
reqwest-retry = "0.6.1"
retry-policies = "0.4.0"
serde = {workspace = true}
snafu = {workspace = true}
tokio = {workspace = true}
tokio-stream = {workspace = true}
tracing = {workspace = true}
url = {workspace = true}

[dependencies.reqwest]
Expand Down
Loading
Loading