Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set and validate supported service discovery protocol version #1526

Merged
merged 6 commits into from
May 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/admin/src/schema_registry/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use http::Uri;
use restate_core::metadata_store::ReadModifyWriteError;
use restate_core::ShutdownError;
use restate_schema_api::invocation_target::BadInputContentType;
use restate_service_protocol::discovery::schema;
use restate_types::endpoint_manifest;
use restate_types::errors::GenericError;
use restate_types::identifiers::DeploymentId;
use restate_types::invocation::ServiceType;
Expand Down Expand Up @@ -91,7 +91,7 @@ pub enum ServiceError {
BadOutputContentType(String, InvalidHeaderValue),
#[error("invalid combination of service type and handler type '({0}, {1:?})'")]
#[code(unknown)]
BadServiceAndHandlerType(ServiceType, Option<schema::HandlerType>),
BadServiceAndHandlerType(ServiceType, Option<endpoint_manifest::HandlerType>),
#[error("modifying retention time for service type {0} is unsupported")]
#[code(unknown)]
CannotModifyRetentionTime(ServiceType),
Expand Down
67 changes: 34 additions & 33 deletions crates/admin/src/schema_registry/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use restate_schema_api::invocation_target::{
use restate_schema_api::subscription::{
EventReceiverServiceType, Sink, Source, Subscription, SubscriptionValidator,
};
use restate_service_protocol::discovery::schema;
use restate_types::endpoint_manifest;
use restate_types::identifiers::{DeploymentId, SubscriptionId};
use restate_types::invocation::{
InvocationTargetType, ServiceType, VirtualObjectHandlerType, WorkflowHandlerType,
Expand Down Expand Up @@ -65,7 +65,7 @@ impl SchemaUpdater {
&mut self,
requested_deployment_id: Option<DeploymentId>,
deployment_metadata: DeploymentMetadata,
services: Vec<schema::Service>,
services: Vec<endpoint_manifest::Service>,
force: bool,
) -> Result<DeploymentId, SchemaError> {
let deployment_id: Option<DeploymentId>;
Expand Down Expand Up @@ -450,22 +450,23 @@ struct DiscoveredHandlerMetadata {
impl DiscoveredHandlerMetadata {
fn from_schema(
service_type: ServiceType,
handler: schema::Handler,
handler: endpoint_manifest::Handler,
) -> Result<Self, ServiceError> {
let ty = match (service_type, handler.ty) {
(ServiceType::Service, None | Some(schema::HandlerType::Shared)) => {
(ServiceType::Service, None | Some(endpoint_manifest::HandlerType::Shared)) => {
InvocationTargetType::Service
}
(ServiceType::VirtualObject, None | Some(schema::HandlerType::Exclusive)) => {
InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive)
}
(ServiceType::VirtualObject, Some(schema::HandlerType::Shared)) => {
(
ServiceType::VirtualObject,
None | Some(endpoint_manifest::HandlerType::Exclusive),
) => InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive),
(ServiceType::VirtualObject, Some(endpoint_manifest::HandlerType::Shared)) => {
InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Shared)
}
(ServiceType::Workflow, None | Some(schema::HandlerType::Shared)) => {
(ServiceType::Workflow, None | Some(endpoint_manifest::HandlerType::Shared)) => {
InvocationTargetType::Workflow(WorkflowHandlerType::Shared)
}
(ServiceType::Workflow, Some(schema::HandlerType::Workflow)) => {
(ServiceType::Workflow, Some(endpoint_manifest::HandlerType::Workflow)) => {
InvocationTargetType::Workflow(WorkflowHandlerType::Workflow)
}
_ => {
Expand Down Expand Up @@ -494,7 +495,7 @@ impl DiscoveredHandlerMetadata {

fn input_rules_from_schema(
handler_name: &str,
schema: schema::InputPayload,
schema: endpoint_manifest::InputPayload,
) -> Result<InputRules, ServiceError> {
let required = schema.required.unwrap_or(false);

Expand Down Expand Up @@ -527,7 +528,7 @@ impl DiscoveredHandlerMetadata {
}

fn output_rules_from_schema(
schema: schema::OutputPayload,
schema: endpoint_manifest::OutputPayload,
) -> Result<OutputRules, ServiceError> {
Ok(if let Some(ct) = schema.content_type {
OutputRules {
Expand Down Expand Up @@ -589,11 +590,11 @@ mod tests {
const GREETER_SERVICE_NAME: &str = "greeter.Greeter";
const ANOTHER_GREETER_SERVICE_NAME: &str = "greeter.AnotherGreeter";

fn greeter_service() -> schema::Service {
schema::Service {
ty: schema::ServiceType::Service,
fn greeter_service() -> endpoint_manifest::Service {
endpoint_manifest::Service {
ty: endpoint_manifest::ServiceType::Service,
name: GREETER_SERVICE_NAME.parse().unwrap(),
handlers: vec![schema::Handler {
handlers: vec![endpoint_manifest::Handler {
name: "greet".parse().unwrap(),
ty: None,
input: None,
Expand All @@ -602,11 +603,11 @@ mod tests {
}
}

fn greeter_virtual_object() -> schema::Service {
schema::Service {
ty: schema::ServiceType::VirtualObject,
fn greeter_virtual_object() -> endpoint_manifest::Service {
endpoint_manifest::Service {
ty: endpoint_manifest::ServiceType::VirtualObject,
name: GREETER_SERVICE_NAME.parse().unwrap(),
handlers: vec![schema::Handler {
handlers: vec![endpoint_manifest::Handler {
name: "greet".parse().unwrap(),
ty: None,
input: None,
Expand All @@ -615,11 +616,11 @@ mod tests {
}
}

fn another_greeter_service() -> schema::Service {
schema::Service {
ty: schema::ServiceType::Service,
fn another_greeter_service() -> endpoint_manifest::Service {
endpoint_manifest::Service {
ty: endpoint_manifest::ServiceType::Service,
name: ANOTHER_GREETER_SERVICE_NAME.parse().unwrap(),
handlers: vec![schema::Handler {
handlers: vec![endpoint_manifest::Handler {
name: "another_greeter".parse().unwrap(),
ty: None,
input: None,
Expand Down Expand Up @@ -918,18 +919,18 @@ mod tests {
use restate_test_util::{check, let_assert};
use test_log::test;

fn greeter_v1_service() -> schema::Service {
schema::Service {
ty: schema::ServiceType::Service,
fn greeter_v1_service() -> endpoint_manifest::Service {
endpoint_manifest::Service {
ty: endpoint_manifest::ServiceType::Service,
name: GREETER_SERVICE_NAME.parse().unwrap(),
handlers: vec![
schema::Handler {
endpoint_manifest::Handler {
name: "greet".parse().unwrap(),
ty: None,
input: None,
output: None,
},
schema::Handler {
endpoint_manifest::Handler {
name: "doSomething".parse().unwrap(),
ty: None,
input: None,
Expand All @@ -939,11 +940,11 @@ mod tests {
}
}

fn greeter_v2_service() -> schema::Service {
schema::Service {
ty: schema::ServiceType::Service,
fn greeter_v2_service() -> endpoint_manifest::Service {
endpoint_manifest::Service {
ty: endpoint_manifest::ServiceType::Service,
name: GREETER_SERVICE_NAME.parse().unwrap(),
handlers: vec![schema::Handler {
handlers: vec![endpoint_manifest::Handler {
name: "greet".parse().unwrap(),
ty: None,
input: None,
Expand Down
8 changes: 8 additions & 0 deletions crates/errors/src/error_codes/RT0013.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
## RT0013

The service endpoint does not support any of the supported service protocol versions of the server. Therefore, the server cannot talk to this endpoint. Please make sure that the service endpoint's SDK and the Restate server are compatible.

Suggestions:

* Register a service endpoint which uses an SDK which is compatible with the used server
* Upgrade the server to a version which is compatible with the used SDK
8 changes: 8 additions & 0 deletions crates/errors/src/error_codes/RT0014.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
## RT0014

The server cannot resume an in-flight invocation which has been started with a now incompatible service protocol version. Restate does not support upgrading service protocols yet.

Suggestions:

* Downgrade the server to a version which is compatible with the used service protocol version
* Kill the affected invocation via the CLI.
5 changes: 3 additions & 2 deletions crates/errors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ mod helper;
// META are meta related errors.

declare_restate_error_codes!(
RT0001, RT0002, RT0003, RT0004, RT0005, RT0006, RT0007, RT0009, RT0010, RT0011, RT0012,
META0003, META0004, META0005, META0006, META0009, META0010, META0011, META0012, META0013
RT0001, RT0002, RT0003, RT0004, RT0005, RT0006, RT0007, RT0009, RT0010, RT0011, RT0012, RT0013,
RT0014, META0003, META0004, META0005, META0006, META0009, META0010, META0011, META0012,
META0013
);

// -- Some commonly used errors
Expand Down
3 changes: 2 additions & 1 deletion crates/invoker-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ mocks = []
serde = ["dep:serde"]

[dependencies]
restate-types = { workspace = true }
restate-errors = { workspace = true }
restate-types = { workspace = true }


anyhow = { workspace = true }
bytes = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions crates/invoker-api/src/effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use restate_types::deployment::PinnedDeployment;
use restate_types::errors::InvocationError;
use restate_types::identifiers::EntryIndex;
use restate_types::identifiers::InvocationId;
use restate_types::identifiers::{DeploymentId, EntryIndex};
use restate_types::journal::enriched::EnrichedRawEntry;
use std::collections::HashSet;

Expand All @@ -25,7 +26,7 @@ pub struct Effect {
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum EffectKind {
/// This is sent before any new entry is created by the invoker. This won't be sent if the deployment_id is already set.
SelectedDeployment(DeploymentId),
PinnedDeployment(PinnedDeployment),
JournalEntry {
entry_index: EntryIndex,
entry: EnrichedRawEntry,
Expand Down
9 changes: 5 additions & 4 deletions crates/invoker-api/src/journal_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
// by the Apache License, Version 2.0.

use futures::Stream;
use restate_types::identifiers::{DeploymentId, InvocationId};
use restate_types::deployment::PinnedDeployment;
use restate_types::identifiers::InvocationId;
use restate_types::invocation::ServiceInvocationSpanContext;
use restate_types::journal::raw::PlainRawEntry;
use restate_types::journal::EntryIndex;
Expand All @@ -20,17 +21,17 @@ use std::future::Future;
pub struct JournalMetadata {
pub length: EntryIndex,
pub span_context: ServiceInvocationSpanContext,
pub deployment_id: Option<DeploymentId>,
pub pinned_deployment: Option<PinnedDeployment>,
}

impl JournalMetadata {
pub fn new(
length: EntryIndex,
span_context: ServiceInvocationSpanContext,
deployment_id: Option<DeploymentId>,
pinned_deployment: Option<PinnedDeployment>,
) -> Self {
Self {
deployment_id,
pinned_deployment,
span_context,
length,
}
Expand Down
Loading
Loading