From c7e23f7e4c3d015bba726c955d0c0df980c341f8 Mon Sep 17 00:00:00 2001 From: Matthew Miller Date: Wed, 1 May 2019 14:48:07 -0700 Subject: [PATCH] Add design for automatic event streaming reconnects. --- .../reconnect/CurrentState.java | 117 ++++++++++++++++++ .../core/event-streaming/reconnect/README.md | 81 ++++++++++++ .../reconnect/prototype/Option1.java | 79 ++++++++++++ .../reconnect/prototype/Option2.java | 89 +++++++++++++ 4 files changed, 366 insertions(+) create mode 100644 docs/design/core/event-streaming/reconnect/CurrentState.java create mode 100644 docs/design/core/event-streaming/reconnect/README.md create mode 100644 docs/design/core/event-streaming/reconnect/prototype/Option1.java create mode 100644 docs/design/core/event-streaming/reconnect/prototype/Option2.java diff --git a/docs/design/core/event-streaming/reconnect/CurrentState.java b/docs/design/core/event-streaming/reconnect/CurrentState.java new file mode 100644 index 000000000000..d17b57e00766 --- /dev/null +++ b/docs/design/core/event-streaming/reconnect/CurrentState.java @@ -0,0 +1,117 @@ +/* + * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package software.amazon.awssdk.services.transcribestreaming; + +import com.github.davidmoten.rx2.Bytes; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import org.junit.Test; +import org.reactivestreams.Publisher; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.transcribestreaming.model.AudioEvent; +import software.amazon.awssdk.services.transcribestreaming.model.AudioStream; +import software.amazon.awssdk.services.transcribestreaming.model.LanguageCode; +import software.amazon.awssdk.services.transcribestreaming.model.MediaEncoding; +import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionRequest; +import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionResponseHandler; +import software.amazon.awssdk.services.transcribestreaming.model.TranscriptEvent; +import software.amazon.awssdk.services.transcribestreaming.model.TranscriptResultStream; + +public class CurrentState { + private File audioFile = new File(getClass().getClassLoader().getResource("silence_16kHz_s16le.wav").getFile()); + + @Test + public void demoCurrentState() throws FileNotFoundException { + try (TranscribeStreamingAsyncClient client = TranscribeStreamingAsyncClient.create()) { + // Create the audio stream for transcription - we have to create a publisher that resumes where it left off. + // If we don't, we'll replay the whole thing again on a reconnect. + Publisher audioStream = + Bytes.from(new FileInputStream(audioFile)) + .map(SdkBytes::fromByteArray) + .map(bytes -> AudioEvent.builder().audioChunk(bytes).build()) + .cast(AudioStream.class); + + CompletableFuture result = printAudio(client, audioStream, null, 3); + result.join(); + } + } + + private CompletableFuture printAudio(TranscribeStreamingAsyncClient client, + Publisher audioStream, + String sessionId, + int resumesRemaining) { + if (resumesRemaining == 0) { + CompletableFuture result = new CompletableFuture<>(); + result.completeExceptionally(new IllegalStateException("Failed to resume audio, because the maximum resumes " + + "have been exceeded.")); + return result; + } + + // Create the request for transcribe that includes the audio metadata + StartStreamTranscriptionRequest audioMetadata = + StartStreamTranscriptionRequest.builder() + .languageCode(LanguageCode.EN_US) + .mediaEncoding(MediaEncoding.PCM) + .mediaSampleRateHertz(16_000) + .sessionId(sessionId) + .build(); + + // Create the transcription handler + AtomicReference atomicSessionId = new AtomicReference<>(sessionId); + Consumer reader = event -> { + if (event instanceof TranscriptEvent) { + TranscriptEvent transcriptEvent = (TranscriptEvent) event; + System.out.println(transcriptEvent.transcript().results()); + } + }; + + StartStreamTranscriptionResponseHandler responseHandler = + StartStreamTranscriptionResponseHandler.builder() + .onResponse(r -> atomicSessionId.set(r.sessionId())) + .subscriber(reader) + .build(); + + // Start talking with transcribe + return client.startStreamTranscription(audioMetadata, audioStream, responseHandler) + .handle((x, error) -> resumePrintAudio(client, audioStream, atomicSessionId.get(), resumesRemaining, error)) + .thenCompose(flatten -> flatten); + } + + private CompletableFuture resumePrintAudio(TranscribeStreamingAsyncClient client, + Publisher audioStream, + String sessionId, + int resumesRemaining, + Throwable error) { + if (error == null) { + return CompletableFuture.completedFuture(null); + } + + System.out.print("Error happened. Reconnecting and trying again..."); + error.printStackTrace(); + + if (sessionId == null) { + CompletableFuture result = new CompletableFuture<>(); + result.completeExceptionally(error); + return result; + } + + // If we failed, recursively call printAudio + return printAudio(client, audioStream, sessionId, resumesRemaining - 1); + } +} diff --git a/docs/design/core/event-streaming/reconnect/README.md b/docs/design/core/event-streaming/reconnect/README.md new file mode 100644 index 000000000000..deae6745426f --- /dev/null +++ b/docs/design/core/event-streaming/reconnect/README.md @@ -0,0 +1,81 @@ +# Event Stream Reconnects + +Event streaming allows long-running bi-directional communication between +customers and AWS services over HTTP/2 connections. + +Because a single request is intended to be long-running, services +usually provide a way for a client to "resume" an interrupted session on +a new TCP connection. In Kinesis's subscribe-to-shard API, each response +event includes a `continuationSequenceNumber` that can be specified in a +request message to pick up from where the disconnect occurred. In +Transcribe's streaming-transcription API, each response includes a +`sessionId` with similar semantics. + +The current implementation requires the service to write a high-level +library for handling this logic (e.g. Kinesis's consumer library), or +for each customer to write this logic by hand. +[This hand-written code](CurrentState.java) is verbose and error prone. + +This mini-design outlines API options for the SDK automatically +reconnecting when a network error occurs. + +## [API Option 1: New Method](prototype/Option1.java) + +This option adds a new method to each streaming operation that the +customer can use to enable automatic reconnects. The customer selects to +work with or without reconnects based on the method that they use. + +```Java +try (TranscribeStreamingAsyncClient client = TranscribeStreamingAsyncClient.create()) { + // ... + // Current method (behavior is unchanged) + client.startStreamTranscription(audioMetadata, + audioStream, + responseHandler); + + // New method that transparently reconnects on network errors (name to be bikeshed) + client.startStreamTranscriptionWithReconnects(audioMetadata, + audioStream, + responseHandler); + // ... +} +``` + +## [API Option 2: New Client Configuration](prototype/Option2.java) + +This option adds a new setting on the client that the customer can use +to *disable* automatic reconnects. The customer gets automatic +reconnects by default, and would need to explicitly disable them if they +do not want them for their use-case. + +```Java +// Current method is updated to transparently reconnect on network errors +try (TranscribeStreamingAsyncClient client = + TranscribeStreamingAsyncClient.create()) { + // ... + client.startStreamTranscription(audioMetadata, + audioStream, + responseHandler); + // ... +} + +// New client configuration option can be used to configure reconnect behavior +try (TranscribeStreamingAsyncClient client = + TranscribeStreamingAsyncClient.builder() + .overrideConfiguration(c -> c.reconnectPolicy(ReconnectPolicy.none())) + .build()) { + // ... + client.startStreamTranscription(audioMetadata, + audioStream, + responseHandler); + // ... +} +``` + +## Comparison + +| | Option 1 | Option 2 | +| --- | --- | --- | +| Discoverability | - | + | +| Configurability | - | + | +| Backwards Compatibility | + | - | \ No newline at end of file diff --git a/docs/design/core/event-streaming/reconnect/prototype/Option1.java b/docs/design/core/event-streaming/reconnect/prototype/Option1.java new file mode 100644 index 000000000000..10befa055c16 --- /dev/null +++ b/docs/design/core/event-streaming/reconnect/prototype/Option1.java @@ -0,0 +1,79 @@ +/* + * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.transcribestreaming; + +import com.github.davidmoten.rx2.Bytes; +import io.reactivex.Flowable; +import java.io.File; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import org.junit.Test; +import org.reactivestreams.Publisher; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.transcribestreaming.model.AudioEvent; +import software.amazon.awssdk.services.transcribestreaming.model.AudioStream; +import software.amazon.awssdk.services.transcribestreaming.model.LanguageCode; +import software.amazon.awssdk.services.transcribestreaming.model.MediaEncoding; +import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionRequest; +import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionResponseHandler; +import software.amazon.awssdk.services.transcribestreaming.model.TranscriptEvent; +import software.amazon.awssdk.services.transcribestreaming.model.TranscriptResultStream; + +/** + * Option 1: Add a new method to hide: (1) the need for non-replayable publishers, (2) the reconnect boilerplate. + */ +public class Option1 { + private File audioFile = new File(getClass().getClassLoader().getResource("silence_16kHz_s16le.wav").getFile()); + + @Test + public void option1() { + try (TranscribeStreamingAsyncClient client = TranscribeStreamingAsyncClient.create()) { + // Create the request for transcribe that includes the audio metadata + StartStreamTranscriptionRequest audioMetadata = + StartStreamTranscriptionRequest.builder() + .languageCode(LanguageCode.EN_US) + .mediaEncoding(MediaEncoding.PCM) + .mediaSampleRateHertz(16_000) + .build(); + + // Create the audio stream for transcription + Publisher audioStream = + Bytes.from(audioFile) + .map(SdkBytes::fromByteArray) + .map(bytes -> AudioEvent.builder().audioChunk(bytes).build()) + .cast(AudioStream.class); + + // Create the visitor that handles the transcriptions from transcribe + Consumer reader = event -> { + if (event instanceof TranscriptEvent) { + TranscriptEvent transcriptEvent = (TranscriptEvent) event; + System.out.println(transcriptEvent.transcript().results()); + } + }; + + StartStreamTranscriptionResponseHandler responseHandler = StartStreamTranscriptionResponseHandler.builder() + .subscriber(reader) + .build(); + + // Start talking with transcribe using a new auto-reconnect method (method name to be bikeshed) + CompletableFuture result = client.startStreamTranscriptionWithAutoReconnect(audioMetadata, + audioStream, + responseHandler); + result.join(); + } + } + +} diff --git a/docs/design/core/event-streaming/reconnect/prototype/Option2.java b/docs/design/core/event-streaming/reconnect/prototype/Option2.java new file mode 100644 index 000000000000..2ccbf6187f93 --- /dev/null +++ b/docs/design/core/event-streaming/reconnect/prototype/Option2.java @@ -0,0 +1,89 @@ +/* + * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.transcribestreaming; + +import com.github.davidmoten.rx2.Bytes; +import io.reactivex.Flowable; +import java.io.File; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import org.junit.Test; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.transcribestreaming.model.AudioEvent; +import software.amazon.awssdk.services.transcribestreaming.model.AudioStream; +import software.amazon.awssdk.services.transcribestreaming.model.LanguageCode; +import software.amazon.awssdk.services.transcribestreaming.model.MediaEncoding; +import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionRequest; +import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionResponseHandler; +import software.amazon.awssdk.services.transcribestreaming.model.TranscriptEvent; +import software.amazon.awssdk.services.transcribestreaming.model.TranscriptResultStream; + +/** + * Option 2: Update current method to automatically reconnect and hide: (1) the need for non-replayable publishers, + * (2) the reconnect boilerplate. + * + * This behavior can be configured (e.g. disabled) via the "reconnect policy" on the client constructor. + */ +public class Option2Test { + private File audioFile = new File(getClass().getClassLoader().getResource("silence_16kHz_s16le.wav").getFile()); + + @Test + public void option2() { + try (TranscribeStreamingAsyncClient client = TranscribeStreamingAsyncClient.create()) { + // Create the request for transcribe that includes the audio metadata + StartStreamTranscriptionRequest audioMetadata = + StartStreamTranscriptionRequest.builder() + .languageCode(LanguageCode.EN_US) + .mediaEncoding(MediaEncoding.PCM) + .mediaSampleRateHertz(16_000) + .build(); + + // Create the audio stream for transcription + Flowable audioStream = + Bytes.from(audioFile) + .map(SdkBytes::fromByteArray) + .map(bytes -> AudioEvent.builder().audioChunk(bytes).build()) + .cast(AudioStream.class); + + // Create the visitor that handles the transcriptions from transcribe + Consumer reader = event -> { + if (event instanceof TranscriptEvent) { + TranscriptEvent transcriptEvent = (TranscriptEvent) event; + System.out.println(transcriptEvent.transcript().results()); + } + }; + + StartStreamTranscriptionResponseHandler responseHandler = StartStreamTranscriptionResponseHandler.builder() + .subscriber(reader) + .build(); + + // Start talking with transcribe using the existing method + CompletableFuture result = client.startStreamTranscription(audioMetadata, audioStream, responseHandler); + result.join(); + } + } + + @Test + public void disableReconnectsDemo() { + // Turn off reconnects + try (TranscribeStreamingAsyncClient client = + TranscribeStreamingAsyncClient.builder() + .overrideConfiguration(c -> c.reconnectPolicy(ReconnectPolicy.none())) + .build()) { + // .... + } + } +}