Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(buffer): Remove spool V1 implementation #4303

Merged
merged 13 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
## 24.11.1

**Breaking Changes**:
- Flatten Linux distribution fields into `os.context`([#4292](https://github.com/getsentry/relay/pull/4292))

- Remove `spool.envelopes.{min_connections,max_connections,unspool_interval,max_memory_size}` config options. ([#4303](https://github.com/getsentry/relay/pull/4303))
- Flatten Linux distribution fields into `os.context`. ([#4292](https://github.com/getsentry/relay/pull/4292))

**Bug Fixes**:

Expand All @@ -13,6 +15,7 @@

**Features**:

- Remove old disk spooling logic, default to new version. ([#4303](https://github.com/getsentry/relay/pull/4303))
- Implement zstd http encoding for Relay to Relay communication. ([#4266](https://github.com/getsentry/relay/pull/4266))
- Support empty branches in Pattern alternations. ([#4283](https://github.com/getsentry/relay/pull/4283))
- Add support for partitioning of the `EnvelopeBufferService`. ([#4291](https://github.com/getsentry/relay/pull/4291))
Expand Down
108 changes: 10 additions & 98 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -873,21 +873,6 @@ fn spool_envelopes_max_disk_size() -> ByteSize {
ByteSize::mebibytes(500)
}

/// Default for min connections to keep open in the pool.
fn spool_envelopes_min_connections() -> u32 {
1
}

/// Default for max connections to keep open in the pool.
fn spool_envelopes_max_connections() -> u32 {
1
}

/// Default interval to unspool buffered envelopes, 100ms.
fn spool_envelopes_unspool_interval() -> u64 {
100
}

/// Default number of encoded envelope bytes to cache before writing to disk.
fn spool_envelopes_batch_size_bytes() -> ByteSize {
ByteSize::kibibytes(10)
Expand Down Expand Up @@ -924,43 +909,34 @@ pub struct EnvelopeSpool {
///
/// If set, this will enable the buffering for incoming envelopes.
pub path: Option<PathBuf>,
/// Maximum number of connections, which will be maintained by the pool.
#[serde(default = "spool_envelopes_max_connections")]
max_connections: u32,
/// Minimal number of connections, which will be maintained by the pool.
#[serde(default = "spool_envelopes_min_connections")]
min_connections: u32,
/// The maximum size of the buffer to keep, in bytes.
///
/// If not set the default is 524288000 bytes (500MB).
#[serde(default = "spool_envelopes_max_disk_size")]
max_disk_size: ByteSize,
pub max_disk_size: ByteSize,
/// The maximum bytes to keep in the memory buffer before spooling envelopes to disk, in bytes.
///
/// This is a hard upper bound and defaults to 524288000 bytes (500MB).
#[serde(default = "spool_envelopes_max_memory_size")]
max_memory_size: ByteSize,
/// The interval in milliseconds to trigger unspool.
#[serde(default = "spool_envelopes_unspool_interval")]
unspool_interval: u64,
pub max_memory_size: ByteSize,
/// Number of encoded envelope bytes that are spooled to disk at once.
///
/// Defaults to 10 KiB.
#[serde(default = "spool_envelopes_batch_size_bytes")]
batch_size_bytes: ByteSize,
pub batch_size_bytes: ByteSize,
/// Maximum time between receiving the envelope and processing it.
///
/// When envelopes spend too much time in the buffer (e.g. because their project cannot be loaded),
/// they are dropped. Defaults to 24h.
#[serde(default = "spool_envelopes_max_envelope_delay_secs")]
max_envelope_delay_secs: u64,
pub max_envelope_delay_secs: u64,
/// The refresh frequency in ms of how frequently disk usage is updated by querying SQLite
/// internal page stats.
#[serde(default = "spool_disk_usage_refresh_frequency_ms")]
disk_usage_refresh_frequency_ms: u64,
pub disk_usage_refresh_frequency_ms: u64,
/// The amount of envelopes that the envelope buffer can push to its output queue.
#[serde(default = "spool_max_backpressure_envelopes")]
max_backpressure_envelopes: usize,
pub max_backpressure_envelopes: usize,
/// The relative memory usage above which the buffer service will stop dequeueing envelopes.
///
/// Only applies when [`Self::path`] is set.
Expand All @@ -971,58 +947,34 @@ pub struct EnvelopeSpool {
///
/// Defaults to 90% (5% less than max memory).
#[serde(default = "spool_max_backpressure_memory_percent")]
max_backpressure_memory_percent: f32,
pub max_backpressure_memory_percent: f32,
/// Number of partitions of the buffer.
#[serde(default = "spool_envelopes_partitions")]
partitions: NonZeroU8,
/// Version of the spooler.
#[serde(default)]
version: EnvelopeSpoolVersion,
}

/// Version of the envelope buffering mechanism.
#[derive(Debug, Default, Deserialize, Serialize)]
pub enum EnvelopeSpoolVersion {
/// Use the spooler service, which only buffers envelopes for unloaded projects and
/// switches between an in-memory mode and a disk mode on-demand.
///
/// This mode will be removed soon.
#[default]
#[serde(rename = "1")]
V1,
/// Use the envelope buffer, through which all envelopes pass before getting unspooled.
/// Can be either disk based or memory based.
///
/// This mode has not yet been stress-tested, do not use in production environments.
#[serde(rename = "experimental")]
V2,
pub partitions: NonZeroU8,
}

impl Default for EnvelopeSpool {
fn default() -> Self {
Self {
path: None,
max_connections: spool_envelopes_max_connections(),
min_connections: spool_envelopes_min_connections(),
max_disk_size: spool_envelopes_max_disk_size(),
max_memory_size: spool_envelopes_max_memory_size(),
unspool_interval: spool_envelopes_unspool_interval(),
batch_size_bytes: spool_envelopes_batch_size_bytes(),
max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
max_backpressure_envelopes: spool_max_backpressure_envelopes(),
max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
partitions: spool_envelopes_partitions(),
version: EnvelopeSpoolVersion::default(),
}
}
}

/// Persistent buffering configuration.
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct Spool {
/// Configuration for envelope spooling.
#[serde(default)]
envelopes: EnvelopeSpool,
pub envelopes: EnvelopeSpool,
}

/// Controls internal caching behavior.
Expand Down Expand Up @@ -2175,45 +2127,17 @@ impl Config {
Some(path)
}

/// Maximum number of connections to create to buffer file.
pub fn spool_envelopes_max_connections(&self) -> u32 {
self.values.spool.envelopes.max_connections
}

/// Minimum number of connections to create to buffer file.
pub fn spool_envelopes_min_connections(&self) -> u32 {
self.values.spool.envelopes.min_connections
}

/// Unspool interval in milliseconds.
pub fn spool_envelopes_unspool_interval(&self) -> Duration {
Duration::from_millis(self.values.spool.envelopes.unspool_interval)
}

/// The maximum size of the buffer, in bytes.
pub fn spool_envelopes_max_disk_size(&self) -> usize {
self.values.spool.envelopes.max_disk_size.as_bytes()
}

/// The maximum size of the memory buffer, in bytes.
pub fn spool_envelopes_max_memory_size(&self) -> usize {
self.values.spool.envelopes.max_memory_size.as_bytes()
}

/// Number of encoded envelope bytes that need to be accumulated before
/// flushing one batch to disk.
pub fn spool_envelopes_batch_size_bytes(&self) -> usize {
self.values.spool.envelopes.batch_size_bytes.as_bytes()
}

/// Returns `true` if version 2 of the spooling mechanism is used.
pub fn spool_v2(&self) -> bool {
matches!(
&self.values.spool.envelopes.version,
EnvelopeSpoolVersion::V2
)
}

/// Returns the time after which we drop envelopes as a [`Duration`] object.
pub fn spool_envelopes_max_age(&self) -> Duration {
Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
Expand Down Expand Up @@ -2654,16 +2578,4 @@ cache:
fn test_emit_outcomes_invalid() {
assert!(serde_json::from_str::<EmitOutcomes>("asdf").is_err());
}

#[test]
fn test_spool_defaults_to_v1() {
let config: ConfigValues = serde_json::from_str("{}").unwrap();
assert!(matches!(
config.spool.envelopes.version,
EnvelopeSpoolVersion::V1
));

let config = Config::from_json_value(serde_json::json!({})).unwrap();
assert!(!config.spool_v2());
}
}
32 changes: 11 additions & 21 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::service::ServiceState;
use crate::services::buffer::{EnvelopeBuffer, ProjectKeyPair};
use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::processor::{BucketSource, MetricData, ProcessMetrics, ProcessingGroup};
use crate::services::projects::cache::legacy::ValidateEnvelope;
use crate::statsd::{RelayCounters, RelayHistograms};
use crate::utils::{self, ApiErrorResponse, FormDataIter, ManagedEnvelope};

Expand Down Expand Up @@ -294,27 +293,18 @@ fn queue_envelope(
envelope.scope(scoping);

let project_key_pair = ProjectKeyPair::from_envelope(envelope.envelope());
match state.envelope_buffer(project_key_pair) {
Some(buffer) => {
if !buffer.has_capacity() {
return Err(BadStoreRequest::QueueFailed);
}

// NOTE: This assumes that a `prefetch` has already been scheduled for both the
// envelope's projects. See `handle_check_envelope`.
relay_log::trace!("Pushing envelope to V2 buffer");

buffer
.addr()
.send(EnvelopeBuffer::Push(envelope.into_envelope()));
}
None => {
relay_log::trace!("Sending envelope to project cache for V1 buffer");
state
.legacy_project_cache()
.send(ValidateEnvelope::new(envelope));
}
let buffer = state.envelope_buffer(project_key_pair);
if !buffer.has_capacity() {
return Err(BadStoreRequest::QueueFailed);
}

// NOTE: This assumes that a `prefetch` has already been scheduled for both the
// envelope's projects. See `handle_check_envelope`.
relay_log::trace!("Pushing envelope to V2 buffer");

buffer
.addr()
.send(EnvelopeBuffer::Push(envelope.into_envelope()));
}
// The entire envelope is taken for a split above, and it's empty at this point, we can just
// accept it without additional checks.
Expand Down
1 change: 0 additions & 1 deletion relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ pub use self::envelope::Envelope; // pub for benchmarks
pub use self::services::buffer::{
EnvelopeStack, PolymorphicEnvelopeBuffer, SqliteEnvelopeStack, SqliteEnvelopeStore,
}; // pub for benchmarks
pub use self::services::spooler::spool_utils;
pub use self::utils::{MemoryChecker, MemoryStat}; // pub for benchmarks

#[cfg(test)]
Expand Down
8 changes: 1 addition & 7 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,11 @@ impl ServiceState {
aggregator: aggregator.clone(),
envelope_processor: processor.clone(),
outcome_aggregator: outcome_aggregator.clone(),
project_cache: legacy_project_cache.clone(),
test_store: test_store.clone(),
};

runner.start_with(
legacy::ProjectCacheService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
project_cache_handle.clone(),
project_cache_services,
global_config_rx,
Expand Down Expand Up @@ -363,10 +360,7 @@ impl ServiceState {
}

/// Returns the V2 envelope buffer, if present.
pub fn envelope_buffer(
&self,
project_key_pair: ProjectKeyPair,
) -> Option<&ObservableEnvelopeBuffer> {
pub fn envelope_buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer {
self.inner.registry.envelope_buffer.buffer(project_key_pair)
}

Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/buffer/envelope_store/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,8 @@ impl SqliteEnvelopeStore {
.shared_cache(true);

let db = SqlitePoolOptions::new()
.max_connections(config.spool_envelopes_max_connections())
.min_connections(config.spool_envelopes_min_connections())
.max_connections(1)
.min_connections(1)
.connect_with(options)
.await
.map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
Expand Down
Loading