diff --git a/crates/storage-query-datafusion/src/context.rs b/crates/storage-query-datafusion/src/context.rs index aab50a3f5..497a23f9a 100644 --- a/crates/storage-query-datafusion/src/context.rs +++ b/crates/storage-query-datafusion/src/context.rs @@ -137,8 +137,9 @@ impl QueryContext { crate::idempotency::register_self( &ctx, partition_selector.clone(), - partition_store_manager, + partition_store_manager.clone(), )?; + crate::promise::register_self(&ctx, partition_selector.clone(), partition_store_manager)?; let ctx = ctx .datafusion_context diff --git a/crates/storage-query-datafusion/src/lib.rs b/crates/storage-query-datafusion/src/lib.rs index 05c89aeed..ccd710ad1 100644 --- a/crates/storage-query-datafusion/src/lib.rs +++ b/crates/storage-query-datafusion/src/lib.rs @@ -19,6 +19,7 @@ mod journal; mod keyed_service_status; mod partition_store_scanner; mod physical_optimizer; +mod promise; mod service; mod state; mod table_macro; diff --git a/crates/storage-query-datafusion/src/promise/mod.rs b/crates/storage-query-datafusion/src/promise/mod.rs new file mode 100644 index 000000000..dca1c728e --- /dev/null +++ b/crates/storage-query-datafusion/src/promise/mod.rs @@ -0,0 +1,15 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +mod row; +mod schema; +mod table; + +pub(crate) use table::register_self; diff --git a/crates/storage-query-datafusion/src/promise/row.rs b/crates/storage-query-datafusion/src/promise/row.rs new file mode 100644 index 000000000..1a2d33bd9 --- /dev/null +++ b/crates/storage-query-datafusion/src/promise/row.rs @@ -0,0 +1,55 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use super::schema::PromiseBuilder; + +use crate::table_util::format_using; +use restate_storage_api::promise_table::{OwnedPromiseRow, PromiseState}; +use restate_types::errors::InvocationError; +use restate_types::identifiers::WithPartitionKey; +use restate_types::journal::EntryResult; + +#[inline] +pub(crate) fn append_promise_row( + builder: &mut PromiseBuilder, + output: &mut String, + owned_promise_row: OwnedPromiseRow, +) { + let mut row = builder.row(); + row.partition_key(owned_promise_row.service_id.partition_key()); + + row.service_name(&owned_promise_row.service_id.service_name); + row.service_key(&owned_promise_row.service_id.key); + row.key(&owned_promise_row.key); + + match owned_promise_row.metadata.state { + PromiseState::Completed(c) => { + row.completed(true); + match c { + EntryResult::Success(s) => { + row.completion_success_value(&s); + if row.is_completion_success_value_utf8_defined() { + if let Ok(str) = std::str::from_utf8(&s) { + row.completion_success_value_utf8(str); + } + } + } + EntryResult::Failure(c, m) => { + if row.is_completion_failure_defined() { + row.completion_failure(format_using(output, &InvocationError::new(c, m))) + } + } + } + } + PromiseState::NotCompleted(_) => { + row.completed(false); + } + } +} diff --git a/crates/storage-query-datafusion/src/promise/schema.rs b/crates/storage-query-datafusion/src/promise/schema.rs new file mode 100644 index 000000000..17cf0f377 --- /dev/null +++ b/crates/storage-query-datafusion/src/promise/schema.rs @@ -0,0 +1,29 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +#![allow(dead_code)] + +use crate::table_macro::*; + +use datafusion::arrow::datatypes::DataType; + +define_table!(promise( + partition_key: DataType::UInt64, + + service_name: DataType::LargeUtf8, + service_key: DataType::LargeUtf8, + + key: DataType::LargeUtf8, + completed: DataType::Boolean, + + completion_success_value: DataType::LargeBinary, + completion_success_value_utf8: DataType::LargeUtf8, + completion_failure: DataType::LargeUtf8 +)); diff --git a/crates/storage-query-datafusion/src/promise/table.rs b/crates/storage-query-datafusion/src/promise/table.rs new file mode 100644 index 000000000..5338ea546 --- /dev/null +++ b/crates/storage-query-datafusion/src/promise/table.rs @@ -0,0 +1,86 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::fmt::Debug; +use std::ops::RangeInclusive; +use std::sync::Arc; + +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::record_batch::RecordBatch; +use futures::{Stream, StreamExt}; +use tokio::sync::mpsc::Sender; + +use restate_partition_store::{PartitionStore, PartitionStoreManager}; +use restate_storage_api::promise_table::{OwnedPromiseRow, ReadOnlyPromiseTable}; +use restate_types::identifiers::PartitionKey; + +use super::row::append_promise_row; +use super::schema::PromiseBuilder; +use crate::context::{QueryContext, SelectPartitions}; +use crate::partition_store_scanner::{LocalPartitionsScanner, ScanLocalPartition}; +use crate::table_providers::PartitionedTableProvider; + +pub(crate) fn register_self( + ctx: &QueryContext, + partition_selector: impl SelectPartitions, + partition_store_manager: PartitionStoreManager, +) -> datafusion::common::Result<()> { + let table = PartitionedTableProvider::new( + partition_selector, + PromiseBuilder::schema(), + LocalPartitionsScanner::new(partition_store_manager, PromiseScanner), + ); + + ctx.as_ref() + .register_table("sys_promise", Arc::new(table)) + .map(|_| ()) +} + +#[derive(Clone, Debug)] +struct PromiseScanner; + +impl ScanLocalPartition for PromiseScanner { + async fn scan_partition_store( + mut partition_store: PartitionStore, + tx: Sender>, + range: RangeInclusive, + projection: SchemaRef, + ) { + for_each_state(projection, tx, partition_store.all_promises(range)).await; + } +} + +async fn for_each_state( + schema: SchemaRef, + tx: Sender>, + rows: impl Stream>, +) { + let mut builder = PromiseBuilder::new(schema.clone()); + let mut temp = String::new(); + + tokio::pin!(rows); + while let Some(Ok(owned_promise_row)) = rows.next().await { + append_promise_row(&mut builder, &mut temp, owned_promise_row); + if builder.full() { + let batch = builder.finish(); + if tx.send(Ok(batch)).await.is_err() { + // not sure what to do here? + // the other side has hung up on us. + // we probably don't want to panic, is it will cause the entire process to exit + return; + } + builder = PromiseBuilder::new(schema.clone()); + } + } + if !builder.empty() { + let result = builder.finish(); + let _ = tx.send(Ok(result)).await; + } +}