Skip to content

Commit

Permalink
Partition-scoped storage reader in invoker
Browse files Browse the repository at this point in the history
This changes the invoker to allow partitions to pass down scoped storage readers along registration messages in invoker. This means that the invoker uses the partition-supplied storage reader for invocations owned by this particular partition.
Additionally, this attempts to consolidate (in most places) the journal reader and state reader under a single generic type.
  • Loading branch information
AhmedSoliman committed Apr 29, 2024
1 parent 3ce2709 commit 94af803
Show file tree
Hide file tree
Showing 13 changed files with 203 additions and 142 deletions.
3 changes: 2 additions & 1 deletion crates/invoker-api/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub enum InvokeInputJournal {
CachedJournal(JournalMetadata, Vec<PlainRawEntry>),
}

pub trait ServiceHandle {
pub trait ServiceHandle<SR> {
type Future: Future<Output = Result<(), NotRunningError>>;

fn invoke(
Expand Down Expand Up @@ -65,6 +65,7 @@ pub trait ServiceHandle {
&mut self,
partition: PartitionLeaderEpoch,
partition_key_range: RangeInclusive<PartitionKey>,
storage_reader: SR,
sender: mpsc::Sender<Effect>,
) -> Self::Future;
}
25 changes: 0 additions & 25 deletions crates/invoker-api/src/journal_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,28 +46,3 @@ pub trait JournalReader {
fid: &'a InvocationId,
) -> impl Future<Output = Result<(JournalMetadata, Self::JournalStream), Self::Error>> + Send;
}

#[cfg(any(test, feature = "mocks"))]
pub mod mocks {
use super::*;
use restate_types::invocation::ServiceInvocationSpanContext;
use std::convert::Infallible;

#[derive(Debug, Clone)]
pub struct EmptyJournalReader;

impl JournalReader for EmptyJournalReader {
type JournalStream = futures::stream::Empty<PlainRawEntry>;
type Error = Infallible;

async fn read_journal<'a>(
&'a mut self,
_sid: &'a InvocationId,
) -> Result<(JournalMetadata, Self::JournalStream), Self::Error> {
Ok((
JournalMetadata::new(0, ServiceInvocationSpanContext::empty(), None),
futures::stream::empty(),
))
}
}
}
41 changes: 41 additions & 0 deletions crates/invoker-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,44 @@ pub use handle::*;
pub use journal_reader::{JournalMetadata, JournalReader};
pub use state_reader::{EagerState, StateReader};
pub use status_handle::{InvocationErrorReport, InvocationStatusReport, StatusHandle};

#[cfg(any(test, feature = "mocks"))]
pub mod mocks {
use super::*;
use bytes::Bytes;
use restate_types::identifiers::{InvocationId, ServiceId};
use restate_types::invocation::ServiceInvocationSpanContext;
use restate_types::journal::raw::PlainRawEntry;
use std::convert::Infallible;
use std::iter::empty;

#[derive(Debug, Clone)]
pub struct EmptyStorageReader;

impl JournalReader for EmptyStorageReader {
type JournalStream = futures::stream::Empty<PlainRawEntry>;
type Error = Infallible;

async fn read_journal<'a>(
&'a mut self,
_sid: &'a InvocationId,
) -> Result<(JournalMetadata, Self::JournalStream), Self::Error> {
Ok((
JournalMetadata::new(0, ServiceInvocationSpanContext::empty(), None),
futures::stream::empty(),
))
}
}

impl StateReader for EmptyStorageReader {
type StateIter = std::iter::Empty<(Bytes, Bytes)>;
type Error = Infallible;

async fn read_state<'a>(
&'a mut self,
_service_id: &'a ServiceId,
) -> Result<EagerState<Self::StateIter>, Self::Error> {
Ok(EagerState::new_complete(empty()))
}
}
}
24 changes: 0 additions & 24 deletions crates/invoker-api/src/state_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,27 +75,3 @@ pub trait StateReader {
service_id: &'a ServiceId,
) -> impl Future<Output = Result<EagerState<Self::StateIter>, Self::Error>> + Send;
}

#[cfg(any(test, feature = "mocks"))]
pub mod mocks {
use crate::{EagerState, StateReader};
use bytes::Bytes;
use restate_types::identifiers::ServiceId;
use std::convert::Infallible;
use std::iter::empty;

#[derive(Debug, Clone)]
pub struct EmptyStateReader;

impl StateReader for EmptyStateReader {
type StateIter = std::iter::Empty<(Bytes, Bytes)>;
type Error = Infallible;

async fn read_state<'a>(
&'a mut self,
_service_id: &'a ServiceId,
) -> Result<EagerState<Self::StateIter>, Self::Error> {
Ok(EagerState::new_complete(empty()))
}
}
}
12 changes: 7 additions & 5 deletions crates/invoker-impl/src/input_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub(crate) struct InvokeCommand {
}

#[derive(Debug)]
pub(crate) enum InputCommand {
pub(crate) enum InputCommand<SR> {
Invoke(InvokeCommand),
Completion {
partition: PartitionLeaderEpoch,
Expand Down Expand Up @@ -58,20 +58,20 @@ pub(crate) enum InputCommand {
RegisterPartition {
partition: PartitionLeaderEpoch,
partition_key_range: RangeInclusive<PartitionKey>,
storage_reader: SR,
sender: mpsc::Sender<Effect>,
},
}

// -- Handles implementations. This is just glue code between the Input<Command> and the interfaces

#[derive(Debug, Clone)]
pub struct ChannelServiceHandle {
pub(super) input: mpsc::UnboundedSender<InputCommand>,
pub struct InvokerHandle<SR> {
pub(super) input: mpsc::UnboundedSender<InputCommand<SR>>,
}

impl ServiceHandle for ChannelServiceHandle {
impl<SR> ServiceHandle<SR> for InvokerHandle<SR> {
type Future = futures::future::Ready<Result<(), NotRunningError>>;

fn invoke(
&mut self,
partition: PartitionLeaderEpoch,
Expand Down Expand Up @@ -152,6 +152,7 @@ impl ServiceHandle for ChannelServiceHandle {
&mut self,
partition: PartitionLeaderEpoch,
partition_key_range: RangeInclusive<PartitionKey>,
storage_reader: SR,
sender: mpsc::Sender<Effect>,
) -> Self::Future {
futures::future::ready(
Expand All @@ -160,6 +161,7 @@ impl ServiceHandle for ChannelServiceHandle {
partition,
partition_key_range,
sender,
storage_reader,
})
.map_err(|_| NotRunningError),
)
Expand Down
12 changes: 6 additions & 6 deletions crates/invoker-impl/src/invocation_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl From<InvocationTaskError> for InvocationTaskOutputInner {
}

/// Represents an open invocation stream
pub(super) struct InvocationTask<JR, SR, EE, DMR> {
pub(super) struct InvocationTask<SR, JR, EE, DMR> {
// Shared client
client: ServiceClient,

Expand All @@ -253,8 +253,8 @@ pub(super) struct InvocationTask<JR, SR, EE, DMR> {
disable_eager_state: bool,

// Invoker tx/rx
journal_reader: JR,
state_reader: SR,
journal_reader: JR,
entry_enricher: EE,
deployment_metadata_resolver: DMR,
invoker_tx: mpsc::UnboundedSender<InvocationTaskOutput>,
Expand Down Expand Up @@ -297,11 +297,11 @@ macro_rules! shortcircuit {
};
}

impl<JR, SR, EE, DMR> InvocationTask<JR, SR, EE, DMR>
impl<SR, JR, EE, DMR> InvocationTask<SR, JR, EE, DMR>
where
SR: StateReader + StateReader + Clone + Send + Sync + 'static,
JR: JournalReader + Clone + Send + Sync + 'static,
<JR as JournalReader>::JournalStream: Unpin + Send + 'static,
SR: StateReader + Clone + Send + Sync + 'static,
<SR as StateReader>::StateIter: Send,
EE: EntryEnricher,
DMR: DeploymentResolver,
Expand All @@ -318,8 +318,8 @@ where
disable_eager_state: bool,
message_size_warning: usize,
message_size_limit: Option<usize>,
journal_reader: JR,
state_reader: SR,
journal_reader: JR,
entry_enricher: EE,
deployment_metadata_resolver: DMR,
invoker_tx: mpsc::UnboundedSender<InvocationTaskOutput>,
Expand All @@ -334,8 +334,8 @@ where
abort_timeout,
disable_eager_state,
next_journal_index: 0,
journal_reader,
state_reader,
journal_reader,
entry_enricher,
deployment_metadata_resolver,
invoker_tx,
Expand Down
Loading

0 comments on commit 94af803

Please sign in to comment.