Skip to content

Commit

Permalink
Adds header to bifrost payloads
Browse files Browse the repository at this point in the history
This also changes the loglet API to accept Bytes to avoid duplication of serialization across loglet implementations. This holds the current invariant that payloads are opaque to loglets.
  • Loading branch information
AhmedSoliman committed May 21, 2024
1 parent 55d7f03 commit 8f8a640
Show file tree
Hide file tree
Showing 13 changed files with 237 additions and 137 deletions.
20 changes: 16 additions & 4 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use bytes::BytesMut;
use enum_map::EnumMap;
use once_cell::sync::OnceCell;
use tracing::{error, instrument};

use restate_core::{metadata, Metadata, MetadataKind};
use restate_types::logs::metadata::ProviderKind;
use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber};
use restate_types::storage::StorageCodec;
use restate_types::Version;
use tracing::{error, instrument};

use crate::loglet::{LogletBase, LogletProvider, LogletWrapper};
use crate::watchdog::{WatchdogCommand, WatchdogSender};
Expand Down Expand Up @@ -158,14 +160,20 @@ impl BifrostInner {
pub async fn append(&self, log_id: LogId, payload: Payload) -> Result<Lsn, Error> {
self.fail_if_shutting_down()?;
let loglet = self.writeable_loglet(log_id).await?;
loglet.append(payload).await
let mut buf = BytesMut::default();
StorageCodec::encode(payload, &mut buf).expect("serialization to bifrost is infallible");
loglet.append(buf.freeze()).await
}

pub async fn read_next_single(&self, log_id: LogId, after: Lsn) -> Result<LogRecord, Error> {
self.fail_if_shutting_down()?;

let loglet = self.find_loglet_for_lsn(log_id, after.next()).await?;
loglet.read_next_single(after).await
Ok(loglet
.read_next_single(after)
.await?
.decode()
.expect("decoding a bifrost envelope succeeds"))
}

pub async fn read_next_single_opt(
Expand All @@ -176,7 +184,11 @@ impl BifrostInner {
self.fail_if_shutting_down()?;

let loglet = self.find_loglet_for_lsn(log_id, after.next()).await?;
loglet.read_next_single_opt(after).await
Ok(loglet.read_next_single_opt(after).await?.map(|record| {
record
.decode()
.expect("decoding a bifrost envelope succeeds")
}))
}

pub async fn find_tail(
Expand Down
2 changes: 2 additions & 0 deletions crates/bifrost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ mod error;
mod loglet;
mod loglets;
mod read_stream;
mod record;
mod service;
mod types;
mod watchdog;

pub use bifrost::Bifrost;
pub use error::{Error, ProviderError};
pub use read_stream::LogReadStream;
pub use record::*;
pub use service::BifrostService;
pub use types::*;
21 changes: 12 additions & 9 deletions crates/bifrost/src/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ use std::sync::Arc;

use async_trait::async_trait;

use bytes::Bytes;
use restate_types::config::Configuration;
use restate_types::logs::metadata::{LogletParams, ProviderKind};
use restate_types::logs::{Lsn, Payload, SequenceNumber};
use restate_types::logs::{Lsn, SequenceNumber};

use crate::{Error, LogRecord, LsnExt, ProviderError};

Expand Down Expand Up @@ -104,7 +105,7 @@ pub trait LogletBase: Send + Sync {
type Offset: SequenceNumber;

/// Append a record to the loglet.
async fn append(&self, payload: Payload) -> Result<Self::Offset, Error>;
async fn append(&self, data: Bytes) -> Result<Self::Offset, Error>;

/// Find the tail of the loglet. If the loglet is empty or have been trimmed, the loglet should
/// return `None`.
Expand All @@ -116,22 +117,24 @@ pub trait LogletBase: Send + Sync {

/// Read or wait for the record at `from` offset, or the next available record if `from` isn't
/// defined for the loglet.
async fn read_next_single(&self, after: Self::Offset)
-> Result<LogRecord<Self::Offset>, Error>;
async fn read_next_single(
&self,
after: Self::Offset,
) -> Result<LogRecord<Self::Offset, Bytes>, Error>;

/// Read the next record if it's been committed, otherwise, return None without waiting.
async fn read_next_single_opt(
&self,
after: Self::Offset,
) -> Result<Option<LogRecord<Self::Offset>>, Error>;
) -> Result<Option<LogRecord<Self::Offset, Bytes>>, Error>;
}

#[async_trait]
impl LogletBase for LogletWrapper {
type Offset = Lsn;

async fn append(&self, payload: Payload) -> Result<Lsn, Error> {
let offset = self.loglet.append(payload).await?;
async fn append(&self, data: Bytes) -> Result<Lsn, Error> {
let offset = self.loglet.append(data).await?;
// Return the LSN given the loglet offset.
Ok(self.base_lsn.offset_by(offset))
}
Expand All @@ -146,7 +149,7 @@ impl LogletBase for LogletWrapper {
Ok(self.base_lsn.offset_by(offset))
}

async fn read_next_single(&self, after: Lsn) -> Result<LogRecord<Lsn>, Error> {
async fn read_next_single(&self, after: Lsn) -> Result<LogRecord<Lsn, Bytes>, Error> {
// convert LSN to loglet offset
let offset = after.into_offset(self.base_lsn);
self.loglet
Expand All @@ -158,7 +161,7 @@ impl LogletBase for LogletWrapper {
async fn read_next_single_opt(
&self,
after: Self::Offset,
) -> Result<Option<LogRecord<Self::Offset>>, Error> {
) -> Result<Option<LogRecord<Self::Offset, Bytes>>, Error> {
let offset = after.into_offset(self.base_lsn);
self.loglet
.read_next_single_opt(offset)
Expand Down
17 changes: 10 additions & 7 deletions crates/bifrost/src/loglets/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub use log_store::LogStoreError;
use metrics::{counter, histogram};
pub use provider::LocalLogletProvider;
use restate_core::ShutdownError;
use restate_types::logs::{Payload, SequenceNumber};
use restate_types::logs::SequenceNumber;
use tokio::sync::Mutex;
use tracing::{debug, warn};

Expand Down Expand Up @@ -95,7 +95,10 @@ impl LocalLoglet {
self.release_watch.notify(release_pointer);
}

fn read_after(&self, after: LogletOffset) -> Result<Option<LogRecord<LogletOffset>>, Error> {
fn read_after(
&self,
after: LogletOffset,
) -> Result<Option<LogRecord<LogletOffset, Bytes>>, Error> {
let trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed));
// Are we reading after before the trim point? Note that if `trim_point` == `after`
// then we don't return a trim gap, because the next record is potentially a data
Expand Down Expand Up @@ -138,15 +141,15 @@ impl LocalLoglet {
return Ok(None);
}
let data = Bytes::from(data);
Ok(Some(LogRecord::new_data(key.offset, Payload::from(data))))
Ok(Some(LogRecord::new_data(key.offset, data)))
}
}
}

#[async_trait]
impl LogletBase for LocalLoglet {
type Offset = LogletOffset;
async fn append(&self, payload: Payload) -> Result<LogletOffset, Error> {
async fn append(&self, payload: Bytes) -> Result<LogletOffset, Error> {
counter!(BIFROST_LOCAL_APPEND).increment(1);
let start_time = std::time::Instant::now();
// We hold the lock to ensure that offsets are enqueued in the order of
Expand All @@ -162,7 +165,7 @@ impl LogletBase for LocalLoglet {
.enqueue_put_record(
self.log_id,
offset,
payload.into(),
payload,
true, /* release_immediately */
)
.await?;
Expand Down Expand Up @@ -199,7 +202,7 @@ impl LogletBase for LocalLoglet {
async fn read_next_single(
&self,
after: Self::Offset,
) -> Result<LogRecord<Self::Offset>, Error> {
) -> Result<LogRecord<Self::Offset, Bytes>, Error> {
loop {
let next_record = self.read_after(after)?;
if let Some(next_record) = next_record {
Expand All @@ -213,7 +216,7 @@ impl LogletBase for LocalLoglet {
async fn read_next_single_opt(
&self,
after: Self::Offset,
) -> Result<Option<LogRecord<Self::Offset>>, Error> {
) -> Result<Option<LogRecord<Self::Offset, Bytes>>, Error> {
self.read_after(after)
}
}
43 changes: 23 additions & 20 deletions crates/bifrost/src/loglets/memory_loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

use async_trait::async_trait;
use bytes::Bytes;
use restate_types::logs::metadata::LogletParams;
use restate_types::logs::{Payload, SequenceNumber};
use restate_types::logs::SequenceNumber;
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::sync::Mutex as AsyncMutex;
use tracing::{debug, info};
Expand Down Expand Up @@ -111,7 +112,7 @@ impl Ord for OffsetWatcher {
pub struct MemoryLoglet {
// We treat params as an opaque identifier for the underlying loglet.
params: LogletParams,
log: Mutex<Vec<Payload>>,
log: Mutex<Vec<Bytes>>,
// internal offset of the first record (or slot available)
trim_point_offset: AtomicU64,
last_committed_offset: AtomicU64,
Expand Down Expand Up @@ -173,7 +174,10 @@ impl MemoryLoglet {
}
}

fn read_after(&self, after: LogletOffset) -> Result<Option<LogRecord<LogletOffset>>, Error> {
fn read_after(
&self,
after: LogletOffset,
) -> Result<Option<LogRecord<LogletOffset, Bytes>>, Error> {
let guard = self.log.lock().unwrap();
let trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Acquire));
// are we reading after before the trim point? Note that if trim_point == after then we
Expand Down Expand Up @@ -201,7 +205,7 @@ impl MemoryLoglet {
impl LogletBase for MemoryLoglet {
type Offset = LogletOffset;

async fn append(&self, payload: Payload) -> Result<LogletOffset, Error> {
async fn append(&self, payload: Bytes) -> Result<LogletOffset, Error> {
let mut log = self.log.lock().unwrap();
let offset = self.index_to_offset(log.len());
debug!(
Expand Down Expand Up @@ -233,24 +237,23 @@ impl LogletBase for MemoryLoglet {
async fn read_next_single(
&self,
after: LogletOffset,
) -> Result<LogRecord<Self::Offset>, Error> {
) -> Result<LogRecord<Self::Offset, Bytes>, Error> {
loop {
let next_record = self.read_after(after)?;
if let Some(next_record) = next_record {
break Ok(next_record);
} else {
// Wait and respond when available.
let receiver = self.watch_for_offset(after.next());
receiver.await.unwrap();
continue;
}
// Wait and respond when available.
let receiver = self.watch_for_offset(after.next());
receiver.await.unwrap();
continue;
}
}

async fn read_next_single_opt(
&self,
after: Self::Offset,
) -> Result<Option<LogRecord<Self::Offset>>, Error> {
) -> Result<Option<LogRecord<Self::Offset, Bytes>>, Error> {
self.read_after(after)
}
}
Expand All @@ -273,19 +276,19 @@ mod tests {
assert_eq!(None, loglet.find_tail().await?);

// Append 1
let offset = loglet.append(Payload::from("record1")).await?;
let offset = loglet.append(Bytes::from_static(b"record1")).await?;
assert_eq!(LogletOffset::OLDEST, offset);
assert_eq!(LogletOffset::INVALID, loglet.get_trim_point().await?);
assert_eq!(Some(LogletOffset::OLDEST), loglet.find_tail().await?);

// Append 2
let offset = loglet.append(Payload::from("record2")).await?;
let offset = loglet.append(Bytes::from_static(b"record2")).await?;
assert_eq!(LogletOffset(2), offset);
assert_eq!(LogletOffset::INVALID, loglet.get_trim_point().await?);
assert_eq!(Some(LogletOffset(2)), loglet.find_tail().await?);

// Append 3
let offset = loglet.append(Payload::from("record3")).await?;
let offset = loglet.append(Bytes::from_static(b"record3")).await?;
assert_eq!(LogletOffset(3), offset);
assert_eq!(LogletOffset::INVALID, loglet.get_trim_point().await?);
assert_eq!(Some(LogletOffset(3)), loglet.find_tail().await?);
Expand All @@ -296,17 +299,17 @@ mod tests {
assert_eq!(offset, loglet.get_trim_point().await?.next());
assert_eq!(LogletOffset::OLDEST, offset);
assert!(record.is_data());
assert_eq!(Payload::from("record1"), record.into_payload_unchecked());
assert_eq!(Some(&Bytes::from_static(b"record1")), record.payload());

// read record 2 (reading next after OLDEST)
let LogRecord { offset, record } = loglet.read_next_single(offset).await?;
assert_eq!(LogletOffset(2), offset);
assert_eq!(Payload::from("record2"), record.into_payload_unchecked());
assert_eq!(Some(&Bytes::from_static(b"record2")), record.payload());

// read record 3
let LogRecord { offset, record } = loglet.read_next_single(offset).await?;
assert_eq!(LogletOffset(3), offset);
assert_eq!(Payload::from("record3"), record.into_payload_unchecked());
assert_eq!(Some(&Bytes::from_static(b"record3")), record.payload());

// read from the future returns None
assert!(loglet
Expand All @@ -320,7 +323,7 @@ mod tests {
// read future record 4
let LogRecord { offset, record } = loglet.read_next_single(LogletOffset(3)).await?;
assert_eq!(LogletOffset(4), offset);
assert_eq!(Payload::from("record4"), record.into_payload_unchecked());
assert_eq!(Some(&Bytes::from_static(b"record4")), record.payload());
Ok(())
}
});
Expand All @@ -332,7 +335,7 @@ mod tests {
// read future record 10
let LogRecord { offset, record } = loglet.read_next_single(LogletOffset(9)).await?;
assert_eq!(LogletOffset(10), offset);
assert_eq!(Payload::from("record10"), record.into_payload_unchecked());
assert_eq!(Some(&Bytes::from_static(b"record10")), record.payload());
Ok(())
}
});
Expand All @@ -342,7 +345,7 @@ mod tests {
assert!(!handle1.is_finished());

// Append 4
let offset = loglet.append(Payload::from("record4")).await?;
let offset = loglet.append(Bytes::from_static(b"record4")).await?;
assert_eq!(LogletOffset(4), offset);
assert_eq!(LogletOffset::INVALID, loglet.get_trim_point().await?);
assert_eq!(Some(LogletOffset(4)), loglet.find_tail().await?);
Expand Down
6 changes: 3 additions & 3 deletions crates/bifrost/src/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ mod tests {
info!(?record, "read record");
assert_eq!(expected_lsn, record.offset);
assert_eq!(
Payload::from(format!("record{}", expected_lsn)),
Payload::new(format!("record{}", expected_lsn)),
record.record.into_payload_unchecked()
);
assert_eq!(expected_lsn, reader.current_read_pointer());
Expand All @@ -133,7 +133,7 @@ mod tests {
// append 5 records to the log
for i in 1..=5 {
let lsn = bifrost
.append(LogId::from(0), format!("record{}", i).into())
.append(LogId::from(0), Payload::new(format!("record{}", i)))
.await?;
info!(?lsn, "appended record");
}
Expand All @@ -147,7 +147,7 @@ mod tests {
// write 5 more records.
for i in 6..=10 {
bifrost
.append(LogId::from(0), format!("record{}", i).into())
.append(LogId::from(0), Payload::new(format!("record{}", i)))
.await?;
}

Expand Down
Loading

0 comments on commit 8f8a640

Please sign in to comment.