Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Fix OOM error with manual buffer size specification #380

Closed
wants to merge 48 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
bd4ffd9
Added runtime settings calculation struct
fhennig Jan 24, 2023
743e54f
fmt
fhennig Jan 24, 2023
ebcc8c9
Added setting the config settings
fhennig Jan 24, 2023
69aa5e1
moved to operator-rs branch
fhennig Jan 25, 2023
b2fc4e6
moved to quantities
fhennig Jan 25, 2023
d5e3b8a
feat: added memory setting to the reconcile loop
fhennig Jan 25, 2023
6ac8d3d
WIP
fhennig Jan 25, 2023
4cc9cf0
WIP
fhennig Jan 25, 2023
aed1d50
WIP: Added DerivedHistoricalSettings to RoleResource
fhennig Jan 26, 2023
191fabe
modified get_jvm_config to take an optional direct_memory setting
fhennig Jan 26, 2023
492bfcf
JVM config refactored; a version that SHOULD run
fhennig Jan 26, 2023
3f9ea06
fix: JVM config formatting
fhennig Jan 26, 2023
11e0f8d
chore: fixed checks
fhennig Jan 26, 2023
9ee6c7f
minor change
fhennig Jan 26, 2023
069f6f5
fixed some unwraps
fhennig Jan 30, 2023
0c9cb81
fix: fixed test
fhennig Jan 30, 2023
bc24d6c
fix: removed some more unwraps
fhennig Jan 30, 2023
d255efa
fix: fixed more todos
fhennig Jan 30, 2023
5b3a50e
reverted an accidental change in the docs
fhennig Jan 30, 2023
8232b34
Some more docs
fhennig Jan 30, 2023
9a204a0
fixed an unwrap
fhennig Jan 30, 2023
ecb50c4
clippy fix in tests
fhennig Jan 30, 2023
3778f0b
tests: update resource test to test for the bug we're fixing
fhennig Jan 30, 2023
0f5c44b
fix tests
fhennig Jan 30, 2023
143822a
fix tests ... this time really?
fhennig Jan 30, 2023
c0ae45e
fix tests ... this time really, really?
fhennig Jan 30, 2023
aa2ffb5
fix test typo
fhennig Jan 31, 2023
f3e3715
fix: adapt to changes in operator-rs after review
fhennig Jan 31, 2023
5e131b4
Merge remote-tracking branch 'origin/main' into fix/359-memory-alloca…
fhennig Jan 31, 2023
a680682
chore: clippy fixes
fhennig Jan 31, 2023
823f952
Update rust/crd/src/memory.rs
fhennig Feb 1, 2023
ccd80c3
Update rust/crd/src/memory.rs
fhennig Feb 1, 2023
d16776f
Update rust/crd/src/memory.rs
fhennig Feb 1, 2023
0c01471
Update rust/crd/src/memory.rs
fhennig Feb 1, 2023
52e2c5e
renamed consts
fhennig Feb 1, 2023
9cfee69
Update rust/crd/src/memory.rs
fhennig Feb 1, 2023
0047dad
Update rust/crd/src/memory.rs
fhennig Feb 1, 2023
d884dcc
Update rust/crd/src/memory.rs
fhennig Feb 1, 2023
9885c69
refactor: changed Druid formatting from trait to function
fhennig Feb 1, 2023
3628d31
docs: updated outdated docs comment
fhennig Feb 1, 2023
7d82248
refactor: changed an enum spec back to Self
fhennig Feb 1, 2023
257f412
refactor: moved match statement from the controller into RoleResource
fhennig Feb 1, 2023
2891e9c
removed obsolete error definitions
fhennig Feb 1, 2023
f6a28a7
Added dedicated Error in the config.rs file
fhennig Feb 1, 2023
a52e25e
update to tagged operator-rs
fhennig Feb 1, 2023
52d7fb5
better error message
fhennig Feb 1, 2023
02b1dc5
removed unnecessary clippy-allow
fhennig Feb 2, 2023
314d4c0
updated resources test
fhennig Feb 2, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/crd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ version = "0.9.0-nightly"
publish = false

[dependencies]
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "0.31.0" }
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "0.33.0" }

semver = "1.0"
serde = { version = "1.0", features = ["derive"] }
Expand Down
5 changes: 5 additions & 0 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod authentication;
pub mod authorization;
pub mod ldap;
pub mod memory;
pub mod resource;
pub mod security;
pub mod storage;
Expand Down Expand Up @@ -88,6 +89,10 @@ pub const MD_ST_USER: &str = "druid.metadata.storage.connector.user";
pub const MD_ST_PASSWORD: &str = "druid.metadata.storage.connector.password";
// indexer properties
pub const INDEXER_JAVA_OPTS: &str = "druid.indexer.runner.javaOptsArray";
// historical settings
pub const PROCESSING_BUFFER_SIZE_BYTES: &str = "druid.processing.buffer.sizeBytes";
pub const PROCESSING_NUM_MERGE_BUFFERS: &str = "druid.processing.numMergeBuffers";
pub const PROCESSING_NUM_THREADS: &str = "druid.processing.numThreads";
// extra
pub const CREDENTIALS_SECRET_PROPERTY: &str = "credentialsSecret";
// metrics
Expand Down
207 changes: 207 additions & 0 deletions rust/crd/src/memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
use lazy_static::lazy_static;
use snafu::{OptionExt, ResultExt, Snafu};
use stackable_operator::{
commons::resources::{NoRuntimeLimits, Resources},
cpu::CpuQuantity,
memory::{BinaryMultiple, MemoryQuantity},
};
use std::collections::BTreeMap;

use crate::{
storage::HistoricalStorage, PROCESSING_BUFFER_SIZE_BYTES, PROCESSING_NUM_MERGE_BUFFERS,
PROCESSING_NUM_THREADS,
};

static MIN_HEAP_RATIO: f32 = 0.75;
lazy_static! {
pub static ref RESERVED_OS_MEMORY: MemoryQuantity = MemoryQuantity::from_mebi(300.);
/// Max size for direct access buffers. This is defined in Druid to be 2GB:
/// https://druid.apache.org/docs/latest/configuration/index.html#processing-1
static ref MAX_DIRECT_BUFFER_SIZE: MemoryQuantity = MemoryQuantity::from_gibi(2.);
}

#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display("failed to parse memory limits"))]
ParsingMemoryLimitFailure {
source: stackable_operator::error::Error,
},
#[snafu(display("failed to parse CPU limits"))]
ParsingCpuLimitFailure {
source: stackable_operator::error::Error,
},
#[snafu(display("could not derive memory distribution, no memory limits defined"))]
NoMemoryLimitsDefined,
#[snafu(display("could not derive memory distribution, no CPU limits defined"))]
NoCpuLimitsDefined,
}

/// This struct takes the resource limits of the Pod and derives Druid settings from it.
/// For mentioned Druid properties, consult the
/// [Druid Configuration Reference](https://druid.apache.org/docs/latest/configuration/index.html)
/// for additional information.
/// Also have a look at the documentation for
/// [Basic Cluster Tuning](<https://druid.apache.org/docs/latest/operations/basic-cluster-tuning.html>).
pub struct HistoricalDerivedSettings {
total_memory: MemoryQuantity,
cpu_millis: CpuQuantity,
min_heap_ratio: f32,
max_buffer_size: MemoryQuantity,
os_reserved_memory: MemoryQuantity,
}

impl HistoricalDerivedSettings {
pub fn new(total_memory: MemoryQuantity, cpu_millis: CpuQuantity) -> Self {
Self {
total_memory,
cpu_millis,
min_heap_ratio: MIN_HEAP_RATIO,
os_reserved_memory: *RESERVED_OS_MEMORY,
max_buffer_size: *MAX_DIRECT_BUFFER_SIZE,
}
}

/// The total memory we use for druid. This is what's left after we take out the OS reserved memory.
fn allocatable_memory(&self) -> MemoryQuantity {
self.total_memory - self.os_reserved_memory
}

/// How much memory to set for the JVM to use. The minimum ratio can be defined in the struct.
/// Once the direct memory is maxed out, all the remaining allocatable memory will be assigned
/// as heap memory.
pub fn heap_memory(&self) -> MemoryQuantity {
// TODO also implement max limit of 24Gi, as recommended by Druid
self.allocatable_memory() - self.direct_access_memory()
}

/// The memory that is available to allocate for direct access.
fn allocatable_direct_access_memory(&self) -> MemoryQuantity {
self.allocatable_memory() * (1. - self.min_heap_ratio)
}

/// The max memory to allocate to direct access. This is based on the max buffer size of a single buffer.
fn max_direct_access_memory(&self) -> MemoryQuantity {
self.max_buffer_size * self.total_num_buffers() as f32
}

/// How much to allocate (or keep free) for direct access.
/// this is the amount to configure in the JVM as the `MaxDirectMemorySize`.
pub fn direct_access_memory(&self) -> MemoryQuantity {
if self.max_direct_access_memory() < self.allocatable_direct_access_memory() {
self.max_direct_access_memory()
} else {
self.allocatable_direct_access_memory()
}
}

/// The number of threads to use, based on the CPU millis.
/// leaves at least 500m available to core functionalities.
/// Druid Property: `druid.processing.numThreads`
fn num_threads(&self) -> usize {
(self.cpu_millis.as_cpu_count().round() - 1.).max(1.) as usize
}

/// Druid property: `druid.processing.numMergeBuffers`
fn num_merge_buffers(&self) -> usize {
((self.num_threads() as f64 / 4.).floor() as usize).max(2)
}

fn total_num_buffers(&self) -> usize {
self.num_merge_buffers() + self.num_threads() + 1
}

/// The buffer size for intermediate result storage. By setting it ourselves, we can set it up to 2Gi.
/// If we leave it on the `auto` default, we only get up to 1Gi.
/// Druid property: `druid.processing.buffer.sizeBytes`
fn buffer_size(&self) -> MemoryQuantity {
self.direct_access_memory() / self.total_num_buffers() as f32
}

/// Adds derived runtime settings to the given config
pub fn add_settings(&self, config: &mut BTreeMap<String, Option<String>>) {
config.insert(
PROCESSING_NUM_THREADS.to_owned(),
Some(self.num_threads().to_string()),
);
config.insert(
PROCESSING_NUM_MERGE_BUFFERS.to_owned(),
Some(self.num_merge_buffers().to_string()),
);
config.insert(
PROCESSING_BUFFER_SIZE_BYTES.to_owned(),
Some(format_for_druid(&self.buffer_size())),
);
}
}

impl TryFrom<&Resources<HistoricalStorage, NoRuntimeLimits>> for HistoricalDerivedSettings {
type Error = Error;

fn try_from(r: &Resources<HistoricalStorage, NoRuntimeLimits>) -> Result<Self, Self::Error> {
let total_memory = MemoryQuantity::try_from(
r.memory
.limit
.as_ref()
.context(NoMemoryLimitsDefinedSnafu)?,
)
.context(ParsingMemoryLimitFailureSnafu)?;
let cpu_millis =
CpuQuantity::try_from(r.cpu.max.as_ref().context(NoCpuLimitsDefinedSnafu)?)
.context(ParsingCpuLimitFailureSnafu)?;
Ok(HistoricalDerivedSettings::new(total_memory, cpu_millis))
}
}

/// A function to format something as the Druid Byte format:
/// `<https://druid.apache.org/docs/latest/configuration/human-readable-byte.html>`.
/// Only KiB precision is supported. Upd to 1KiB will be rounded away.
fn format_for_druid(memory_quantity: &MemoryQuantity) -> String {
let k = memory_quantity.scale_to(BinaryMultiple::Kibi);
// floor instead of round so we don't accidently make the memory quantity
// bigger than it should be
let v = k.value.floor() as usize;
format!("{v}Ki")
}

#[cfg(test)]
mod tests {
use super::*;
use rstest::*;

#[rstest]
#[case(1000, 1)]
#[case(1400, 1)]
#[case(1600, 1)]
#[case(2000, 1)]
#[case(2400, 1)]
#[case(2600, 2)]
#[case(3000, 2)]
#[case(3400, 2)]
#[case(3600, 3)]
#[case(32_000, 31)]
fn test_num_threads(#[case] cpu_millis: usize, #[case] expected_num_threads: usize) {
let mem = MemoryQuantity::from_gibi(2.);
let cpu = CpuQuantity::from_millis(cpu_millis);
let s = HistoricalDerivedSettings::new(mem, cpu);
assert_eq!(s.num_threads(), expected_num_threads);
}

#[rstest]
#[case(1000, 2)]
#[case(2000, 2)]
#[case(4000, 2)]
#[case(8000, 2)]
#[case(15_000, 3)]
#[case(16_000, 3)]
#[case(17_000, 4)]
#[case(32_000, 7)]
fn test_num_merge_buffers(
#[case] cpu_millis: usize,
#[case] expected_num_merge_buffers: usize,
) {
let mem = MemoryQuantity::from_gibi(2.);
let cpu = CpuQuantity::from_millis(cpu_millis);
let s = HistoricalDerivedSettings::new(mem, cpu);
assert_eq!(s.num_merge_buffers(), expected_num_merge_buffers);
}
}
Loading