From 8f8a6406764bc1a5af445b6590c2401e00a24e2d Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Tue, 21 May 2024 10:05:19 +0100 Subject: [PATCH] Adds header to bifrost payloads This also changes the loglet API to accept Bytes to avoid duplication of serialization across loglet implementations. This holds the current invariant that payloads are opaque to loglets. --- crates/bifrost/src/bifrost.rs | 20 +++- crates/bifrost/src/lib.rs | 2 + crates/bifrost/src/loglet.rs | 21 ++-- .../bifrost/src/loglets/local_loglet/mod.rs | 17 ++-- crates/bifrost/src/loglets/memory_loglet.rs | 43 +++++---- crates/bifrost/src/read_stream.rs | 6 +- crates/bifrost/src/record.rs | 96 +++++++++++++++++++ crates/bifrost/src/types.rs | 67 +------------ crates/ingress-dispatcher/src/dispatcher.rs | 4 +- crates/types/src/logs/mod.rs | 92 +++++++++++++----- crates/wal-protocol/src/lib.rs | 2 +- crates/worker/src/partition/mod.rs | 2 +- .../worker/src/partition_processor_manager.rs | 2 +- 13 files changed, 237 insertions(+), 137 deletions(-) create mode 100644 crates/bifrost/src/record.rs diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index aea2d24ec..cc7c8a874 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -15,14 +15,16 @@ use std::ops::Deref; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use bytes::BytesMut; use enum_map::EnumMap; use once_cell::sync::OnceCell; +use tracing::{error, instrument}; use restate_core::{metadata, Metadata, MetadataKind}; use restate_types::logs::metadata::ProviderKind; use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber}; +use restate_types::storage::StorageCodec; use restate_types::Version; -use tracing::{error, instrument}; use crate::loglet::{LogletBase, LogletProvider, LogletWrapper}; use crate::watchdog::{WatchdogCommand, WatchdogSender}; @@ -158,14 +160,20 @@ impl BifrostInner { pub async fn append(&self, log_id: LogId, payload: Payload) -> Result { self.fail_if_shutting_down()?; let loglet = self.writeable_loglet(log_id).await?; - loglet.append(payload).await + let mut buf = BytesMut::default(); + StorageCodec::encode(payload, &mut buf).expect("serialization to bifrost is infallible"); + loglet.append(buf.freeze()).await } pub async fn read_next_single(&self, log_id: LogId, after: Lsn) -> Result { self.fail_if_shutting_down()?; let loglet = self.find_loglet_for_lsn(log_id, after.next()).await?; - loglet.read_next_single(after).await + Ok(loglet + .read_next_single(after) + .await? + .decode() + .expect("decoding a bifrost envelope succeeds")) } pub async fn read_next_single_opt( @@ -176,7 +184,11 @@ impl BifrostInner { self.fail_if_shutting_down()?; let loglet = self.find_loglet_for_lsn(log_id, after.next()).await?; - loglet.read_next_single_opt(after).await + Ok(loglet.read_next_single_opt(after).await?.map(|record| { + record + .decode() + .expect("decoding a bifrost envelope succeeds") + })) } pub async fn find_tail( diff --git a/crates/bifrost/src/lib.rs b/crates/bifrost/src/lib.rs index 969bdbe17..6fded3cc8 100644 --- a/crates/bifrost/src/lib.rs +++ b/crates/bifrost/src/lib.rs @@ -13,6 +13,7 @@ mod error; mod loglet; mod loglets; mod read_stream; +mod record; mod service; mod types; mod watchdog; @@ -20,5 +21,6 @@ mod watchdog; pub use bifrost::Bifrost; pub use error::{Error, ProviderError}; pub use read_stream::LogReadStream; +pub use record::*; pub use service::BifrostService; pub use types::*; diff --git a/crates/bifrost/src/loglet.rs b/crates/bifrost/src/loglet.rs index 3dc3435e9..314389c68 100644 --- a/crates/bifrost/src/loglet.rs +++ b/crates/bifrost/src/loglet.rs @@ -12,9 +12,10 @@ use std::sync::Arc; use async_trait::async_trait; +use bytes::Bytes; use restate_types::config::Configuration; use restate_types::logs::metadata::{LogletParams, ProviderKind}; -use restate_types::logs::{Lsn, Payload, SequenceNumber}; +use restate_types::logs::{Lsn, SequenceNumber}; use crate::{Error, LogRecord, LsnExt, ProviderError}; @@ -104,7 +105,7 @@ pub trait LogletBase: Send + Sync { type Offset: SequenceNumber; /// Append a record to the loglet. - async fn append(&self, payload: Payload) -> Result; + async fn append(&self, data: Bytes) -> Result; /// Find the tail of the loglet. If the loglet is empty or have been trimmed, the loglet should /// return `None`. @@ -116,22 +117,24 @@ pub trait LogletBase: Send + Sync { /// Read or wait for the record at `from` offset, or the next available record if `from` isn't /// defined for the loglet. - async fn read_next_single(&self, after: Self::Offset) - -> Result, Error>; + async fn read_next_single( + &self, + after: Self::Offset, + ) -> Result, Error>; /// Read the next record if it's been committed, otherwise, return None without waiting. async fn read_next_single_opt( &self, after: Self::Offset, - ) -> Result>, Error>; + ) -> Result>, Error>; } #[async_trait] impl LogletBase for LogletWrapper { type Offset = Lsn; - async fn append(&self, payload: Payload) -> Result { - let offset = self.loglet.append(payload).await?; + async fn append(&self, data: Bytes) -> Result { + let offset = self.loglet.append(data).await?; // Return the LSN given the loglet offset. Ok(self.base_lsn.offset_by(offset)) } @@ -146,7 +149,7 @@ impl LogletBase for LogletWrapper { Ok(self.base_lsn.offset_by(offset)) } - async fn read_next_single(&self, after: Lsn) -> Result, Error> { + async fn read_next_single(&self, after: Lsn) -> Result, Error> { // convert LSN to loglet offset let offset = after.into_offset(self.base_lsn); self.loglet @@ -158,7 +161,7 @@ impl LogletBase for LogletWrapper { async fn read_next_single_opt( &self, after: Self::Offset, - ) -> Result>, Error> { + ) -> Result>, Error> { let offset = after.into_offset(self.base_lsn); self.loglet .read_next_single_opt(offset) diff --git a/crates/bifrost/src/loglets/local_loglet/mod.rs b/crates/bifrost/src/loglets/local_loglet/mod.rs index 6d228c590..6238094cb 100644 --- a/crates/bifrost/src/loglets/local_loglet/mod.rs +++ b/crates/bifrost/src/loglets/local_loglet/mod.rs @@ -22,7 +22,7 @@ pub use log_store::LogStoreError; use metrics::{counter, histogram}; pub use provider::LocalLogletProvider; use restate_core::ShutdownError; -use restate_types::logs::{Payload, SequenceNumber}; +use restate_types::logs::SequenceNumber; use tokio::sync::Mutex; use tracing::{debug, warn}; @@ -95,7 +95,10 @@ impl LocalLoglet { self.release_watch.notify(release_pointer); } - fn read_after(&self, after: LogletOffset) -> Result>, Error> { + fn read_after( + &self, + after: LogletOffset, + ) -> Result>, Error> { let trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); // Are we reading after before the trim point? Note that if `trim_point` == `after` // then we don't return a trim gap, because the next record is potentially a data @@ -138,7 +141,7 @@ impl LocalLoglet { return Ok(None); } let data = Bytes::from(data); - Ok(Some(LogRecord::new_data(key.offset, Payload::from(data)))) + Ok(Some(LogRecord::new_data(key.offset, data))) } } } @@ -146,7 +149,7 @@ impl LocalLoglet { #[async_trait] impl LogletBase for LocalLoglet { type Offset = LogletOffset; - async fn append(&self, payload: Payload) -> Result { + async fn append(&self, payload: Bytes) -> Result { counter!(BIFROST_LOCAL_APPEND).increment(1); let start_time = std::time::Instant::now(); // We hold the lock to ensure that offsets are enqueued in the order of @@ -162,7 +165,7 @@ impl LogletBase for LocalLoglet { .enqueue_put_record( self.log_id, offset, - payload.into(), + payload, true, /* release_immediately */ ) .await?; @@ -199,7 +202,7 @@ impl LogletBase for LocalLoglet { async fn read_next_single( &self, after: Self::Offset, - ) -> Result, Error> { + ) -> Result, Error> { loop { let next_record = self.read_after(after)?; if let Some(next_record) = next_record { @@ -213,7 +216,7 @@ impl LogletBase for LocalLoglet { async fn read_next_single_opt( &self, after: Self::Offset, - ) -> Result>, Error> { + ) -> Result>, Error> { self.read_after(after) } } diff --git a/crates/bifrost/src/loglets/memory_loglet.rs b/crates/bifrost/src/loglets/memory_loglet.rs index d6ef6da7f..2c03d752c 100644 --- a/crates/bifrost/src/loglets/memory_loglet.rs +++ b/crates/bifrost/src/loglets/memory_loglet.rs @@ -15,8 +15,9 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use async_trait::async_trait; +use bytes::Bytes; use restate_types::logs::metadata::LogletParams; -use restate_types::logs::{Payload, SequenceNumber}; +use restate_types::logs::SequenceNumber; use tokio::sync::oneshot::{Receiver, Sender}; use tokio::sync::Mutex as AsyncMutex; use tracing::{debug, info}; @@ -111,7 +112,7 @@ impl Ord for OffsetWatcher { pub struct MemoryLoglet { // We treat params as an opaque identifier for the underlying loglet. params: LogletParams, - log: Mutex>, + log: Mutex>, // internal offset of the first record (or slot available) trim_point_offset: AtomicU64, last_committed_offset: AtomicU64, @@ -173,7 +174,10 @@ impl MemoryLoglet { } } - fn read_after(&self, after: LogletOffset) -> Result>, Error> { + fn read_after( + &self, + after: LogletOffset, + ) -> Result>, Error> { let guard = self.log.lock().unwrap(); let trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Acquire)); // are we reading after before the trim point? Note that if trim_point == after then we @@ -201,7 +205,7 @@ impl MemoryLoglet { impl LogletBase for MemoryLoglet { type Offset = LogletOffset; - async fn append(&self, payload: Payload) -> Result { + async fn append(&self, payload: Bytes) -> Result { let mut log = self.log.lock().unwrap(); let offset = self.index_to_offset(log.len()); debug!( @@ -233,24 +237,23 @@ impl LogletBase for MemoryLoglet { async fn read_next_single( &self, after: LogletOffset, - ) -> Result, Error> { + ) -> Result, Error> { loop { let next_record = self.read_after(after)?; if let Some(next_record) = next_record { break Ok(next_record); - } else { - // Wait and respond when available. - let receiver = self.watch_for_offset(after.next()); - receiver.await.unwrap(); - continue; } + // Wait and respond when available. + let receiver = self.watch_for_offset(after.next()); + receiver.await.unwrap(); + continue; } } async fn read_next_single_opt( &self, after: Self::Offset, - ) -> Result>, Error> { + ) -> Result>, Error> { self.read_after(after) } } @@ -273,19 +276,19 @@ mod tests { assert_eq!(None, loglet.find_tail().await?); // Append 1 - let offset = loglet.append(Payload::from("record1")).await?; + let offset = loglet.append(Bytes::from_static(b"record1")).await?; assert_eq!(LogletOffset::OLDEST, offset); assert_eq!(LogletOffset::INVALID, loglet.get_trim_point().await?); assert_eq!(Some(LogletOffset::OLDEST), loglet.find_tail().await?); // Append 2 - let offset = loglet.append(Payload::from("record2")).await?; + let offset = loglet.append(Bytes::from_static(b"record2")).await?; assert_eq!(LogletOffset(2), offset); assert_eq!(LogletOffset::INVALID, loglet.get_trim_point().await?); assert_eq!(Some(LogletOffset(2)), loglet.find_tail().await?); // Append 3 - let offset = loglet.append(Payload::from("record3")).await?; + let offset = loglet.append(Bytes::from_static(b"record3")).await?; assert_eq!(LogletOffset(3), offset); assert_eq!(LogletOffset::INVALID, loglet.get_trim_point().await?); assert_eq!(Some(LogletOffset(3)), loglet.find_tail().await?); @@ -296,17 +299,17 @@ mod tests { assert_eq!(offset, loglet.get_trim_point().await?.next()); assert_eq!(LogletOffset::OLDEST, offset); assert!(record.is_data()); - assert_eq!(Payload::from("record1"), record.into_payload_unchecked()); + assert_eq!(Some(&Bytes::from_static(b"record1")), record.payload()); // read record 2 (reading next after OLDEST) let LogRecord { offset, record } = loglet.read_next_single(offset).await?; assert_eq!(LogletOffset(2), offset); - assert_eq!(Payload::from("record2"), record.into_payload_unchecked()); + assert_eq!(Some(&Bytes::from_static(b"record2")), record.payload()); // read record 3 let LogRecord { offset, record } = loglet.read_next_single(offset).await?; assert_eq!(LogletOffset(3), offset); - assert_eq!(Payload::from("record3"), record.into_payload_unchecked()); + assert_eq!(Some(&Bytes::from_static(b"record3")), record.payload()); // read from the future returns None assert!(loglet @@ -320,7 +323,7 @@ mod tests { // read future record 4 let LogRecord { offset, record } = loglet.read_next_single(LogletOffset(3)).await?; assert_eq!(LogletOffset(4), offset); - assert_eq!(Payload::from("record4"), record.into_payload_unchecked()); + assert_eq!(Some(&Bytes::from_static(b"record4")), record.payload()); Ok(()) } }); @@ -332,7 +335,7 @@ mod tests { // read future record 10 let LogRecord { offset, record } = loglet.read_next_single(LogletOffset(9)).await?; assert_eq!(LogletOffset(10), offset); - assert_eq!(Payload::from("record10"), record.into_payload_unchecked()); + assert_eq!(Some(&Bytes::from_static(b"record10")), record.payload()); Ok(()) } }); @@ -342,7 +345,7 @@ mod tests { assert!(!handle1.is_finished()); // Append 4 - let offset = loglet.append(Payload::from("record4")).await?; + let offset = loglet.append(Bytes::from_static(b"record4")).await?; assert_eq!(LogletOffset(4), offset); assert_eq!(LogletOffset::INVALID, loglet.get_trim_point().await?); assert_eq!(Some(LogletOffset(4)), loglet.find_tail().await?); diff --git a/crates/bifrost/src/read_stream.rs b/crates/bifrost/src/read_stream.rs index 0149735ea..9947ac34c 100644 --- a/crates/bifrost/src/read_stream.rs +++ b/crates/bifrost/src/read_stream.rs @@ -116,7 +116,7 @@ mod tests { info!(?record, "read record"); assert_eq!(expected_lsn, record.offset); assert_eq!( - Payload::from(format!("record{}", expected_lsn)), + Payload::new(format!("record{}", expected_lsn)), record.record.into_payload_unchecked() ); assert_eq!(expected_lsn, reader.current_read_pointer()); @@ -133,7 +133,7 @@ mod tests { // append 5 records to the log for i in 1..=5 { let lsn = bifrost - .append(LogId::from(0), format!("record{}", i).into()) + .append(LogId::from(0), Payload::new(format!("record{}", i))) .await?; info!(?lsn, "appended record"); } @@ -147,7 +147,7 @@ mod tests { // write 5 more records. for i in 6..=10 { bifrost - .append(LogId::from(0), format!("record{}", i).into()) + .append(LogId::from(0), Payload::new(format!("record{}", i))) .await?; } diff --git a/crates/bifrost/src/record.rs b/crates/bifrost/src/record.rs new file mode 100644 index 000000000..ac4e58755 --- /dev/null +++ b/crates/bifrost/src/record.rs @@ -0,0 +1,96 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use bytes::Bytes; +use restate_types::logs::{Lsn, Payload, SequenceNumber}; +use restate_types::storage::{StorageCodec, StorageDecodeError}; + +use crate::{LsnExt, SealReason}; + +/// A single entry in the log. +#[derive(Debug, Clone)] +pub struct LogRecord { + pub offset: S, + pub record: Record, +} + +impl LogRecord { + pub(crate) fn new_data(offset: S, payload: D) -> Self { + Self { + offset, + record: Record::Data(payload), + } + } + + pub(crate) fn new_trim_gap(offset: S, until: S) -> Self { + LogRecord { + offset, + record: Record::TrimGap(TrimGap { until }), + } + } + + pub(crate) fn with_base_lsn(self, base_lsn: Lsn) -> LogRecord { + let record = match self.record { + Record::TrimGap(_) => todo!(), + Record::Data(payload) => Record::Data(payload), + Record::Seal(reason) => Record::Seal(reason), + }; + + LogRecord { + offset: base_lsn.offset_by(self.offset), + record, + } + } +} + +impl LogRecord { + pub(crate) fn decode(self) -> Result, StorageDecodeError> { + let record = match self.record { + Record::Data(mut payload) => Record::Data(StorageCodec::decode(&mut payload)?), + Record::TrimGap(t) => Record::TrimGap(t), + Record::Seal(reason) => Record::Seal(reason), + }; + Ok(LogRecord { + offset: self.offset, + record, + }) + } +} + +#[derive(Debug, Clone, strum_macros::EnumIs)] +pub enum Record { + TrimGap(TrimGap), + Data(D), + Seal(SealReason), +} + +impl Record { + pub fn payload(&self) -> Option<&D> { + match self { + Record::Data(payload) => Some(payload), + _ => None, + } + } +} + +#[cfg(any(test, feature = "test-util"))] +impl Record { + pub fn into_payload_unchecked(self) -> Payload { + match self { + Record::Data(payload) => payload, + _ => panic!("not a data record"), + } + } +} + +#[derive(Debug, Clone)] +pub struct TrimGap { + pub until: S, +} diff --git a/crates/bifrost/src/types.rs b/crates/bifrost/src/types.rs index d69f0a0ac..6b92cb20a 100644 --- a/crates/bifrost/src/types.rs +++ b/crates/bifrost/src/types.rs @@ -12,7 +12,7 @@ #![allow(dead_code)] use crate::loglet::LogletOffset; -use restate_types::logs::{Lsn, Payload, SequenceNumber}; +use restate_types::logs::{Lsn, SequenceNumber}; use serde::{Deserialize, Serialize}; pub(crate) trait LsnExt: SequenceNumber { @@ -52,71 +52,6 @@ pub enum SealReason { Other(String), } -/// A single entry in the log. -#[derive(Debug, Clone)] -pub struct LogRecord { - pub offset: S, - pub record: Record, -} - -impl LogRecord { - pub(crate) fn new_data(offset: S, payload: Payload) -> Self { - Self { - offset, - record: Record::Data(payload), - } - } - - pub(crate) fn new_trim_gap(offset: S, until: S) -> Self { - LogRecord { - offset, - record: Record::TrimGap(TrimGap { until }), - } - } - - pub(crate) fn with_base_lsn(self, base_lsn: Lsn) -> LogRecord { - let record = match self.record { - Record::TrimGap(_) => todo!(), - Record::Data(payload) => Record::Data(payload), - Record::Seal(reason) => Record::Seal(reason), - }; - - LogRecord { - offset: base_lsn.offset_by(self.offset), - record, - } - } -} - -#[derive(Debug, Clone, strum_macros::EnumIs)] -pub enum Record { - TrimGap(TrimGap), - Data(Payload), - Seal(SealReason), -} - -impl Record { - pub fn payload(&self) -> Option<&Payload> { - match self { - Record::Data(payload) => Some(payload), - _ => None, - } - } - - #[cfg(test)] - pub fn into_payload_unchecked(self) -> Payload { - match self { - Record::Data(payload) => payload, - _ => panic!("not a data record"), - } - } -} - -#[derive(Debug, Clone)] -pub struct TrimGap { - pub until: S, -} - #[derive(Debug, Clone, Default)] pub struct FindTailAttributes { // Ensure that we are reading the most recent metadata. This should be used when diff --git a/crates/ingress-dispatcher/src/dispatcher.rs b/crates/ingress-dispatcher/src/dispatcher.rs index cd6510c08..273149ede 100644 --- a/crates/ingress-dispatcher/src/dispatcher.rs +++ b/crates/ingress-dispatcher/src/dispatcher.rs @@ -397,7 +397,7 @@ mod tests { let log_record = bifrost.read_next_single(log_id, Lsn::INVALID).await?; let output_message = - Envelope::from_bytes(log_record.record.payload().unwrap().as_ref())?; + Envelope::from_bytes(log_record.record.into_payload_unchecked().into_body())?; let_assert!( Envelope { @@ -484,7 +484,7 @@ mod tests { let bifrost_messages = bifrost.read_all(LogId::from(partition_id)).await?; let output_message_1 = - Envelope::from_bytes(bifrost_messages[0].record.payload().unwrap().as_ref())?; + Envelope::from_bytes(bifrost_messages[0].record.payload().unwrap().body())?; assert_that!( output_message_1.command, diff --git a/crates/types/src/logs/mod.rs b/crates/types/src/logs/mod.rs index 54df28954..22e1f29f2 100644 --- a/crates/types/src/logs/mod.rs +++ b/crates/types/src/logs/mod.rs @@ -9,8 +9,11 @@ // by the Apache License, Version 2.0. use bytes::Bytes; +use serde::{Deserialize, Serialize}; +use crate::flexbuffers_storage_encode_decode; use crate::identifiers::PartitionId; +use crate::time::MillisSinceEpoch; pub mod metadata; @@ -26,8 +29,8 @@ pub mod metadata; derive_more::Display, derive_more::From, derive_more::Into, - serde::Serialize, - serde::Deserialize, + Serialize, + Deserialize, )] pub struct LogId(u64); @@ -59,8 +62,8 @@ impl From for LogId { derive_more::From, derive_more::Add, derive_more::Display, - serde::Serialize, - serde::Deserialize, + Serialize, + Deserialize, )] pub struct Lsn(u64); @@ -106,28 +109,71 @@ where fn prev(self) -> Self; } -/// Owned payload. -#[derive( - Debug, - Clone, - Default, - Eq, - PartialEq, - derive_more::From, - derive_more::Into, - derive_more::Deref, - derive_more::DerefMut, -)] -pub struct Payload(Bytes); +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub struct Header { + created_at: MillisSinceEpoch, + // additional custom headers can be added here. Those should be somewhat + // generic and values must be optional. + pub custom_data_1: Option, +} -impl From<&str> for Payload { - fn from(value: &str) -> Self { - Payload(Bytes::copy_from_slice(value.as_bytes())) +impl Default for Header { + fn default() -> Self { + Self { + created_at: MillisSinceEpoch::now(), + custom_data_1: None, + } } } -impl From for Payload { - fn from(value: String) -> Self { - Payload(Bytes::from(value)) +/// Owned payload that loglets accept and return as is. This payload is converted +/// into Payload by bifrost on read and write. +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub(crate) struct Envelope { + header: Header, + body: Bytes, +} + +/// Owned payload. +#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)] +pub struct Payload { + header: Header, + body: Bytes, +} + +impl Payload { + pub fn new(body: impl Into) -> Self { + Self { + header: Header::default(), + body: body.into(), + } + } + + /// Sets the custom data 1 field on the record header + pub fn with_custom_data_1(mut self, value: u64) -> Self { + self.header.custom_data_1 = Some(value); + self + } + + pub fn body(&self) -> &Bytes { + &self.body + } + + pub fn split(self) -> (Header, Bytes) { + (self.header, self.body) + } + + pub fn into_body(self) -> Bytes { + self.body + } + + pub fn header(&self) -> &Header { + &self.header + } + + pub fn into_header(self) -> Header { + self.header } } + +flexbuffers_storage_encode_decode!(Payload); diff --git a/crates/wal-protocol/src/lib.rs b/crates/wal-protocol/src/lib.rs index af5e36731..3521dcfb9 100644 --- a/crates/wal-protocol/src/lib.rs +++ b/crates/wal-protocol/src/lib.rs @@ -187,7 +187,7 @@ pub async fn append_envelope_to_bifrost( let partition_id = partition_table.find_partition_id(envelope.partition_key())?; let log_id = LogId::from(*partition_id); - let payload = Payload::from(envelope.to_bytes()?); + let payload = Payload::new(envelope.to_bytes()?); let lsn = bifrost.append(log_id, payload).await?; Ok((log_id, lsn)) diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 918924ad3..d770c4a96 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -438,7 +438,7 @@ impl LogReader { fn deserialize_record(record: Record) -> anyhow::Result { match record { Record::Data(payload) => { - let envelope = Envelope::from_bytes(payload.as_ref())?; + let envelope = Envelope::from_bytes(payload.into_body())?; Ok(envelope) } Record::TrimGap(_) => { diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 2b0d2ccc9..302f1b140 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -397,7 +397,7 @@ impl PartitionProcessorManager { leader_epoch, }), ); - let payload = Payload::from(envelope.to_bytes()?); + let payload = Payload::new(envelope.to_bytes()?); bifrost .append(LogId::from(partition_id), payload)