From f5817212035ad6a7d4db3eee7f8a65e4a7fca0e7 Mon Sep 17 00:00:00 2001 From: Giselle van Dongen Date: Thu, 28 Nov 2024 10:12:53 +0100 Subject: [PATCH] Use case pages python (#483) --- .github/workflows/pre-release.yml | 6 +- .gitignore | 2 + code_snippets/go/develop/serving.go | 1 + code_snippets/go/getstarted/tour.go | 2 +- .../main/java/develop/JournalingResults.java | 38 -- .../main/java/develop/ServingIdentity.java | 2 +- .../main/java/develop/clients/Ingress.java | 2 +- .../main/java/develop/signals/MyWorkflow.java | 41 -- .../synctoasync/DataPreparationService.java | 13 +- .../asynctasks/synctoasync/MyClient.java | 11 +- .../usecases/workflows/SignupWorkflow.java | 5 - .../kotlin/src/main/kotlin/develop/Greeter.kt | 3 - .../main/kotlin/develop/JournalingResults.kt | 29 - .../kotlin/develop/ServiceCommunication.kt | 4 +- .../main/kotlin/develop/ServingIdentity.kt | 2 +- .../main/kotlin/develop/clients/Ingress.kt | 6 +- .../synctoasync/DataPreparationService.kt | 8 +- .../asynctasks/synctoasync/MyClient.kt | 11 +- .../usecases/workflows/SignupWorkflow.kt | 5 - code_snippets/python/README.md | 11 + code_snippets/python/hypercorn-config.toml | 6 + code_snippets/python/requirements.txt | 7 +- .../python/src/develop/journaling_results.py | 2 +- code_snippets/python/src/develop/serving.py | 2 +- .../python/src/get_started/checkout.py | 4 +- code_snippets/python/src/get_started/tour.py | 2 +- .../python/src/use_cases/__init__.py | 0 .../src/use_cases/async_tasks/__init__.py | 0 .../use_cases/async_tasks/fan_out_worker.py | 54 ++ .../async_tasks/simple_async_task/__init__.py | 0 .../simple_async_task/async_task_service.py | 26 + .../simple_async_task/task_submitter.py | 36 ++ .../async_tasks/sync_to_async/__init__.py | 0 .../async_tasks/sync_to_async/client.py | 38 ++ .../sync_to_async/data_preparation_service.py | 46 ++ .../use_cases/event_processing/__init__.py | 0 .../event_processing/event_processing.py | 62 +++ .../event_processing/events_state.py | 68 +++ .../src/use_cases/microservices/__init__.py | 0 .../use_cases/microservices/idempotency.py | 12 + .../use_cases/microservices/role_updater.py | 74 +++ .../src/use_cases/workflows/__init__.py | 0 .../use_cases/workflows/signup_workflow.py | 79 +++ .../python/src/use_cases/workflows/submit.py | 25 + code_snippets/ts/package-lock.json | 3 +- code_snippets/ts/package.json | 2 +- .../ts/src/develop/clients/ingress.ts | 2 +- code_snippets/ts/src/develop/serving.ts | 6 +- .../ts/src/develop/serving_lambda.ts | 2 +- code_snippets/ts/src/get_started/tour.ts | 6 +- .../async_tasks/sync_to_async/client.ts | 21 +- .../sync_to_async/data_preparation_service.ts | 3 +- .../event_processing/events_state.ts | 9 +- .../use_cases/microservices/idempotency.ts | 2 +- .../use_cases/workflows/signup_workflow.ts | 5 +- docs/develop/java/service-communication.mdx | 2 - docs/use-cases/async-tasks.mdx | 524 +++++++++++++----- docs/use-cases/event-processing.mdx | 165 +++++- docs/use-cases/microservice-orchestration.mdx | 324 ++++++----- docs/use-cases/workflows.mdx | 117 +++- src/components/FeatureWidget/index.tsx | 28 + src/components/code/scrollycoding.module.css | 1 + src/components/code/scrollycoding.tsx | 7 +- 63 files changed, 1462 insertions(+), 512 deletions(-) delete mode 100644 code_snippets/java/src/main/java/develop/signals/MyWorkflow.java create mode 100644 code_snippets/python/README.md create mode 100644 code_snippets/python/hypercorn-config.toml create mode 100644 code_snippets/python/src/use_cases/__init__.py create mode 100644 code_snippets/python/src/use_cases/async_tasks/__init__.py create mode 100644 code_snippets/python/src/use_cases/async_tasks/fan_out_worker.py create mode 100644 code_snippets/python/src/use_cases/async_tasks/simple_async_task/__init__.py create mode 100644 code_snippets/python/src/use_cases/async_tasks/simple_async_task/async_task_service.py create mode 100644 code_snippets/python/src/use_cases/async_tasks/simple_async_task/task_submitter.py create mode 100644 code_snippets/python/src/use_cases/async_tasks/sync_to_async/__init__.py create mode 100644 code_snippets/python/src/use_cases/async_tasks/sync_to_async/client.py create mode 100644 code_snippets/python/src/use_cases/async_tasks/sync_to_async/data_preparation_service.py create mode 100644 code_snippets/python/src/use_cases/event_processing/__init__.py create mode 100644 code_snippets/python/src/use_cases/event_processing/event_processing.py create mode 100644 code_snippets/python/src/use_cases/event_processing/events_state.py create mode 100644 code_snippets/python/src/use_cases/microservices/__init__.py create mode 100644 code_snippets/python/src/use_cases/microservices/idempotency.py create mode 100644 code_snippets/python/src/use_cases/microservices/role_updater.py create mode 100644 code_snippets/python/src/use_cases/workflows/__init__.py create mode 100644 code_snippets/python/src/use_cases/workflows/signup_workflow.py create mode 100644 code_snippets/python/src/use_cases/workflows/submit.py diff --git a/.github/workflows/pre-release.yml b/.github/workflows/pre-release.yml index cfde6f7d..836c2e16 100644 --- a/.github/workflows/pre-release.yml +++ b/.github/workflows/pre-release.yml @@ -136,9 +136,9 @@ jobs: cd code_snippets/python python3 -m venv .venv source .venv/bin/activate - pip install -r requirements.txt - pip install mypy - python3 -m mypy . + python3 -m pip install -r requirements.txt + python3 -m pip install mypy + python3 -m mypy ./src --install-types deactivate - name: Install dasel diff --git a/.gitignore b/.gitignore index 2c4b93e3..88279928 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,5 @@ yarn-error.log* /code_snippets/ts/node_modules/ /code_snippets/ts/dist/ /code_snippets/python/venv/ +/code_snippets/python/.venv/ +*__pycache__/ diff --git a/code_snippets/go/develop/serving.go b/code_snippets/go/develop/serving.go index ae450bae..9c5249d5 100644 --- a/code_snippets/go/develop/serving.go +++ b/code_snippets/go/develop/serving.go @@ -32,6 +32,7 @@ func serving() { // if err := server.NewRestate(). Bind(restate.Reflect(MyService{})). + // !mark WithIdentityV1("publickeyv1_w7YHemBctH5Ck2nQRQ47iBBqhNHy4FV7t2Usbye2A6f"). Start(context.Background(), ":9080"); err != nil { log.Fatal(err) diff --git a/code_snippets/go/getstarted/tour.go b/code_snippets/go/getstarted/tour.go index e953e408..2db6a8bb 100644 --- a/code_snippets/go/getstarted/tour.go +++ b/code_snippets/go/getstarted/tour.go @@ -38,7 +38,7 @@ type CheckoutRequest struct { /* // func (CheckoutService) Handle(ctx restate.Context, request CheckoutRequest) (bool, error) { - // withClass(1:3) highlight-line + // !mark(1:3) idempotencyKey := restate.Rand(ctx).UUID().String() ctx.Log().Info("Generated idempotency key", "idempotencyKey", idempotencyKey) return false, fmt.Errorf("Something happened!") diff --git a/code_snippets/java/src/main/java/develop/JournalingResults.java b/code_snippets/java/src/main/java/develop/JournalingResults.java index 3949c62e..0b6362b1 100644 --- a/code_snippets/java/src/main/java/develop/JournalingResults.java +++ b/code_snippets/java/src/main/java/develop/JournalingResults.java @@ -4,7 +4,6 @@ import dev.restate.sdk.Awakeable; import dev.restate.sdk.Context; import dev.restate.sdk.JsonSerdes; -import dev.restate.sdk.common.TerminalException; import java.util.UUID; class JournalingResults { @@ -15,43 +14,6 @@ void sideEffect(Context ctx) { String output = ctx.run(JsonSerdes.STRING, () -> doDbRequest()); // - var paymentClient = new PaymentClient(); - String txId = ""; - int amount = 1; - - // - ctx.run( - JsonSerdes.BOOLEAN, - () -> { - boolean result = paymentClient.call(txId, amount); - if (result) { - // withClass highlight-line - throw new IllegalStateException("Payment failed"); - } else { - return result; - } - }); - // - - // - try { - ctx.run( - JsonSerdes.BOOLEAN, - () -> { - boolean result = paymentClient.call(txId, amount); - if (result) { - // withClass highlight-line - throw new TerminalException( - TerminalException.INTERNAL_SERVER_ERROR_CODE, "Payment failed"); - } else { - return result; - } - }); - } catch (TerminalException e) { - // handle terminal error - } - // - Awakeable a1 = ctx.awakeable(JsonSerdes.BOOLEAN); Awakeable a2 = ctx.awakeable(JsonSerdes.BOOLEAN); Awakeable a3 = ctx.awakeable(JsonSerdes.BOOLEAN); diff --git a/code_snippets/java/src/main/java/develop/ServingIdentity.java b/code_snippets/java/src/main/java/develop/ServingIdentity.java index 224c04cf..c4511557 100644 --- a/code_snippets/java/src/main/java/develop/ServingIdentity.java +++ b/code_snippets/java/src/main/java/develop/ServingIdentity.java @@ -8,7 +8,7 @@ class MySecureApp { public static void main(String[] args) { RestateHttpEndpointBuilder.builder() .bind(new MyService()) - // withClass(1:3) highlight-line + // !mark .withRequestIdentityVerifier( RestateRequestIdentityVerifier.fromKeys( "publickeyv1_w7YHemBctH5Ck2nQRQ47iBBqhNHy4FV7t2Usbye2A6f")) diff --git a/code_snippets/java/src/main/java/develop/clients/Ingress.java b/code_snippets/java/src/main/java/develop/clients/Ingress.java index 70477bb4..3f1ad94f 100644 --- a/code_snippets/java/src/main/java/develop/clients/Ingress.java +++ b/code_snippets/java/src/main/java/develop/clients/Ingress.java @@ -55,7 +55,7 @@ public void idempotentInvoke() { Client rs = Client.connect("http://localhost:8080"); GreetCounterObjectClient.fromClient(rs, "Mary") .send() - // withClass highlight-line + // !mark .greet("Hi", CallRequestOptions.DEFAULT.withIdempotency("abcde")); // } diff --git a/code_snippets/java/src/main/java/develop/signals/MyWorkflow.java b/code_snippets/java/src/main/java/develop/signals/MyWorkflow.java deleted file mode 100644 index 86fea76e..00000000 --- a/code_snippets/java/src/main/java/develop/signals/MyWorkflow.java +++ /dev/null @@ -1,41 +0,0 @@ -package develop.signals; - -import dev.restate.sdk.JsonSerdes; -import dev.restate.sdk.SharedWorkflowContext; -import dev.restate.sdk.WorkflowContext; -import dev.restate.sdk.annotation.Handler; -import dev.restate.sdk.annotation.Workflow; -import dev.restate.sdk.common.DurablePromiseKey; - -// -@Workflow -public class MyWorkflow { - - // withClass highlight-line - DurablePromiseKey MY_BOOLEAN_SIGNAL = - DurablePromiseKey.of("my-boolean-signal", JsonSerdes.BOOLEAN); - - @Workflow - public String run(WorkflowContext ctx, String input) { - - // do some steps... - - // withClass highlight-line - // Creation of the Durable Promise - // withClass highlight-line - boolean signal = ctx.promise(MY_BOOLEAN_SIGNAL).awaitable().await(); - - // do some steps... - - return "success"; - } - - @Handler - public void resolveMySignal(SharedWorkflowContext ctx, boolean signal) { - // withClass highlight-line - // Resolution of the Durable Promise - // withClass highlight-line - ctx.promiseHandle(MY_BOOLEAN_SIGNAL).resolve(signal); - } -} -// diff --git a/code_snippets/java/src/main/java/usecases/asynctasks/synctoasync/DataPreparationService.java b/code_snippets/java/src/main/java/usecases/asynctasks/synctoasync/DataPreparationService.java index cbe2a17b..1cf3152a 100644 --- a/code_snippets/java/src/main/java/usecases/asynctasks/synctoasync/DataPreparationService.java +++ b/code_snippets/java/src/main/java/usecases/asynctasks/synctoasync/DataPreparationService.java @@ -7,7 +7,6 @@ import dev.restate.sdk.annotation.Shared; import dev.restate.sdk.annotation.Workflow; import dev.restate.sdk.common.DurablePromiseKey; -import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder; import dev.restate.sdk.serde.jackson.JacksonSerdes; import develop.workflows.Email; import usecases.utils.URL; @@ -20,22 +19,24 @@ public class DataPreparationService { DurablePromiseKey.of("url", JacksonSerdes.of(URL.class)); @Workflow - public URL run(WorkflowContext ctx, String userId) { + public URL run(WorkflowContext ctx) { + // URL url = ctx.run(JacksonSerdes.of(URL.class), () -> createS3Bucket()); ctx.run(() -> uploadData(url)); + // ctx.promiseHandle(URL_PROMISE).resolve(url); + // return url; + // } @Shared public void resultAsEmail(SharedWorkflowContext ctx, Email email) { + // URL url = ctx.promise(URL_PROMISE).awaitable().await(); + // ctx.run(() -> sendEmail(url, email)); } - - public static void main(String[] args) { - RestateHttpEndpointBuilder.builder().bind(new DataPreparationService()).buildAndListen(); - } } // diff --git a/code_snippets/java/src/main/java/usecases/asynctasks/synctoasync/MyClient.java b/code_snippets/java/src/main/java/usecases/asynctasks/synctoasync/MyClient.java index 7ca34aae..99c79b1a 100644 --- a/code_snippets/java/src/main/java/usecases/asynctasks/synctoasync/MyClient.java +++ b/code_snippets/java/src/main/java/usecases/asynctasks/synctoasync/MyClient.java @@ -6,18 +6,15 @@ import java.util.concurrent.TimeUnit; import usecases.asynctasks.synctoasync.DataPreparationServiceClient.IngressClient; -// public class MyClient { - - private static final Client rs = Client.connect("http://localhost:8080"); - + // public void downloadData(String userId, Email email) { // + Client rs = Client.connect("http://localhost:8080"); IngressClient client = DataPreparationServiceClient.fromClient(rs, userId); // - // - client.submit(userId); + client.submit(); // try { @@ -34,5 +31,5 @@ public void downloadData(String userId, Email email) { // // ... process directly ... } + // } -// diff --git a/code_snippets/java/src/main/java/usecases/workflows/SignupWorkflow.java b/code_snippets/java/src/main/java/usecases/workflows/SignupWorkflow.java index 14620b80..862141b4 100644 --- a/code_snippets/java/src/main/java/usecases/workflows/SignupWorkflow.java +++ b/code_snippets/java/src/main/java/usecases/workflows/SignupWorkflow.java @@ -71,11 +71,6 @@ public String getStage(SharedWorkflowContext ctx) { public void approveEmail(SharedWorkflowContext ctx, String secret) { ctx.promiseHandle(EMAIL_LINK).resolve(secret); } - - @Handler - public void rejectEmail(SharedWorkflowContext ctx) { - ctx.promiseHandle(EMAIL_LINK).reject("Abort verification"); - } // } // diff --git a/code_snippets/kotlin/src/main/kotlin/develop/Greeter.kt b/code_snippets/kotlin/src/main/kotlin/develop/Greeter.kt index 779d3bf2..466cce6f 100644 --- a/code_snippets/kotlin/src/main/kotlin/develop/Greeter.kt +++ b/code_snippets/kotlin/src/main/kotlin/develop/Greeter.kt @@ -6,16 +6,13 @@ import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder import dev.restate.sdk.kotlin.KtStateKey import dev.restate.sdk.kotlin.ObjectContext -// withClass tooltip java-overview-virtual-object @VirtualObject class Greeter { - // withClass(1:3) tooltip java-overview-state-key companion object { private val COUNT = KtStateKey.json("count") } - // withClass tooltip java-overview-virtual-object-handler @Handler suspend fun greet(ctx: ObjectContext, greeting: String): String { // Get the count and increment it diff --git a/code_snippets/kotlin/src/main/kotlin/develop/JournalingResults.kt b/code_snippets/kotlin/src/main/kotlin/develop/JournalingResults.kt index 91d4ee16..d19df67b 100644 --- a/code_snippets/kotlin/src/main/kotlin/develop/JournalingResults.kt +++ b/code_snippets/kotlin/src/main/kotlin/develop/JournalingResults.kt @@ -1,6 +1,5 @@ package develop -import dev.restate.sdk.common.TerminalException import dev.restate.sdk.kotlin.* import java.util.UUID @@ -14,34 +13,6 @@ internal class SideEffects { val txId = "" val amount = 1 - // - ctx.runBlock { - val result = paymentClient.call(txId, amount) - if (result) { - // withClass highlight-line - throw IllegalStateException("Payment failed") - } else { - result - } - } - // - - // - try { - ctx.runBlock { - val result = paymentClient.call(txId, amount) - if (result) { - // withClass highlight-line - throw TerminalException(TerminalException.INTERNAL_SERVER_ERROR_CODE, "Payment failed") - } else { - result - } - } - } catch (e: TerminalException) { - // handle terminal error - } - // - val a1 = ctx.awakeable() val a2 = ctx.awakeable() val a3 = ctx.awakeable() diff --git a/code_snippets/kotlin/src/main/kotlin/develop/ServiceCommunication.kt b/code_snippets/kotlin/src/main/kotlin/develop/ServiceCommunication.kt index 4d66f2d4..a89cac74 100644 --- a/code_snippets/kotlin/src/main/kotlin/develop/ServiceCommunication.kt +++ b/code_snippets/kotlin/src/main/kotlin/develop/ServiceCommunication.kt @@ -40,7 +40,7 @@ class ServiceCommunication { // MyServiceClient.fromContext(ctx) - // withClass highlight-line + // !mark .send() .myHandler(request) // @@ -51,7 +51,7 @@ class ServiceCommunication { // MyServiceClient.fromContext(ctx) - // withClass highlight-line + // !mark .send(1.seconds) .myHandler(request) // diff --git a/code_snippets/kotlin/src/main/kotlin/develop/ServingIdentity.kt b/code_snippets/kotlin/src/main/kotlin/develop/ServingIdentity.kt index ae4ddca1..274f0d76 100644 --- a/code_snippets/kotlin/src/main/kotlin/develop/ServingIdentity.kt +++ b/code_snippets/kotlin/src/main/kotlin/develop/ServingIdentity.kt @@ -7,7 +7,7 @@ import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder fun main() { RestateHttpEndpointBuilder.builder() .bind(MyService()) - // withClass(1:5) highlight-line + // !mark(1:5) .withRequestIdentityVerifier( RestateRequestIdentityVerifier.fromKeys( "publickeyv1_w7YHemBctH5Ck2nQRQ47iBBqhNHy4FV7t2Usbye2A6f", diff --git a/code_snippets/kotlin/src/main/kotlin/develop/clients/Ingress.kt b/code_snippets/kotlin/src/main/kotlin/develop/clients/Ingress.kt index 7b29ec25..15ed3d87 100644 --- a/code_snippets/kotlin/src/main/kotlin/develop/clients/Ingress.kt +++ b/code_snippets/kotlin/src/main/kotlin/develop/clients/Ingress.kt @@ -39,12 +39,12 @@ class Ingress { // val rs = Client.connect("http://localhost:8080") GreeterServiceClient.fromClient(rs) - // withClass highlight-line + // !mark .send(1.seconds) .greet("Hi") GreetCounterObjectClient.fromClient(rs, "Mary") - // withClass highlight-line + // !mark .send(1000.milliseconds) .greet("Hi") // @@ -55,7 +55,7 @@ class Ingress { val rs = Client.connect("http://localhost:8080") GreetCounterObjectClient.fromClient(rs, "Mary") .send() - // withClass highlight-line + // !mark .greet("Hi", CallRequestOptions.DEFAULT.withIdempotency("abcde")) // } diff --git a/code_snippets/kotlin/src/main/kotlin/usecases/asynctasks/synctoasync/DataPreparationService.kt b/code_snippets/kotlin/src/main/kotlin/usecases/asynctasks/synctoasync/DataPreparationService.kt index c9f8d6ac..9f640cfc 100644 --- a/code_snippets/kotlin/src/main/kotlin/usecases/asynctasks/synctoasync/DataPreparationService.kt +++ b/code_snippets/kotlin/src/main/kotlin/usecases/asynctasks/synctoasync/DataPreparationService.kt @@ -18,17 +18,23 @@ class DataPreparationService { } @Workflow - suspend fun run(ctx: WorkflowContext, userId: String): URL { + suspend fun run(ctx: WorkflowContext): URL { + // val url: URL = ctx.runBlock { createS3Bucket() } ctx.runBlock { uploadData(url) } + // ctx.promiseHandle(URL_PROMISE).resolve(url) + // return url + // } @Shared suspend fun resultAsEmail(ctx: SharedWorkflowContext, email: Email) { + // val url: URL = ctx.promise(URL_PROMISE).awaitable().await() + // ctx.runBlock { sendEmail(url, email) } } } diff --git a/code_snippets/kotlin/src/main/kotlin/usecases/asynctasks/synctoasync/MyClient.kt b/code_snippets/kotlin/src/main/kotlin/usecases/asynctasks/synctoasync/MyClient.kt index ff8f0dee..a7ba1d56 100644 --- a/code_snippets/kotlin/src/main/kotlin/usecases/asynctasks/synctoasync/MyClient.kt +++ b/code_snippets/kotlin/src/main/kotlin/usecases/asynctasks/synctoasync/MyClient.kt @@ -4,20 +4,17 @@ import dev.restate.sdk.client.Client import develop.workflows.Email import java.util.concurrent.TimeUnit -// class MyClient { - companion object { - private val rs: Client = Client.connect("http://localhost:8080") - } - + // suspend fun downloadData(userId: String, email: Email) { // + val rs: Client = Client.connect("http://localhost:8080") val client = DataPreparationServiceClient.fromClient(rs, userId) // // - client.submit(userId) + client.submit() // try { @@ -32,5 +29,5 @@ class MyClient { // // ... process directly ... } + // } -// diff --git a/code_snippets/kotlin/src/main/kotlin/usecases/workflows/SignupWorkflow.kt b/code_snippets/kotlin/src/main/kotlin/usecases/workflows/SignupWorkflow.kt index b6b87897..09cf6330 100644 --- a/code_snippets/kotlin/src/main/kotlin/usecases/workflows/SignupWorkflow.kt +++ b/code_snippets/kotlin/src/main/kotlin/usecases/workflows/SignupWorkflow.kt @@ -67,11 +67,6 @@ class SignupWorkflow { suspend fun approveEmail(ctx: SharedWorkflowContext, secret: String) { ctx.promiseHandle(EMAIL_LINK).resolve(secret) } - - @Handler - suspend fun rejectEmail(ctx: SharedWorkflowContext) { - ctx.promiseHandle(EMAIL_LINK).reject("Abort verification") - } // } // diff --git a/code_snippets/python/README.md b/code_snippets/python/README.md new file mode 100644 index 00000000..49e84868 --- /dev/null +++ b/code_snippets/python/README.md @@ -0,0 +1,11 @@ +# Python code snippets + +```shell +python3 -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt +``` + +```shell +python -m hypercorn --config hypercorn-config.toml example:app +``` diff --git a/code_snippets/python/hypercorn-config.toml b/code_snippets/python/hypercorn-config.toml new file mode 100644 index 00000000..f7ae2f8d --- /dev/null +++ b/code_snippets/python/hypercorn-config.toml @@ -0,0 +1,6 @@ +bind = "0.0.0.0:9080" +h2_max_concurrent_streams = 2147483647 +keep_alive_max_requests = 2147483647 +keep_alive_timeout = 2147483647 +workers = 8 + diff --git a/code_snippets/python/requirements.txt b/code_snippets/python/requirements.txt index 918ae906..35b440ff 100644 --- a/code_snippets/python/requirements.txt +++ b/code_snippets/python/requirements.txt @@ -1,2 +1,5 @@ -restate_sdk==0.2.1 -hypercorn \ No newline at end of file +restate_sdk==0.4.1 +hypercorn +pydantic +requests +types-requests \ No newline at end of file diff --git a/code_snippets/python/src/develop/journaling_results.py b/code_snippets/python/src/develop/journaling_results.py index b5f2e7e4..f6d44acf 100644 --- a/code_snippets/python/src/develop/journaling_results.py +++ b/code_snippets/python/src/develop/journaling_results.py @@ -10,6 +10,6 @@ async def do_db_request(): # ... implement ... return "my_result" - # withClass highlight-line + # !mark result = await ctx.run("database request", do_db_request) # diff --git a/code_snippets/python/src/develop/serving.py b/code_snippets/python/src/develop/serving.py index b9d714a9..07b8be24 100644 --- a/code_snippets/python/src/develop/serving.py +++ b/code_snippets/python/src/develop/serving.py @@ -10,7 +10,7 @@ # app = restate.app( services=[my_service], - # withClass highlight-line + # !mark identity_keys=["publickeyv1_w7YHemBctH5Ck2nQRQ47iBBqhNHy4FV7t2Usbye2A6f"] ) # diff --git a/code_snippets/python/src/get_started/checkout.py b/code_snippets/python/src/get_started/checkout.py index e3d965b8..0a9db510 100644 --- a/code_snippets/python/src/get_started/checkout.py +++ b/code_snippets/python/src/get_started/checkout.py @@ -15,12 +15,12 @@ # @checkout.handler() async def handle(ctx: ObjectContext, order: Order) -> bool: - # withClass highlight-line + # !mark total_price = len(order['tickets']) * 40 idempotency_key = await ctx.run("idempotency_key", lambda: str(uuid.uuid4())) - # withClass(1:3) highlight-line + # !mark(1:3) async def pay(): return await payment_client.call(idempotency_key, total_price) success = await ctx.run("payment", pay) diff --git a/code_snippets/python/src/get_started/tour.py b/code_snippets/python/src/get_started/tour.py index 5275285f..5db5581c 100644 --- a/code_snippets/python/src/get_started/tour.py +++ b/code_snippets/python/src/get_started/tour.py @@ -52,7 +52,7 @@ class Order(TypedDict): # @checkout.handler() async def handle(ctx: ObjectContext, order: Order) -> bool: - # withClass(1:3) highlight-line + # !mark(1:3) idempotency_key = await ctx.run("idempotency_key", lambda: str(uuid.uuid4())) print("My idempotency key is: ", idempotency_key) raise Exception("Something happened!") diff --git a/code_snippets/python/src/use_cases/__init__.py b/code_snippets/python/src/use_cases/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/code_snippets/python/src/use_cases/async_tasks/__init__.py b/code_snippets/python/src/use_cases/async_tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/code_snippets/python/src/use_cases/async_tasks/fan_out_worker.py b/code_snippets/python/src/use_cases/async_tasks/fan_out_worker.py new file mode 100644 index 00000000..e1bbc3e8 --- /dev/null +++ b/code_snippets/python/src/use_cases/async_tasks/fan_out_worker.py @@ -0,0 +1,54 @@ +import restate +from pydantic import BaseModel +from restate import Service, Context + + +class Task(BaseModel): + task_id: str + + +class Subtask(BaseModel): + task_id: str + subtask_id: str + + +def split(task) -> list[Subtask]: + return list() + + +def aggregate(results) -> list[Task]: + return list() + + +# +worker_service = Service("worker") + + +@worker_service.handler() +async def run(ctx: Context, task: Task): + subtasks = await ctx.run("split task", lambda: split(task)) + + result_promises = [] + # + for subtask in subtasks: + sub_result_promise = ctx.service_call(run_subtask, arg=subtask) + # + result_promises.append(sub_result_promise) + + # + results = [await promise for promise in result_promises] + # + return aggregate(results) + + +@worker_service.handler() +async def run_subtask(ctx: Context, subtask: Subtask): + # Processing logic goes here... + # Can be moved to a separate service to scale independently + pass + + +# +app = restate.app([worker_service]) +# +# diff --git a/code_snippets/python/src/use_cases/async_tasks/simple_async_task/__init__.py b/code_snippets/python/src/use_cases/async_tasks/simple_async_task/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/code_snippets/python/src/use_cases/async_tasks/simple_async_task/async_task_service.py b/code_snippets/python/src/use_cases/async_tasks/simple_async_task/async_task_service.py new file mode 100644 index 00000000..cc745273 --- /dev/null +++ b/code_snippets/python/src/use_cases/async_tasks/simple_async_task/async_task_service.py @@ -0,0 +1,26 @@ +from typing import TypedDict + +import restate +from restate import Service, Context + + +class TaskOpts(TypedDict): + id: str + + +def some_heavy_work(params: TaskOpts): + return "Work!" + + +# +async_task_service = Service("taskWorker") + + +# +@async_task_service.handler("runTask") +async def run_task(ctx: Context, params: TaskOpts): + return some_heavy_work(params) +# + +app = restate.app([async_task_service]) +# \ No newline at end of file diff --git a/code_snippets/python/src/use_cases/async_tasks/simple_async_task/task_submitter.py b/code_snippets/python/src/use_cases/async_tasks/simple_async_task/task_submitter.py new file mode 100644 index 00000000..bf2966eb --- /dev/null +++ b/code_snippets/python/src/use_cases/async_tasks/simple_async_task/task_submitter.py @@ -0,0 +1,36 @@ +import json + +import requests + +from src.use_cases.async_tasks.simple_async_task.async_task_service import TaskOpts + +RESTATE_URL = "http://localhost:8080" + + +# +def submit_and_await_task(task: TaskOpts): + # + idempotency_key = task["id"] + headers = { + # + "idempotency-key": idempotency_key, + # + "Content-Type": "application/json" + } + url = f"{RESTATE_URL}/taskWorker/runTask/send" + requests.post(url, json=json.dumps(task), headers=headers) + # + + # Do something else, with task running in the background + + # Attach back to the task to retrieve the result + # + # + attach_url = f"{RESTATE_URL}/restate/invocation/taskWorker/runTask/{idempotency_key}/attach" + response = requests.get(attach_url) + # + # +# + + +submit_and_await_task(TaskOpts(id="myTask123")) diff --git a/code_snippets/python/src/use_cases/async_tasks/sync_to_async/__init__.py b/code_snippets/python/src/use_cases/async_tasks/sync_to_async/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/code_snippets/python/src/use_cases/async_tasks/sync_to_async/client.py b/code_snippets/python/src/use_cases/async_tasks/sync_to_async/client.py new file mode 100644 index 00000000..866b7cad --- /dev/null +++ b/code_snippets/python/src/use_cases/async_tasks/sync_to_async/client.py @@ -0,0 +1,38 @@ +import asyncio +import requests +from pydantic import BaseModel + +class User(BaseModel): + id: str + email: str + + +RESTATE_URL = "http://localhost:8080" + + +# +def download_data(user: User): + headers = {"Content-Type": "application/json"} + try: + # + # + # + url = f"{RESTATE_URL}/dataPrep/{user.id}/run/send" + data_prep = requests.post(url, headers=headers, timeout=30) + # + # + # + # + except requests.exceptions.Timeout: + # Hit timeout... Mail us the link later + email_url = f"{RESTATE_URL}/dataPrep/{user.id}/resultAsEmail/send" + requests.post(email_url, json=user.email, headers=headers) + return + + # ... process result directly ... + # +# + + +async def read_line(prompt: str) -> str: + return await asyncio.get_event_loop().run_in_executor(None, input, prompt) diff --git a/code_snippets/python/src/use_cases/async_tasks/sync_to_async/data_preparation_service.py b/code_snippets/python/src/use_cases/async_tasks/sync_to_async/data_preparation_service.py new file mode 100644 index 00000000..efaf3dc9 --- /dev/null +++ b/code_snippets/python/src/use_cases/async_tasks/sync_to_async/data_preparation_service.py @@ -0,0 +1,46 @@ +from restate import WorkflowContext, WorkflowSharedContext, Workflow +from typing import TypedDict + + +class DataPrepService(TypedDict): + name: str + handlers: dict + + +def create_s3_bucket() -> str: + return "" + + +def upload_data(target: str): + pass + + +def send_email(url: str, email: str): + print(f" >>> Sending email to '{email}' with URL {url}") + + +# +data_preparation = Workflow("dataPrep") + + +@data_preparation.main() +async def run(ctx: WorkflowContext) -> str: + # + url = await ctx.run("bucket creation", lambda: create_s3_bucket()) + await ctx.run("upload", lambda: upload_data(url)) + + # + await ctx.promise("url").resolve(url) + # + + return url + # + + +@data_preparation.handler("resultAsEmail") +async def result_as_email(ctx: WorkflowSharedContext, email: str): + # + url = await ctx.promise("url").value() + # + await ctx.run("email", lambda: send_email(url, email)) +# diff --git a/code_snippets/python/src/use_cases/event_processing/__init__.py b/code_snippets/python/src/use_cases/event_processing/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/code_snippets/python/src/use_cases/event_processing/event_processing.py b/code_snippets/python/src/use_cases/event_processing/event_processing.py new file mode 100644 index 00000000..fd91996f --- /dev/null +++ b/code_snippets/python/src/use_cases/event_processing/event_processing.py @@ -0,0 +1,62 @@ +import random + +from pydantic import BaseModel +from restate import VirtualObject +from restate.exceptions import TerminalError + + +class UserUpdateEvent(BaseModel): + profile: str + permissions: str + resources: str + + +NOT_READY = "NOT_READY" + + +async def update_profile(profile: str) -> str: + return profile + "-id" if random.random() >= 0.8 else NOT_READY + + +async def set_permissions(user_id: str, permissions: str) -> str: + return permissions + + +async def provision_resources(user: str, role: str, resources: str): + pass + + +def verify_event(request: UserUpdateEvent) -> UserUpdateEvent: + return request + + +# +user_updates = VirtualObject("userUpdates") + + +# +@user_updates.handler() +async def update(ctx, event: UserUpdateEvent) -> None: + # + # + user_id = await ctx.run("update profile", + lambda: update_profile(event.profile)) + # + # + while user_id == NOT_READY: + # + await ctx.sleep(5000) + # + # + user_id = await ctx.run("update profile", + lambda: update_profile(event.profile)) + # + + # + role_id = await ctx.run("set permissions", + lambda: set_permissions(user_id, event.permissions)) + await ctx.run("provision resources", + lambda: provision_resources(user_id, role_id, event.resources)) + # + # +# diff --git a/code_snippets/python/src/use_cases/event_processing/events_state.py b/code_snippets/python/src/use_cases/event_processing/events_state.py new file mode 100644 index 00000000..e8888517 --- /dev/null +++ b/code_snippets/python/src/use_cases/event_processing/events_state.py @@ -0,0 +1,68 @@ +from datetime import timedelta + +import restate +from pydantic import BaseModel +from restate import ObjectContext, VirtualObject +from typing import Optional, List + +from restate.exceptions import TerminalError + + +class UserProfile(BaseModel): + id: str + name: str + email: str + features: Optional[List[str]] + + +def send(key: str, user: Optional[UserProfile]): + pass + + +# +event_enricher = VirtualObject("profile") + + +# +@event_enricher.handler() +async def process_name(ctx: ObjectContext, name: str): + # + # + ctx.set("user", {"name": name}) + # + # Schedule a delayed RPC call to emit the event + # + ctx.object_send(emit, ctx.key(), arg=None, send_delay=timedelta(seconds=60)) + # + + +# +@event_enricher.handler() +async def process_email(ctx: ObjectContext, email: str): + # + # + user_event = await ctx.get("user") + # + if user_event is None: + raise TerminalError("No user found") + + user_event["email"] = email + # + ctx.set("user", user_event) + # + + +# +@event_enricher.handler() +async def emit(ctx: ObjectContext): + # + # + user = await ctx.get("user") + # + send(ctx.key(), user) + # + ctx.clear_all() + # +# + +app = restate.app([event_enricher]) \ No newline at end of file diff --git a/code_snippets/python/src/use_cases/microservices/__init__.py b/code_snippets/python/src/use_cases/microservices/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/code_snippets/python/src/use_cases/microservices/idempotency.py b/code_snippets/python/src/use_cases/microservices/idempotency.py new file mode 100644 index 00000000..9092378d --- /dev/null +++ b/code_snippets/python/src/use_cases/microservices/idempotency.py @@ -0,0 +1,12 @@ +import requests + +url = "http://localhost:8080/productService/reserve" +payload = {"product_id": "myProduct123"} +headers = { + # + "idempotency-key": "myReservation123", + # + "Content-Type": "application/json" +} + +response = requests.post(url, json=payload, headers=headers) \ No newline at end of file diff --git a/code_snippets/python/src/use_cases/microservices/role_updater.py b/code_snippets/python/src/use_cases/microservices/role_updater.py new file mode 100644 index 00000000..2e991229 --- /dev/null +++ b/code_snippets/python/src/use_cases/microservices/role_updater.py @@ -0,0 +1,74 @@ +from typing import List + +import restate +from pydantic import BaseModel +from restate import VirtualObject, ObjectContext +from restate.exceptions import TerminalError + + +class Permission(BaseModel): + role: str + + +class Update(BaseModel): + user_id: str + role: str + permissions: List[Permission] + + +def get_current_role(user_id): + pass + + +def apply_user_role(user_id, role): + pass + + +def apply_permission(user_id, permission): + pass + + +async def rollback(ctx, user_id, previous_role, previous_permissions): + pass + + +# +# +role_updater = VirtualObject("roleUpdater") +# + + +# +# +@role_updater.handler() +async def update(ctx: ObjectContext, req: Update) -> bool: + + # Persist current role, then apply new role + # + # + # + previous_role = await ctx.run("current role", + lambda: get_current_role(req.user_id)) + await ctx.run("apply role", + lambda: apply_user_role(req.user_id, req.role)) + # + + # Apply permissions sequentially. Rollback if any fails + previous_permissions: List[Permission] = [] + for permission in req.permissions: + # + try: + # + previous = await ctx.run("apply permissions", + lambda: apply_permission(req.user_id, permission)) + # + previous_permissions.append(previous) + except TerminalError as err: + await rollback(ctx, req.user_id, previous_role, previous_permissions) + raise err + # + + return True +# + +app = restate.app([role_updater]) \ No newline at end of file diff --git a/code_snippets/python/src/use_cases/workflows/__init__.py b/code_snippets/python/src/use_cases/workflows/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/code_snippets/python/src/use_cases/workflows/signup_workflow.py b/code_snippets/python/src/use_cases/workflows/signup_workflow.py new file mode 100644 index 00000000..5180cd57 --- /dev/null +++ b/code_snippets/python/src/use_cases/workflows/signup_workflow.py @@ -0,0 +1,79 @@ +import uuid +from typing import TypedDict + +from pydantic import BaseModel +from restate import Workflow, WorkflowContext, WorkflowSharedContext +import restate +from restate.exceptions import TerminalError + + +class User(BaseModel): + email: str + name: str + + +def create_user_entry(user): + pass + + +def send_email_with_link(email, secret): + pass + + +# +# +signup_workflow = Workflow("signupWorkflow") +# + + +# +@signup_workflow.main() +async def run(ctx: WorkflowContext, user: User) -> bool: + # + # + ctx.set("stage", "Creating user") + # + # + await ctx.run("create user", lambda: create_user_entry(user)) + # + + # + ctx.set("stage", "Email verification") + # + # + secret = await ctx.run("generate secret", lambda: str(uuid.uuid4())) + await ctx.run("send email", lambda: send_email_with_link(user.email, secret)) + # + + # + click_secret = await ctx.promise("email_link_clicked").value() + # + # + if click_secret != secret: + # + ctx.set("stage", "Email verification failed") + # + raise TerminalError("Wrong secret from email link") + + # + ctx.set("stage", "Email verified") + # + return True + # + + +# +@signup_workflow.handler("getStage") +async def get_stage(ctx: WorkflowSharedContext) -> str: + return await ctx.get("stage") or "unknown" + # + + +# +@signup_workflow.handler("approveEmail") +async def approve_email(ctx: WorkflowSharedContext, secret: str): + await ctx.promise("email_link_clicked").resolve(secret) + # + # + +app = restate.app([signup_workflow]) \ No newline at end of file diff --git a/code_snippets/python/src/use_cases/workflows/submit.py b/code_snippets/python/src/use_cases/workflows/submit.py new file mode 100644 index 00000000..a5f573c6 --- /dev/null +++ b/code_snippets/python/src/use_cases/workflows/submit.py @@ -0,0 +1,25 @@ +# +import requests + +restate = "http://localhost:8080" +workflow_id = "myUser123" +payload = { + "email": "user@user.com", + "name": "Pete" +} +headers = {"Content-Type": "application/json"} + +# !mark[/Submit/] blue +# 1. Submit the workflow +# !mark[/\/send/] blue +url = f'${restate}/signupWorkflow/${workflow_id}/run/send' +response = requests.post(url, json=payload, headers=headers) + +# Do something else, with workflow running in the background + +# !mark[/Attach/] blue +# 2. Attach back to the workflow +# !mark[/\/attach/] blue +attach_url = f'${restate}/restate/workflow/signupWorkflow/${workflow_id}/attach' +response = requests.get(attach_url) +# diff --git a/code_snippets/ts/package-lock.json b/code_snippets/ts/package-lock.json index 2815bdff..68f6a128 100644 --- a/code_snippets/ts/package-lock.json +++ b/code_snippets/ts/package-lock.json @@ -3350,8 +3350,7 @@ "node_modules/balanced-match": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.2.tgz", - "integrity": "sha512-3oSeUO0TMV67hN1AmbXsK4yaqU7tjiHlbxRDZOpH0KW9+CeX4bRAaX0Anxt0tx2MrpRpWwQaPwIlISEJhYU5Pw==", - "dev": true + "integrity": "sha512-3oSeUO0TMV67hN1AmbXsK4yaqU7tjiHlbxRDZOpH0KW9+CeX4bRAaX0Anxt0tx2MrpRpWwQaPwIlISEJhYU5Pw==" }, "node_modules/bare-events": { "version": "2.5.0", diff --git a/code_snippets/ts/package.json b/code_snippets/ts/package.json index 93ea96ac..e520a671 100644 --- a/code_snippets/ts/package.json +++ b/code_snippets/ts/package.json @@ -16,7 +16,7 @@ "test": "vitest run" }, "dependencies": { - "@restatedev/restate-cdk": "^1.0.0", + "@restatedev/restate-cdk": "^1.0.1", "@restatedev/restate-sdk": "^1.4.0", "@restatedev/restate-sdk-clients": "^1.4.0", "@restatedev/restate-sdk-testcontainers": "^1.4.0", diff --git a/code_snippets/ts/src/develop/clients/ingress.ts b/code_snippets/ts/src/develop/clients/ingress.ts index b9a9f013..ee9a78c7 100644 --- a/code_snippets/ts/src/develop/clients/ingress.ts +++ b/code_snippets/ts/src/develop/clients/ingress.ts @@ -48,7 +48,7 @@ const servicesIdempotent = async () => { // await rs .serviceSendClient(greeterService) - // withClass highlight-line + // !mark .greet(request, clients.rpc.sendOpts({ idempotencyKey: "abcde" })); // }; diff --git a/code_snippets/ts/src/develop/serving.ts b/code_snippets/ts/src/develop/serving.ts index 33e6ceba..612af469 100644 --- a/code_snippets/ts/src/develop/serving.ts +++ b/code_snippets/ts/src/develop/serving.ts @@ -31,9 +31,9 @@ const http2Handler = restate .bind(myService) .bind(myVirtualObject) .bind(myWorkflow) - // withClass highlight-line + // !mark .http2Handler(); -// withClass highlight-line +// !mark const httpServer = http2.createServer(http2Handler); httpServer.listen(); // @@ -42,7 +42,7 @@ httpServer.listen(); restate .endpoint() .bind(myService) - // withClass higlight-line + // !mark .withIdentityV1("publickeyv1_w7YHemBctH5Ck2nQRQ47iBBqhNHy4FV7t2Usbye2A6f") .listen(); // diff --git a/code_snippets/ts/src/develop/serving_lambda.ts b/code_snippets/ts/src/develop/serving_lambda.ts index df6d71f4..1f10fbeb 100644 --- a/code_snippets/ts/src/develop/serving_lambda.ts +++ b/code_snippets/ts/src/develop/serving_lambda.ts @@ -20,6 +20,6 @@ export const handler = restate .bind(myService) .bind(myVirtualObject) .bind(myWorkflow) - // withClass highlight-line + // !mark .handler(); // diff --git a/code_snippets/ts/src/get_started/tour.ts b/code_snippets/ts/src/get_started/tour.ts index eca3bbec..d2ca7c67 100644 --- a/code_snippets/ts/src/get_started/tour.ts +++ b/code_snippets/ts/src/get_started/tour.ts @@ -26,7 +26,7 @@ const checkoutService = restate.service({ ctx: restate.Context, request: { userId: string; tickets: string[] } ) { - // withClass(1:3) highlight-line + // !mark(1:3) const idempotencyKey = ctx.rand.uuidv4(); console.info("My idempotency key: " + idempotencyKey); throw new Error("Something happened!"); @@ -45,11 +45,11 @@ const secondCheckoutService = restate.service({ ctx: restate.Context, request: { userId: string; tickets: string[] } ) { - // withClass highlight-line + // !mark const totalPrice = request.tickets.length * 40; const idempotencyKey = ctx.rand.uuidv4(); - // withClass(1:3) highlight-line + // !mark(1:3) const success = await ctx.run(() => PaymentClient.get().call(idempotencyKey, totalPrice) ); diff --git a/code_snippets/ts/src/use_cases/async_tasks/sync_to_async/client.ts b/code_snippets/ts/src/use_cases/async_tasks/sync_to_async/client.ts index 191e1751..cfd8e5fc 100644 --- a/code_snippets/ts/src/use_cases/async_tasks/sync_to_async/client.ts +++ b/code_snippets/ts/src/use_cases/async_tasks/sync_to_async/client.ts @@ -13,13 +13,13 @@ const RESTATE_URL = process.env.RESTATE_URL ?? "http://localhost:8080"; const rs = restate.connect({ url: RESTATE_URL }); const dataPrepService: DataPrepService = { name: "dataPrep" }; -async function downloadData(userId: string) { +async function downloadData(user: { id: string, email: string }) { // - const dataPrep = rs.workflowClient(dataPrepService, userId); + const dataPrep = rs.workflowClient(dataPrepService, user.id); // // - await dataPrep.workflowSubmit({ userId }); + await dataPrep.workflowSubmit(); // // @@ -28,8 +28,8 @@ async function downloadData(userId: string) { // if (result === Timeout) { - const email = await readLine("This takes long... Mail us the link later"); - await dataPrep.resultAsEmail({ email }); + // Hit timeout... Mail us the link later + await dataPrep.resultAsEmail({ email: user.email }); return; } // @@ -48,14 +48,3 @@ function withTimeout( ); return Promise.race([promise, timeoutPromise]); } - -async function readLine(prompt: string): Promise { - const rl = readline.createInterface({ - input: process.stdin, - output: process.stdout, - }); - - return new Promise((resolve) => rl.question(prompt, resolve)).finally( - () => rl.close() - ); -} diff --git a/code_snippets/ts/src/use_cases/async_tasks/sync_to_async/data_preparation_service.ts b/code_snippets/ts/src/use_cases/async_tasks/sync_to_async/data_preparation_service.ts index c64765b0..d101ca09 100644 --- a/code_snippets/ts/src/use_cases/async_tasks/sync_to_async/data_preparation_service.ts +++ b/code_snippets/ts/src/use_cases/async_tasks/sync_to_async/data_preparation_service.ts @@ -8,7 +8,7 @@ import { const dataPreparationService = restate.workflow({ name: "dataPrep", handlers: { - run: async (ctx: WorkflowContext, args: { userId: string }) => { + run: async (ctx: WorkflowContext) => { // const url = await ctx.run(() => createS3Bucket()); await ctx.run(() => uploadData(url)); @@ -16,7 +16,6 @@ const dataPreparationService = restate.workflow({ // await ctx.promise("url").resolve(url); // - return url; // }, diff --git a/code_snippets/ts/src/use_cases/event_processing/events_state.ts b/code_snippets/ts/src/use_cases/event_processing/events_state.ts index 4a58f2d0..11325430 100644 --- a/code_snippets/ts/src/use_cases/event_processing/events_state.ts +++ b/code_snippets/ts/src/use_cases/event_processing/events_state.ts @@ -11,6 +11,7 @@ import * as restate from "@restatedev/restate-sdk"; import { ObjectContext } from "@restatedev/restate-sdk"; +import { TerminalError } from "@restatedev/restate-sdk"; // const eventEnricher = restate.object({ @@ -35,7 +36,11 @@ const eventEnricher = restate.object({ // const userEvent = await ctx.get("user"); // - (userEvent!.features ??= []).push(featureEvent); + if(!userEvent) { + throw new TerminalError("User not found"); + } + + userEvent.features.push(featureEvent); // ctx.set("user", userEvent); // @@ -54,10 +59,10 @@ const eventEnricher = restate.object({ }, }, }); +// type EventEnricherType = typeof eventEnricher; const EventEnricher: EventEnricherType = { name: "profile" }; -// type UserProfile = { id: string; diff --git a/code_snippets/ts/src/use_cases/microservices/idempotency.ts b/code_snippets/ts/src/use_cases/microservices/idempotency.ts index 058f4c42..e42b9c6b 100644 --- a/code_snippets/ts/src/use_cases/microservices/idempotency.ts +++ b/code_snippets/ts/src/use_cases/microservices/idempotency.ts @@ -14,7 +14,7 @@ const productService: ProductService = { name: "product" }; app.get("/reserve/:product/:reservationId", async (req, res) => { const { product, reservationId } = req.params; - // withClass(1:5) highlight-line + // !mark(1:5) const products = rs.serviceClient(productService); const reservation = await products.reserve( product, diff --git a/code_snippets/ts/src/use_cases/workflows/signup_workflow.ts b/code_snippets/ts/src/use_cases/workflows/signup_workflow.ts index c974618a..822d7830 100644 --- a/code_snippets/ts/src/use_cases/workflows/signup_workflow.ts +++ b/code_snippets/ts/src/use_cases/workflows/signup_workflow.ts @@ -52,10 +52,7 @@ const signUpWorkflow = restate.workflow({ // approveEmail: (ctx: WorkflowSharedContext, secret: string) => - ctx.promise("email-link").resolve(secret), - - rejectEmail: (ctx: WorkflowSharedContext) => - ctx.promise("email-link").reject("Abort verification"), + ctx.promise("email-link").resolve(secret) // }, }); diff --git a/docs/develop/java/service-communication.mdx b/docs/develop/java/service-communication.mdx index 86f363ad..af0cac1e 100644 --- a/docs/develop/java/service-communication.mdx +++ b/docs/develop/java/service-communication.mdx @@ -3,11 +3,9 @@ sidebar_position: 2 description: "Find out how Restate services can send requests to each other." --- -import clsx from "clsx"; import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; import Admonition from '@theme/Admonition'; -import {Scrollycoding} from "../../../src/components/code/scrollycoding"; # Service Communication A handler can call another handler and wait for the response (request-response), or it can send a message without waiting for the response. diff --git a/docs/use-cases/async-tasks.mdx b/docs/use-cases/async-tasks.mdx index b4718d06..e918d481 100644 --- a/docs/use-cases/async-tasks.mdx +++ b/docs/use-cases/async-tasks.mdx @@ -16,6 +16,13 @@ import Tabs from "@theme/Tabs" import TabItem from "@theme/TabItem" import {Scrollycoding} from "../../src/components/code/scrollycoding"; + + + + + + +

Async tasks

Flexible, durable scheduling across processes and time.

@@ -30,7 +37,7 @@ import {Scrollycoding} from "../../src/components/code/scrollycoding"; description: "Register timers with Restate to make them durable. Restate makes sure that they get fired when they should, whether it's millis or months.", }, { - title: 'Message queue', + title: '(Delayed) task queue', iconPath: '/img/use_cases/queue.svg', description: "Schedule tasks asynchronously, by using Restate as message queue. Tasks execute with workflow-like semantics and durability. Restate handles retries and recovery of progress." }, @@ -47,58 +54,77 @@ import {Scrollycoding} from "../../src/components/code/scrollycoding"; ## Async tasks with Restate Schedule tasks for now or later with the Restate SDK. - + + ### !!steps Execute any handler async - ### !!steps Execute any handler async + Every handler in Restate is executed asynchronously and can be treated + as a reliable asynchronous task. No matter whether it is a simple function, or a complex workflow. + Restate persists the requests to this handler and makes sure they run to completion. + Restate handles retries and recovery upon failures. - Every handler in Restate is executed asynchronously and can be treated - as a reliable asynchronous task. No matter whether it is a simple function, or a complex workflow. - Restate persists the requests to this handler and makes sure they run to completion. - Restate handles retries and recovery upon failures. + ```ts !!windows async_task_service.ts + CODE_LOAD::ts/src/use_cases/async_tasks/simple_async_task/async_task_service.ts?1 + ``` - ```ts ! async_task_service.ts - CODE_LOAD::ts/src/use_cases/async_tasks/simple_async_task/async_task_service.ts?1 - ``` + ```ts !!windows task_submitter.ts + CODE_LOAD::ts/src/use_cases/async_tasks/simple_async_task/task_submitter.ts + ``` - ### !!steps Schedule tasks reliably + ### !!steps Schedule tasks reliably - Schedule tasks asynchronously, by using Restate as message queue. - Restate reliably queues them, also under backpressure/load. + Schedule tasks asynchronously, by using Restate as message queue. + Restate reliably queues them, also under backpressure/load. - Handlers can be called asynchronously from anywhere. - This returns a task handle once the call in enqueued. + Handlers can be called asynchronously from anywhere. + This returns a task handle once the call in enqueued. + + ```ts !!windows async_task_service.ts + CODE_LOAD::ts/src/use_cases/async_tasks/simple_async_task/async_task_service.ts + ``` + + ```ts !!windows task_submitter.ts + CODE_LOAD::ts/src/use_cases/async_tasks/simple_async_task/task_submitter.ts?1 + ``` + + ### !!steps Idempotent task submission - ```ts ! task_submitter.ts - CODE_LOAD::ts/src/use_cases/async_tasks/simple_async_task/task_submitter.ts?1 - ``` + Use an idempotency key to ensure that the task is only scheduled once. + Restate will deduplicate the request and return the previous response. - ### !!steps Idempotent task submission + ```ts !!windows async_task_service.ts + CODE_LOAD::ts/src/use_cases/async_tasks/simple_async_task/async_task_service.ts + ``` - Use an idempotency key to ensure that the task is only scheduled once. - Restate will deduplicate the request and return the previous response. + ```ts !!windows task_submitter.ts + CODE_LOAD::ts/src/use_cases/async_tasks/simple_async_task/task_submitter.ts?2 + ``` - ```ts ! task_submitter.ts - CODE_LOAD::ts/src/use_cases/async_tasks/simple_async_task/task_submitter.ts?2 - ``` + ### !!steps Latch on to the task + + For requests with an idempotency key, you can use this task handle to latch on to the task later and retrieve the result, or wait for it to finish. - ### !!steps Latch on to the task + ```ts !!windows async_task_service.ts + CODE_LOAD::ts/src/use_cases/async_tasks/simple_async_task/async_task_service.ts + ``` - For requests with an idempotency key, you can use this task handle to latch on to the task later and retrieve the result, or wait for it to finish. + ```ts !!windows task_submitter.ts + CODE_LOAD::ts/src/use_cases/async_tasks/simple_async_task/task_submitter.ts?3 + ``` - ```ts ! task_submitter.ts - CODE_LOAD::ts/src/use_cases/async_tasks/simple_async_task/task_submitter.ts?3 - ``` + ### !!steps + This works across processes, so you can have a separate process latch on to the task later. - ### !!steps - This works across processes, so you can have a separate process latch on to the task later. + ```ts !!windows async_task_service.ts + CODE_LOAD::ts/src/use_cases/async_tasks/simple_async_task/async_task_service.ts + ``` - ```ts ! task_submitter.ts - CODE_LOAD::ts/src/use_cases/async_tasks/simple_async_task/task_submitter.ts?4 - ``` + ```ts !!windows task_submitter.ts + CODE_LOAD::ts/src/use_cases/async_tasks/simple_async_task/task_submitter.ts?4 + ``` - + @@ -110,10 +136,14 @@ Schedule tasks for now or later with the Restate SDK. Restate persists the requests to this handler and makes sure they run to completion. Restate handles retries and recovery upon failures. - ```java ! AsyncTaskService.java + ```java !!windows AsyncTaskService.java CODE_LOAD::java/src/main/java/usecases/asynctasks/simple/AsyncTaskService.java?1 ``` + ```java !!windows TaskSubmitter.java + CODE_LOAD::java/src/main/java/usecases/asynctasks/simple/TaskSubmitter.java + ``` + ### !!steps Schedule tasks reliably Schedule tasks asynchronously, by using Restate as message queue. @@ -122,7 +152,11 @@ Schedule tasks for now or later with the Restate SDK. Handlers can be called asynchronously from anywhere. This returns a task handle once the call in enqueued. - ```java ! TaskSubmitter.java + ```java !!windows AsyncTaskService.java + CODE_LOAD::java/src/main/java/usecases/asynctasks/simple/AsyncTaskService.java + ``` + + ```java !!windows TaskSubmitter.java CODE_LOAD::java/src/main/java/usecases/asynctasks/simple/TaskSubmitter.java?1 ``` @@ -131,7 +165,11 @@ Schedule tasks for now or later with the Restate SDK. Use an idempotency key to ensure that the task is only scheduled once. Restate will deduplicate the request and return the previous response. - ```java ! TaskSubmitter.java + ```java !!windows AsyncTaskService.java + CODE_LOAD::java/src/main/java/usecases/asynctasks/simple/AsyncTaskService.java + ``` + + ```java !!windows TaskSubmitter.java CODE_LOAD::java/src/main/java/usecases/asynctasks/simple/TaskSubmitter.java?2 ``` @@ -141,7 +179,11 @@ Schedule tasks for now or later with the Restate SDK. This works across processes, so you can have a separate process latch on to the task later. - ```java ! TaskSubmitter.java + ```java !!windows AsyncTaskService.java + CODE_LOAD::java/src/main/java/usecases/asynctasks/simple/AsyncTaskService.java + ``` + + ```java !!windows TaskSubmitter.java CODE_LOAD::java/src/main/java/usecases/asynctasks/simple/TaskSubmitter.java?3 ``` @@ -194,6 +236,68 @@ Schedule tasks for now or later with the Restate SDK. + + + + ### !!steps Execute any handler async + + Every handler in Restate can be treated as a reliable asynchronous task. + No matter whether it is a simple function, or a complex workflow. + Restate persists the requests to this handler and makes sure they run to completion. + Restate handles retries and recovery upon failures. + + ```python !!windows async_task_service.py + CODE_LOAD::python/src/use_cases/async_tasks/simple_async_task/async_task_service.py?1 + ``` + + ```python !!windows task_submitter.py + CODE_LOAD::python/src/use_cases/async_tasks/simple_async_task/task_submitter.py + ``` + + ### !!steps Schedule tasks reliably + + Schedule async tasks by using Restate as message queue. + Restate reliably queues them, also under backpressure/load. + + Handlers can be called asynchronously from anywhere. + This returns a task handle once the call in enqueued. + + ```python !!windows async_task_service.py + CODE_LOAD::python/src/use_cases/async_tasks/simple_async_task/async_task_service.py + ``` + + ```python !!windows task_submitter.py + CODE_LOAD::python/src/use_cases/async_tasks/simple_async_task/task_submitter.py?1 + ``` + + ### !!steps Idempotent task submission + + Use an idempotency key to ensure that the task is only scheduled once. + Restate will deduplicate the request and return the previous response. + + ```python !!windows async_task_service.py + CODE_LOAD::python/src/use_cases/async_tasks/simple_async_task/async_task_service.py + ``` + + ```python !!windows task_submitter.py + CODE_LOAD::python/src/use_cases/async_tasks/simple_async_task/task_submitter.py?2 + ``` + + ### !!steps Latch on to the task + For requests with an idempotency key, you can use this task handle to latch on to the task later and retrieve the result, or wait for it to finish. + + This works across processes, so you can have a separate process latch on to the task later. + + ```python !!windows async_task_service.py + CODE_LOAD::python/src/use_cases/async_tasks/simple_async_task/async_task_service.py + ``` + + ```python !!windows task_submitter.py + CODE_LOAD::python/src/use_cases/async_tasks/simple_async_task/task_submitter.py?3 + ``` + + + @@ -213,62 +317,48 @@ Schedule tasks for now or later with the Restate SDK. Write flexible scheduling logic via durable building blocks. - +Restate makes it easy to parallelize async work by fanning out tasks. +Afterwards, you can collect the result by fanning in the partial results. +Durable Execution ensures that the fan-out and fan-in steps happen +reliably exactly once. + + - ### !!steps - Restate makes it easy to parallelize async work by fanning out tasks. - Afterwards, you can collect the result by fanning in the partial results. - Durable Execution ensures that the fan-out and fan-in steps happen - reliably exactly once. - - ```ts ! fan_out_worker.ts - CODE_LOAD::ts/src/use_cases/async_tasks/fan_out_worker.ts - ``` - - ### !!steps Fan out + ### !!steps Fan out - Fan out tasks by calling the subtask handler for each subtask. - Every handler is an asynchronous task, for which Restate serves as the queue. + Fan out tasks by calling the subtask handler for each subtask. + Every handler is an asynchronous task, for which Restate serves as the queue. - The subtasks might run in different processes, if this is deployed in a parallel setup. + The subtasks might run in different processes, if this is deployed in a parallel setup. - ```ts ! fan_out_worker.ts - CODE_LOAD::ts/src/use_cases/async_tasks/fan_out_worker.ts?1 - ``` + ```ts ! fan_out_worker.ts + CODE_LOAD::ts/src/use_cases/async_tasks/fan_out_worker.ts?1 + ``` - ### !!steps Fan in - Invocations produce durable promises that can be awaited and combined. - Fan in by simply awaiting the combined promise. - Invocation promises recover from failures, re-connect to running subtasks. + ### !!steps Fan in + Invocations produce durable promises that can be awaited and combined. + Fan in by simply awaiting the combined promise. + Invocation promises recover from failures, re-connect to running subtasks. - ```ts ! fan_out_worker.ts - CODE_LOAD::ts/src/use_cases/async_tasks/fan_out_worker.ts?2 - ``` + ```ts ! fan_out_worker.ts + CODE_LOAD::ts/src/use_cases/async_tasks/fan_out_worker.ts?2 + ``` - ### !!steps Server(less) - Deploy this service on an platform like Kubernetes or AWS Lambda to - automatically get parallel scale out. + ### !!steps Server(less) + Deploy this service on an platform like Kubernetes or AWS Lambda to + automatically get parallel scale out. - ```ts ! fan_out_worker.ts - CODE_LOAD::ts/src/use_cases/async_tasks/fan_out_worker.ts?3 - ``` + ```ts ! fan_out_worker.ts + CODE_LOAD::ts/src/use_cases/async_tasks/fan_out_worker.ts?3 + ``` - + - Restate makes it easy to parallelize async work by fanning out tasks. - Afterwards, you can collect the result by fanning in the partial results. - Durable Execution ensures that the fan-out and fan-in steps happen - reliably exactly once. - - ```java ! FanOutWorker.java - CODE_LOAD::java/src/main/java/usecases/asynctasks/FanOutWorker.java - ``` - ### !!steps Fan out Fan out tasks by calling the subtask handler for each subtask. @@ -299,19 +389,9 @@ Write flexible scheduling logic via durable building blocks. - + - ### !!steps - Restate makes it easy to parallelize async work by fanning out tasks. - Afterwards, you can collect the result by fanning in the partial results. - Durable Execution ensures that the fan-out and fan-in steps happen - reliably exactly once. - - ```kotlin ! FanOutWorker.kt - CODE_LOAD::kotlin/src/main/kotlin/usecases/asynctasks/FanOutWorker.kt - ``` - ### !!steps Fan out Fan out tasks by calling the subtask handler for each subtask. @@ -342,6 +422,38 @@ Write flexible scheduling logic via durable building blocks. + + + ### !!steps Fan out + + Fan out tasks by calling the subtask handler for each subtask. + Every handler is an asynchronous task, for which Restate serves as the queue. + + The subtasks might run in different processes, if this is deployed in a parallel setup. + + ```python ! fan_out_worker.py + CODE_LOAD::python/src/use_cases/async_tasks/fan_out_worker.py?1 + ``` + + ### !!steps Fan in + Invocations produce durable promises that can be awaited and combined. + Fan in by simply awaiting the combined promise. + Invocation promises recover from failures, re-connect to running subtasks. + + ```python ! fan_out_worker.py + CODE_LOAD::python/src/use_cases/async_tasks/fan_out_worker.py?2 + ``` + + ### !!steps Server(less) + Deploy this service on an platform like Kubernetes or AWS Lambda to + automatically get parallel scale out. + + ```python ! fan_out_worker.py + CODE_LOAD::python/src/use_cases/async_tasks/fan_out_worker.py?3 + ``` + + + @@ -354,78 +466,112 @@ Write flexible scheduling logic via durable building blocks. ## Switch between async and sync with Restate - + ### !!steps - Imagine a data preparation workflow that creates an S3 bucket, uploads a file to it, and then returns the URL. - Let's now kick off this workflow from another process. + Imagine a data preparation workflow that creates an S3 bucket, uploads a file to it, and then returns the URL. + + Let's now kick off this workflow from another process. + + ```ts !!windows data_preparation_service.ts + CODE_LOAD::ts/src/use_cases/async_tasks/sync_to_async/data_preparation_service.ts?1 + ``` + + ```ts !!windows client.ts + CODE_LOAD::ts/src/use_cases/async_tasks/sync_to_async/client.ts + ``` + + ### !!steps + + 1. Connect to the Restate server and create a client for the data preparation workflow. - ```ts ! data_preparation_service.ts - CODE_LOAD::ts/src/use_cases/async_tasks/sync_to_async/data_preparation_service.ts?1 - ``` + ```ts !!windows data_preparation_service.ts + CODE_LOAD::ts/src/use_cases/async_tasks/sync_to_async/data_preparation_service.ts + ``` - ### !!steps + ```ts !!windows client.ts + CODE_LOAD::ts/src/use_cases/async_tasks/sync_to_async/client.ts?1 + ``` - 1. Connect to the Restate server and create a client for the data preparation workflow. + ### !!steps - ```ts ! client.ts - CODE_LOAD::ts/src/use_cases/async_tasks/sync_to_async/client.ts?1 - ``` + 2. Kick off a new data preparation workflow. This is idempotent per workflow ID. - ### !!steps + ```ts !!windows data_preparation_service.ts + CODE_LOAD::ts/src/use_cases/async_tasks/sync_to_async/data_preparation_service.ts + ``` - 2. Kick off a new data preparation workflow. This is idempotent per workflow ID. + ```ts !!windows client.ts + CODE_LOAD::ts/src/use_cases/async_tasks/sync_to_async/client.ts?2 + ``` - ```ts ! client.ts - CODE_LOAD::ts/src/use_cases/async_tasks/sync_to_async/client.ts?2 - ``` + ### !!steps - ### !!steps + 3. Wait for the result for 30 seconds. - 3. Wait for the result for 30 seconds. + ```ts !!windows data_preparation_service.ts + CODE_LOAD::ts/src/use_cases/async_tasks/sync_to_async/data_preparation_service.ts + ``` - ```ts ! client.ts - CODE_LOAD::ts/src/use_cases/async_tasks/sync_to_async/client.ts?3 - ``` + ```ts !!windows client.ts + CODE_LOAD::ts/src/use_cases/async_tasks/sync_to_async/client.ts?3 + ``` + + ### !!steps - ### !!steps + 4. If it takes longer, rewire the workflow to send an email instead. + If returns within 30 seconds, process the URL directly. + + ```ts !!windows data_preparation_service.ts + CODE_LOAD::ts/src/use_cases/async_tasks/sync_to_async/data_preparation_service.ts + ``` - 4. If it takes longer, rewire the workflow to send an email instead. - If returns within 30 seconds, process the URL directly. + ```ts !!windows client.ts + CODE_LOAD::ts/src/use_cases/async_tasks/sync_to_async/client.ts?4 + ``` - ```ts ! client.ts - CODE_LOAD::ts/src/use_cases/async_tasks/sync_to_async/client.ts?4 - ``` + ### !!steps - ### !!steps + 5. This is implemented in the data preparation workflow by letting the workflow signal our handler when it's done. + It does this by resolving a shared Durable Promise that we then retrieve in our handler to send the email. - 5. This is implemented in the data preparation workflow by letting the workflow signal our handler when it's done. - It does this by resolving a shared Durable Promise that we then retrieve in our handler to send the email. + ```ts !!windows data_preparation_service.ts + CODE_LOAD::ts/src/use_cases/async_tasks/sync_to_async/data_preparation_service.ts?2 + ``` - ```ts ! data_preparation_service.ts - CODE_LOAD::ts/src/use_cases/async_tasks/sync_to_async/data_preparation_service.ts?2 - ``` + ```ts !!windows client.ts + CODE_LOAD::ts/src/use_cases/async_tasks/sync_to_async/client.ts + ``` - + ### !!steps + Imagine a data preparation workflow that creates an S3 bucket, uploads a file to it, and then returns the URL. Let's now kick off this workflow from another process. - ```java ! DataPreparationService.java + ```java !!windows DataPreparationService.java CODE_LOAD::java/src/main/java/usecases/asynctasks/synctoasync/DataPreparationService.java?1 ``` + ```java !!windows MyClient.java + CODE_LOAD::java/src/main/java/usecases/asynctasks/synctoasync/MyClient.java + ``` + ### !!steps 1. Connect to the Restate server and create a client for the data preparation workflow. - ```java ! MyClient.java + ```java !!windows DataPreparationService.java + CODE_LOAD::java/src/main/java/usecases/asynctasks/synctoasync/DataPreparationService.java + ``` + + ```java !!windows MyClient.java CODE_LOAD::java/src/main/java/usecases/asynctasks/synctoasync/MyClient.java?1 ``` @@ -433,7 +579,11 @@ Write flexible scheduling logic via durable building blocks. 2. Kick off a new data preparation workflow. This is idempotent per workflow ID. - ```java ! MyClient.java + ```java !!windows DataPreparationService.java + CODE_LOAD::java/src/main/java/usecases/asynctasks/synctoasync/DataPreparationService.java + ``` + + ```java !!windows MyClient.java CODE_LOAD::java/src/main/java/usecases/asynctasks/synctoasync/MyClient.java?2 ``` @@ -441,7 +591,11 @@ Write flexible scheduling logic via durable building blocks. 3. Wait for the result for 30 seconds. - ```java ! MyClient.java + ```java !!windows DataPreparationService.java + CODE_LOAD::java/src/main/java/usecases/asynctasks/synctoasync/DataPreparationService.java + ``` + + ```java !!windows MyClient.java CODE_LOAD::java/src/main/java/usecases/asynctasks/synctoasync/MyClient.java?3 ``` @@ -450,7 +604,11 @@ Write flexible scheduling logic via durable building blocks. 4. If it takes longer, rewire the workflow to send an email instead. If returns within 30 seconds, process the URL directly. - ```java ! MyClient.java + ```java !!windows DataPreparationService.java + CODE_LOAD::java/src/main/java/usecases/asynctasks/synctoasync/DataPreparationService.java + ``` + + ```java !!windows MyClient.java CODE_LOAD::java/src/main/java/usecases/asynctasks/synctoasync/MyClient.java?4 ``` @@ -459,29 +617,41 @@ Write flexible scheduling logic via durable building blocks. 5. This is implemented in the data preparation workflow by letting the workflow signal our handler when it's done. It does this by resolving a shared Durable Promise that we then retrieve in our handler to send the email. - ```java ! DataPreparationService.java + ```java !!windows DataPreparationService.java CODE_LOAD::java/src/main/java/usecases/asynctasks/synctoasync/DataPreparationService.java?2 ``` + ```java !!windows MyClient.java + CODE_LOAD::java/src/main/java/usecases/asynctasks/synctoasync/MyClient.java + ``` + - + ### !!steps + Imagine a data preparation workflow that creates an S3 bucket, uploads a file to it, and then returns the URL. Let's now kick off this workflow from another process. - - ```kotlin ! DataPreparationService.kt + ```kotlin !!windows DataPreparationService.kt CODE_LOAD::kotlin/src/main/kotlin/usecases/asynctasks/synctoasync/DataPreparationService.kt?1 ``` + ```kotlin !!windows MyClient.kt + CODE_LOAD::kotlin/src/main/kotlin/usecases/asynctasks/synctoasync/MyClient.kt + ``` + ### !!steps 1. Connect to the Restate server and create a client for the data preparation workflow. - ```kotlin ! MyClient.kt + ```kotlin !!windows DataPreparationService.kt + CODE_LOAD::kotlin/src/main/kotlin/usecases/asynctasks/synctoasync/DataPreparationService.kt + ``` + + ```kotlin !!windows MyClient.kt CODE_LOAD::kotlin/src/main/kotlin/usecases/asynctasks/synctoasync/MyClient.kt?1 ``` @@ -489,7 +659,11 @@ Write flexible scheduling logic via durable building blocks. 2. Kick off a new data preparation workflow. This is idempotent per workflow ID. - ```kotlin ! MyClient.kt + ```kotlin !!windows DataPreparationService.kt + CODE_LOAD::kotlin/src/main/kotlin/usecases/asynctasks/synctoasync/DataPreparationService.kt + ``` + + ```kotlin !!windows MyClient.kt CODE_LOAD::kotlin/src/main/kotlin/usecases/asynctasks/synctoasync/MyClient.kt?2 ``` @@ -497,7 +671,11 @@ Write flexible scheduling logic via durable building blocks. 3. Wait for the result for 30 seconds. - ```kotlin ! MyClient.kt + ```kotlin !!windows DataPreparationService.kt + CODE_LOAD::kotlin/src/main/kotlin/usecases/asynctasks/synctoasync/DataPreparationService.kt + ``` + + ```kotlin !!windows MyClient.kt CODE_LOAD::kotlin/src/main/kotlin/usecases/asynctasks/synctoasync/MyClient.kt?3 ``` @@ -506,7 +684,11 @@ Write flexible scheduling logic via durable building blocks. 4. If it takes longer, rewire the workflow to send an email instead. If returns within 30 seconds, process the URL directly. - ```kotlin ! MyClient.kt + ```kotlin !!windows DataPreparationService.kt + CODE_LOAD::kotlin/src/main/kotlin/usecases/asynctasks/synctoasync/DataPreparationService.kt + ``` + + ```kotlin !!windows MyClient.kt CODE_LOAD::kotlin/src/main/kotlin/usecases/asynctasks/synctoasync/MyClient.kt?4 ``` @@ -515,10 +697,59 @@ Write flexible scheduling logic via durable building blocks. 5. This is implemented in the data preparation workflow by letting the workflow signal our handler when it's done. It does this by resolving a shared Durable Promise that we then retrieve in our handler to send the email. - ```kotlin ! DataPreparationService.kt + ```kotlin !!windows DataPreparationService.kt CODE_LOAD::kotlin/src/main/kotlin/usecases/asynctasks/synctoasync/DataPreparationService.kt?2 ``` + ```kotlin !!windows MyClient.kt + CODE_LOAD::kotlin/src/main/kotlin/usecases/asynctasks/synctoasync/MyClient.kt + ``` + + + + + + ### !!steps + + Send a request to Restate to kick off a new data preparation workflow. + This is idempotent per workflow ID. + + Wait for the result for 30 seconds. + + ```python !!windows client.py + CODE_LOAD::python/src/use_cases/async_tasks/sync_to_async/client.py?1 + ``` + + ```py !!windows data_preparation_service.py + CODE_LOAD::python/src/use_cases/async_tasks/sync_to_async/data_preparation_service.py + ``` + + ### !!steps + + If it takes too long, rewire the workflow to send an email instead. + If returns within 30 seconds, process the URL directly. + + ```python !!windows client.py + CODE_LOAD::python/src/use_cases/async_tasks/sync_to_async/client.py?4 + ``` + + ```py !!windows data_preparation_service.py + CODE_LOAD::python/src/use_cases/async_tasks/sync_to_async/data_preparation_service.py + ``` + + ### !!steps + + This is implemented in the data preparation workflow by letting the workflow signal our handler when it's done. + It does this by resolving a shared Durable Promise that we then retrieve in our handler to send the email. + + ```python !!windows client.py + CODE_LOAD::python/src/use_cases/async_tasks/sync_to_async/client.py + ``` + + ```py !!windows data_preparation_service.py + CODE_LOAD::python/src/use_cases/async_tasks/sync_to_async/data_preparation_service.py?2 + ``` + @@ -556,15 +787,22 @@ Write flexible scheduling logic via durable building blocks. { title: 'Docs', description: ( -

Read the docs to learn more. +

-

), }, + { + title: 'Docs: schedulers & timers', + ts: "/develop/ts/durable-timers", + java: "/develop/java/durable-timers?sdk=java", + kotlin: "/develop/java/durable-timers?sdk=kotlin", + go: "/develop/go/durable-timers", + python: "/develop/python/durable-timers" + }, { title: 'Need help?', description: "Join the Restate Discord or Slack communities", diff --git a/docs/use-cases/event-processing.mdx b/docs/use-cases/event-processing.mdx index 8cb39244..21f17fe2 100644 --- a/docs/use-cases/event-processing.mdx +++ b/docs/use-cases/event-processing.mdx @@ -16,6 +16,14 @@ import Tabs from "@theme/Tabs" import TabItem from "@theme/TabItem" import {Scrollycoding} from "../../src/components/code/scrollycoding"; + + + + + + + +

Event Processing

Lightweight, transactional event processing.

Process Kafka events with flexible flows of transactional steps.
@@ -47,7 +55,7 @@ import {Scrollycoding} from "../../src/components/code/scrollycoding"; Connect functions to Kafka topics. Restate pushes the events to your function. - + @@ -63,7 +71,12 @@ Restate pushes the events to your function. Let Restate subscribe to a Kafka topic and specify to which function to push the events. Restate will take care of the event plumbing: polling for records, committing offsets, recovering... - ```shell ! curl + ```ts !!windows user_updates.ts + CODE_LOAD::ts/src/use_cases/event_processing/event_processing.ts + ``` + + ```shell !!windows curl + # !mark(1:5) curl restate:9070/subscriptions --json '{ "source": "kafka://my-cluster/user-events", "sink": "service://userUpdates/updateUserEvent" @@ -129,7 +142,12 @@ Restate pushes the events to your function. Let Restate subscribe to a Kafka topic and specify to which function to push the events. Restate will take care of the event plumbing: polling for records, committing offsets, recovering... - ```shell ! curl + ```java !!windows UserUpdatesService.java + CODE_LOAD::java/src/main/java/usecases/eventprocessing/UserUpdatesService.java + ``` + + ```shell !!windows curl + # !mark(1:5) curl restate:9070/subscriptions --json '{ "source": "kafka://my-cluster/user-events", "sink": "service://UserUpdatesService/updateUserEvent" @@ -195,7 +213,12 @@ Restate pushes the events to your function. Let Restate subscribe to a Kafka topic and specify to which function to push the events. Restate will take care of the event plumbing: polling for records, committing offsets, recovering... - ```shell ! curl + ```kotlin !!windows UserUpdatesService.kt + CODE_LOAD::kotlin/src/main/kotlin/usecases/eventprocessing/UserUpdatesService.kt + ``` + + ```shell !!windows curl + # !mark(1:5) curl restate:9070/subscriptions --json '{ "source": "kafka://my-cluster/user-events", "sink": "service://UserUpdatesService/updateUserEvent" @@ -214,8 +237,6 @@ Restate pushes the events to your function. CODE_LOAD::kotlin/src/main/kotlin/usecases/eventprocessing/UserUpdatesService.kt?1 ``` - - ### !!steps Postpone processing Flexibly postpone processing of events until later. @@ -228,8 +249,6 @@ Restate pushes the events to your function. CODE_LOAD::kotlin/src/main/kotlin/usecases/eventprocessing/UserUpdatesService.kt?2 ``` - - ### !!steps Durable side effects The results of interactions with external systems are tracked and recovered after failures. This simplifies writing flows that keep multiple systems in sync. @@ -238,8 +257,6 @@ Restate pushes the events to your function. CODE_LOAD::kotlin/src/main/kotlin/usecases/eventprocessing/UserUpdatesService.kt?3 ``` - - ### !!steps Flexible control flow As opposed to many stream processing systems, Restate does not put any restrictions on the control flow (e.g. DAG). @@ -267,7 +284,12 @@ Restate pushes the events to your function. Let Restate subscribe to a Kafka topic and specify to which function to push the events. Restate will take care of the event plumbing: polling for records, committing offsets, recovering... - ```shell ! curl + ```go !!windows userupdates.go + CODE_LOAD::go/usecases/eventprocessing/eventprocessing.go?1 + ``` + + ```shell !!windows curl + # !mark(1:5) curl restate:9070/subscriptions --json '{ "source": "kafka://my-cluster/user-events", "sink": "service://userUpdates/updateUserEvent" @@ -318,6 +340,77 @@ Restate pushes the events to your function. + + + + ### !!steps Lightweight Durable Functions + Write functions that take Kafka events as inputs. + Functions execute with Durable Execution: their progress is tracked and they can be retried from the exact point before the crash, as if you are taking micro-checkpoints throughout the function execution. + + ```python ! user_updates.py + CODE_LOAD::python/src/use_cases/event_processing/event_processing.py?1 + ``` + + ### !!steps Push events to functions + Let Restate subscribe to a Kafka topic and specify to which function to push the events. + Restate will take care of the event plumbing: polling for records, committing offsets, recovering... + + ```python !!windows user_updates.py + CODE_LOAD::python/src/use_cases/event_processing/event_processing.py + ``` + + ```shell !!windows curl + # !mark(1:5) + curl restate:9070/subscriptions --json '{ + "source": "kafka://my-cluster/user-events", + "sink": "service://userUpdates/update" + }' + ``` + + ### !!steps Queue per key + Events get sent to objects based on the Kafka key. + For each key, Restate ensures that events are processed sequentially and in order. + Slow events on other keys do not block processing. + + In the example, we process user updates in a queue per user. + Slow updates for one user do not block updates for other users. + + ```python ! user_updates.py + CODE_LOAD::python/src/use_cases/event_processing/event_processing.py?1 + ``` + + ### !!steps Postpone processing + + Flexibly postpone processing of events until later. + Restate tracks the timers and re-invokes. + When sleeping, other events for that key are enqueued. + + Here, we postpone processing for 5 seconds if the user profile is not ready yet. + + ```python ! user_updates.py + CODE_LOAD::python/src/use_cases/event_processing/event_processing.py?2 + ``` + + ### !!steps Durable side effects + + The results of interactions with external systems are tracked and recovered after failures. This simplifies writing flows that keep multiple systems in sync. + + ```python ! user_updates.py + CODE_LOAD::python/src/use_cases/event_processing/event_processing.py?3 + ``` + + ### !!steps Flexible control flow + + As opposed to many stream processing systems, Restate does not put any restrictions on the control flow (e.g. DAG). + Each event crafts its own path through the code + and builds up its own recovery log. + + ```python ! user_updates.py + CODE_LOAD::python/src/use_cases/event_processing/event_processing.py?4 + ``` + + + @@ -337,9 +430,9 @@ Restate pushes the events to your function. Implement stateful event handlers with Restate. - + - + ### !!steps K/V State Store state in Restate and access it from other handlers. @@ -479,7 +572,7 @@ Implement stateful event handlers with Restate. - + ### !!steps K/V State @@ -526,6 +619,47 @@ CODE_LOAD::go/usecases/eventprocessing/eventsstate.go?2 + + + + ### !!steps K/V State + Store state in Restate and access it from other handlers. + Restate guarantees that it is consistent and persistent. + The state gets delivered together with the request, so you operate on local state. + + ```py ! profile_service.py + CODE_LOAD::python/src/use_cases/event_processing/events_state.py?1 + ``` + + ### !!steps Event Enrichment + + Enrich events with data from multiple sources by storing it in state and eventually exposing it via other functions. + + ```py ! profile_service.py + CODE_LOAD::python/src/use_cases/event_processing/events_state.py?2 + ``` + + ### !!steps Delayed actions + Schedule async follow-up tasks for now or for later. + Restate tracks the timers and triggers them when the time comes. + + Here, we wait one second for other user features to arrive before sending the event to downstream processing. + + ```py ! profile_service.py + CODE_LOAD::python/src/use_cases/event_processing/events_state.py?3 + ``` + + ### !!steps Combine Kafka and RPC + + Functions can be called over RPC or Kafka without changing the code. + In the example, the registration can come over Kafka, while the email gets called via HTTP. + + ```py ! profile_service.py + CODE_LOAD::python/src/use_cases/event_processing/events_state.py?2 + ``` + + + @@ -584,7 +718,8 @@ CODE_LOAD::go/usecases/eventprocessing/eventsstate.go?2 ), ts: "/invoke/kafka?sdk=ts", java: "/invoke/kafka?sdk=java", - go: "/invoke/kafka?sdk=go" + go: "/invoke/kafka?sdk=go", + python: "/invoke/kafka?sdk=python", }, { title: 'Need help?', diff --git a/docs/use-cases/microservice-orchestration.mdx b/docs/use-cases/microservice-orchestration.mdx index ae61cda6..b9e41b6e 100644 --- a/docs/use-cases/microservice-orchestration.mdx +++ b/docs/use-cases/microservice-orchestration.mdx @@ -15,6 +15,14 @@ import Tabs from "@theme/Tabs" import TabItem from "@theme/TabItem" import {Scrollycoding} from "../../src/components/code/scrollycoding"; + + + + + + + +

Microservice Orchestration

The simplest way to build resilient applications.

Regular functions and services, but now resilient, consistent, and scalable.
@@ -44,217 +52,256 @@ import {Scrollycoding} from "../../src/components/code/scrollycoding"; Turn functions into durable handlers with the Restate SDK. - - - + + + - ### !!steps Distributed, durable building blocks - Work with objects, functions, and promises as if failures don’t happen. - Restate makes them distributed and durable out-of-the-box. + ### !!steps Distributed, durable building blocks + Work with objects, functions, and promises as if failures don’t happen. + Restate makes them distributed and durable out-of-the-box. - ```ts ! role_updater.ts - CODE_LOAD::ts/src/use_cases/microservices/role_updater.ts?1 - ``` + ```ts ! role_updater.ts + CODE_LOAD::ts/src/use_cases/microservices/role_updater.ts?1 + ``` - ### !!steps Scalability, concurrency, consistency + ### !!steps Scalability, concurrency, consistency - Restate sequences requests per key, if desired. - Scale out without fearing issues such as race conditions, multiple writers to state, etc. + Restate sequences requests per key, if desired. + Scale out without fearing issues such as race conditions, multiple writers to state, etc. - This handler updates a user’s role in set of systems. - Other updates to the same user are queued and processed in order. - Updates either succeed or fail, but never leave the user in an inconsistent state. + This handler updates a user’s role in set of systems. + Other updates to the same user are queued and processed in order. + Updates either succeed or fail, but never leave the user in an inconsistent state. - ```ts ! role_updater.ts - CODE_LOAD::ts/src/use_cases/microservices/role_updater.ts?2 - ``` + ```ts ! role_updater.ts + CODE_LOAD::ts/src/use_cases/microservices/role_updater.ts?2 + ``` - ### !!steps Persist progress + ### !!steps Persist progress - Store results of interaction with external systems or non-deterministic actions. - On retries, the action does not get re-executed but the previous result will be returned. + Store results of interaction with external systems or non-deterministic actions. + On retries, the action does not get re-executed but the previous result will be returned. - ```ts ! role_updater.ts - CODE_LOAD::ts/src/use_cases/microservices/role_updater.ts?3 - ``` + ```ts ! role_updater.ts + CODE_LOAD::ts/src/use_cases/microservices/role_updater.ts?3 + ``` - ### !!steps Sagas and rollbacks + ### !!steps Sagas and rollbacks - Implement compensation logic with Restate’s durable building blocks and Restate takes care of running it till completion. + Implement compensation logic with Restate’s durable building blocks and Restate takes care of running it till completion. - Here, we update the user’s role in multiple systems. If one fails, we rollback the changes in the other systems. + Here, we update the user’s role in multiple systems. If one fails, we rollback the changes in the other systems. - ```ts ! role_updater.ts - CODE_LOAD::ts/src/use_cases/microservices/role_updater.ts?4 - ``` + ```ts ! role_updater.ts + CODE_LOAD::ts/src/use_cases/microservices/role_updater.ts?4 + ``` - - - - + + + + - ### !!steps Distributed, durable building blocks - Work with objects, functions, and promises as if failures don’t happen. - Restate makes them distributed and durable out-of-the-box. + ### !!steps Distributed, durable building blocks + Work with objects, functions, and promises as if failures don’t happen. + Restate makes them distributed and durable out-of-the-box. - ```java ! RoleUpdateService.java - CODE_LOAD::java/src/main/java/usecases/microservices/RoleUpdateService.java?1 - ``` + ```java ! RoleUpdateService.java + CODE_LOAD::java/src/main/java/usecases/microservices/RoleUpdateService.java?1 + ``` - ### !!steps Scalability, concurrency, consistency + ### !!steps Scalability, concurrency, consistency - Restate sequences requests per key, if desired. - Scale out without fearing issues such as race conditions, multiple writers to state, etc. + Restate sequences requests per key, if desired. + Scale out without fearing issues such as race conditions, multiple writers to state, etc. - This handler updates a user’s role in set of systems. - Other updates to the same user are queued and processed in order. - Updates either succeed or fail, but never leave the user in an inconsistent state. + This handler updates a user’s role in set of systems. + Other updates to the same user are queued and processed in order. + Updates either succeed or fail, but never leave the user in an inconsistent state. - ```java ! RoleUpdateService.java - CODE_LOAD::java/src/main/java/usecases/microservices/RoleUpdateService.java?2 - ``` + ```java ! RoleUpdateService.java + CODE_LOAD::java/src/main/java/usecases/microservices/RoleUpdateService.java?2 + ``` - ### !!steps Persist progress + ### !!steps Persist progress - Store results of interaction with external systems or non-deterministic actions. - On retries, the action does not get re-executed but the previous result will be returned. + Store results of interaction with external systems or non-deterministic actions. + On retries, the action does not get re-executed but the previous result will be returned. - ```java ! RoleUpdateService.java - CODE_LOAD::java/src/main/java/usecases/microservices/RoleUpdateService.java?3 - ``` + ```java ! RoleUpdateService.java + CODE_LOAD::java/src/main/java/usecases/microservices/RoleUpdateService.java?3 + ``` - ### !!steps Sagas and rollbacks + ### !!steps Sagas and rollbacks - Implement compensation logic with Restate’s durable building blocks and Restate takes care of running it till completion. + Implement compensation logic with Restate’s durable building blocks and Restate takes care of running it till completion. - Here, we update the user’s role in multiple systems. If one fails, we rollback the changes in the other systems. + Here, we update the user’s role in multiple systems. If one fails, we rollback the changes in the other systems. - ```java ! RoleUpdateService.java - CODE_LOAD::java/src/main/java/usecases/microservices/RoleUpdateService.java?4 - ``` + ```java ! RoleUpdateService.java + CODE_LOAD::java/src/main/java/usecases/microservices/RoleUpdateService.java?4 + ``` - - - - + + + + - ### !!steps Distributed, durable building blocks - Work with objects, functions, and promises as if failures don’t happen. - Restate makes them distributed and durable out-of-the-box. + ### !!steps Distributed, durable building blocks + Work with objects, functions, and promises as if failures don’t happen. + Restate makes them distributed and durable out-of-the-box. - ```kotlin ! RoleUpdateService.kt - CODE_LOAD::kotlin/src/main/kotlin/usecases/microservices/RoleUpdateService.kt?1 - ``` + ```kotlin ! RoleUpdateService.kt + CODE_LOAD::kotlin/src/main/kotlin/usecases/microservices/RoleUpdateService.kt?1 + ``` - ### !!steps Scalability, concurrency, consistency + ### !!steps Scalability, concurrency, consistency - Restate sequences requests per key, if desired. - Scale out without fearing issues such as race conditions, multiple writers to state, etc. + Restate sequences requests per key, if desired. + Scale out without fearing issues such as race conditions, multiple writers to state, etc. - This handler updates a user’s role in set of systems. - Other updates to the same user are queued and processed in order. - Updates either succeed or fail, but never leave the user in an inconsistent state. + This handler updates a user’s role in set of systems. + Other updates to the same user are queued and processed in order. + Updates either succeed or fail, but never leave the user in an inconsistent state. - ```kotlin ! RoleUpdateService.kt - CODE_LOAD::kotlin/src/main/kotlin/usecases/microservices/RoleUpdateService.kt?2 - ``` + ```kotlin ! RoleUpdateService.kt + CODE_LOAD::kotlin/src/main/kotlin/usecases/microservices/RoleUpdateService.kt?2 + ``` - ### !!steps Persist progress + ### !!steps Persist progress - Store results of interaction with external systems or non-deterministic actions. - On retries, the action does not get re-executed but the previous result will be returned. + Store results of interaction with external systems or non-deterministic actions. + On retries, the action does not get re-executed but the previous result will be returned. - ```kotlin ! RoleUpdateService.kt - CODE_LOAD::kotlin/src/main/kotlin/usecases/microservices/RoleUpdateService.kt?3 - ``` + ```kotlin ! RoleUpdateService.kt + CODE_LOAD::kotlin/src/main/kotlin/usecases/microservices/RoleUpdateService.kt?3 + ``` - ### !!steps Sagas and rollbacks + ### !!steps Sagas and rollbacks - Implement compensation logic with Restate’s durable building blocks and Restate takes care of running it till completion. + Implement compensation logic with Restate’s durable building blocks and Restate takes care of running it till completion. - Here, we update the user’s role in multiple systems. If one fails, we rollback the changes in the other systems. + Here, we update the user’s role in multiple systems. If one fails, we rollback the changes in the other systems. - ```kotlin ! RoleUpdateService.kt - CODE_LOAD::kotlin/src/main/kotlin/usecases/microservices/RoleUpdateService.kt?4 - ``` + ```kotlin ! RoleUpdateService.kt + CODE_LOAD::kotlin/src/main/kotlin/usecases/microservices/RoleUpdateService.kt?4 + ``` - - - - + + + + - ### !!steps Distributed, durable building blocks - Work with objects, functions, and promises as if failures don’t happen. - Restate makes them distributed and durable out-of-the-box. + ### !!steps Distributed, durable building blocks + Work with objects, functions, and promises as if failures don’t happen. + Restate makes them distributed and durable out-of-the-box. - ```go ! roleupdater.go - CODE_LOAD::go/usecases/microservices/roleupdater.go?1 - ``` + ```go ! roleupdater.go + CODE_LOAD::go/usecases/microservices/roleupdater.go?1 + ``` + ### !!steps Scalability, concurrency, consistency + Restate sequences requests per key, if desired. + Scale out without fearing issues such as race conditions, multiple writers to state, etc. - ### !!steps Scalability, concurrency, consistency + This handler updates a user’s role in set of systems. + Other updates to the same user are queued and processed in order. + Updates either succeed or fail, but never leave the user in an inconsistent state. - Restate sequences requests per key, if desired. - Scale out without fearing issues such as race conditions, multiple writers to state, etc. + ```go ! roleupdater.go + CODE_LOAD::go/usecases/microservices/roleupdater.go?2 + ``` - This handler updates a user’s role in set of systems. - Other updates to the same user are queued and processed in order. - Updates either succeed or fail, but never leave the user in an inconsistent state. + ### !!steps Persist progress - ```go ! roleupdater.go - CODE_LOAD::go/usecases/microservices/roleupdater.go?2 - ``` + Store results of interaction with external systems or non-deterministic actions. + On retries, the action does not get re-executed but the previous result will be returned. + ```go ! roleupdater.go + CODE_LOAD::go/usecases/microservices/roleupdater.go?3 + ``` + ### !!steps Sagas and rollbacks - ### !!steps Persist progress + Implement compensation logic with Restate’s durable building blocks and Restate takes care of running it till completion. - Store results of interaction with external systems or non-deterministic actions. - On retries, the action does not get re-executed but the previous result will be returned. + Here, we update the user’s role in multiple systems. If one fails, we rollback the changes in the other systems. - ```go ! roleupdater.go - CODE_LOAD::go/usecases/microservices/roleupdater.go?3 - ``` + ```go ! roleupdater.go + CODE_LOAD::go/usecases/microservices/roleupdater.go?4 + ``` + + + + + ### !!steps Distributed, durable building blocks + Work with objects, functions, and promises as if failures don’t happen. + Restate makes them distributed and durable out-of-the-box. - ### !!steps Sagas and rollbacks + ```py ! role_updater.py + CODE_LOAD::python/src/use_cases/microservices/role_updater.py?1 + ``` - Implement compensation logic with Restate’s durable building blocks and Restate takes care of running it till completion. + ### !!steps Scalability, concurrency, consistency - Here, we update the user’s role in multiple systems. If one fails, we rollback the changes in the other systems. + Restate sequences requests per key, if desired. + Scale out without fearing issues such as race conditions, multiple writers to state, etc. - ```go ! roleupdater.go - CODE_LOAD::go/usecases/microservices/roleupdater.go?4 - ``` + This handler updates a user’s role in set of systems. + Other updates to the same user are queued and processed in order. + Updates either succeed or fail, but never leave the user in an inconsistent state. - + ```py ! role_updater.py + CODE_LOAD::python/src/use_cases/microservices/role_updater.py?2 + ``` + + ### !!steps Persist progress + + Store results of interaction with external systems or non-deterministic actions. + On retries, the action does not get re-executed but the previous result will be returned. + + ```py ! role_updater.py + CODE_LOAD::python/src/use_cases/microservices/role_updater.py?3 + ``` + + ### !!steps Sagas and rollbacks + + Implement compensation logic with Restate’s durable building blocks and Restate takes care of running it till completion. + + Here, we update the user’s role in multiple systems. If one fails, we rollback the changes in the other systems. + + ```py ! role_updater.py + CODE_LOAD::python/src/use_cases/microservices/role_updater.py?4 + ``` + +

Proxy RPC calls to other services via Restate and get durability and idempotency for free.

- + @@ -329,6 +376,30 @@ Turn functions into durable handlers with the Restate SDK. CODE_LOAD::kotlin/src/main/kotlin/usecases/microservices/Idempotency.kt?1 ``` + + + + + + ### !!steps Durable RPC + Send requests to other services and Restate ensures they are delivered and processed. + Requests can be done as request-response, as message, or delayed action. + + ```py ! product_reservation.py + CODE_LOAD::python/src/use_cases/microservices/idempotency.py + ``` + + ### !!steps Idempotency for free + + Add an idempotency token to your request and let Restate take care of deduplication for you. + Duplicate requests latch on to the previous one and see the same response. + + Here, we reserve a product for a user. We connect to Restate and send a `reserve` request with an idempotency key so retries wouldn't lead to double reservations. + + ```py ! product_reservation.py + CODE_LOAD::python/src/use_cases/microservices/idempotency.py?1 + ``` + @@ -403,7 +474,8 @@ Turn functions into durable handlers with the Restate SDK. ), ts: "/get_started/tour?sdk=ts", java: "/get_started/tour?sdk=java", - go: "/get_started/tour?sdk=go" + go: "/get_started/tour?sdk=go", + python: "/get_started/tour?sdk=python" }, { title: 'Need help?', diff --git a/docs/use-cases/workflows.mdx b/docs/use-cases/workflows.mdx index 3da2e2fd..b91665d7 100644 --- a/docs/use-cases/workflows.mdx +++ b/docs/use-cases/workflows.mdx @@ -15,8 +15,13 @@ import SpotlightCard from "../../src/components/SpotlightCard" import TwoColumnText from "../../src/components/TwoColumnText" import Tabs from "@theme/Tabs" import TabItem from "@theme/TabItem" -import {Scrollycoding} from "../../src/components/code/scrollycoding"; + + + + + +

Workflows-as-code

Lightweight, flexible, durable.

@@ -48,7 +53,7 @@ import {Scrollycoding} from "../../src/components/code/scrollycoding"; Implement the `run` function of your workflow, using the Restate SDK. - + @@ -60,8 +65,6 @@ Implement the `run` function of your workflow, using the Restate SDK. CODE_LOAD::ts/src/use_cases/workflows/signup_workflow.ts?1 ``` - - ### !!steps Persist progress Store results of intermediate steps, interaction with external systems, or non-deterministic actions. @@ -72,8 +75,6 @@ Implement the `run` function of your workflow, using the Restate SDK. CODE_LOAD::ts/src/use_cases/workflows/signup_workflow.ts?2 ``` - - ### !!steps Workflow state Use Restate’s built-in key-value store to store workflow state. @@ -83,8 +84,6 @@ Implement the `run` function of your workflow, using the Restate SDK. CODE_LOAD::ts/src/use_cases/workflows/signup_workflow.ts?3 ``` - - ### !!steps Query workflow state Retrieve the current state of the workflow from within other handlers and expose it to external clients. @@ -93,8 +92,6 @@ Implement the `run` function of your workflow, using the Restate SDK. CODE_LOAD::ts/src/use_cases/workflows/signup_workflow.ts?4 ``` - - ### !!steps Wait on external signals Make Promises/Futures resilient by registering them in Restate. @@ -104,8 +101,6 @@ Implement the `run` function of your workflow, using the Restate SDK. CODE_LOAD::ts/src/use_cases/workflows/signup_workflow.ts?5 ``` - - ### !!steps Signal in-flight workflows Notify the workflow of external signals, callbacks or Kafka events. @@ -115,8 +110,6 @@ Implement the `run` function of your workflow, using the Restate SDK. CODE_LOAD::ts/src/use_cases/workflows/signup_workflow.ts?6 ``` - - ### !!steps Flexible failure handling Implement sagas and compensations in code, as per your requirements. @@ -138,8 +131,6 @@ Implement the `run` function of your workflow, using the Restate SDK. CODE_LOAD::java/src/main/java/usecases/workflows/SignupWorkflow.java?1 ``` - - ### !!steps Persist progress Store results of intermediate steps, interaction with external systems, or non-deterministic actions. @@ -150,8 +141,6 @@ Implement the `run` function of your workflow, using the Restate SDK. CODE_LOAD::java/src/main/java/usecases/workflows/SignupWorkflow.java?2 ``` - - ### !!steps Workflow state Use Restate’s built-in key-value store to store workflow state. @@ -161,8 +150,6 @@ Implement the `run` function of your workflow, using the Restate SDK. CODE_LOAD::java/src/main/java/usecases/workflows/SignupWorkflow.java?3 ``` - - ### !!steps Query workflow state Retrieve the current state of the workflow from within other handlers and expose it to external clients. @@ -171,8 +158,6 @@ Implement the `run` function of your workflow, using the Restate SDK. CODE_LOAD::java/src/main/java/usecases/workflows/SignupWorkflow.java?4 ``` - - ### !!steps Wait on external signals Make Promises/Futures resilient by registering them in Restate. @@ -182,8 +167,6 @@ Implement the `run` function of your workflow, using the Restate SDK. CODE_LOAD::java/src/main/java/usecases/workflows/SignupWorkflow.java?5 ``` - - ### !!steps Signal in-flight workflows Notify the workflow of external signals, callbacks or Kafka events. @@ -193,8 +176,6 @@ Implement the `run` function of your workflow, using the Restate SDK. CODE_LOAD::java/src/main/java/usecases/workflows/SignupWorkflow.java?6 ``` - - ### !!steps Flexible failure handling Implement sagas and compensations in code, as per your requirements. @@ -283,6 +264,72 @@ Implement the `run` function of your workflow, using the Restate SDK. + + + + ### !!steps Run once, idempotently + A workflow runs exactly one time. + Restate makes sure that duplicate requests do not lead to duplicate execution. + + ```python ! user_sign_up_flow.py + CODE_LOAD::python/src/use_cases/workflows/signup_workflow.py?1 + ``` + + ### !!steps Persist progress + + Store results of intermediate steps, interaction with external systems, or non-deterministic actions. + Restate makes sure that on retries, the code does not get re-executed and the previous result is returned. + This lets you execute the steps of your workflows durably. + + ```python ! user_sign_up_flow.py + CODE_LOAD::python/src/use_cases/workflows/signup_workflow.py?2 + ``` + + ### !!steps Workflow state + + Use Restate’s built-in key-value store to store workflow state. + Restate guarantees that it is consistent and persistent, since state updates are tracked together with the rest of the execution progress. + + ```python ! user_sign_up_flow.py + CODE_LOAD::python/src/use_cases/workflows/signup_workflow.py?3 + ``` + + ### !!steps Query workflow state + + Retrieve the current state of the workflow from within other handlers and expose it to external clients. + + ```python ! user_sign_up_flow.py + CODE_LOAD::python/src/use_cases/workflows/signup_workflow.py?4 + ``` + + ### !!steps Wait on external signals + + Make Promises/Futures resilient by registering them in Restate. + Share them and wait until other functions resolve them. + + ```python ! user_sign_up_flow.py + CODE_LOAD::python/src/use_cases/workflows/signup_workflow.py?5 + ``` + + ### !!steps Signal in-flight workflows + + Notify the workflow of external signals, callbacks or Kafka events. + Resolve or reject shared promises on which the workflow is waiting. The workflow handles the outcome. + + ```python ! user_sign_up_flow.py + CODE_LOAD::python/src/use_cases/workflows/signup_workflow.py?6 + ``` + + ### !!steps Flexible failure handling + + Implement sagas and compensations in code, as per your requirements. + + ```python ! user_sign_up_flow.py + CODE_LOAD::python/src/use_cases/workflows/signup_workflow.py?7 + ``` + + + @@ -298,7 +345,7 @@ Implement the `run` function of your workflow, using the Restate SDK. ]}/> - + @@ -335,6 +382,18 @@ Implement the `run` function of your workflow, using the Restate SDK. + + + ### Latch on to a workflow + A workflow runs exactly one time. + If the caller loses the connection to the workflow, he can latch on again to retrieve the result. + + ```python !result workflow_submitter.py + CODE_LOAD::python/src/use_cases/workflows/submit.py + ``` + + + @@ -392,7 +451,9 @@ Implement the `run` function of your workflow, using the Restate SDK. "Read the docs to learn more." ), ts: "/develop/ts/workflows", - java: "/develop/java/workflows" + java: "/develop/java/workflows?sdk=java", + kotlin: "/develop/java/workflows?sdk=kotlin", + python: "/develop/python/workflows", }, { title: 'Need help?', diff --git a/src/components/FeatureWidget/index.tsx b/src/components/FeatureWidget/index.tsx index c28fa783..f598f305 100644 --- a/src/components/FeatureWidget/index.tsx +++ b/src/components/FeatureWidget/index.tsx @@ -10,7 +10,9 @@ type FeatureItem = { java: string; ts: string; kotlin: string; + python: string; go: string; + rust: string; link: { icon: string; url: string }; links: [{ icon: string; url: string }]; }; @@ -25,6 +27,8 @@ function Feature({ link, kotlin, go, + python, + rust, links, }: FeatureItem) { const colWidth = itemsPerRow ? Math.floor(12 / itemsPerRow) : 4; @@ -91,6 +95,30 @@ function Feature({ ) : null} + {python ? ( +
+ + + +
+ ) : null} + {rust ? ( +
+ + + +
+ ) : null} {link ? (