Skip to content

Commit

Permalink
Emit shared cache durations in cache hit, miss and error conditions (#…
Browse files Browse the repository at this point in the history
…1162)

Adds additional duration metrics to the shared cache for cache hits,
misses, and errors.

Example metrics:
```
2024-12-06T14:11:43.012775Z  INFO mountpoint_s3::metrics: express_data_cache.block_err[reason=invalid_block_offset,type=read]: 189 (n=189)
2024-12-06T14:11:43.012802Z  INFO mountpoint_s3::metrics: express_data_cache.block_hit: 0 (n=189)
2024-12-06T14:11:43.012817Z  INFO mountpoint_s3::metrics: express_data_cache.read_duration_us[type=error]: n=189: min=3 p10=3 p50=4 avg=3.87 p90=5 p99=5 p99.9=6 max=6
2024-12-06T14:11:43.012831Z  INFO mountpoint_s3::metrics: express_data_cache.total_bytes[type=write]: 380 (n=190)
2024-12-06T14:11:43.012844Z  INFO mountpoint_s3::metrics: express_data_cache.write_duration_us[type=ok]: n=190: min=8256 p10=8511 p50=8895 avg=8882.19 p90=9343 p99=9535 p99.9=9663 max=9663
```

And
```
2024-12-06T16:06:14.462602Z  INFO mountpoint_s3::metrics: express_data_cache.block_hit: 98 (n=100)
2024-12-06T16:06:14.462628Z  INFO mountpoint_s3::metrics: express_data_cache.read_duration_us[type=miss]: n=2: min=21120 p10=21247 p50=21247 avg=21824.00 p90=22527 p99=22527 p99.9=22527 max=22527
2024-12-06T16:06:14.462641Z  INFO mountpoint_s3::metrics: express_data_cache.read_duration_us[type=ok]: n=98: min=5888 p10=6015 p50=6271 avg=6378.94 p90=6559 p99=14079 p99.9=14079 max=14079
2024-12-06T16:06:14.462652Z  INFO mountpoint_s3::metrics: express_data_cache.total_bytes[type=read]: 196 (n=98)
2024-12-06T16:06:14.462663Z  INFO mountpoint_s3::metrics: express_data_cache.total_bytes[type=write]: 4 (n=2)
2024-12-06T16:06:14.462673Z  INFO mountpoint_s3::metrics: express_data_cache.write_duration_us[type=ok]: n=2: min=9408 p10=9471 p50=9471 avg=19280.00 p90=29183 p99=29183 p99.9=29183 max=29183

```

Additionally refactors the cache in response to comments in
#1146

### Does this change impact existing behavior?

Yes, the `express_data_cache.read_duration_us` metric now has a type
associated with if it was a cache hit or not.

### Does this change need a changelog entry?

No, changes to metrics don't need changelog entries.

---

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license and I agree to the terms of
the [Developer Certificate of Origin
(DCO)](https://developercertificate.org/).

---------

Signed-off-by: Simon Beal <[email protected]>
  • Loading branch information
muddyfish authored Dec 20, 2024
1 parent 3ee6fbc commit 641f613
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 64 deletions.
17 changes: 15 additions & 2 deletions mountpoint-s3/src/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ pub enum DataCacheError {
IoFailure(#[source] anyhow::Error),
#[error("Block header was not valid: {0}")]
InvalidBlockHeader(String),
#[error("Block checksum was not present")]
BlockChecksumMissing,
#[error("Block checksum was not valid")]
InvalidBlockChecksum,
#[error("Block content was not valid/readable")]
InvalidBlockContent,
#[error("Block offset does not match block index")]
Expand All @@ -42,6 +42,19 @@ pub enum DataCacheError {
EvictionFailure,
}

impl DataCacheError {
fn reason(&self) -> &'static str {
match self {
DataCacheError::IoFailure(_) => "io_failure",
DataCacheError::InvalidBlockHeader(_) => "invalid_block_header",
DataCacheError::InvalidBlockChecksum => "invalid_block_checksum",
DataCacheError::InvalidBlockContent => "invalid_block_content",
DataCacheError::InvalidBlockOffset => "invalid_block_offset",
DataCacheError::EvictionFailure => "eviction_failure",
}
}
}

pub type DataCacheResult<Value> = Result<Value, DataCacheError>;

/// Data cache for fixed-size checksummed buffers.
Expand Down
141 changes: 79 additions & 62 deletions mountpoint-s3/src/data_cache/express_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,30 +105,20 @@ where
backpressure_handle.increment_read_window(self.config.block_size as usize);
}
}
}

#[async_trait]
impl<Client> DataCache for ExpressDataCache<Client>
where
Client: ObjectClient + Send + Sync + 'static,
{
async fn get_block(
async fn read_block(
&self,
cache_key: &ObjectId,
block_idx: BlockIndex,
block_offset: u64,
object_size: usize,
) -> DataCacheResult<Option<ChecksummedBytes>> {
let start = Instant::now();

if object_size > self.config.max_object_size {
metrics::counter!("express_data_cache.block_hit").increment(0);
metrics::counter!("express_data_cache.over_max_object_size", "type" => "read").increment(1);
return Ok(None);
}

if block_offset != block_idx * self.config.block_size {
emit_failure_metric_read("invalid_block_offset");
return Err(DataCacheError::InvalidBlockOffset);
}

Expand All @@ -144,11 +134,9 @@ where
{
Ok(result) => result,
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => {
metrics::counter!("express_data_cache.block_hit").increment(0);
return Ok(None);
}
Err(e) => {
emit_failure_metric_read("get_object_failure");
return Err(DataCacheError::IoFailure(e.into()));
}
};
Expand All @@ -163,7 +151,6 @@ where
match chunk {
Ok((offset, body)) => {
if offset != buffer.len() as u64 {
emit_failure_metric_read("invalid_block_offset");
return Err(DataCacheError::InvalidBlockOffset);
}

Expand All @@ -180,70 +167,51 @@ where
self.ensure_read_window(backpressure_handle.as_mut());
}
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => {
metrics::counter!("express_data_cache.block_hit").increment(0);
return Ok(None);
}
Err(e) => {
emit_failure_metric_read("get_object_failure");
return Err(DataCacheError::IoFailure(e.into()));
}
}
}

let object_metadata = result.get_object_metadata();

let checksum = result.get_object_checksum().map_err(|err| {
emit_failure_metric_read("invalid_block_checksum");
DataCacheError::IoFailure(err.into())
})?;
let crc32c_b64 = checksum.checksum_crc32c.ok_or_else(|| {
emit_failure_metric_read("missing_block_checksum");
DataCacheError::BlockChecksumMissing
})?;
let crc32c = crc32c_from_base64(&crc32c_b64).map_err(|err| {
emit_failure_metric_read("unparsable_block_checksum");
DataCacheError::IoFailure(err.into())
})?;
let checksum = result
.get_object_checksum()
.map_err(|_| DataCacheError::InvalidBlockChecksum)?;
let crc32c_b64 = checksum
.checksum_crc32c
.ok_or_else(|| DataCacheError::InvalidBlockChecksum)?;
let crc32c = crc32c_from_base64(&crc32c_b64).map_err(|_| DataCacheError::InvalidBlockChecksum)?;

let block_metadata = BlockMetadata::new(block_idx, block_offset, cache_key, &self.bucket_name, crc32c);
block_metadata
.validate_object_metadata(&object_metadata)
.inspect_err(|_| emit_failure_metric_read("invalid_block_metadata"))?;

metrics::counter!("express_data_cache.block_hit").increment(1);
metrics::counter!("express_data_cache.total_bytes", "type" => "read").increment(buffer.len() as u64);
metrics::histogram!("express_data_cache.read_duration_us").record(start.elapsed().as_micros() as f64);
block_metadata.validate_object_metadata(&object_metadata)?;

Ok(Some(ChecksummedBytes::new_from_inner_data(buffer, crc32c)))
}

async fn put_block(
async fn write_block(
&self,
cache_key: ObjectId,
block_idx: BlockIndex,
block_offset: u64,
bytes: ChecksummedBytes,
object_size: usize,
) -> DataCacheResult<()> {
let start = Instant::now();
if object_size > self.config.max_object_size {
metrics::counter!("express_data_cache.over_max_object_size", "type" => "write").increment(1);
return Ok(());
}

if block_offset != block_idx * self.config.block_size {
emit_failure_metric_write("invalid_block_offset");
return Err(DataCacheError::InvalidBlockOffset);
}

let object_key = get_s3_key(&self.prefix, &cache_key, block_idx);

let (data, checksum) = bytes.into_inner().map_err(|_| {
emit_failure_metric_write("invalid_block_content");
DataCacheError::InvalidBlockContent
})?;
let (data, checksum) = bytes.into_inner().map_err(|_| DataCacheError::InvalidBlockContent)?;
let block_metadata = BlockMetadata::new(block_idx, block_offset, &cache_key, &self.bucket_name, checksum);
let data_length = data.len() as u64;

self.client
.put_object_single(
Expand All @@ -254,29 +222,78 @@ where
)
.in_current_span()
.await
.inspect_err(|_| {
emit_failure_metric_write("put_object_failure");
})?;
.map_err(|err| DataCacheError::IoFailure(err.into()))?;

metrics::counter!("express_data_cache.total_bytes", "type" => "write").increment(data_length);
metrics::histogram!("express_data_cache.write_duration_us").record(start.elapsed().as_micros() as f64);
Ok(())
}
}

fn block_size(&self) -> u64 {
self.config.block_size
#[async_trait]
impl<Client> DataCache for ExpressDataCache<Client>
where
Client: ObjectClient + Send + Sync + 'static,
{
async fn get_block(
&self,
cache_key: &ObjectId,
block_idx: BlockIndex,
block_offset: u64,
object_size: usize,
) -> DataCacheResult<Option<ChecksummedBytes>> {
let start = Instant::now();
let (result, result_type) = match self.read_block(cache_key, block_idx, block_offset, object_size).await {
Ok(Some(data)) => {
metrics::counter!("express_data_cache.block_hit").increment(1);
metrics::counter!("express_data_cache.total_bytes", "type" => "read").increment(data.len() as u64);
(Ok(Some(data)), "ok")
}
Ok(None) => {
metrics::counter!("express_data_cache.block_hit").increment(0);
(Ok(None), "miss")
}
Err(err) => {
metrics::counter!("express_data_cache.block_hit").increment(0);
metrics::counter!("express_data_cache.block_err", "reason" => err.reason(), "type" => "read")
.increment(1);
(Err(err), "error")
}
};
metrics::histogram!("express_data_cache.read_duration_us", "type" => result_type)
.record(start.elapsed().as_micros() as f64);
result
}
}

#[inline]
fn emit_failure_metric_read(reason: &'static str) {
metrics::counter!("express_data_cache.block_hit").increment(0);
metrics::counter!("express_data_cache.block_err", "reason" => reason, "type" => "read").increment(1);
}
async fn put_block(
&self,
cache_key: ObjectId,
block_idx: BlockIndex,
block_offset: u64,
bytes: ChecksummedBytes,
object_size: usize,
) -> DataCacheResult<()> {
let start = Instant::now();
let (result, result_type) = match self
.write_block(cache_key, block_idx, block_offset, bytes, object_size)
.await
{
Ok(()) => {
metrics::counter!("express_data_cache.total_bytes", "type" => "write").increment(object_size as u64);
(Ok(()), "ok")
}
Err(err) => {
metrics::counter!("express_data_cache.block_err", "reason" => err.reason(), "type" => "write")
.increment(1);
(Err(err), "error")
}
};
metrics::histogram!("express_data_cache.write_duration_us", "type" => result_type)
.record(start.elapsed().as_micros() as f64);
result
}

#[inline]
fn emit_failure_metric_write(reason: &'static str) {
metrics::counter!("express_data_cache.block_err", "reason" => reason, "type" => "write").increment(1);
fn block_size(&self) -> u64 {
self.config.block_size
}
}

/// Metadata about the cached object to ensure that the object we've retrieved is the one we were
Expand Down Expand Up @@ -335,7 +352,7 @@ impl BlockMetadata {
}

/// Validate the object metadata headers received match this BlockMetadata object.
pub fn validate_object_metadata(&self, headers: &HashMap<String, String>) -> Result<(), DataCacheError> {
pub fn validate_object_metadata(&self, headers: &HashMap<String, String>) -> DataCacheResult<()> {
self.validate_header(headers, "cache-version", |version| version == CACHE_VERSION)?;
self.validate_header(headers, "block-idx", |block_idx| {
block_idx.parse() == Ok(self.block_idx)
Expand All @@ -362,7 +379,7 @@ impl BlockMetadata {
headers: &HashMap<String, String>,
header: &str,
is_valid: F,
) -> Result<(), DataCacheError> {
) -> DataCacheResult<()> {
let value = headers
.get(header)
.ok_or(DataCacheError::InvalidBlockHeader(header.to_string()))?;
Expand Down Expand Up @@ -600,7 +617,7 @@ mod tests {
.get_block(&cache_key, 0, 0, data.len())
.await
.expect_err("cache should return error if checksum isn't present");
assert!(matches!(err, DataCacheError::BlockChecksumMissing));
assert!(matches!(err, DataCacheError::InvalidBlockChecksum));

// Remove the object metadata when writing.
client
Expand Down

0 comments on commit 641f613

Please sign in to comment.