diff --git a/CHANGELOG.md b/CHANGELOG.md index 79998902dc..4daab18199 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,9 @@ ## 24.11.1 **Breaking Changes**: -- Flatten Linux distribution fields into `os.context`([#4292](https://github.com/getsentry/relay/pull/4292)) + +- Remove `spool.envelopes.{min_connections,max_connections,unspool_interval,max_memory_size}` config options. ([#4303](https://github.com/getsentry/relay/pull/4303)) +- Flatten Linux distribution fields into `os.context`. ([#4292](https://github.com/getsentry/relay/pull/4292)) **Bug Fixes**: @@ -13,6 +15,7 @@ **Features**: +- Remove old disk spooling logic, default to new version. ([#4303](https://github.com/getsentry/relay/pull/4303)) - Implement zstd http encoding for Relay to Relay communication. ([#4266](https://github.com/getsentry/relay/pull/4266)) - Support empty branches in Pattern alternations. ([#4283](https://github.com/getsentry/relay/pull/4283)) - Add support for partitioning of the `EnvelopeBufferService`. ([#4291](https://github.com/getsentry/relay/pull/4291)) diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 54305d2460..ad7beacc59 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -873,21 +873,6 @@ fn spool_envelopes_max_disk_size() -> ByteSize { ByteSize::mebibytes(500) } -/// Default for min connections to keep open in the pool. -fn spool_envelopes_min_connections() -> u32 { - 1 -} - -/// Default for max connections to keep open in the pool. -fn spool_envelopes_max_connections() -> u32 { - 1 -} - -/// Default interval to unspool buffered envelopes, 100ms. -fn spool_envelopes_unspool_interval() -> u64 { - 100 -} - /// Default number of encoded envelope bytes to cache before writing to disk. fn spool_envelopes_batch_size_bytes() -> ByteSize { ByteSize::kibibytes(10) @@ -924,43 +909,34 @@ pub struct EnvelopeSpool { /// /// If set, this will enable the buffering for incoming envelopes. pub path: Option, - /// Maximum number of connections, which will be maintained by the pool. - #[serde(default = "spool_envelopes_max_connections")] - max_connections: u32, - /// Minimal number of connections, which will be maintained by the pool. - #[serde(default = "spool_envelopes_min_connections")] - min_connections: u32, /// The maximum size of the buffer to keep, in bytes. /// /// If not set the default is 524288000 bytes (500MB). #[serde(default = "spool_envelopes_max_disk_size")] - max_disk_size: ByteSize, + pub max_disk_size: ByteSize, /// The maximum bytes to keep in the memory buffer before spooling envelopes to disk, in bytes. /// /// This is a hard upper bound and defaults to 524288000 bytes (500MB). #[serde(default = "spool_envelopes_max_memory_size")] - max_memory_size: ByteSize, - /// The interval in milliseconds to trigger unspool. - #[serde(default = "spool_envelopes_unspool_interval")] - unspool_interval: u64, + pub max_memory_size: ByteSize, /// Number of encoded envelope bytes that are spooled to disk at once. /// /// Defaults to 10 KiB. #[serde(default = "spool_envelopes_batch_size_bytes")] - batch_size_bytes: ByteSize, + pub batch_size_bytes: ByteSize, /// Maximum time between receiving the envelope and processing it. /// /// When envelopes spend too much time in the buffer (e.g. because their project cannot be loaded), /// they are dropped. Defaults to 24h. #[serde(default = "spool_envelopes_max_envelope_delay_secs")] - max_envelope_delay_secs: u64, + pub max_envelope_delay_secs: u64, /// The refresh frequency in ms of how frequently disk usage is updated by querying SQLite /// internal page stats. #[serde(default = "spool_disk_usage_refresh_frequency_ms")] - disk_usage_refresh_frequency_ms: u64, + pub disk_usage_refresh_frequency_ms: u64, /// The amount of envelopes that the envelope buffer can push to its output queue. #[serde(default = "spool_max_backpressure_envelopes")] - max_backpressure_envelopes: usize, + pub max_backpressure_envelopes: usize, /// The relative memory usage above which the buffer service will stop dequeueing envelopes. /// /// Only applies when [`Self::path`] is set. @@ -971,49 +947,24 @@ pub struct EnvelopeSpool { /// /// Defaults to 90% (5% less than max memory). #[serde(default = "spool_max_backpressure_memory_percent")] - max_backpressure_memory_percent: f32, + pub max_backpressure_memory_percent: f32, /// Number of partitions of the buffer. #[serde(default = "spool_envelopes_partitions")] - partitions: NonZeroU8, - /// Version of the spooler. - #[serde(default)] - version: EnvelopeSpoolVersion, -} - -/// Version of the envelope buffering mechanism. -#[derive(Debug, Default, Deserialize, Serialize)] -pub enum EnvelopeSpoolVersion { - /// Use the spooler service, which only buffers envelopes for unloaded projects and - /// switches between an in-memory mode and a disk mode on-demand. - /// - /// This mode will be removed soon. - #[default] - #[serde(rename = "1")] - V1, - /// Use the envelope buffer, through which all envelopes pass before getting unspooled. - /// Can be either disk based or memory based. - /// - /// This mode has not yet been stress-tested, do not use in production environments. - #[serde(rename = "experimental")] - V2, + pub partitions: NonZeroU8, } impl Default for EnvelopeSpool { fn default() -> Self { Self { path: None, - max_connections: spool_envelopes_max_connections(), - min_connections: spool_envelopes_min_connections(), max_disk_size: spool_envelopes_max_disk_size(), max_memory_size: spool_envelopes_max_memory_size(), - unspool_interval: spool_envelopes_unspool_interval(), batch_size_bytes: spool_envelopes_batch_size_bytes(), max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(), disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(), max_backpressure_envelopes: spool_max_backpressure_envelopes(), max_backpressure_memory_percent: spool_max_backpressure_memory_percent(), partitions: spool_envelopes_partitions(), - version: EnvelopeSpoolVersion::default(), } } } @@ -1021,8 +972,9 @@ impl Default for EnvelopeSpool { /// Persistent buffering configuration. #[derive(Debug, Serialize, Deserialize, Default)] pub struct Spool { + /// Configuration for envelope spooling. #[serde(default)] - envelopes: EnvelopeSpool, + pub envelopes: EnvelopeSpool, } /// Controls internal caching behavior. @@ -2175,45 +2127,17 @@ impl Config { Some(path) } - /// Maximum number of connections to create to buffer file. - pub fn spool_envelopes_max_connections(&self) -> u32 { - self.values.spool.envelopes.max_connections - } - - /// Minimum number of connections to create to buffer file. - pub fn spool_envelopes_min_connections(&self) -> u32 { - self.values.spool.envelopes.min_connections - } - - /// Unspool interval in milliseconds. - pub fn spool_envelopes_unspool_interval(&self) -> Duration { - Duration::from_millis(self.values.spool.envelopes.unspool_interval) - } - /// The maximum size of the buffer, in bytes. pub fn spool_envelopes_max_disk_size(&self) -> usize { self.values.spool.envelopes.max_disk_size.as_bytes() } - /// The maximum size of the memory buffer, in bytes. - pub fn spool_envelopes_max_memory_size(&self) -> usize { - self.values.spool.envelopes.max_memory_size.as_bytes() - } - /// Number of encoded envelope bytes that need to be accumulated before /// flushing one batch to disk. pub fn spool_envelopes_batch_size_bytes(&self) -> usize { self.values.spool.envelopes.batch_size_bytes.as_bytes() } - /// Returns `true` if version 2 of the spooling mechanism is used. - pub fn spool_v2(&self) -> bool { - matches!( - &self.values.spool.envelopes.version, - EnvelopeSpoolVersion::V2 - ) - } - /// Returns the time after which we drop envelopes as a [`Duration`] object. pub fn spool_envelopes_max_age(&self) -> Duration { Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs) @@ -2654,16 +2578,4 @@ cache: fn test_emit_outcomes_invalid() { assert!(serde_json::from_str::("asdf").is_err()); } - - #[test] - fn test_spool_defaults_to_v1() { - let config: ConfigValues = serde_json::from_str("{}").unwrap(); - assert!(matches!( - config.spool.envelopes.version, - EnvelopeSpoolVersion::V1 - )); - - let config = Config::from_json_value(serde_json::json!({})).unwrap(); - assert!(!config.spool_v2()); - } } diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index a2b01eff6d..beee35b563 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -13,7 +13,6 @@ use crate::service::ServiceState; use crate::services::buffer::{EnvelopeBuffer, ProjectKeyPair}; use crate::services::outcome::{DiscardReason, Outcome}; use crate::services::processor::{BucketSource, MetricData, ProcessMetrics, ProcessingGroup}; -use crate::services::projects::cache::legacy::ValidateEnvelope; use crate::statsd::{RelayCounters, RelayHistograms}; use crate::utils::{self, ApiErrorResponse, FormDataIter, ManagedEnvelope}; @@ -294,27 +293,18 @@ fn queue_envelope( envelope.scope(scoping); let project_key_pair = ProjectKeyPair::from_envelope(envelope.envelope()); - match state.envelope_buffer(project_key_pair) { - Some(buffer) => { - if !buffer.has_capacity() { - return Err(BadStoreRequest::QueueFailed); - } - - // NOTE: This assumes that a `prefetch` has already been scheduled for both the - // envelope's projects. See `handle_check_envelope`. - relay_log::trace!("Pushing envelope to V2 buffer"); - - buffer - .addr() - .send(EnvelopeBuffer::Push(envelope.into_envelope())); - } - None => { - relay_log::trace!("Sending envelope to project cache for V1 buffer"); - state - .legacy_project_cache() - .send(ValidateEnvelope::new(envelope)); - } + let buffer = state.envelope_buffer(project_key_pair); + if !buffer.has_capacity() { + return Err(BadStoreRequest::QueueFailed); } + + // NOTE: This assumes that a `prefetch` has already been scheduled for both the + // envelope's projects. See `handle_check_envelope`. + relay_log::trace!("Pushing envelope to V2 buffer"); + + buffer + .addr() + .send(EnvelopeBuffer::Push(envelope.into_envelope())); } // The entire envelope is taken for a split above, and it's empty at this point, we can just // accept it without additional checks. diff --git a/relay-server/src/lib.rs b/relay-server/src/lib.rs index 43b0619403..259d12edd4 100644 --- a/relay-server/src/lib.rs +++ b/relay-server/src/lib.rs @@ -270,7 +270,6 @@ pub use self::envelope::Envelope; // pub for benchmarks pub use self::services::buffer::{ EnvelopeStack, PolymorphicEnvelopeBuffer, SqliteEnvelopeStack, SqliteEnvelopeStore, }; // pub for benchmarks -pub use self::services::spooler::spool_utils; pub use self::utils::{MemoryChecker, MemoryStat}; // pub for benchmarks #[cfg(test)] diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 94d43ba08a..8a208ea7a8 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -285,14 +285,11 @@ impl ServiceState { aggregator: aggregator.clone(), envelope_processor: processor.clone(), outcome_aggregator: outcome_aggregator.clone(), - project_cache: legacy_project_cache.clone(), test_store: test_store.clone(), }; runner.start_with( legacy::ProjectCacheService::new( - config.clone(), - MemoryChecker::new(memory_stat.clone(), config.clone()), project_cache_handle.clone(), project_cache_services, global_config_rx, @@ -363,10 +360,7 @@ impl ServiceState { } /// Returns the V2 envelope buffer, if present. - pub fn envelope_buffer( - &self, - project_key_pair: ProjectKeyPair, - ) -> Option<&ObservableEnvelopeBuffer> { + pub fn envelope_buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer { self.inner.registry.envelope_buffer.buffer(project_key_pair) } diff --git a/relay-server/src/services/buffer/envelope_store/sqlite.rs b/relay-server/src/services/buffer/envelope_store/sqlite.rs index 63739f0545..4fe70b4b62 100644 --- a/relay-server/src/services/buffer/envelope_store/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_store/sqlite.rs @@ -353,8 +353,8 @@ impl SqliteEnvelopeStore { .shared_cache(true); let db = SqlitePoolOptions::new() - .max_connections(config.spool_envelopes_max_connections()) - .min_connections(config.spool_envelopes_min_connections()) + .max_connections(1) + .min_connections(1) .connect_with(options) .await .map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?; diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 4378c000e3..2553e71146 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -94,14 +94,6 @@ pub struct PartitionedEnvelopeBuffer { } impl PartitionedEnvelopeBuffer { - /// Creates a [`PartitionedEnvelopeBuffer`] with no partitions. - #[cfg(test)] - pub fn empty() -> Self { - Self { - buffers: Arc::new(Vec::new()), - } - } - /// Creates a new [`PartitionedEnvelopeBuffer`] by instantiating inside all the necessary /// [`ObservableEnvelopeBuffer`]s. #[allow(clippy::too_many_arguments)] @@ -130,11 +122,9 @@ impl PartitionedEnvelopeBuffer { test_store: test_store.clone(), }, ) - .map(|b| b.start_in(runner)); + .start_in(runner); - if let Some(envelope_buffer) = envelope_buffer { - envelope_buffers.push(envelope_buffer); - } + envelope_buffers.push(envelope_buffer); } Self { @@ -147,16 +137,13 @@ impl PartitionedEnvelopeBuffer { /// /// The rationale of using this partitioning strategy is to reduce memory usage across buffers /// since each individual buffer will only take care of a subset of projects. - pub fn buffer(&self, project_key_pair: ProjectKeyPair) -> Option<&ObservableEnvelopeBuffer> { - if self.buffers.is_empty() { - return None; - } - + pub fn buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer { let mut hasher = FnvHasher::default(); project_key_pair.own_key.hash(&mut hasher); let buffer_index = (hasher.finish() % self.buffers.len() as u64) as usize; - let buffer = self.buffers.get(buffer_index); - buffer + self.buffers + .get(buffer_index) + .expect("buffers should not be empty") } /// Returns `true` if all [`ObservableEnvelopeBuffer`]s have capacity to get new [`Envelope`]s. @@ -227,17 +214,14 @@ const DEFAULT_SLEEP: Duration = Duration::from_secs(1); impl EnvelopeBufferService { /// Creates a memory or disk based [`EnvelopeBufferService`], depending on the given config. - /// - /// NOTE: until the V1 spooler implementation is removed, this function returns `None` - /// if V2 spooling is not configured. pub fn new( partition_id: u8, config: Arc, memory_stat: MemoryStat, global_config_rx: watch::Receiver, services: Services, - ) -> Option { - config.spool_v2().then(|| Self { + ) -> Self { + Self { partition_id, config, memory_stat, @@ -245,7 +229,7 @@ impl EnvelopeBufferService { services, has_capacity: Arc::new(AtomicBool::new(true)), sleep: Duration::ZERO, - }) + } } /// Returns both the [`Addr`] to this service, and a reference to the capacity flag. @@ -633,14 +617,9 @@ mod tests { ) -> EnvelopeBufferServiceResult { relay_log::init_test!(); - let config_json = config_json.unwrap_or(serde_json::json!({ - "spool": { - "envelopes": { - "version": "experimental" - } - } - })); - let config = Arc::new(Config::from_json_value(config_json).unwrap()); + let config = Arc::new( + config_json.map_or_else(Config::default, |c| Config::from_json_value(c).unwrap()), + ); let memory_stat = MemoryStat::default(); let (global_tx, global_rx) = watch::channel(global_config_status); @@ -659,8 +638,7 @@ mod tests { outcome_aggregator, test_store: Addr::dummy(), }, - ) - .unwrap(); + ); EnvelopeBufferServiceResult { service: envelope_buffer_service, @@ -741,7 +719,6 @@ mod tests { Some(serde_json::json!({ "spool": { "envelopes": { - "version": "experimental", "path": std::env::temp_dir().join(Uuid::new_v4().to_string()), } }, @@ -776,7 +753,6 @@ mod tests { Some(serde_json::json!({ "spool": { "envelopes": { - "version": "experimental", "max_envelope_delay_secs": 1, } } @@ -908,16 +884,7 @@ mod tests { }; // Create two buffer services - let config = Arc::new( - Config::from_json_value(serde_json::json!({ - "spool": { - "envelopes": { - "version": "experimental" - } - } - })) - .unwrap(), - ); + let config = Arc::new(Config::default()); let buffer1 = EnvelopeBufferService::new( 0, @@ -925,8 +892,7 @@ mod tests { MemoryStat::default(), global_rx.clone(), services.clone(), - ) - .unwrap(); + ); let buffer2 = EnvelopeBufferService::new( 1, @@ -934,8 +900,7 @@ mod tests { MemoryStat::default(), global_rx, services, - ) - .unwrap(); + ); // Start both services and create partitioned buffer let observable1 = buffer1.start_in(&mut runner); diff --git a/relay-server/src/services/mod.rs b/relay-server/src/services/mod.rs index 8c06c1d0af..4284bab6e2 100644 --- a/relay-server/src/services/mod.rs +++ b/relay-server/src/services/mod.rs @@ -38,7 +38,6 @@ pub mod processor; pub mod projects; pub mod relays; pub mod server; -pub mod spooler; pub mod stats; pub mod test_store; pub mod upstream; diff --git a/relay-server/src/services/projects/cache/legacy.rs b/relay-server/src/services/projects/cache/legacy.rs index 8dfd88cddb..d419e028de 100644 --- a/relay-server/src/services/projects/cache/legacy.rs +++ b/relay-server/src/services/projects/cache/legacy.rs @@ -1,7 +1,5 @@ -use std::collections::{BTreeMap, BTreeSet}; -use std::error::Error; +use std::collections::BTreeMap; use std::sync::Arc; -use std::time::Duration; use crate::services::buffer::{ EnvelopeBuffer, EnvelopeBufferError, PartitionedEnvelopeBuffer, ProjectKeyPair, @@ -10,11 +8,8 @@ use crate::services::global_config; use crate::services::processor::{ EncodeMetrics, EnvelopeProcessor, ProcessEnvelope, ProcessingGroup, ProjectMetrics, }; -use crate::services::projects::cache::{CheckedEnvelope, ProjectCacheHandle, ProjectChange}; +use crate::services::projects::cache::{CheckedEnvelope, ProjectCacheHandle}; use crate::Envelope; -use hashbrown::HashSet; -use relay_base_schema::project::ProjectKey; -use relay_config::Config; use relay_statsd::metric; use relay_system::{Addr, FromMessage, Interface, Service}; use tokio::sync::{mpsc, watch}; @@ -22,54 +17,10 @@ use tokio::sync::{mpsc, watch}; use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets}; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::services::projects::project::ProjectState; -use crate::services::spooler::{ - self, Buffer, BufferService, DequeueMany, Enqueue, QueueKey, RemoveMany, RestoreIndex, - UnspooledEnvelope, BATCH_KEY_COUNT, -}; use crate::services::test_store::TestStore; -use crate::statsd::{RelayCounters, RelayGauges, RelayTimers}; -use crate::utils::{ManagedEnvelope, MemoryChecker, RetryBackoff, SleepHandle}; - -/// Validates the envelope against project configuration and rate limits. -/// -/// This ensures internally that the project state is up to date . -/// Once the envelope has been validated, remaining items are forwarded to the next stage: -/// -/// - If the envelope needs dynamic sampling, and the project state is not cached or out of the -/// date, the envelopes is spooled and we continue when the state is fetched. -/// - Otherwise, the envelope is directly submitted to the [`EnvelopeProcessor`]. -#[derive(Debug)] -pub struct ValidateEnvelope { - envelope: ManagedEnvelope, -} - -impl ValidateEnvelope { - pub fn new(envelope: ManagedEnvelope) -> Self { - Self { envelope } - } -} - -/// Updates the buffer index for [`ProjectKey`] with the [`QueueKey`] keys. -/// -/// This message is sent from the project buffer in case of the error while fetching the data from -/// the persistent buffer, ensuring that we still have the index pointing to the keys, which could be found in the -/// persistent storage. -#[derive(Debug)] -pub struct UpdateSpoolIndex(pub HashSet); - -impl UpdateSpoolIndex { - pub fn new(keys: HashSet) -> Self { - Self(keys) - } -} - -/// The current envelopes index fetched from the underlying buffer spool. -/// -/// This index will be received only once shortly after startup and will trigger refresh for the -/// project states for the project keys returned in the message. -#[derive(Debug)] -pub struct RefreshIndexCache(pub HashSet); +use crate::statsd::{RelayCounters, RelayTimers}; +use crate::utils::ManagedEnvelope; /// Handle an envelope that was popped from the envelope buffer. #[derive(Debug)] @@ -80,49 +31,19 @@ pub struct DequeuedEnvelope(pub Box); /// It manages spool v1 and some remaining messages which handle project state. #[derive(Debug)] pub enum ProjectCache { - ValidateEnvelope(ValidateEnvelope), FlushBuckets(FlushBuckets), - UpdateSpoolIndex(UpdateSpoolIndex), - RefreshIndexCache(RefreshIndexCache), } impl ProjectCache { pub fn variant(&self) -> &'static str { match self { - Self::ValidateEnvelope(_) => "ValidateEnvelope", Self::FlushBuckets(_) => "FlushBuckets", - Self::UpdateSpoolIndex(_) => "UpdateSpoolIndex", - Self::RefreshIndexCache(_) => "RefreshIndexCache", } } } impl Interface for ProjectCache {} -impl FromMessage for ProjectCache { - type Response = relay_system::NoResponse; - - fn from_message(message: UpdateSpoolIndex, _: ()) -> Self { - Self::UpdateSpoolIndex(message) - } -} - -impl FromMessage for ProjectCache { - type Response = relay_system::NoResponse; - - fn from_message(message: RefreshIndexCache, _: ()) -> Self { - Self::RefreshIndexCache(message) - } -} - -impl FromMessage for ProjectCache { - type Response = relay_system::NoResponse; - - fn from_message(message: ValidateEnvelope, _: ()) -> Self { - Self::ValidateEnvelope(message) - } -} - impl FromMessage for ProjectCache { type Response = relay_system::NoResponse; @@ -138,7 +59,6 @@ pub struct Services { pub aggregator: Addr, pub envelope_processor: Addr, pub outcome_aggregator: Addr, - pub project_cache: Addr, pub test_store: Addr, } @@ -148,31 +68,13 @@ pub struct Services { /// cache of project states. #[derive(Debug)] struct ProjectCacheBroker { - config: Arc, - memory_checker: MemoryChecker, services: Services, projects: ProjectCacheHandle, - /// Handle to schedule periodic unspooling of buffered envelopes (spool V1). - spool_v1_unspool_handle: SleepHandle, - spool_v1: Option, /// Status of the global configuration, used to determine readiness for processing. global_config: GlobalConfigStatus, } -#[derive(Debug)] -struct SpoolV1 { - /// Tx channel used by the [`BufferService`] to send back the requested dequeued elements. - buffer_tx: mpsc::UnboundedSender, - /// Index containing all the [`QueueKey`] that have been enqueued in the [`BufferService`]. - index: HashSet, - /// Backoff strategy for retrying unspool attempts. - buffer_unspool_backoff: RetryBackoff, - /// Address of the [`BufferService`] used for enqueuing and dequeuing envelopes that can't be - /// immediately processed. - buffer: Addr, -} - /// Describes the current status of the `GlobalConfig`. /// /// Either it's ready to be used, or it contains the list of in-flight project keys, @@ -185,185 +87,11 @@ enum GlobalConfigStatus { Pending, } -impl GlobalConfigStatus { - fn is_ready(&self) -> bool { - matches!(self, GlobalConfigStatus::Ready) - } -} - impl ProjectCacheBroker { fn set_global_config_ready(&mut self) { self.global_config = GlobalConfigStatus::Ready; } - /// Adds the value to the queue for the provided key. - fn enqueue(&mut self, key: QueueKey, value: ManagedEnvelope) { - let spool_v1 = self.spool_v1.as_mut().expect("no V1 spool configured"); - spool_v1.index.insert(key); - spool_v1.buffer.send(Enqueue::new(key, value)); - } - - /// Sends the message to the buffer service to dequeue the envelopes. - /// - /// All the found envelopes will be send back through the `buffer_tx` channel and directly - /// forwarded to `handle_processing`. - fn dequeue(&self, keys: HashSet) { - let spool_v1 = self.spool_v1.as_ref().expect("no V1 spool configured"); - spool_v1 - .buffer - .send(DequeueMany::new(keys, spool_v1.buffer_tx.clone())) - } - - fn evict_project(&mut self, project_key: ProjectKey) { - let Some(ref mut spool_v1) = self.spool_v1 else { - return; - }; - - let keys = spool_v1 - .index - .extract_if(|key| key.own_key == project_key || key.sampling_key == project_key) - .collect::>(); - - if !keys.is_empty() { - spool_v1.buffer.send(RemoveMany::new(project_key, keys)) - } - } - - fn handle_project_change(&mut self, event: ProjectChange) { - match event { - ProjectChange::Ready(_) => self.schedule_unspool(), - ProjectChange::Evicted(project_key) => self.evict_project(project_key), - } - } - - /// Handles the processing of the provided envelope. - fn handle_processing(&mut self, mut managed_envelope: ManagedEnvelope) { - let project_key = managed_envelope.envelope().meta().public_key(); - - let project = self.projects.get(project_key); - - let project_info = match project.state() { - ProjectState::Enabled(info) => info, - ProjectState::Disabled => { - managed_envelope.reject(Outcome::Invalid(DiscardReason::ProjectId)); - return; - } - ProjectState::Pending => { - relay_log::error!( - tags.project_key = %project_key, - "project has no valid cached state", - ); - return; - } - }; - - // The `Envelope` and `EnvelopeContext` will be dropped if the `Project::check_envelope()` - // function returns any error, which will also be ignored here. - // TODO(jjbayer): check_envelope also makes sure the envelope has proper scoping. - // If we don't call check_envelope in the same message handler as handle_processing, - // there is a chance that the project is not ready yet and events are published with - // `organization_id: 0`. We should eliminate this footgun by introducing a `ScopedEnvelope` - // type which guarantees that the envelope has complete scoping. - if let Ok(CheckedEnvelope { - envelope: Some(managed_envelope), - .. - }) = project.check_envelope(managed_envelope) - { - let rate_limits = project.rate_limits().current_limits(); - let reservoir_counters = project.reservoir_counters().clone(); - - let sampling_project_info = managed_envelope - .envelope() - .sampling_key() - .map(|key| self.projects.get(key)) - .and_then(|p| p.state().clone().enabled()) - .filter(|info| info.organization_id == project_info.organization_id); - - let process = ProcessEnvelope { - envelope: managed_envelope, - project_info: Arc::clone(project_info), - rate_limits, - sampling_project_info, - reservoir_counters, - }; - - self.services.envelope_processor.send(process); - } - } - - /// Checks an incoming envelope and decides either process it immediately or buffer it. - /// - /// Few conditions are checked here: - /// - If there is no dynamic sampling key and the project is already cached, we do straight to - /// processing otherwise buffer the envelopes. - /// - If the dynamic sampling key is provided and if the root and sampling projects - /// are cached - process the envelope, buffer otherwise. - /// - /// This means if the caches are hot we always process all the incoming envelopes without any - /// delay. But in case the project state cannot be fetched, we keep buffering till the state - /// is eventually updated. - /// - /// The flushing of the buffered envelopes happens in `update_state`. - fn handle_validate_envelope(&mut self, message: ValidateEnvelope) { - let ValidateEnvelope { - envelope: mut managed_envelope, - } = message; - - let envelope = managed_envelope.envelope(); - - // Fetch the project state for our key and make sure it's not invalid. - let own_key = envelope.meta().public_key(); - let project = self.projects.get(own_key); - - let project_state = match project.state() { - ProjectState::Enabled(state) => Some(state), - ProjectState::Disabled => { - managed_envelope.reject(Outcome::Invalid(DiscardReason::ProjectId)); - return; - } - ProjectState::Pending => None, - }; - - // Also, fetch the project state for sampling key and make sure it's not invalid. - let sampling_key = envelope.sampling_key(); - let mut requires_sampling_state = sampling_key.is_some(); - let sampling_info = if let Some(sampling_key) = sampling_key { - let sampling_project = self.projects.get(sampling_key); - match sampling_project.state() { - ProjectState::Enabled(info) => Some(Arc::clone(info)), - ProjectState::Disabled => { - relay_log::trace!("Sampling state is disabled ({sampling_key})"); - // We accept events even if its root project has been disabled. - requires_sampling_state = false; - None - } - ProjectState::Pending => { - relay_log::trace!("Sampling state is pending ({sampling_key})"); - None - } - } - } else { - None - }; - - let key = QueueKey::new(own_key, sampling_key.unwrap_or(own_key)); - - // Trigger processing once we have a project state and we either have a sampling project - // state or we do not need one. - if project_state.is_some() - && (sampling_info.is_some() || !requires_sampling_state) - && self.memory_checker.check_memory().has_capacity() - && self.global_config.is_ready() - { - // TODO: Add ready project infos to the processing message. - relay_log::trace!("Sending envelope to processor"); - return self.handle_processing(managed_envelope); - } - - relay_log::trace!("Enqueueing envelope"); - self.enqueue(key, managed_envelope); - } - fn handle_flush_buckets(&mut self, message: FlushBuckets) { let aggregator = self.services.aggregator.clone(); @@ -425,25 +153,6 @@ impl ProjectCacheBroker { ); } - fn handle_buffer_index(&mut self, message: UpdateSpoolIndex) { - let spool_v1 = self.spool_v1.as_mut().expect("no V1 spool configured"); - spool_v1.index.extend(message.0); - } - - fn handle_refresh_index_cache(&mut self, message: RefreshIndexCache) { - let RefreshIndexCache(index) = message; - - for key in index { - let spool_v1 = self.spool_v1.as_mut().expect("no V1 spool configured"); - spool_v1.index.insert(key); - - self.projects.fetch(key.own_key); - if key.own_key != key.sampling_key { - self.projects.fetch(key.sampling_key); - } - } - } - fn handle_dequeued_envelope( &mut self, envelope: Box, @@ -527,89 +236,6 @@ impl ProjectCacheBroker { Ok(()) } - /// Returns backoff timeout for an unspool attempt. - fn next_unspool_attempt(&mut self) -> Duration { - let spool_v1 = self.spool_v1.as_mut().expect("no V1 spool configured"); - self.config.spool_envelopes_unspool_interval() - + spool_v1.buffer_unspool_backoff.next_backoff() - } - - fn schedule_unspool(&mut self) { - if self.spool_v1.is_none() { - return; - } - - if self.spool_v1_unspool_handle.is_idle() { - // Set the time for the next attempt. - let wait = self.next_unspool_attempt(); - self.spool_v1_unspool_handle.set(wait); - } - } - - /// Returns `true` if the project state valid for the [`QueueKey`]. - /// - /// Which includes the own key and the sampling key for the project. - /// Note: this function will trigger [`ProjectState`] refresh if it's already expired. - fn is_state_cached(&mut self, key: &QueueKey) -> bool { - key.unique_keys() - .iter() - .all(|key| !self.projects.get(*key).state().is_pending()) - } - - /// Iterates the buffer index and tries to unspool the envelopes for projects with a valid - /// state. - /// - /// This makes sure we always moving the unspool forward, even if we do not fetch the project - /// states updates, but still can process data based on the existing cache. - fn handle_periodic_unspool(&mut self) { - relay_log::trace!("handle_periodic_unspool"); - let (num_keys, reason) = self.handle_periodic_unspool_inner(); - relay_statsd::metric!( - gauge(RelayGauges::BufferPeriodicUnspool) = num_keys as u64, - reason = reason - ); - } - - fn handle_periodic_unspool_inner(&mut self) -> (usize, &str) { - let spool_v1 = self.spool_v1.as_mut().expect("no V1 spool configured"); - self.spool_v1_unspool_handle.reset(); - - // If we don't yet have the global config, we will defer dequeuing until we do. - if let GlobalConfigStatus::Pending = self.global_config { - spool_v1.buffer_unspool_backoff.reset(); - self.schedule_unspool(); - return (0, "no_global_config"); - } - // If there is nothing spooled, schedule the next check a little bit later. - if spool_v1.index.is_empty() { - self.schedule_unspool(); - return (0, "index_empty"); - } - - let mut index = std::mem::take(&mut spool_v1.index); - let keys = index - .extract_if(|key| self.is_state_cached(key)) - .take(BATCH_KEY_COUNT) - .collect::>(); - let num_keys = keys.len(); - - if !keys.is_empty() { - self.dequeue(keys); - } - - // Return all the un-used items to the index. - let spool_v1 = self.spool_v1.as_mut().expect("no V1 spool configured"); - if !index.is_empty() { - spool_v1.index.extend(index); - } - - // Schedule unspool once we are done. - spool_v1.buffer_unspool_backoff.reset(); - self.schedule_unspool(); - - (num_keys, "found_keys") - } - fn handle_message(&mut self, message: ProjectCache) { let ty = message.variant(); metric!( @@ -617,14 +243,7 @@ impl ProjectCacheBroker { message = ty, { match message { - ProjectCache::ValidateEnvelope(message) => { - self.handle_validate_envelope(message) - } ProjectCache::FlushBuckets(message) => self.handle_flush_buckets(message), - ProjectCache::UpdateSpoolIndex(message) => self.handle_buffer_index(message), - ProjectCache::RefreshIndexCache(message) => { - self.handle_refresh_index_cache(message) - } } } ) @@ -637,8 +256,7 @@ impl ProjectCacheBroker { .envelope_buffer .clone() .buffer(project_key_pair) - .map(|b| b.addr()) - .expect("Called HandleDequeuedEnvelope without an envelope buffer"); + .addr(); if let Err(e) = self.handle_dequeued_envelope(dequeued_envelope.0, envelope_buffer) { relay_log::error!( @@ -652,8 +270,6 @@ impl ProjectCacheBroker { /// Service implementing the [`ProjectCache`] interface. #[derive(Debug)] pub struct ProjectCacheService { - config: Arc, - memory_checker: MemoryChecker, project_cache_handle: ProjectCacheHandle, services: Services, global_config_rx: watch::Receiver, @@ -664,16 +280,12 @@ pub struct ProjectCacheService { impl ProjectCacheService { /// Creates a new `ProjectCacheService`. pub fn new( - config: Arc, - memory_checker: MemoryChecker, project_cache_handle: ProjectCacheHandle, services: Services, global_config_rx: watch::Receiver, envelopes_rx: mpsc::Receiver, ) -> Self { Self { - config, - memory_checker, project_cache_handle, services, global_config_rx, @@ -687,18 +299,11 @@ impl Service for ProjectCacheService { async fn run(self, mut rx: relay_system::Receiver) { let Self { - config, - memory_checker, project_cache_handle, services, mut global_config_rx, mut envelopes_rx, } = self; - let mut project_changes = project_cache_handle.changes(); - let project_cache = services.project_cache.clone(); - let outcome_aggregator = services.outcome_aggregator.clone(); - let test_store = services.test_store.clone(); - relay_log::info!("project cache started"); let global_config = match global_config_rx.borrow().clone() { @@ -712,57 +317,9 @@ impl Service for ProjectCacheService { } }; - let (buffer_tx, mut buffer_rx) = mpsc::unbounded_channel(); - let spool_v1 = match config.spool_v2() { - true => None, - false => Some({ - // Channel for envelope buffering. - let buffer_services = spooler::Services { - outcome_aggregator, - project_cache, - test_store, - }; - let buffer = match BufferService::create( - memory_checker.clone(), - buffer_services, - config.clone(), - ) - .await - { - Ok(buffer) => { - // NOTE: This service is not monitored by the service runner. - buffer.start_detached() - } - Err(err) => { - relay_log::error!( - error = &err as &dyn Error, - "failed to start buffer service", - ); - // NOTE: The process will exit with error if the buffer file could not be - // opened or the migrations could not be run. - std::process::exit(1); - } - }; - - // Request the existing index from the spooler. - buffer.send(RestoreIndex); - - SpoolV1 { - buffer_tx, - index: HashSet::new(), - buffer_unspool_backoff: RetryBackoff::new(config.http_max_retry_interval()), - buffer, - } - }), - }; - let mut broker = ProjectCacheBroker { - config: config.clone(), - memory_checker, projects: project_cache_handle, services, - spool_v1_unspool_handle: SleepHandle::idle(), - spool_v1, global_config, }; @@ -780,25 +337,6 @@ impl Service for ProjectCacheService { } }) }, - project_change = project_changes.recv() => { - metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "handle_project_change", { - if let Ok(project_change) = project_change { - broker.handle_project_change(project_change); - } - }) - } - // Buffer will not dequeue the envelopes from the spool if there is not enough - // permits in `BufferGuard` available. Currently this is 50%. - Some(UnspooledEnvelope { managed_envelope, .. }) = buffer_rx.recv() => { - metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "handle_processing", { - broker.handle_processing(managed_envelope) - }) - }, - () = &mut broker.spool_v1_unspool_handle => { - metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "periodic_unspool", { - broker.handle_periodic_unspool() - }) - } Some(message) = rx.recv() => { metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "handle_message", { broker.handle_message(message) @@ -816,164 +354,3 @@ impl Service for ProjectCacheService { relay_log::info!("project cache stopped"); } } - -#[cfg(test)] -mod tests { - use relay_test::mock_service; - use tokio::select; - use uuid::Uuid; - - use crate::services::processor::ProcessingGroup; - use crate::testutils::empty_envelope_with_dsn; - use crate::utils::MemoryStat; - - use super::*; - - fn mocked_services() -> Services { - let (aggregator, _) = mock_service("aggregator", (), |&mut (), _| {}); - let (envelope_processor, _) = mock_service("envelope_processor", (), |&mut (), _| {}); - let (outcome_aggregator, _) = mock_service("outcome_aggregator", (), |&mut (), _| {}); - let (project_cache, _) = mock_service("project_cache", (), |&mut (), _| {}); - let (test_store, _) = mock_service("test_store", (), |&mut (), _| {}); - - Services { - envelope_buffer: PartitionedEnvelopeBuffer::empty(), - aggregator, - envelope_processor, - project_cache, - outcome_aggregator, - test_store, - } - } - - async fn project_cache_broker_setup( - services: Services, - buffer_tx: mpsc::UnboundedSender, - ) -> (ProjectCacheBroker, Addr) { - let config: Arc<_> = Config::from_json_value(serde_json::json!({ - "spool": { - "envelopes": { - "path": std::env::temp_dir().join(Uuid::new_v4().to_string()), - "max_memory_size": 0, // 0 bytes, to force to spool to disk all the envelopes. - }, - "health": { - "max_memory_percent": 0.0 - } - } - })) - .unwrap() - .into(); - let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); - let buffer_services = spooler::Services { - outcome_aggregator: services.outcome_aggregator.clone(), - project_cache: services.project_cache.clone(), - test_store: services.test_store.clone(), - }; - let buffer = - match BufferService::create(memory_checker.clone(), buffer_services, config.clone()) - .await - { - Ok(buffer) => buffer.start_detached(), - Err(err) => { - relay_log::error!(error = &err as &dyn Error, "failed to start buffer service"); - // NOTE: The process will exit with error if the buffer file could not be - // opened or the migrations could not be run. - std::process::exit(1); - } - }; - - ( - ProjectCacheBroker { - config: config.clone(), - memory_checker, - projects: ProjectCacheHandle::for_test(), - services, - spool_v1_unspool_handle: SleepHandle::idle(), - spool_v1: Some(SpoolV1 { - buffer_tx, - index: HashSet::new(), - buffer: buffer.clone(), - buffer_unspool_backoff: RetryBackoff::new(Duration::from_millis(100)), - }), - global_config: GlobalConfigStatus::Pending, - }, - buffer, - ) - } - - #[tokio::test] - async fn periodic_unspool() { - relay_log::init_test!(); - - let services = mocked_services(); - let (buffer_tx, mut buffer_rx) = mpsc::unbounded_channel(); - let (mut broker, _buffer_svc) = - project_cache_broker_setup(services.clone(), buffer_tx).await; - let projects = broker.projects.clone(); - let mut project_events = projects.changes(); - - broker.global_config = GlobalConfigStatus::Ready; - let (tx_assert, mut rx_assert) = mpsc::unbounded_channel(); - - let dsn1 = "111d836b15bb49d7bbf99e64295d995b"; - let dsn2 = "eeed836b15bb49d7bbf99e64295d995b"; - - // Send and spool some envelopes. - for dsn in [dsn1, dsn2] { - let envelope = ManagedEnvelope::new( - empty_envelope_with_dsn(dsn), - services.outcome_aggregator.clone(), - services.test_store.clone(), - ProcessingGroup::Ungrouped, - ); - - let message = ValidateEnvelope { envelope }; - - broker.handle_validate_envelope(message); - tokio::time::sleep(Duration::from_millis(200)).await; - // Nothing will be dequeued. - assert!(buffer_rx.try_recv().is_err()) - } - - // Emulate the project cache service loop. - relay_system::spawn!(async move { - loop { - select! { - Ok(project_event) = project_events.recv() => { - broker.handle_project_change(project_event); - } - Some(assert) = rx_assert.recv() => { - assert_eq!(broker.spool_v1.as_ref().unwrap().index.len(), assert); - }, - () = &mut broker.spool_v1_unspool_handle => broker.handle_periodic_unspool(), - } - } - }); - - // Before updating any project states. - tx_assert.send(2).unwrap(); - - projects.test_set_project_state( - ProjectKey::parse(dsn1).unwrap(), - ProjectState::new_allowed(), - ); - - assert!(buffer_rx.recv().await.is_some()); - // One of the project should be unspooled. - tx_assert.send(1).unwrap(); - - // Schedule some work... - tokio::time::sleep(Duration::from_secs(2)).await; - - projects.test_set_project_state( - ProjectKey::parse(dsn2).unwrap(), - ProjectState::new_allowed(), - ); - - assert!(buffer_rx.recv().await.is_some()); - // The last project should be unspooled. - tx_assert.send(0).unwrap(); - // Make sure the last assert is tested. - tokio::time::sleep(Duration::from_millis(100)).await; - } -} diff --git a/relay-server/src/services/spooler/mod.rs b/relay-server/src/services/spooler/mod.rs deleted file mode 100644 index 956e346289..0000000000 --- a/relay-server/src/services/spooler/mod.rs +++ /dev/null @@ -1,1759 +0,0 @@ -//! This module contains the [`BufferService`], which is responsible for spooling of the incoming -//! envelopes, either to in-memory or to persistent storage. -//! -//! The main entry point for the [`BufferService`] is the [`Buffer`] interface, which currently -//! supports: -//! - [`Enqueue`] - enqueueing a message into the backend storage -//! - [`DequeueMany`] - dequeueing all the requested [`QueueKey`] keys -//! - [`RemoveMany`] - removing and dropping the requested [`QueueKey`] keys. -//! - [`Health`] - checking the health of the [`BufferService`] -//! -//! To make sure the [`BufferService`] is fast the responsive, especially in the normal working -//! conditions, it keeps the internal [`BufferState`] state, which defines where the spooling will -//! be happening. -//! -//! The initial state is always [`InMemory`], and if the Relay can properly fetch all the -//! [`crate::services::projects::project::ProjectState`] it continues to use the memory as temporary spool. -//! -//! Keeping the envelopes in memory as long as we can, we ensure the fast unspool operations and -//! fast processing times. -//! -//! In case of an incident when the in-memory spool gets full (see, `spool.envelopes.max_memory_size` config option) -//! or if the processing pipeline gets too many messages in-flight, configured by -//! `cache.envelope_buffer_size` and once it reaches 80% of the defined amount, the internal state will be -//! switched to [`OnDisk`] and service will continue spooling all the incoming envelopes onto the disk. -//! This will happen also only when the disk spool is configured. -//! -//! The state can be changed to [`InMemory`] again only if all the on-disk spooled envelopes are -//! read out again and the disk is empty. -//! -//! Current on-disk spool implementation uses SQLite as a storage. - -use chrono::{DateTime, Utc}; -use futures::stream::{self, StreamExt}; -use hashbrown::HashSet; -use relay_base_schema::project::{ParseProjectKeyError, ProjectKey}; -use relay_config::Config; -use relay_statsd::metric; -use relay_system::{Addr, Controller, FromMessage, Interface, Sender, Service}; -use smallvec::{smallvec, SmallVec}; -use sqlx::migrate::MigrateError; -use sqlx::sqlite::{ - SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteRow, - SqliteSynchronous, -}; -use sqlx::{Pool, Row, Sqlite}; -use std::collections::{BTreeMap, BTreeSet}; -use std::error::Error; -use std::path::Path; -use std::pin::pin; -use std::sync::Arc; -use tokio::fs::DirBuilder; -use tokio::sync::mpsc; - -use crate::envelope::{Envelope, EnvelopeError}; -use crate::services::outcome::TrackOutcome; -use crate::services::processor::ProcessingGroup; -use crate::services::projects::cache::legacy::{ProjectCache, RefreshIndexCache, UpdateSpoolIndex}; -use crate::services::test_store::TestStore; -use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; -use crate::utils::{ManagedEnvelope, MemoryChecker}; - -pub mod spool_utils; -mod sql; - -/// The number of keys to take from the [`ProjectCache`] index in one unspool operation. -pub const BATCH_KEY_COUNT: usize = 2000; - -/// The predefined batch size for the SQL queries, when fetching anything from the on-disk spool. -const BATCH_SIZE: i64 = 200; - -/// The low memory watermark for spool. -/// -/// This number is used to calculate how much memory should be taken by the on-disk spool if all -/// the spooled envelopes will be moved to in-memory buffer. -const LOW_SPOOL_MEMORY_WATERMARK: f64 = 0.3; - -/// The set of errors which can happened while working the buffer. -#[derive(Debug, thiserror::Error)] -pub enum BufferError { - #[error("failed to move envelope from disk to memory")] - CapacityExceeded, - - #[error("failed to get the size of the buffer on the filesystem")] - DatabaseFileError(#[from] std::io::Error), - - #[error("failed to insert data into database: {0}")] - InsertFailed(sqlx::Error), - - #[error("failed to delete data from the database: {0}")] - DeleteFailed(sqlx::Error), - - #[error("failed to fetch data from the database: {0}")] - FetchFailed(sqlx::Error), - - #[error("failed to get database file size: {0}")] - FileSizeReadFailed(sqlx::Error), - - #[error("failed to setup the database: {0}")] - SqlxSetupFailed(sqlx::Error), - - #[error("failed to create the spool file: {0}")] - FileSetupError(std::io::Error), - - #[error(transparent)] - EnvelopeError(#[from] EnvelopeError), - - #[error("failed to run migrations")] - MigrationFailed(#[from] MigrateError), - - #[error("failed to extract project key from the row")] - ParseProjectKeyFailed(#[from] ParseProjectKeyError), - - #[error("on-disk spool is full")] - SpoolIsFull, -} - -/// This key represents the index element in the queue. -/// -/// It consists from two parts, the own key of the project and the sampling key which points to the -/// sampling project. The sampling key can be the same as the own key if the own and sampling -/// projects are the same. -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct QueueKey { - pub own_key: ProjectKey, - pub sampling_key: ProjectKey, -} - -impl QueueKey { - pub fn new(own_key: ProjectKey, sampling_key: ProjectKey) -> Self { - Self { - own_key, - sampling_key, - } - } - - pub fn from_envelope(envelope: &Envelope) -> Self { - let meta = envelope.meta(); - Self { - own_key: meta.public_key(), - sampling_key: envelope.sampling_key().unwrap_or(meta.public_key()), - } - } - - /// Returns both keys, but omits duplicates. - pub fn unique_keys(&self) -> SmallVec<[ProjectKey; 2]> { - let mut keys = smallvec![self.own_key]; - if self.sampling_key != self.own_key { - keys.push(self.sampling_key); - } - keys - } -} - -/// The envelope with its key sent to project cache for processing. -/// -/// It's sent in response to [`DequeueMany`] message from the [`ProjectCache`]. -#[derive(Debug)] -pub struct UnspooledEnvelope { - pub managed_envelope: ManagedEnvelope, -} - -/// Adds the envelope and the managed envelope to the internal buffer. -#[derive(Debug)] -pub struct Enqueue { - key: QueueKey, - value: ManagedEnvelope, -} - -impl Enqueue { - pub fn new(key: QueueKey, value: ManagedEnvelope) -> Self { - Self { key, value } - } -} - -/// Removes messages from the internal buffer and streams them to the sender. -#[derive(Debug)] -pub struct DequeueMany { - keys: HashSet, - sender: mpsc::UnboundedSender, -} - -impl DequeueMany { - pub fn new(keys: HashSet, sender: mpsc::UnboundedSender) -> Self { - Self { keys, sender } - } -} - -/// Removes the provided keys from the internal buffer. -/// -/// If any of the provided keys still have envelopes, an error will be logged with the -/// number of envelopes dropped for the specific project key. -#[derive(Debug)] -pub struct RemoveMany { - project_key: ProjectKey, - keys: BTreeSet, -} - -impl RemoveMany { - pub fn new(project_key: ProjectKey, keys: BTreeSet) -> Self { - Self { project_key, keys } - } -} - -/// Checks the health of the spooler. -#[derive(Debug)] -pub struct Health(pub Sender); - -/// Requests the index [`ProjectKey`] -> [`QueueKey`] of the data currently residing in the spool. -/// -/// This is a one time request, which is sent on startup. -/// Upon receiving this message the buffer internally will check the existing keys in the -/// on-disk spool and compile the index which will be returned to [`ProjectCache`]. -/// -/// The returned message will initiate the project state refresh for all the returned project keys. -#[derive(Debug)] -pub struct RestoreIndex; - -/// The interface for [`BufferService`]. -/// -/// Buffer maintaince internal storage (internal buffer) of the envelopes, which keep accumulating -/// till the request to dequeue them again comes in. -/// -/// The envelopes first will be kept in memory buffer and it that hits the configured limit the -/// envelopes will be buffer to the disk. -/// -/// To add the envelopes to the buffer use [`Enqueue`] which will persists the envelope in the -/// internal storage. To retrieve the envelopes one can use [`DequeueMany`], where one is expected -/// to provide the list of [`QueueKey`]s and the [`mpsc::UnboundedSender`] - all the found envelopes -/// will be streamed back to this sender. -/// -/// There is also a [`RemoveMany`] operation, which, when requested, removes the found keys from -/// the queue and drops them. If any of the keys still have envelopes, an error will be logged. -#[derive(Debug)] -pub enum Buffer { - Enqueue(Enqueue), - DequeueMany(DequeueMany), - RemoveMany(RemoveMany), - Health(Health), - RestoreIndex(RestoreIndex), -} - -impl Buffer { - pub fn variant(&self) -> &'static str { - match self { - Buffer::Enqueue(_) => "Enqueue", - Buffer::DequeueMany(_) => "DequeueMany", - Buffer::RemoveMany(_) => "RemoveMany", - Buffer::Health(_) => "Health", - Buffer::RestoreIndex(_) => "RestoreIndex", - } - } -} - -impl Interface for Buffer {} - -impl FromMessage for Buffer { - type Response = relay_system::NoResponse; - - fn from_message(message: Enqueue, _: ()) -> Self { - Self::Enqueue(message) - } -} - -impl FromMessage for Buffer { - type Response = relay_system::NoResponse; - - fn from_message(message: DequeueMany, _: ()) -> Self { - Self::DequeueMany(message) - } -} - -impl FromMessage for Buffer { - type Response = relay_system::NoResponse; - - fn from_message(message: RemoveMany, _: ()) -> Self { - Self::RemoveMany(message) - } -} - -impl FromMessage for Buffer { - type Response = relay_system::NoResponse; - - fn from_message(message: Health, _: ()) -> Self { - Self::Health(message) - } -} - -impl FromMessage for Buffer { - type Response = relay_system::NoResponse; - - fn from_message(message: RestoreIndex, _: ()) -> Self { - Self::RestoreIndex(message) - } -} - -/// The configuration which describes the in-memory [`BufferState`]. -#[derive(Debug)] -struct InMemory { - buffer: BTreeMap>, - max_memory_size: usize, - used_memory: usize, - envelope_count: usize, -} - -impl InMemory { - /// Create a new [`InMemory`] state. - fn new(max_memory_size: usize) -> Self { - Self { - max_memory_size, - buffer: BTreeMap::new(), - used_memory: 0, - envelope_count: 0, - } - } - - /// Creates a new [`InMemory`] state using already provided buffer of the envelopes. - fn new_with_buffer( - max_memory_size: usize, - buffer: BTreeMap>, - ) -> Self { - let (envelope_count, used_memory) = - buffer - .values() - .fold((0, 0), |(envelope_count, used_memory), envelopes| { - ( - envelope_count + envelopes.len(), - used_memory + envelopes.iter().map(|e| e.estimated_size()).sum::(), - ) - }); - - Self { - max_memory_size, - buffer, - used_memory, - envelope_count, - } - } - - /// Returns the number of envelopes in the memory buffer. - fn count(&self) -> usize { - self.buffer.values().map(|v| v.len()).sum() - } - - /// Removes envelopes from the in-memory buffer. - fn remove(&mut self, keys: &BTreeSet) -> usize { - let mut count = 0; - for key in keys { - let (current_count, current_size) = self.buffer.remove(key).map_or((0, 0), |k| { - (k.len(), k.into_iter().map(|k| k.estimated_size()).sum()) - }); - count += current_count; - self.used_memory -= current_size; - } - self.envelope_count = self.envelope_count.saturating_sub(count); - relay_statsd::metric!( - histogram(RelayHistograms::BufferEnvelopesMemoryBytes) = self.used_memory as f64 - ); - relay_statsd::metric!( - gauge(RelayGauges::BufferEnvelopesMemoryCount) = self.envelope_count as u64 - ); - - count - } - - /// Dequeues the envelopes from the in-memory buffer and send them to provided `sender`. - fn dequeue( - &mut self, - keys: HashSet, - sender: mpsc::UnboundedSender, - ) { - for key in keys { - for envelope in self.buffer.remove(&key).unwrap_or_default() { - self.used_memory -= envelope.estimated_size(); - self.envelope_count = self.envelope_count.saturating_sub(1); - sender - .send(UnspooledEnvelope { - managed_envelope: envelope, - }) - .ok(); - } - } - relay_statsd::metric!( - histogram(RelayHistograms::BufferEnvelopesMemoryBytes) = self.used_memory as f64 - ); - relay_statsd::metric!( - gauge(RelayGauges::BufferEnvelopesMemoryCount) = self.envelope_count as u64 - ); - } - - /// Enqueues the envelope into the in-memory buffer. - fn enqueue(&mut self, key: QueueKey, managed_envelope: ManagedEnvelope) { - self.envelope_count += 1; - self.used_memory += managed_envelope.estimated_size(); - self.buffer.entry(key).or_default().push(managed_envelope); - relay_statsd::metric!( - histogram(RelayHistograms::BufferEnvelopesMemoryBytes) = self.used_memory as f64 - ); - relay_statsd::metric!( - gauge(RelayGauges::BufferEnvelopesMemoryCount) = self.envelope_count as u64 - ); - } - - /// Returns `true` if the in-memory buffer is full, `false` otherwise. - fn is_full(&self) -> bool { - self.max_memory_size == 0 || self.used_memory >= self.max_memory_size - } -} - -/// The configuration which describes the on-disk [`BufferState`]. -#[derive(Debug)] -struct OnDisk { - dequeue_attempts: usize, - db: Pool, - memory_checker: MemoryChecker, - max_disk_size: usize, - /// The number of items currently on disk. - /// - /// We do not track the count when we encounter envelopes in the database on startup, - /// because counting those envelopes would risk locking the db for multiple seconds. - count: Option, -} - -impl OnDisk { - /// Saves the provided buffer to the disk. - /// - /// Returns an error if the spooling failed, and the number of spooled envelopes on success. - async fn spool( - &mut self, - buffer: BTreeMap>, - ) -> Result<(), BufferError> { - relay_statsd::metric!(histogram(RelayHistograms::BufferEnvelopesMemoryBytes) = 0); - let envelopes = buffer - .into_iter() - .flat_map(|(key, values)| { - values - .into_iter() - .map(move |value| (key, value.received_at().timestamp_millis(), value)) - }) - .filter_map( - |(key, received_at, managed)| match managed.into_envelope().to_vec() { - Ok(vec) => Some((key, vec, received_at)), - Err(err) => { - relay_log::error!( - error = &err as &dyn Error, - "failed to serialize the envelope" - ); - None - } - }, - ); - - let inserted = sql::do_insert(stream::iter(envelopes), &self.db) - .await - .map_err(BufferError::InsertFailed)?; - - self.track_count(inserted as i64); - - Ok(()) - } - - /// Removes the envelopes from the on-disk spool. - /// - /// Returns the count of removed envelopes. - async fn remove(&mut self, keys: &BTreeSet) -> Result { - let mut count = 0; - for key in keys { - let result = sql::delete(*key) - .execute(&self.db) - .await - .map_err(BufferError::DeleteFailed)?; - count += result.rows_affected(); - } - - self.track_count(-(count as i64)); - - Ok(count as usize) - } - - /// Extracts the envelope from the `SqliteRow`. - /// - /// Reads the bytes and tries to perse them into `Envelope`. - fn extract_envelope( - &self, - row: SqliteRow, - services: &Services, - ) -> Result<(QueueKey, Vec), BufferError> { - let envelope_row: Vec = row.try_get("envelope").map_err(BufferError::FetchFailed)?; - let envelope_bytes = bytes::Bytes::from(envelope_row); - let mut envelope = Envelope::parse_bytes(envelope_bytes)?; - - let received_at: i64 = row - .try_get("received_at") - .map_err(BufferError::FetchFailed)?; - let received_at = DateTime::from_timestamp_millis(received_at).unwrap_or(Utc::now()); - let own_key: &str = row.try_get("own_key").map_err(BufferError::FetchFailed)?; - let sampling_key: &str = row - .try_get("sampling_key") - .map_err(BufferError::FetchFailed)?; - let queue_key = QueueKey { - own_key: ProjectKey::parse(own_key).map_err(BufferError::ParseProjectKeyFailed)?, - sampling_key: ProjectKey::parse(sampling_key) - .map_err(BufferError::ParseProjectKeyFailed)?, - }; - - envelope.set_received_at(received_at); - - let envelopes: Result, BufferError> = ProcessingGroup::split_envelope(*envelope) - .into_iter() - .map(|(group, envelope)| { - let managed_envelope = ManagedEnvelope::new( - envelope, - services.outcome_aggregator.clone(), - services.test_store.clone(), - group, - ); - - Ok(managed_envelope) - }) - .collect(); - Ok((queue_key, envelopes?)) - } - - /// Returns the size of the batch to unspool. - fn unspool_batch(&self) -> i64 { - BATCH_SIZE - } - - /// Tries to delete the envelopes from the persistent buffer in batches, - /// extract and convert them to managed envelopes and send back into - /// processing pipeline. - /// - /// If the error happens in the deletion/fetching phase, a key is returned - /// to allow retrying later. - /// - /// Returns the amount of envelopes deleted from disk. - async fn delete_and_fetch( - &mut self, - mut keys: Vec, - sender: mpsc::UnboundedSender, - services: &Services, - ) -> Result<(), Vec> { - loop { - // Before querying the db, make sure that the buffer guard has enough availability: - self.dequeue_attempts += 1; - if self.memory_checker.check_memory().is_exceeded() { - break; - } - relay_statsd::metric!( - histogram(RelayHistograms::BufferDequeueAttempts) = self.dequeue_attempts as u64 - ); - self.dequeue_attempts = 0; - - // Removing envelopes from the on-disk buffer in batches has following implications: - // 1. It is faster to delete from the DB in batches. - // 2. Make sure that if we panic and deleted envelopes cannot be read out fully, we do not lose all of them, - // but only one batch, and the rest of them will stay on disk for the next iteration - // to pick up. - let query = sql::prepare_delete_query(std::mem::take(&mut keys)); - let envelopes = sql::delete_and_fetch(&query, self.unspool_batch()) - .fetch(&self.db) - .peekable(); - let mut envelopes = pin!(envelopes); - relay_statsd::metric!(counter(RelayCounters::BufferReadsDisk) += 1); - - // Stream is empty, we can break the loop, since we read everything by now. - if envelopes.as_mut().peek().await.is_none() { - break; - } - - let mut count: i64 = 0; - while let Some(envelope) = envelopes.as_mut().next().await { - count += 1; - let envelope = match envelope { - Ok(envelope) => envelope, - Err(err) => { - relay_log::error!( - error = &err as &dyn Error, - "failed to read the buffer stream from the disk", - ); - self.track_count(-count); - continue; - } - }; - - match self.extract_envelope(envelope, services) { - Ok((_, managed_envelopes)) => { - for managed_envelope in managed_envelopes { - sender.send(UnspooledEnvelope { managed_envelope }).ok(); - } - } - Err(err) => relay_log::error!( - error = &err as &dyn Error, - "failed to extract envelope from the buffer", - ), - } - } - - self.track_count(-count); - } - - if !keys.is_empty() { - return Err(keys); - } - - Ok(()) - } - - /// Unspools the entire contents of the on-disk spool. - async fn delete_and_fetch_all( - &mut self, - services: &Services, - ) -> Result>, BufferError> { - let mut result: BTreeMap> = BTreeMap::new(); - - loop { - // On each iteration make sure we are still below the lower limit of available - // guard permits. - if self.memory_checker.check_memory().is_exceeded() { - return Ok(result); - } - let envelopes = sql::delete_and_fetch_all(self.unspool_batch()) - .fetch(&self.db) - .peekable(); - let mut envelopes = pin!(envelopes); - relay_statsd::metric!(counter(RelayCounters::BufferReadsDisk) += 1); - // Stream is empty, we can break the loop, since we read everything by now. - if envelopes.as_mut().peek().await.is_none() { - break; - } - - let mut count: i64 = 0; - while let Some(envelope) = envelopes.as_mut().next().await { - count += 1; - let envelope = match envelope { - Ok(envelope) => envelope, - - // Bail if there are errors in the stream. - Err(err) => { - relay_log::error!( - error = &err as &dyn Error, - "failed to read the buffer stream from the disk", - ); - continue; - } - }; - - match self.extract_envelope(envelope, services) { - Ok((key, managed_envelopes)) => { - for managed_envelope in managed_envelopes { - result.entry(key).or_default().push(managed_envelope); - } - } - Err(err) => relay_log::error!( - error = &err as &dyn Error, - "failed to extract envelope from the buffer", - ), - } - } - self.track_count(-count); - } - - Ok(result) - } - - /// Dequeues the envelopes from the on-disk spool and send them to the provided `sender`. - /// - /// The keys for which the envelopes could not be fetched, send back to `ProjectCache` to merge - /// back into index. - async fn dequeue( - &mut self, - mut keys: HashSet, - sender: mpsc::UnboundedSender, - services: &Services, - ) { - if keys.is_empty() { - return; - } - - let mut unused_keys = HashSet::new(); - loop { - if keys.is_empty() { - break; - } - - let chunk = keys - .extract_if(|_| true) - .take(BATCH_SIZE as usize) - .collect(); - - // If the error with a key is returned we must save it for the next iteration. - if let Err(failed_keys) = self.delete_and_fetch(chunk, sender.clone(), services).await { - unused_keys.extend(failed_keys); - }; - } - - if !unused_keys.is_empty() { - services - .project_cache - .send(UpdateSpoolIndex::new(unused_keys)) - } - } - - /// Estimates the db size by multiplying `used_page_count * page_size`. - async fn estimate_spool_size(&self) -> Result { - let size: i64 = sql::estimate_size() - .fetch_one(&self.db) - .await - .and_then(|r| r.try_get(0)) - .map_err(BufferError::FileSizeReadFailed)?; - - relay_statsd::metric!(histogram(RelayHistograms::BufferDiskSize) = size as u64); - Ok(size) - } - - /// Returns `true` if the maximum size is reached, `false` otherwise. - async fn is_full(&self) -> Result { - Ok(self.max_disk_size == 0 - || (self.estimate_spool_size().await? as usize) >= self.max_disk_size) - } - - /// Returns `true` if the spool is empty, `false` otherwise. - async fn is_empty(&self) -> Result { - let is_empty = sql::select_one() - .fetch_optional(&self.db) - .await - .map_err(BufferError::FetchFailed)? - .is_none(); - - Ok(is_empty) - } - - /// Enqueues data into on-disk spool. - async fn enqueue( - &mut self, - key: QueueKey, - managed_envelope: ManagedEnvelope, - ) -> Result<(), BufferError> { - let received_at = managed_envelope.received_at().timestamp_millis(); - sql::insert( - key, - managed_envelope.into_envelope().to_vec().unwrap(), - received_at, - ) - .execute(&self.db) - .await - .map_err(BufferError::InsertFailed)?; - - self.track_count(1); - relay_statsd::metric!(counter(RelayCounters::BufferWritesDisk) += 1); - Ok(()) - } - - fn track_count(&mut self, increment: i64) { - // Track the number of envelopes read/written: - let metric = if increment < 0 { - RelayCounters::BufferEnvelopesRead - } else { - RelayCounters::BufferEnvelopesWritten - }; - relay_statsd::metric!(counter(metric) += increment); - - if let Some(count) = &mut self.count { - *count = count.saturating_add_signed(increment); - relay_statsd::metric!(gauge(RelayGauges::BufferEnvelopesDiskCount) = *count); - } - } - - fn extract_key(row: SqliteRow) -> Option { - let own_key = row - .try_get("own_key") - .map_err(BufferError::FetchFailed) - .and_then(|key| ProjectKey::parse(key).map_err(BufferError::ParseProjectKeyFailed)); - let sampling_key = row - .try_get("sampling_key") - .map_err(BufferError::FetchFailed) - .and_then(|key| ProjectKey::parse(key).map_err(BufferError::ParseProjectKeyFailed)); - - match (own_key, sampling_key) { - (Ok(own_key), Ok(sampling_key)) => Some(QueueKey { - own_key, - sampling_key, - }), - // Report the first found error. - (Err(err), _) | (_, Err(err)) => { - relay_log::error!("Failed to extract a queue key from the spool record: {err}"); - None - } - } - } - - /// Returns the index from the on-disk spool. - async fn get_spooled_index(db: &Pool) -> Result, BufferError> { - let keys = sql::get_keys() - .fetch_all(db) - .await - .map_err(BufferError::FetchFailed)?; - - let index = keys - .into_iter() - // Collect only keys we could extract. - .filter_map(Self::extract_key) - .collect(); - - Ok(index) - } -} - -/// The state which defines the [`BufferService`] behaviour. -#[derive(Debug)] -enum BufferState { - /// Only memory buffer is enabled. - Memory(InMemory), - - /// The memory buffer is used, but also disk spool is configured. - /// - /// The disk will be used when the memory limit will be hit. - MemoryFileStandby { ram: InMemory, disk: OnDisk }, - - /// Only disk used for read/write operations. - Disk(OnDisk), -} - -impl BufferState { - fn label(&self) -> &'static str { - match self { - BufferState::Memory(_) => "memory", - BufferState::MemoryFileStandby { .. } => "memory_file_standby", - BufferState::Disk(_) => "disk", - } - } - - /// Creates the initial [`BufferState`] depending on the provided config. - /// - /// If the on-disk spool is configured, the `BufferState::MemoryDiskStandby` will be returned: - /// if the spool is empty, `BufferState::MemoryFileRead` if the spool contains some data, - /// `BufferState::Memory` otherwise. - async fn new(max_memory_size: usize, disk: Option) -> Self { - let ram = InMemory { - buffer: BTreeMap::new(), - max_memory_size, - used_memory: 0, - envelope_count: 0, - }; - let state = match disk { - Some(disk) => { - // When the old db file is picked up and it is not empty, we can also try to read from it - // on dequeue requests. - if disk.is_empty().await.unwrap_or_default() { - Self::MemoryFileStandby { ram, disk } - } else { - Self::Disk(disk) - } - } - None => Self::Memory(ram), - }; - - relay_statsd::metric!( - counter(RelayCounters::BufferStateTransition) += 1, - state_in = "none", - state_out = state.label(), - reason = "init" - ); - - state - } - - /// Becomes a different state, depending on the current state and the current conditions of - /// underlying spool. - async fn transition(self, config: &Config, services: &Services) -> Self { - let state_in = self.label(); - let mut reason = None; - let state = match self { - Self::MemoryFileStandby { ram, mut disk } => { - if ram.is_full() { - if let Err(err) = disk.spool(ram.buffer).await { - relay_log::error!( - error = &err as &dyn Error, - "failed to spool the in-memory buffer to disk", - ); - } - relay_log::trace!( - "transition to disk spool: # of envelopes = {}", - disk.count.unwrap_or_default() - ); - - reason = Some("ram_full"); - Self::Disk(disk) - } else { - Self::MemoryFileStandby { ram, disk } - } - } - Self::Disk(mut disk) if Self::is_below_low_mem_watermark(config, &disk).await => { - match disk.delete_and_fetch_all(services).await { - Ok(buffer) => { - let ram = InMemory::new_with_buffer( - config.spool_envelopes_max_memory_size(), - buffer, - ); - relay_log::trace!( - "transition to memory spool: # of envelopes = {}", - ram.envelope_count - ); - - reason = Some("below_low_watermark"); - Self::MemoryFileStandby { ram, disk } - } - Err(err) => { - relay_log::error!( - error = &err as &dyn Error, - "failed to move data from disk to memory, keep using on-disk spool", - ); - - reason = Some("failed"); - Self::Disk(disk) - } - } - } - Self::Memory(_) | Self::Disk(_) => self, - }; - - if let Some(reason) = reason { - relay_statsd::metric!( - counter(RelayCounters::BufferStateTransition) += 1, - state_in = state_in, - state_out = state.label(), - reason = reason - ); - } - - state - } - - /// Returns `true` if the on-disk spooled data can fit in the memory. - /// - /// The spooled envelopes must: - /// * fit into memory and take not more than 30% of the configured space - /// * the used buffer guards also must be under the low watermark. - async fn is_below_low_mem_watermark(config: &Config, disk: &OnDisk) -> bool { - ((config.spool_envelopes_max_memory_size() as f64 * LOW_SPOOL_MEMORY_WATERMARK) as i64) - > disk.estimate_spool_size().await.unwrap_or(i64::MAX) - && disk.memory_checker.check_memory().has_capacity() - } -} - -impl Default for BufferState { - fn default() -> Self { - // Just use the all the memory we can now. - Self::Memory(InMemory::new(usize::MAX)) - } -} - -/// The services, which rely on the state of the envelopes in this buffer. -#[derive(Clone, Debug)] -pub struct Services { - pub outcome_aggregator: Addr, - pub project_cache: Addr, - pub test_store: Addr, -} - -/// [`Buffer`] interface implementation backed by SQLite. -#[derive(Debug)] -pub struct BufferService { - services: Services, - state: BufferState, - config: Arc, -} - -impl BufferService { - /// Set up the database and return the current number of envelopes. - /// - /// The directories and spool file will be created if they don't already - /// exist. - async fn setup(path: &Path) -> Result<(), BufferError> { - BufferService::create_spool_directory(path).await?; - - let options = SqliteConnectOptions::new() - .filename(path) - .journal_mode(SqliteJournalMode::Wal) - .create_if_missing(true); - - let db = SqlitePoolOptions::new() - .connect_with(options) - .await - .map_err(BufferError::SqlxSetupFailed)?; - - sqlx::migrate!("../migrations").run(&db).await?; - Ok(()) - } - - /// Creates the directories for the spool file. - async fn create_spool_directory(path: &Path) -> Result<(), BufferError> { - let Some(parent) = path.parent() else { - return Ok(()); - }; - if !parent.as_os_str().is_empty() && !parent.exists() { - relay_log::debug!("creating directory for spooling file: {}", parent.display()); - DirBuilder::new() - .recursive(true) - .create(&parent) - .await - .map_err(BufferError::FileSetupError)?; - } - Ok(()) - } - - /// Prepares the disk state. - async fn prepare_disk_state( - config: Arc, - memory_checker: MemoryChecker, - ) -> Result, BufferError> { - // Only if persistent envelopes buffer file path provided, we create the pool and set the config. - let Some(path) = config.spool_envelopes_path(0) else { - return Ok(None); - }; - - relay_log::info!("buffer file {}", path.to_string_lossy()); - relay_log::info!( - "max memory size {}", - config.spool_envelopes_max_memory_size() - ); - relay_log::info!("max disk size {}", config.spool_envelopes_max_disk_size()); - - Self::setup(&path).await?; - - let options = SqliteConnectOptions::new() - .filename(&path) - // The WAL journaling mode uses a write-ahead log instead of a rollback journal to implement transactions. - // The WAL journaling mode is persistent; after being set it stays in effect - // across multiple database connections and after closing and reopening the database. - // - // 1. WAL is significantly faster in most scenarios. - // 2. WAL provides more concurrency as readers do not block writers and a writer does not block readers. Reading and writing can proceed concurrently. - // 3. Disk I/O operations tends to be more sequential using WAL. - // 4. WAL uses many fewer fsync() operations and is thus less vulnerable to problems on systems where the fsync() system call is broken. - .journal_mode(SqliteJournalMode::Wal) - // WAL mode is safe from corruption with synchronous=NORMAL. - // When synchronous is NORMAL, the SQLite database engine will still sync at the most critical moments, but less often than in FULL mode. - // Which guarantees good balance between safety and speed. - .synchronous(SqliteSynchronous::Normal) - // The freelist pages are moved to the end of the database file and the database file is truncated to remove the freelist pages at every - // transaction commit. Note, however, that auto-vacuum only truncates the freelist pages from the file. - // Auto-vacuum does not defragment the database nor repack individual database pages the way that the VACUUM command does. - // - // This will helps us to keep the file size under some control. - .auto_vacuum(SqliteAutoVacuum::Full) - // If shared-cache mode is enabled and a thread establishes multiple - // connections to the same database, the connections share a single data and schema cache. - // This can significantly reduce the quantity of memory and IO required by the system. - .shared_cache(true); - - let db = SqlitePoolOptions::new() - .max_connections(config.spool_envelopes_max_connections()) - .min_connections(config.spool_envelopes_min_connections()) - .connect_with(options) - .await - .map_err(BufferError::SqlxSetupFailed)?; - - let mut on_disk = OnDisk { - dequeue_attempts: 0, - db, - memory_checker, - max_disk_size: config.spool_envelopes_max_disk_size(), - count: None, - }; - - if on_disk.is_empty().await? { - // Only start live-tracking the count if there's nothing in the db yet. - on_disk.count = Some(0); - } - - Ok(Some(on_disk)) - } - - /// Creates a new [`BufferService`] from the provided path to the SQLite database file. - pub async fn create( - memory_checker: MemoryChecker, - services: Services, - config: Arc, - ) -> Result { - let on_disk_state = Self::prepare_disk_state(config.clone(), memory_checker).await?; - let state = BufferState::new(config.spool_envelopes_max_memory_size(), on_disk_state).await; - Ok(Self { - services, - state, - config, - }) - } - - /// Handles the enqueueing messages into the internal buffer. - async fn handle_enqueue(&mut self, message: Enqueue) -> Result<(), BufferError> { - let Enqueue { - key, - value: managed_envelope, - } = message; - - match self.state { - BufferState::Memory(ref mut ram) - | BufferState::MemoryFileStandby { ref mut ram, .. } => { - ram.enqueue(key, managed_envelope); - } - BufferState::Disk(ref mut disk) => { - // The disk is full, drop the incoming envelopes. - if disk.is_full().await? { - return Err(BufferError::SpoolIsFull); - } - disk.enqueue(key, managed_envelope).await?; - } - } - - let state = std::mem::take(&mut self.state); - self.state = state.transition(&self.config, &self.services).await; - Ok(()) - } - - /// Handles the dequeueing messages from the internal buffer. - /// - /// This method removes the envelopes from the buffer and stream them to the sender. - async fn handle_dequeue(&mut self, message: DequeueMany) -> Result<(), BufferError> { - let DequeueMany { keys, sender } = message; - - match self.state { - BufferState::Memory(ref mut ram) - | BufferState::MemoryFileStandby { ref mut ram, .. } => { - ram.dequeue(keys, sender); - } - BufferState::Disk(ref mut disk) => { - disk.dequeue(keys, sender, &self.services).await; - } - } - let state = std::mem::take(&mut self.state); - self.state = state.transition(&self.config, &self.services).await; - - Ok(()) - } - - /// Handles the remove request. - /// - /// This removes all the envelopes from the internal buffer for the provided keys. - /// If any of the provided keys still have envelopes, an error will be logged with the - /// number of envelopes dropped for the specific project key. - async fn handle_remove(&mut self, message: RemoveMany) -> Result<(), BufferError> { - let RemoveMany { project_key, keys } = message; - let mut count: usize = 0; - - match self.state { - BufferState::Memory(ref mut ram) - | BufferState::MemoryFileStandby { ref mut ram, .. } => { - count += ram.remove(&keys); - } - BufferState::Disk(ref mut disk) => { - count += disk.remove(&keys).await?; - } - } - - let state = std::mem::take(&mut self.state); - self.state = state.transition(&self.config, &self.services).await; - - if count > 0 { - relay_log::with_scope( - |scope| scope.set_tag("project_key", project_key), - || relay_log::error!(count, "evicted project with envelopes"), - ); - } - - Ok(()) - } - - /// Handles the health requests. - async fn handle_health(&mut self, health: Health) -> Result<(), BufferError> { - match self.state { - BufferState::Memory(ref ram) => health.0.send(!ram.is_full()), - BufferState::MemoryFileStandby { ref ram, ref disk } => health - .0 - .send(!ram.is_full() || !disk.is_full().await.unwrap_or_default()), - - BufferState::Disk(ref disk) => health.0.send(!disk.is_full().await.unwrap_or_default()), - } - - Ok(()) - } - - /// Handles retrieving of the index from the underlying spool. - /// - /// To process this request we spawn the tokio task and once it's done the result will be sent - /// to [`ProjectCache`]. - /// - /// If the spool is memory based, we ignore this request - the index in the - /// [`ProjectCache`] should be full and correct. - /// If the spool is located on the disk, we read up the keys and compile the index of the - /// spooled envelopes for [`ProjectCache`]. - async fn handle_restore_index(&mut self, _: RestoreIndex) -> Result<(), BufferError> { - match self.state { - BufferState::Memory(_) | BufferState::MemoryFileStandby { .. } => (), - BufferState::Disk(ref disk) => { - let db = disk.db.clone(); - let project_cache = self.services.project_cache.clone(); - relay_system::spawn!(async move { - match OnDisk::get_spooled_index(&db).await { - Ok(index) => { - relay_log::trace!( - "recover index from disk with {} unique project keys", - index.len() - ); - project_cache.send(RefreshIndexCache(index)) - } - Err(err) => { - relay_log::error!("failed to retrieve the index from the disk: {err}") - } - } - }); - } - } - - Ok(()) - } - - /// Handles all the incoming messages from the [`Buffer`] interface. - async fn handle_message(&mut self, message: Buffer) -> Result<(), BufferError> { - let ty = message.variant(); - metric!( - timer(RelayTimers::BufferMessageProcessDuration), - message = ty, - { - match message { - Buffer::Enqueue(message) => self.handle_enqueue(message).await, - Buffer::DequeueMany(message) => self.handle_dequeue(message).await, - Buffer::RemoveMany(message) => self.handle_remove(message).await, - Buffer::Health(message) => self.handle_health(message).await, - Buffer::RestoreIndex(message) => self.handle_restore_index(message).await, - } - } - ) - } - - /// Handle the shutdown notification. - /// - /// Tries to spool to disk if the current buffer state is `BufferState::MemoryDiskStandby`, - /// which means we use the in-memory buffer active and disk still free or not used before. - async fn handle_shutdown(&mut self) -> Result<(), BufferError> { - let BufferState::MemoryFileStandby { - ref mut ram, - ref mut disk, - } = self.state - else { - return Ok(()); - }; - - let count: usize = ram.count(); - if count == 0 { - return Ok(()); - } - - relay_log::info!( - count, - "received shutdown signal, spooling envelopes to disk" - ); - - let buffer = std::mem::take(&mut ram.buffer); - disk.spool(buffer).await?; - Ok(()) - } -} - -impl Service for BufferService { - type Interface = Buffer; - - async fn run(mut self, mut rx: relay_system::Receiver) { - let mut shutdown = Controller::shutdown_handle(); - - loop { - tokio::select! { - biased; - - Some(message) = rx.recv() => { - if let Err(err) = self.handle_message(message).await { - relay_log::error!( - error = &err as &dyn Error, - "failed to handle an incoming message", - ); - } - } - _ = shutdown.notified() => { - if let Err(err) = self.handle_shutdown().await { - relay_log::error!( - error = &err as &dyn Error, - "failed while shutting down the service", - ); - } - } - else => break, - } - } - } -} - -impl Drop for BufferService { - fn drop(&mut self) { - // Count only envelopes from in-memory buffer. - match &self.state { - BufferState::Memory(ram) | BufferState::MemoryFileStandby { ram, .. } => { - let count = ram.count(); - if count > 0 { - relay_log::error!("dropped {count} envelopes"); - } - } - BufferState::Disk(_) => (), - } - } -} - -#[cfg(test)] -mod tests { - use chrono::Utc; - use insta::assert_debug_snapshot; - use rand::Rng; - use relay_test::mock_service; - use sqlx::ConnectOptions; - use std::str::FromStr; - use std::sync::Mutex; - use std::time::{Duration, Instant}; - use uuid::Uuid; - - use crate::testutils::empty_envelope; - use crate::utils::MemoryStat; - - use super::*; - - fn services() -> Services { - let (project_cache, _) = mock_service("project_cache", (), |&mut (), _| {}); - let (outcome_aggregator, _) = mock_service("outcome_aggregator", (), |&mut (), _| {}); - let (test_store, _) = mock_service("test_store", (), |&mut (), _| {}); - - Services { - project_cache, - outcome_aggregator, - test_store, - } - } - - fn empty_managed_envelope() -> ManagedEnvelope { - let envelope = empty_envelope(); - let Services { - outcome_aggregator, - test_store, - .. - } = services(); - ManagedEnvelope::untracked(envelope, outcome_aggregator, test_store) - } - - #[tokio::test] - async fn create_spool_directory_deep_path() { - let parent_dir = std::env::temp_dir().join(Uuid::new_v4().to_string()); - let target_dir = parent_dir - .join("subdir1") - .join("subdir2") - .join("spool-file"); - let config: Arc<_> = Config::from_json_value(serde_json::json!({ - "spool": { - "envelopes": { - "path": target_dir, - } - }, - "health": { - "max_memory_percent": 1.0 - } - })) - .unwrap() - .into(); - let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); - BufferService::create(memory_checker, services(), config) - .await - .unwrap(); - assert!(target_dir.exists()); - } - - #[tokio::test] - async fn ensure_start_time_restore() { - let config: Arc<_> = Config::from_json_value(serde_json::json!({ - "spool": { - "envelopes": { - "path": std::env::temp_dir().join(Uuid::new_v4().to_string()), - "max_memory_size": 0, // 0 bytes, to force to spool to disk all the envelopes. - }, - "health": { - "max_memory_percent": 0.0 - } - } - })) - .unwrap() - .into(); - let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); - let service = BufferService::create(memory_checker, services(), config) - .await - .unwrap(); - let addr = service.start_detached(); - let (tx, mut rx) = mpsc::unbounded_channel(); - - // Test cases: - let test_cases = [ - "a94ae32be2584e0bbd7a4cbb95971fee", - "aaaae32be2584e0bbd7a4cbb95971fff", - ]; - for pub_key in test_cases { - let project_key = ProjectKey::parse(pub_key).unwrap(); - let key = QueueKey { - own_key: project_key, - sampling_key: project_key, - }; - - let envelope = empty_managed_envelope(); - let received_at_sent = envelope.received_at(); - addr.send(Enqueue { - key, - value: envelope, - }); - - // How long to wait to dequeue the message from the spool. - // This will also ensure that the start time will have to be restored to the time - // when the request first came in. - tokio::time::sleep(Duration::from_millis( - 1000 * rand::thread_rng().gen_range(1..3), - )) - .await; - - addr.send(DequeueMany { - keys: [key].into(), - sender: tx.clone(), - }); - - let UnspooledEnvelope { managed_envelope } = rx.recv().await.unwrap(); - let received_at_received = managed_envelope.received_at(); - - // Check if the original start time elapsed to the same second as the restored one. - assert_eq!( - received_at_received.timestamp_millis(), - received_at_sent.timestamp_millis() - ); - } - } - - #[test] - fn metrics_work() { - relay_log::init_test!(); - - let config: Arc<_> = Config::from_json_value(serde_json::json!({ - "spool": { - "envelopes": { - "path": std::env::temp_dir().join(Uuid::new_v4().to_string()), - "max_memory_size": "4KB", - "max_disk_size": "40KB", - }, - "health": { - "max_memory_percent": 1.0 - } - } - })) - .unwrap() - .into(); - let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); - let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); - let key = QueueKey { - own_key: project_key, - sampling_key: project_key, - }; - - let rt = tokio::runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap(); - let _guard = rt.enter(); - - let captures = relay_statsd::with_capturing_test_client(|| { - rt.block_on(async { - let mut service = BufferService::create(memory_checker, services(), config) - .await - .unwrap(); - - // Send 5 envelopes - for _ in 0..5 { - service - .handle_enqueue(Enqueue { - key, - value: empty_managed_envelope(), - }) - .await - .unwrap(); - } - - // Dequeue everything - let (tx, mut rx) = mpsc::unbounded_channel(); - service - .handle_dequeue(DequeueMany { - keys: [key].into(), - sender: tx, - }) - .await - .unwrap(); - - // Make sure that not only metrics are correct, but also the number of envelopes - // flushed. - let mut count = 0; - while rx.recv().await.is_some() { - count += 1; - } - assert_eq!(count, 5); - }) - }); - - // Collect only the buffer metrics. - let captures: Vec<_> = captures - .into_iter() - .filter(|name| name.contains("buffer.")) - .collect(); - - assert_debug_snapshot!(captures, @r###" - [ - "buffer.state.transition:1|c|#state_in:none,state_out:memory_file_standby,reason:init", - "buffer.envelopes_mem:2000|h", - "buffer.envelopes_mem_count:1|g", - "buffer.envelopes_mem:4000|h", - "buffer.envelopes_mem_count:2|g", - "buffer.envelopes_mem:6000|h", - "buffer.envelopes_mem_count:3|g", - "buffer.envelopes_mem:0|h", - "buffer.writes:1|c", - "buffer.envelopes_written:3|c", - "buffer.envelopes_disk_count:3|g", - "buffer.state.transition:1|c|#state_in:memory_file_standby,state_out:disk,reason:ram_full", - "buffer.disk_size:24576|h", - "buffer.envelopes_written:1|c", - "buffer.envelopes_disk_count:4|g", - "buffer.writes:1|c", - "buffer.disk_size:24576|h", - "buffer.disk_size:24576|h", - "buffer.envelopes_written:1|c", - "buffer.envelopes_disk_count:5|g", - "buffer.writes:1|c", - "buffer.disk_size:24576|h", - "buffer.dequeue_attempts:1|h", - "buffer.reads:1|c", - "buffer.envelopes_read:-5|c", - "buffer.envelopes_disk_count:0|g", - "buffer.dequeue_attempts:1|h", - "buffer.reads:1|c", - "buffer.disk_size:24576|h", - ] - "###); - } - - #[tokio::test] - async fn index_restore() { - relay_log::init_test!(); - - let db_path = std::env::temp_dir().join(Uuid::new_v4().to_string()); - - let config: Arc<_> = Config::from_json_value(serde_json::json!({ - "spool": { - "envelopes": { - "path": db_path, - "max_memory_size": 0, // 0 bytes, to force to spool to disk all the envelopes. - "max_disk_size": "100KB", - }, - "health": { - "max_memory_percent": 1.0 - } - } - })) - .unwrap() - .into(); - let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); - - // Setup spool file and run migrations. - BufferService::setup(&db_path).await.unwrap(); - - // Setup db and insert few records for the test. - let mut db = - SqliteConnectOptions::from_str(&format!("sqlite://{}", db_path.to_str().unwrap())) - .unwrap() - .connect() - .await - .unwrap(); - - let now = Utc::now().timestamp_millis(); - let result = sqlx::query(&format!( - r#"INSERT INTO - envelopes (received_at, own_key, sampling_key, envelope) - VALUES - ("{}", "a94ae32be2584e0bbd7a4cbb95971fee", "a94ae32be2584e0bbd7a4cbb95971fee", ""), - ("{}", "a94ae32be2584e0bbd7a4cbb95971f00", "a94ae32be2584e0bbd7a4cbb95971f11", "") - "#, now , now), - ).execute(&mut db).await.unwrap(); - - assert_eq!(result.rows_affected(), 2); - - let index = Arc::new(Mutex::new(HashSet::new())); - let mut services = services(); - let index_in = index.clone(); - let (project_cache, _) = mock_service("project_cache", (), move |(), msg: ProjectCache| { - let ProjectCache::RefreshIndexCache(new_index) = msg else { - return; - }; - index_in.lock().unwrap().extend(new_index.0); - }); - - services.project_cache = project_cache; - - let buffer = BufferService::create(memory_checker, services, config) - .await - .unwrap(); - let addr = buffer.start_detached(); - addr.send(RestoreIndex); - // Give some time to process the message - tokio::time::sleep(Duration::from_millis(500)).await; - - // Get the index out of the mutex. - let index = index.lock().unwrap().clone(); - - let key1 = QueueKey { - own_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - sampling_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - }; - let key2 = QueueKey { - own_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971f00").unwrap(), - sampling_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971f11").unwrap(), - }; - - assert_eq!(index.len(), 2); - assert!(index.contains(&key1)); - assert!(index.contains(&key2)); - } - - #[tokio::test] - async fn chunked_unspool() { - let db_path = std::env::temp_dir().join(Uuid::new_v4().to_string()); - let config: Arc<_> = Config::from_json_value(serde_json::json!({ - "spool": { - "envelopes": { - "path": db_path, - "max_memory_size": "10KB", - "max_disk_size": "20MB", - }, - "health": { - "max_memory_percent": 1.0 - } - } - })) - .unwrap() - .into(); - let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); - - let services = services(); - let buffer = BufferService::create(memory_checker, services, config) - .await - .unwrap(); - let addr = buffer.start_detached(); - - let mut keys = HashSet::new(); - for _ in 1..=300 { - let project_key = uuid::Uuid::new_v4().as_simple().to_string(); - let key = ProjectKey::parse(&project_key).unwrap(); - let index_key = QueueKey { - own_key: key, - sampling_key: key, - }; - keys.insert(index_key); - addr.send(Enqueue::new(index_key, empty_managed_envelope())) - } - - let (tx, mut rx) = mpsc::unbounded_channel(); - // Dequeue all the keys at once. - addr.send(DequeueMany { - keys, - sender: tx.clone(), - }); - drop(tx); - - let mut count = 0; - while rx.recv().await.is_some() { - count += 1; - } - assert_eq!(count, 300); - } - - #[ignore] // Slow. Should probably be a criterion benchmark. - #[tokio::test] - #[allow(clippy::print_stdout, reason = "benchmark test")] - async fn compare_counts() { - let path = std::env::temp_dir().join(Uuid::new_v4().to_string()); - let options = SqliteConnectOptions::new() - .filename(path) - .journal_mode(SqliteJournalMode::Wal) - .create_if_missing(true); - - let db = sqlx::SqlitePool::connect_with(options).await.unwrap(); - - println!("Migrating..."); - sqlx::migrate!("../migrations").run(&db).await.unwrap(); - - let transaction = db.begin().await.unwrap(); - - // println!("Inserting..."); - let mut query_builder = sqlx::QueryBuilder::new( - "INSERT INTO envelopes (received_at, own_key, sampling_key, envelope) ", - ); - let n = 10000; - for i in 0..n { - if (i % 100) == 0 { - println!("Batch {i} of {n}"); - } - - let chunk = (0..5000).map(|_| ("", "", "this is my chunk how do you like it", "")); - let query = query_builder - .push_values(chunk, |mut b, (key1, key2, value, received_at)| { - b.push_bind(received_at) - .push_bind(key1) - .push_bind(key2) - .push_bind(value); - }) - .build(); - - query.execute(&db).await.unwrap(); - - query_builder.reset(); - } - transaction.commit().await.unwrap(); - - let t = Instant::now(); - let q = sqlx::query( - r#"SELECT SUM(pgsize - unused) FROM dbstat WHERE name="envelopes" AND aggregate = FALSE"#, - ); - let pgsize: i64 = q.fetch_one(&db).await.unwrap().get(0); - println!("pgsize: {} {:?}", pgsize, t.elapsed()); - - let t = Instant::now(); - let q = sqlx::query( - r#"SELECT SUM(pgsize - unused) FROM dbstat WHERE name="envelopes" AND aggregate = TRUE"#, - ); - let pgsize_agg: i64 = q.fetch_one(&db).await.unwrap().get(0); - println!("pgsize_agg: {} {:?}", pgsize_agg, t.elapsed()); - - let t = Instant::now(); - let q = sqlx::query(r#"SELECT SUM(length(envelope)) FROM envelopes"#); - let brute_force: i64 = q.fetch_one(&db).await.unwrap().get(0); - println!("brute_force: {} {:?}", brute_force, t.elapsed()); - - let t = Instant::now(); - let q = sqlx::query( - r#"SELECT (page_count - freelist_count) * page_size as size FROM pragma_page_count(), pragma_freelist_count(), pragma_page_size();"#, - ); - let pragma: i64 = q.fetch_one(&db).await.unwrap().get(0); - println!("pragma: {} {:?}", pragma, t.elapsed()); - - // Result: - // pgsize = 2'408'464'463 t.elapsed() = 7.007533833s - // pgsize_agg = 2'408'464'463 t.elapsed() = 5.010104791s - // brute_force = 1'750'000'000 t.elapsed() = 7.893590875s - // pragma = 3'036'307'456 t.elapsed() = 213.417µs - } -} diff --git a/relay-server/src/services/spooler/spool_utils.rs b/relay-server/src/services/spooler/spool_utils.rs deleted file mode 100644 index dffb87d4e3..0000000000 --- a/relay-server/src/services/spooler/spool_utils.rs +++ /dev/null @@ -1,35 +0,0 @@ -//! Contains helper utils which help to manage the spooler and spooled data. - -use std::path::PathBuf; - -use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}; - -use crate::service::create_runtime; -use crate::services::spooler::{sql, BufferError}; - -/// Truncates the spool file deleting all the persisted on-disk data. -/// -/// Returns the number of deleted envelopes when run successfully. -pub fn truncate(path: &PathBuf) -> Result { - let options = SqliteConnectOptions::new() - .filename(path) - .journal_mode(SqliteJournalMode::Wal); - - let rt = create_runtime("truncator", 1); - - let result = rt.block_on(async move { - let db = SqlitePoolOptions::new() - .connect_with(options) - .await - .map_err(BufferError::SqlxSetupFailed)?; - - let result = sql::truncate() - .execute(&db) - .await - .map_err(BufferError::DeleteFailed)?; - - Ok::(result.rows_affected()) - })?; - - Ok(result) -} diff --git a/relay-server/src/services/spooler/sql.rs b/relay-server/src/services/spooler/sql.rs deleted file mode 100644 index 02d0b66d3c..0000000000 --- a/relay-server/src/services/spooler/sql.rs +++ /dev/null @@ -1,167 +0,0 @@ -//! This module contains the helper functions wrapping the SQL queries which will be run against -//! the on-disk spool (currently backed by SQLite). - -use futures::stream::{Stream, StreamExt}; -use itertools::Itertools; -use sqlx::query::Query; -use sqlx::sqlite::SqliteArguments; -use sqlx::{Pool, QueryBuilder, Sqlite}; - -use crate::services::spooler::QueueKey; -use crate::statsd::RelayCounters; - -/// SQLite allocates space to hold all host parameters between 1 and the largest host parameter number used. -/// -/// To prevent excessive memory allocations, the maximum value of a host parameter number is SQLITE_MAX_VARIABLE_NUMBER, -/// which defaults to 999 for SQLite versions prior to 3.32.0 (2020-05-22) or 32766 for SQLite versions after 3.32.0. -/// -/// Keep it on the lower side for now. -const SQLITE_LIMIT_VARIABLE_NUMBER: usize = 999; - -/// Prepares a DELETE query, by properly genering IN clauses for provided keys. -pub fn prepare_delete_query(keys: Vec) -> String { - let (own_keys, sampling_keys) = keys.iter().fold( - (Vec::new(), Vec::new()), - |(mut own_keys, mut sampling_keys), key| { - own_keys.push(format!(r#""{}""#, key.own_key)); - sampling_keys.push(format!(r#""{}""#, key.sampling_key)); - (own_keys, sampling_keys) - }, - ); - - let own_keys = own_keys.into_iter().join(","); - let sampling_keys = sampling_keys.into_iter().join(","); - - format!( - "DELETE FROM - envelopes - WHERE id IN (SELECT id FROM envelopes WHERE own_key in ({}) AND sampling_key in ({}) LIMIT ?) - RETURNING - received_at, own_key, sampling_key, envelope", own_keys, sampling_keys - ) -} - -/// Creates a DELETE query binding to the provided [`QueueKey`] which returns the envelopes and -/// timestamp. -/// -/// The query will perform the delete once executed returning deleted envelope and timestamp when -/// the envelope was received. This will create a prepared statement which is cached and re-used. -pub fn delete_and_fetch(query: &str, batch_size: i64) -> Query<'_, Sqlite, SqliteArguments<'_>> { - sqlx::query(query).bind(batch_size) -} - -/// Creates a DELETE query which returns the requested batch of the envelopes with the timestamp -/// and designated keys. -pub fn delete_and_fetch_all<'a>(batch_size: i64) -> Query<'a, Sqlite, SqliteArguments<'a>> { - sqlx::query( - "DELETE FROM - envelopes - WHERE id IN (SELECT id FROM envelopes LIMIT ?) - RETURNING - received_at, own_key, sampling_key, envelope", - ) - .bind(batch_size) -} - -/// Creates a DELETE query, which silently removes the data from the database. -pub fn delete<'a>(key: QueueKey) -> Query<'a, Sqlite, SqliteArguments<'a>> { - sqlx::query("DELETE FROM envelopes where own_key = ? AND sampling_key = ?") - .bind(key.own_key.to_string()) - .bind(key.sampling_key.to_string()) -} - -/// Creates a query which fetches the number of used database pages multiplied by the page size. -/// -/// This info used to estimate the current allocated database size. -pub fn estimate_size<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> { - sqlx::query( - r#"SELECT (page_count - freelist_count) * page_size as size FROM pragma_page_count(), pragma_freelist_count(), pragma_page_size();"#, - ) -} - -/// Creates the query to select only 1 record's `received_at` from the database. -/// -/// It is useful and very fast for checking if the table is empty. -pub fn select_one<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> { - sqlx::query("SELECT received_at FROM envelopes LIMIT 1;") -} - -/// Returns the query to select all the unique combinations of own and sampling keys. -pub fn get_keys<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> { - sqlx::query("SELECT DISTINCT own_key, sampling_key FROM envelopes;") -} - -/// Creates the INSERT query. -pub fn insert<'a>( - key: QueueKey, - managed_envelope: Vec, - received_at: i64, -) -> Query<'a, Sqlite, SqliteArguments<'a>> { - sqlx::query( - "INSERT INTO envelopes (received_at, own_key, sampling_key, envelope) VALUES (?, ?, ?, ?);", - ) - .bind(received_at) - .bind(key.own_key.to_string()) - .bind(key.sampling_key.to_string()) - .bind(managed_envelope) -} - -/// Describes the chunk item which is handled by insert statement. -type ChunkItem = (QueueKey, Vec, i64); - -/// Creates an INSERT query for the chunk of provided data. -fn build_insert<'a>( - builder: &'a mut QueryBuilder, - chunk: Vec, -) -> Query<'a, Sqlite, SqliteArguments<'a>> { - builder.push_values(chunk, |mut b, (key, value, received_at)| { - b.push_bind(received_at) - .push_bind(key.own_key.to_string()) - .push_bind(key.sampling_key.to_string()) - .push_bind(value); - }); - - builder.build() -} - -/// Creates INSERT statements from the stream and execute them on provided database pool. -/// -/// This function internally will split the provided stream into chunks and will prepare the -/// insert statement for each chunk. -/// -/// Returns the number of inserted rows on success. -pub async fn do_insert( - stream: impl Stream + std::marker::Unpin, - db: &Pool, -) -> Result { - // Since we have 3 variables we have to bind, we divide the SQLite limit by 3 - // here to prepare the chunks which will be preparing the batch inserts. - let mut envelopes = stream.chunks(SQLITE_LIMIT_VARIABLE_NUMBER / 3); - - // A builder type for constructing queries at runtime. - // This by default creates a prepared SQL statement, which is cached and - // re-used for sequential queries. - let mut query_builder: QueryBuilder = - QueryBuilder::new("INSERT INTO envelopes (received_at, own_key, sampling_key, envelope) "); - - let mut count = 0; - while let Some(chunk) = envelopes.next().await { - let result = build_insert(&mut query_builder, chunk).execute(db).await?; - count += result.rows_affected(); - relay_statsd::metric!(counter(RelayCounters::BufferWritesDisk) += 1); - - // Reset the builder to initial state set by `QueryBuilder::new` function, - // so it can be reused for another chunk. - query_builder.reset(); - } - - Ok(count) -} - -/// Creates DELETE statement which truncates the entire `envelopes` table. -/// -/// When the DELETE statement runs without WHERE clause SQLite uses an optimization (so called "truncate" optimization) -/// to erase the entire table content without having to visit each row. -pub fn truncate<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> { - sqlx::query("DELETE FROM envelopes;") -} diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 9c33418f40..257f2f4281 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -7,23 +7,6 @@ pub enum RelayGauges { /// The state of Relay with respect to the upstream connection. /// Possible values are `0` for normal operations and `1` for a network outage. NetworkOutage, - /// The number of envelopes waiting for project states in memory. - /// - /// This number is always <= `EnvelopeQueueSize`. - /// - /// The memory buffer size can be configured with `spool.envelopes.max_memory_size`. - BufferEnvelopesMemoryCount, - /// The number of envelopes waiting for project states on disk. - /// - /// Note this metric *will not be logged* when we encounter envelopes in the database on startup, - /// because counting those envelopes reliably would risk locking the db for multiple seconds. - /// - /// The disk buffer size can be configured with `spool.envelopes.max_disk_size`. - BufferEnvelopesDiskCount, - /// Number of queue keys (project key pairs) unspooled during proactive unspool. - /// This metric is tagged with: - /// - `reason`: Why keys are / are not unspooled. - BufferPeriodicUnspool, /// The number of individual stacks in the priority queue. /// /// Per combination of `(own_key, sampling_key)`, a new stack is created. @@ -56,9 +39,6 @@ impl GaugeMetric for RelayGauges { fn name(&self) -> &'static str { match self { RelayGauges::NetworkOutage => "upstream.network_outage", - RelayGauges::BufferEnvelopesMemoryCount => "buffer.envelopes_mem_count", - RelayGauges::BufferEnvelopesDiskCount => "buffer.envelopes_disk_count", - RelayGauges::BufferPeriodicUnspool => "buffer.unspool.periodic", RelayGauges::BufferStackCount => "buffer.stack_count", RelayGauges::BufferDiskUsed => "buffer.disk_used", RelayGauges::SystemMemoryUsed => "health.system_memory.used", @@ -185,16 +165,7 @@ pub enum RelayHistograms { /// /// Metric is tagged by the item type. EnvelopeItemSize, - /// The estimated number of envelope bytes buffered in memory. - /// - /// The memory buffer size can be configured with `spool.envelopes.max_memory_size`. - BufferEnvelopesMemoryBytes, - /// The file size of the buffer db on disk, in bytes. - /// - /// This metric is computed by multiplying `page_count * page_size`. - BufferDiskSize, - /// Number of attempts needed to dequeue spooled envelopes from disk. - BufferDequeueAttempts, + /// Number of elements in the envelope buffer across all the stacks. /// /// This metric is tagged with: @@ -333,9 +304,6 @@ impl HistogramMetric for RelayHistograms { RelayHistograms::EventSpans => "event.spans", RelayHistograms::BatchesPerPartition => "metrics.buckets.batches_per_partition", RelayHistograms::BucketsPerBatch => "metrics.buckets.per_batch", - RelayHistograms::BufferEnvelopesMemoryBytes => "buffer.envelopes_mem", - RelayHistograms::BufferDiskSize => "buffer.disk_size", - RelayHistograms::BufferDequeueAttempts => "buffer.dequeue_attempts", RelayHistograms::BufferEnvelopesCount => "buffer.envelopes_count", RelayHistograms::BufferBackpressureEnvelopesCount => { "buffer.backpressure_envelopes_count" @@ -525,12 +493,6 @@ pub enum RelayTimers { /// This metric is tagged with: /// - `task`: The type of the task the project cache does. LegacyProjectCacheTaskDuration, - /// Timing in milliseconds for processing a message in the buffer service. - /// - /// This metric is tagged with: - /// - /// - `message`: The type of message that was processed. - BufferMessageProcessDuration, /// Timing in milliseconds for handling and responding to a health check request. /// /// This metric is tagged with: @@ -629,7 +591,6 @@ impl TimerMetric for RelayTimers { RelayTimers::ReplayRecordingProcessing => "replay.recording.process", RelayTimers::GlobalConfigRequestDuration => "global_config.requests.duration", RelayTimers::ProcessMessageDuration => "processor.message.duration", - RelayTimers::BufferMessageProcessDuration => "buffer.message.duration", RelayTimers::ProjectCacheTaskDuration => "project_cache.task.duration", RelayTimers::LegacyProjectCacheMessageDuration => { "legacy_project_cache.message.duration" @@ -688,20 +649,10 @@ pub enum RelayCounters { /// - `handling`: Either `"success"` if the envelope was handled correctly, or `"failure"` if /// there was an error or bug. EnvelopeRejected, - /// Number of times the envelope buffer spools to disk. - BufferWritesDisk, - /// Number of times the envelope buffer reads back from disk. - BufferReadsDisk, /// Number of _envelopes_ the envelope buffer ingests. BufferEnvelopesWritten, /// Number of _envelopes_ the envelope buffer produces. BufferEnvelopesRead, - /// Number of state changes in the envelope buffer. - /// This metric is tagged with: - /// - `state_in`: The previous state. `memory`, `memory_file_standby`, or `disk`. - /// - `state_out`: The new state. `memory`, `memory_file_standby`, or `disk`. - /// - `reason`: Why a transition was made (or not made). - BufferStateTransition, /// Number of envelopes that were returned to the envelope buffer by the project cache. /// /// This happens when the envelope buffer falsely assumes that the envelope's projects are loaded @@ -901,12 +852,9 @@ impl CounterMetric for RelayCounters { RelayCounters::EventCorrupted => "event.corrupted", RelayCounters::EnvelopeAccepted => "event.accepted", RelayCounters::EnvelopeRejected => "event.rejected", - RelayCounters::BufferWritesDisk => "buffer.writes", - RelayCounters::BufferReadsDisk => "buffer.reads", RelayCounters::BufferEnvelopesWritten => "buffer.envelopes_written", RelayCounters::BufferEnvelopesRead => "buffer.envelopes_read", RelayCounters::BufferEnvelopesReturned => "buffer.envelopes_returned", - RelayCounters::BufferStateTransition => "buffer.state.transition", RelayCounters::BufferEnvelopeStacksPopped => "buffer.envelope_stacks_popped", RelayCounters::BufferTryPop => "buffer.try_pop", RelayCounters::BufferReadyToPop => "buffer.ready_to_pop", diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index c3b5228623..a492137808 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -13,7 +13,6 @@ use relay_system::Addr; use relay_test::mock_service; use crate::envelope::{Envelope, Item, ItemType}; -use crate::extractors::RequestMeta; use crate::metrics::{MetricOutcomes, MetricStats}; #[cfg(feature = "processing")] use crate::service::create_redis_pools; @@ -104,18 +103,6 @@ pub fn new_envelope>(with_dsc: bool, transaction_name: T) -> Box envelope } -pub fn empty_envelope() -> Box { - empty_envelope_with_dsn("e12d836b15bb49d7bbf99e64295d995b") -} - -pub fn empty_envelope_with_dsn(dsn: &str) -> Box { - let dsn = format!("https://{dsn}:@sentry.io/42").parse().unwrap(); - - let mut envelope = Envelope::from_request(Some(EventId::new()), RequestMeta::new(dsn)); - envelope.add_item(Item::new(ItemType::Event)); - envelope -} - pub async fn create_test_processor(config: Config) -> EnvelopeProcessorService { let (outcome_aggregator, _) = mock_service("outcome_aggregator", (), |&mut (), _| {}); let (aggregator, _) = mock_service("aggregator", (), |&mut (), _| {}); diff --git a/relay/src/cli.rs b/relay/src/cli.rs index e6852fb612..048807e6eb 100644 --- a/relay/src/cli.rs +++ b/relay/src/cli.rs @@ -1,15 +1,13 @@ -use std::io::BufRead; use std::path::{Path, PathBuf}; use std::{env, io}; -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{anyhow, bail, Result}; use clap::ArgMatches; use clap_complete::Shell; use dialoguer::{Confirm, Select}; use relay_config::{ Config, ConfigError, ConfigErrorKind, Credentials, MinimalConfig, OverridableConfig, RelayMode, }; -use relay_server::spool_utils; use uuid::Uuid; use crate::cliapp::make_app; @@ -65,8 +63,6 @@ pub fn execute() -> Result<()> { let arg_config = extract_config_args(matches); config.apply_override(arg_config)?; run(config, matches) - } else if let Some(matches) = matches.subcommand_matches("spool") { - manage_spool(&config, matches) } else { unreachable!(); } @@ -322,49 +318,6 @@ pub fn init_config>(config_path: P, _matches: &ArgMatches) -> Res Ok(()) } -/// Manages the on-disk spool file. -pub fn manage_spool(config: &Config, matches: &ArgMatches) -> Result<()> { - let Some(matches) = matches.subcommand_matches("clear") else { - unreachable!() - }; - - let path = match matches.get_one::("path") { - Some(path) => path.to_owned(), - None => config - .spool_envelopes_path(0) - .context("Config file does not contain the path to the spool file.")?, - }; - - if !path.is_file() { - bail!("Could not find provided file: {}", path.display()); - } - - let force = matches.get_flag("force"); - - if !force { - let stdin = std::io::stdin(); - eprintln!( - "Are you sure you want to clear up on-disk spooled data in {} (yes/no): ", - path.display() - ); - for line in stdin.lock().lines().map_while(Result::ok) { - match line.trim().to_lowercase().as_str() { - "yes" => break, - "no" => bail!("Canceling the cleaning of the spooled data."), - _ => eprintln!("Accepting only YES or NO: "), - } - } - } - - relay_log::info!("Clearing the spool file: {}", path.to_string_lossy()); - - let truncated = spool_utils::truncate(&path)?; - - relay_log::info!("On-disk spool emptied. Deleted {truncated} envelopes."); - - Ok(()) -} - pub fn generate_completions(matches: &ArgMatches) -> Result<()> { let shell = match matches.get_one::("format") { Some(shell) => *shell, diff --git a/tests/integration/fixtures/relay.py b/tests/integration/fixtures/relay.py index f0da584cb8..bc335f39bd 100644 --- a/tests/integration/fixtures/relay.py +++ b/tests/integration/fixtures/relay.py @@ -147,10 +147,6 @@ def inner( "flush_interval": 0, }, }, - "spool": { - # Unspool as quickly as possible - "envelopes": {"unspool_interval": 1, "version": "experimental"}, - }, } if static_relays is not None: diff --git a/tests/integration/test_basic.py b/tests/integration/test_basic.py index 1833d1cdff..b488d716e7 100644 --- a/tests/integration/test_basic.py +++ b/tests/integration/test_basic.py @@ -25,10 +25,7 @@ def get_project_config(): relay = relay( mini_sentry, - { - "limits": {"shutdown_timeout": 2}, - "spool": {"envelopes": {"version": "experimental"}}, - }, + {"limits": {"shutdown_timeout": 2}}, ) relay.send_event(project_id) @@ -61,7 +58,7 @@ def get_project_config(): mini_sentry, { "limits": {"shutdown_timeout": 2}, - "spool": {"envelopes": {"version": "experimental", "path": db_file_path}}, + "spool": {"envelopes": {"path": db_file_path}}, }, ) diff --git a/tests/integration/test_healthchecks.py b/tests/integration/test_healthchecks.py index 72634ca3f1..2e02e9173b 100644 --- a/tests/integration/test_healthchecks.py +++ b/tests/integration/test_healthchecks.py @@ -175,7 +175,6 @@ def test_readiness_disk_spool(mini_sentry, relay): "spool": { # if the config contains max_disk_size and max_memory_size set both to 0, Relay will never passes readiness check "envelopes": { - "version": "experimental", "path": dbfile, "max_disk_size": 24577, # one more than the initial size "batch_size_bytes": 1,