From 3141987e93893d87f73c7f72e3f05e7faf19b2cb Mon Sep 17 00:00:00 2001 From: Marcin Czeczko Date: Tue, 19 May 2020 10:04:53 +0200 Subject: [PATCH] AWS SQS guide --- docs/src/main/asciidoc/amazon-sqs.adoc | 463 +++++++++++++++++++++++++ 1 file changed, 463 insertions(+) create mode 100644 docs/src/main/asciidoc/amazon-sqs.adoc diff --git a/docs/src/main/asciidoc/amazon-sqs.adoc b/docs/src/main/asciidoc/amazon-sqs.adoc new file mode 100644 index 0000000000000..883bc87161304 --- /dev/null +++ b/docs/src/main/asciidoc/amazon-sqs.adoc @@ -0,0 +1,463 @@ +//// +This guide is maintained in the main Quarkus repository +and pull requests should be submitted there: +https://github.com/quarkusio/quarkus/tree/master/docs/src/main/asciidoc +//// += Quarkus - Amazon SQS Client +:extension-status: preview + +include::./attributes.adoc[] + +Amazon Simple Queue Service (SQS) is a fully managed message queuing service. +Using SQS, you can send, store, and receive messages between software components at any volume, without losing messages or requiring other +services to be available. +SQS offers two types of message queues. Standard queues offer maximum throughput, best-effort ordering and at-least-once delivery. +SQS FIFO queues are designed to guarantee that messages are processes exactly once, on the exact order that they were sent. + +You can find more information about SQS at https://aws.amazon.com/sws/[the Amazon SQS website]. + +NOTE: The SQS extension is based on https://docs.aws.amazon.com/sdk-for-java/v2/developer-guide/welcome.html[AWS Java SDK 2.x]. +It's a major rewrite of the 1.x code base that offers two programming models (Blocking & Async). + +include::./status-include.adoc[] + +The Quarkus extension supports two programming models: + +* Blocking access using URL Connection HTTP client (by default) or the Apache HTTP Client +* https://docs.aws.amazon.com/sdk-for-java/v2/developer-guide/basics-async.html[Asynchronous programming] based on JDK's `CompletableFuture` objects and the Netty HTTP client. + +In this guide, we see how you can get your REST services to use SQS locally and on AWS. + +== Prerequisites + +To complete this guide, you need: + +* JDK 1.8+ installed with `JAVA_HOME` configured appropriately +* an IDE +* Apache Maven {maven-version} +* An AWS Account to access the SQS service +* Docker for your system to run SQS locally for testing purposes + +=== Set up SQS locally + +The easiest way to start working with SQS is to run a local instance as a container. + +[source,shell,subs="verbatim,attributes"] +---- +docker run --rm --name local-sqs 8010:4576 -e SERVICES=sqs -e START_WEB=0 -d localstack/localstack:0.11.1 +---- +This starts a SQS instance that is accessible on port `8010`. + +Create an AWS profile for your local instance using AWS CLI: +[source,shell,subs="verbatim,attributes"] +---- +$ aws configure --profile localstack +AWS Access Key ID [None]: test-key +AWS Secret Access Key [None]: test-secret +Default region name [None]: us-east-1 +Default output format [None]: +---- + +=== Create a SQS queue + +Create a SQS queue using AWS CLI and store in `QUEUE_URL` environment variable. + +[source,shell,subs="verbatim,attributes"] +---- +QUEUE_URL=`aws sqs create-queue --queue-name=ColliderQueue --profile localstack --endpoint-url=http://localhost:8010` +---- + +Or, if you want to use your SQS queue on your AWS account create a queue using your default profile +[source,shell,subs="verbatim,attributes"] +---- +QUEUE_URL=`aws sqs create-queue --queue-name=ColliderQueue` +---- + +== Solution +The application built here allows shooting an elementary particles (quarks) into a `ColliderQueue` queue of the AWS SQS. +Additionally, we create a resource that allows receiving those quarks from the `ColliderQueue` queue in the order they were sent. + +We recommend that you follow the instructions in the next sections and create the application step by step. +However, you can go right to the completed example. + +Clone the Git repository: `git clone {quickstarts-clone-url}`, or download an {quickstarts-archive-url}[archive]. + +The solution is located in the `amazon-sqs-quickstart` {quickstarts-tree-url}/amazon-sqs-quickstart[directory]. + +== Creating the Maven project + +First, we need a new project. Create a new project with the following command: + +[source,shell,subs=attributes+] +---- +mvn io.quarkus:quarkus-maven-plugin:{quarkus-version}:create \ + -DprojectGroupId=org.acme \ + -DprojectArtifactId=amazon-sqs-quickstart \ + -DclassName="org.acme.sqs.QuarksCannonSyncResource" \ + -Dpath="/sync-cannon" \ + -Dextensions="resteasy-jsonb,amazon-sqs,resteasy-mutiny" +cd amazon-sqs-quickstart +---- + +This command generates a Maven structure importing the RESTEasy/JAX-RS, Mutiny and Amazon SQS Client extensions. +After this, the `amazon-sqs` extension has been added to your `pom.xml` as well as the Mutiny support for RESTEasy. + +== Creating JSON REST service + +In this example, we will create an application that sends quarks via the queue. The example application will demonstrate the two programming models supported by the extension. + +First, let's create the `Quark` bean as follows: + +[source,java] +---- +package org.acme.sqs.model; + +import io.quarkus.runtime.annotations.RegisterForReflection; +import java.util.Objects; + +@RegisterForReflection +public class Quark { + + private String flavor; + private String spin; + + public Quark() { + } + + public String getFlavor() { + return flavor; + } + + public void setFlavor(String flavor) { + this.flavor = flavor; + } + + public String getSpin() { + return spin; + } + + public void setSpin(String spin) { + this.spin = spin; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof Quark)) { + return false; + } + + Quark other = (Quark) obj; + + return Objects.equals(other.flavor, this.flavor); + } + + @Override + public int hashCode() { + return Objects.hash(this.flavor); + } +} +---- +Then, create a `org.acme.sqs.QuarksCannonSyncResource` that will provide an API to shoot quarks into the SQS queue using the synchronous client. + +[source,java] +---- +package org.acme.sqs; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.acme.sqs.model.Quark; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.jboss.logging.Logger; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.SendMessageResponse; + +@Path("/sync/cannon") +@Produces(MediaType.TEXT_PLAIN) +public class QuarksCannonSyncResource { + + private static final Logger LOGGER = Logger.getLogger(QuarksCannonSyncResource.class); + + @Inject + SqsClient sqs; + + @ConfigProperty(name = "queue.url") + String queueUrl; + + static ObjectWriter QUARK_WRITER = new ObjectMapper().writerFor(Quark.class); + + @POST + @Path("/shoot") + @Consumes(MediaType.APPLICATION_JSON) + public Response sendMessage(Quark quark) throws Exception { + String message = QUARK_WRITER.writeValueAsString(quark); + SendMessageResponse response = sqs.sendMessage(m -> m.queueUrl(queueUrl).messageBody(message)); + LOGGER.infov("Fired Quark[{0}, {1}}]", quark.getFlavor(), quark.getSpin()); + return Response.ok().entity(response.messageId()).build(); + } +} +---- + +Because of the fact messages sent to the queue must be a `String`, we're using Jackson's `ObjectWriter` in order to serialize our `Quark` objects into a `String`. + +Now, create the `org.acme.QuarksShieldSyncResource` REST resources that provides an endpoint to read the messages from the `ColliderQueue` queue. + +[source,java] +---- +package org.acme.sqs; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import java.util.List; +import java.util.stream.Collectors; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import org.acme.sqs.model.Quark; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.jboss.logging.Logger; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; + +@Path("/sync/shield") +public class QuarksShieldSyncResource { + + private static final Logger LOGGER = Logger.getLogger(QuarksShieldSyncResource.class); + + @Inject + SqsClient sqs; + + @ConfigProperty(name = "queue.url") + String queueUrl; + + static ObjectReader QUARK_READER = new ObjectMapper().readerFor(Quark.class); + + @GET + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public List receive() { + List messages = sqs.receiveMessage(m -> m.maxNumberOfMessages(10).queueUrl(queueUrl)).messages(); + + return messages.stream() + .map(Message::body) + .map(this::toQuark) + .collect(Collectors.toList()); + } + + private Quark toQuark(String message) { + Quark quark = null; + try { + quark = QUARK_READER.readValue(message); + } catch (Exception e) { + LOGGER.error("Error decoding message", e); + throw new RuntimeException(e); + } + return quark; + } +} +---- + +We are using here a Jackson's `ObjectReader` in order to deserialize queue messages into our `Quark` POJOs. + +== Configuring SQS clients + +Both SQS clients (sync and async) are configurable via the `application.properties` file that can be provided in the `src/main/resources` directory. +Additionally, you need to add to the classpath a proper implementation of the sync client. By default the extension uses the URL connection HTTP client, so +you need to add a URL connection client dependency to the `pom.xml` file: + +[source,xml] +---- + + software.amazon.awssdk + url-connection-client + +---- + +If you want to use Apache HTTP client instead, configure it as follows: +[source,properties] +---- +quarkus.sqs.sync-client.type=apache +---- + +And add the following dependency to the application `pom.xml`: +[source,xml] +---- + + software.amazon.awssdk + apache-client + +---- + +If you're going to use a local SQS instance, configure it as follows: + +[source,properties] +---- +quarkus.sqs.endpoint-override=http://localhost:8010 + +quarkus.sqs.aws.region=us-east-1 +quarkus.sqs.aws.credentials.type=static +quarkus.sqs.aws.credentials.static-provider.access-key-id=test-key +quarkus.sqs.aws.credentials.static-provider.secret-access-key=test-secret +---- + +- `quarkus.sqs.aws.region` - It's required by the client, but since you're using a local SQS instance use `us-east-1` as it's a default region of localstack's SQS. +- `quarkus.sqs.aws.credentials.type` - Set `static` credentials provider with any values for `access-key-id` and `secret-access-key` +- `quarkus.sqs.endpoint-override` - Override the SQS client to use a local instance instead of an AWS service + +If you want to work with an AWS account, you can simply remove or comment out all SQS related properties. By default, the SQS client extension +will use the `default` credentials provider chain that looks for credentials in this order: +- Java System Properties - `aws.accessKeyId` and `aws.secretKey` +* Environment Variables - `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` +* Credential profiles file at the default location (`~/.aws/credentials`) shared by all AWS SDKs and the AWS CLI +* Credentials delivered through the Amazon EC2 container service if the `AWS_CONTAINER_CREDENTIALS_RELATIVE_URI` environment variable is set and the security manager has permission to access the variable, +* Instance profile credentials delivered through the Amazon EC2 metadata service + +And the region from your AWS CLI profile will be used. + +== Next steps + +=== Packaging + +Packaging your application is as simple as `./mvnw clean package`. +It can be run with `java -Dqueue.url=$QUEUE_URL -jar target/amazon-sqs-quickstart-1.0-SNAPSHOT-runner.jar`. + +With GraalVM installed, you can also create a native executable binary: `./mvnw clean package -Dnative`. +Depending on your system, that will take some time. + +=== Going asynchronous + +Thanks to the AWS SDK v2.x used by the Quarkus extension, you can use the asynchronous programming model out of the box. + +Create a `org.acme.sqs.QuarksCannonAsyncResource` REST resource that will be similar to our `QuarksCannonSyncResource` but using an asynchronous programming model. + +[source,java] +---- +package org.acme.sqs; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import io.smallrye.mutiny.Uni; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.acme.sqs.model.Quark; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.jboss.logging.Logger; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.SendMessageResponse; + +@Path("/async/cannon") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +public class QuarksCannonAsyncResource { + + private static final Logger LOGGER = Logger.getLogger(QuarksCannonAsyncResource.class); + + @Inject + SqsAsyncClient sqs; + + @ConfigProperty(name = "queue.url") + String queueUrl; + + static ObjectWriter QUARK_WRITER = new ObjectMapper().writerFor(Quark.class); + + @POST + @Path("/shoot") + @Consumes(MediaType.APPLICATION_JSON) + public Uni sendMessage(Quark quark) throws Exception { + String message = QUARK_WRITER.writeValueAsString(quark); + return Uni.createFrom() + .completionStage(sqs.sendMessage(m -> m.queueUrl(queueUrl).messageBody(message))) + .onItem().invoke(item -> LOGGER.infov("Fired Quark[{0}, {1}}]", quark.getFlavor(), quark.getSpin())) + .onItem().apply(SendMessageResponse::messageId) + .onItem().apply(id -> Response.ok().entity(id).build()); + } +} +---- +We create `Uni` instances from the `CompletionStage` objects returned by the asynchronous SQS client, and then transform the emitted item. + +And the corresponding async receiver of the queue messages `org.acme.sqs.QuarksShieldAsyncResource` +[source,java] +---- +package org.acme.sqs; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import io.smallrye.mutiny.Uni; +import java.util.List; +import java.util.stream.Collectors; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import org.acme.sqs.model.Quark; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.jboss.logging.Logger; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; + +@Path("/async/shield") +public class QuarksShieldAsyncResource { + + private static final Logger LOGGER = Logger.getLogger(QuarksShieldAsyncResource.class); + + @Inject + SqsAsyncClient sqs; + + @ConfigProperty(name = "queue.url") + String queueUrl; + + static ObjectReader QUARK_READER = new ObjectMapper().readerFor(Quark.class); + + @GET + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Uni> receive() { + return Uni.createFrom() + .completionStage(sqs.receiveMessage(m -> m.maxNumberOfMessages(10).queueUrl(queueUrl))) + .onItem().apply(ReceiveMessageResponse::messages) + .onItem().apply(m -> m.stream().map(Message::body).map(this::toQuark).collect(Collectors.toList())); + } + + private Quark toQuark(String message) { + Quark quark = null; + try { + quark = QUARK_READER.readValue(message); + } catch (Exception e) { + LOGGER.error("Error decoding message", e); + throw new RuntimeException(e); + } + return quark; + } +} +---- + +And we need to add the Netty HTTP client dependency to the `pom.xml`: + +[source,xml] +---- + + software.amazon.awssdk + netty-nio-client + +---- + +== Configuration Reference + +include::{generated-dir}/config/quarkus-amazon-sqs.adoc[opts=optional, leveloffset=+1]