From c739b588e3b5741403e2b412f46bacf62a692088 Mon Sep 17 00:00:00 2001 From: Pascal Schulze Date: Sun, 2 Jan 2022 21:29:24 +0100 Subject: [PATCH] [FLINK-21373] RabbitMQ Connector using FLIP-143 Sink API RabbitMQ Connector using new Sink API https://issues.apache.org/jira/browse/FLINK-21373 Co-authored-by: Yannik Schroeder Co-authored-by: Jan Westphal --- flink-connector-rabbitmq/README.md | 145 +++++++- .../common/ConsistencyMode.java | 0 .../common/RabbitMQConnectionConfig.java | 17 +- .../flink/connector/rabbitmq/sink/README.md | 14 + .../connector/rabbitmq/sink/RabbitMQSink.java | 351 ++++++++++++++++++ .../sink/common/RabbitMQSinkConnection.java | 167 +++++++++ .../common/RabbitMQSinkMessageWrapper.java | 55 +++ .../common/RabbitMQSinkPublishOptions.java | 99 +++++ .../common/SerializableReturnListener.java | 34 ++ .../sink/state/RabbitMQSinkWriterState.java | 45 +++ .../RabbitMQSinkWriterStateSerializer.java | 127 +++++++ .../sink/writer/RabbitMQSinkWriterBase.java | 135 +++++++ .../RabbitMQSinkWriterAtLeastOnce.java | 173 +++++++++ .../RabbitMQSinkWriterAtMostOnce.java | 62 ++++ .../RabbitMQSinkWriterExactlyOnce.java | 137 +++++++ .../flink/connector/rabbitmq/source/README.md | 14 + .../source/RabbitMQSource.java | 2 +- .../common/RabbitMQSourceMessageWrapper.java | 0 .../enumerator/RabbitMQSourceEnumState.java | 0 .../RabbitMQSourceEnumStateSerializer.java | 0 .../enumerator/RabbitMQSourceEnumerator.java | 8 +- .../source/reader/RabbitMQCollector.java | 6 +- .../reader/RabbitMQSourceReaderBase.java | 0 .../RabbitMQSourceReaderAtLeastOnce.java | 0 .../RabbitMQSourceReaderAtMostOnce.java | 2 +- .../RabbitMQSourceReaderExactlyOnce.java | 0 .../source/split/RabbitMQSourceSplit.java | 0 .../split/RabbitMQSourceSplitSerializer.java | 0 .../connector/rabbitmq2/source/README.md | 87 ----- .../rabbitmq/common/RabbitMQBaseTest.java | 27 ++ .../common/RabbitMQContainerClient.java | 4 +- .../rabbitmq/sink/RabbitMQSinkITCase.java | 156 ++++++++ ...RabbitMQSinkWriterStateSerializerTest.java | 119 ++++++ 33 files changed, 1873 insertions(+), 113 deletions(-) rename flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/{rabbitmq2 => rabbitmq}/common/ConsistencyMode.java (100%) rename flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/{rabbitmq2 => rabbitmq}/common/RabbitMQConnectionConfig.java (97%) create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/README.md create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSink.java create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/common/RabbitMQSinkConnection.java create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/common/RabbitMQSinkMessageWrapper.java create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/common/RabbitMQSinkPublishOptions.java create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/common/SerializableReturnListener.java create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/state/RabbitMQSinkWriterState.java create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/state/RabbitMQSinkWriterStateSerializer.java create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/RabbitMQSinkWriterBase.java create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/specialized/RabbitMQSinkWriterAtLeastOnce.java create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/specialized/RabbitMQSinkWriterAtMostOnce.java create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/specialized/RabbitMQSinkWriterExactlyOnce.java create mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/README.md rename flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/{rabbitmq2 => rabbitmq}/source/RabbitMQSource.java (99%) rename flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/{rabbitmq2 => rabbitmq}/source/common/RabbitMQSourceMessageWrapper.java (100%) rename flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/{rabbitmq2 => rabbitmq}/source/enumerator/RabbitMQSourceEnumState.java (100%) rename flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/{rabbitmq2 => rabbitmq}/source/enumerator/RabbitMQSourceEnumStateSerializer.java (100%) rename flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/{rabbitmq2 => rabbitmq}/source/enumerator/RabbitMQSourceEnumerator.java (95%) rename flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/{rabbitmq2 => rabbitmq}/source/reader/RabbitMQCollector.java (95%) rename flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/{rabbitmq2 => rabbitmq}/source/reader/RabbitMQSourceReaderBase.java (100%) rename flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/{rabbitmq2 => rabbitmq}/source/reader/specialized/RabbitMQSourceReaderAtLeastOnce.java (100%) rename flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/{rabbitmq2 => rabbitmq}/source/reader/specialized/RabbitMQSourceReaderAtMostOnce.java (96%) rename flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/{rabbitmq2 => rabbitmq}/source/reader/specialized/RabbitMQSourceReaderExactlyOnce.java (100%) rename flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/{rabbitmq2 => rabbitmq}/source/split/RabbitMQSourceSplit.java (100%) rename flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/{rabbitmq2 => rabbitmq}/source/split/RabbitMQSourceSplitSerializer.java (100%) delete mode 100644 flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/README.md create mode 100644 flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkITCase.java create mode 100644 flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/state/RabbitMQSinkWriterStateSerializerTest.java diff --git a/flink-connector-rabbitmq/README.md b/flink-connector-rabbitmq/README.md index 26b065e8b97..f66e374a75f 100644 --- a/flink-connector-rabbitmq/README.md +++ b/flink-connector-rabbitmq/README.md @@ -1,4 +1,4 @@ -# License of the Rabbit MQ Connector +# License of the RabbitMQ Connector Flink's RabbitMQ connector defines a Maven dependency on the "RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public License 1.1 ("MPL"), @@ -19,6 +19,143 @@ and the Sink API specified in [FLIP-143](https://cwiki.apache.org/confluence/dis For more information about RabbitMQ visit https://www.rabbitmq.com/. -In order to view how to use the connector inspect -[Source](src/main/java/org/apache/flink/connector/rabbitmq2/source/README.md) and -[Sink](src/main/java/org/apache/flink/connector/rabbitmq2/sink/README.md). +# RabbitMQ Source + +Flink's RabbitMQ connector provides a streaming-only source which enables you to receive messages +from a RabbitMQ queue in three different consistency modes: at-most-once, at-least-once, +and exactly-once. + +## Consistency Modes + +With **at-most-once**, the source will receive each message and automatically acknowledges it to +RabbitMQ. The message content is then polled by the output. If the system crashes in the meantime, +the messages that the source buffers are lost. + +By contrast, the messages in the **at-least-once** mode are not automatically acknowledged but +instead the delivery tag is stored in order to acknowledge it later to RabbitMQ. Messages are polled +by the output and when the notification for a completed checkpoint is received the messages that were +polled are acknowledged to RabbitMQ. Therefore, the mode requires _checkpointing enabled_. This way, +it is assured that the messages are correctly processed by the system. If the system crashes in the +meantime, the unacknowledged messages will be resend by RabbitMQ to assure at-least-once behavior. + +The **exactly-once-mode** mode uses _correlation ids_ to deduplicate messages. Correlation ids are +properties of the messages and need to be set by the message publisher (who publishes the messages +to RabbitMQ) in order for the mode to function. The user has the obligation to ensure that the set +correlation id for a message is unique, otherwise no exactly-once can be guaranteed here since +RabbitMQ itself has no support for automatic exactly-once ids or the required behavior. In addition, +it requires _checkpointing enabled_and only \_parallelism 1_ is allowed. Similar to at-least-once, +the messages are received from RabbitMQ,buffered, and passed to the output when polled. A set of +seen correlation ids is maintained to apply the deduplication. During a checkpoint, the seen +correlation ids are stored so that in case of failure they can be recovered and used for +deduplication. When the notification for a completed checkpoint is received, all polled messages are +acknowledged as one transaction to ensure the reception by RabbitMQ. Afterwards, the set of +correlation ids is updated as RabbitMQ will not send the acknowledged messages again. This behavior +assures exactly-once processing but also has a performance drawback. Committing many messages will +take time and will thus increase the overall time it takes to do a checkpoint. This can result in +checkpoint delays and in peaks where checkpoint have either many or just a few messages. + +## How to use it + +```java +public class Main { + public static void main(String[]args) { + + RabbitMQSource source = + RabbitMQSource.builder() + .setConnectionConfig(RMQ_CONNECTION_CONFIG) + .setQueueName(RABBITMQ_QUEUE_NAME) + .setDeserializationSchema(DESERIALIZATION_SCHEMA) + .setConsistencyMode(CONSISTENCY_MODE) + .build(); + + // ******************* An example usage looks like this ******************* + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + RMQConnectionConfig rmqConnectionConfig = + new RMQConnectionConfig.Builder() + .setHost("localhost") + .setVirtualHost("/") + .setUserName("guest") + .setPassword("guest") + .setPort(5672) + .build(); + + RabbitMQSource rmqSource = + RabbitMQSource.builder() + .setConnectionConfig(rmqConnectionConfig) + .setQueueName("consume-queue") + .setDeserializationSchema(new SimpleStringSchema()) + .setConsistencyMode(ConsistencyMode.AT_MOST_ONCE) + .build(); + + DataStream stream = env.fromSource(rmqSource, WatermarkStrategy.noWatermarks(), "RMQSource"); + } +} +``` + +# RabbitMQ Sink + +Flink's RabbitMQ connector provides a sink which enables you to publish your stream directly +to a RabbitMQ exchange in three different consistency modes: at-most-once, at-least-once, +and exactly-once. Furthermore, user defined publish options can be used to customize each message +options in regard to exchange and publish settings in the RabbitMQ context. + +## Consistency Mode + +With **at-most-once**, the sink will simply take each message and publish the serialization of it +(with publish options if given) to RabbitMQ. At this point the sink gives up the ownership of the message. + +For **at-least-once** the same process as for at-most-once is executed except that the ownership of +the message does not end immediately with publishing it. The sink will keep the individual publishing +id for each message as well as the message itself and buffer it as long as it takes to receive the +message received acknowledgment from RabbitMQ. Since we are in the need of buffering messages and waiting +for their asynchronous acknowledgment, this requires _checkpointing enabled_. On each checkpoint, +all to this point buffered (and thus send) but unacknowledged messages will be stored. Simultaneously, +on each checkpoint a resend will be triggered to send all unacknowledged messages once again since +we have to assume that something went wrong for it during the publishing process. Since it can take a +moment until messages get acknowledged from RabbitMQ this can and probably will result in a message +duplication and therefore this logic becomes at-least-once. + +By contrast, the **exactly-once-mode** mode will not send messages on receive. All incoming messages +will be buffered until a checkpoint is triggered. On each checkpoint all messages will be +published/committed as one transaction to ensure the reception acknowledge by RabbitMQ. +If successful, all messages which were committed will be given up, otherwise they will be stored +and tried to commit again in the next transaction during the next checkpoint. +This consistency mode ensures that each message will be stored in RabbitMQ exactly once but also has +a performance drawback. Committing many messages will take time and will thus increase the overall +time it takes to do a checkpoint. This can result in checkpoint delays and in peaks where +checkpoint have either many or just a few messages. This also correlates to the latency of each message. + +## How to use it + +```java +RabbitMQSink sink = + RabbitMQSink.builder() + .setConnectionConfig() + .setQueueName() + .setSerializationSchema() + .setConsistencyMode() + .build(); + +// ******************* An example usage looks like this ******************* + +RMQConnectionConfig rmqConnectionConfig = + new RMQConnectionConfig.Builder() + .setHost("localhost") + .setVirtualHost("/") + .setUserName("guest") + .setPassword("guest") + .setPort(5672) + .build(); + +RabbitMQSink rmqSink = + RabbitMQSink.builder() + .setConnectionConfig(rmqConnectionConfig) + .setQueueName("publish-queue") + .setSerializationSchema(new SimpleStringSchema()) + .setConsistencyMode(ConsistencyMode.AT_MOST_ONCE) + .build(); + +(DataStream).sinkTo(rmqSink) +``` diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/common/ConsistencyMode.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/ConsistencyMode.java similarity index 100% rename from flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/common/ConsistencyMode.java rename to flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/ConsistencyMode.java diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQConnectionConfig.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfig.java similarity index 97% rename from flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQConnectionConfig.java rename to flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfig.java index 0c7e0937a41..e1b237f9319 100644 --- a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/common/RabbitMQConnectionConfig.java +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfig.java @@ -30,6 +30,8 @@ import java.security.NoSuchAlgorithmException; import java.util.Optional; +import static java.util.Objects.requireNonNull; + /** * This class is copied from the previous RabbitMQ connector. Connection Configuration for RMQ. If * {@link Builder#setUri(String)} has been set then {@link @@ -92,16 +94,11 @@ private RabbitMQConnectionConfig( Integer requestedFrameMax, Integer requestedHeartbeat, Integer prefetchCount) { - Preconditions.checkNotNull(host, "host can not be null"); - Preconditions.checkNotNull(port, "port can not be null"); - Preconditions.checkNotNull(virtualHost, "virtualHost can not be null"); - Preconditions.checkNotNull(username, "username can not be null"); - Preconditions.checkNotNull(password, "password can not be null"); - this.host = host; - this.port = port; - this.virtualHost = virtualHost; - this.username = username; - this.password = password; + this.host = requireNonNull(host); + this.port = requireNonNull(port); + this.virtualHost = requireNonNull(virtualHost); + this.username = requireNonNull(username); + this.password = requireNonNull(password); this.networkRecoveryInterval = networkRecoveryInterval; this.automaticRecovery = automaticRecovery; diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/README.md b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/README.md new file mode 100644 index 00000000000..eaee8d0ffea --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/README.md @@ -0,0 +1,14 @@ +# License of the RabbitMQ Connector + +Flink's RabbitMQ connector defines a Maven dependency on the +"RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public License 1.1 ("MPL"), +the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL"). + +Flink itself neither reuses source code from the "RabbitMQ AMQP Java Client" +nor packages binaries from the "RabbitMQ AMQP Java Client". + +Users that create and publish derivative work based on Flink's +RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client") +must be aware that this may be subject to conditions declared in the +Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") +and the Apache License version 2 ("ASL"). diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSink.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSink.java new file mode 100644 index 00000000000..ef0f313a140 --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSink.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.sink; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.rabbitmq.common.ConsistencyMode; +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkPublishOptions; +import org.apache.flink.connector.rabbitmq.sink.common.SerializableReturnListener; +import org.apache.flink.connector.rabbitmq.sink.state.RabbitMQSinkWriterState; +import org.apache.flink.connector.rabbitmq.sink.state.RabbitMQSinkWriterStateSerializer; +import org.apache.flink.connector.rabbitmq.sink.writer.RabbitMQSinkWriterBase; +import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterAtLeastOnce; +import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterAtMostOnce; +import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterExactlyOnce; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +/** + * RabbitMQ sink that publishes messages to a RabbitMQ queue. It provides at-most-once, + * at-least-once or exactly-once processing semantics. For at-least-once and exactly-once, + * checkpointing needs to be enabled. + * + *
{@code
+ * RabbitMQSink
+ *     .builder()
+ *     .setConnectionConfig(connectionConfig)
+ *     .setQueueName("queue")
+ *     .setSerializationSchema(new SimpleStringSchema())
+ *     .setConsistencyMode(ConsistencyMode.AT_LEAST_ONCE)
+ *     .build();
+ * }
+ * + *

When creating the sink a {@code connectionConfig} must be specified via {@link + * RabbitMQConnectionConfig}. It contains required information for the RabbitMQ java client to + * connect to the RabbitMQ server. A minimum configuration contains a (virtual) host, a username, a + * password and a port. Besides that, the {@code queueName} to publish to and a {@link + * SerializationSchema} for the sink input type is required. {@code publishOptions} can be added + * optionally to route messages in RabbitMQ. + * + *

If at-least-once is required messages are buffered until an acknowledgement arrives because + * delivery needs to be guaranteed. On each checkpoint, all unacknowledged messages will be resent + * to RabbitMQ. In case of a failure, all unacknowledged messages can be restored and resend. + * + *

In the case of exactly-once a transactional RabbitMQ channel is used to achieve that all + * messages within a checkpoint are delivered once and only once. All messages that arrive in a + * checkpoint interval are buffered and sent to RabbitMQ in a single transaction when the checkpoint + * is triggered. If the transaction fails, all messages that were a part of the transaction are put + * back into the buffer and a resend is issued in the next checkpoint. + * + *

Keep in mind that the transactional channels are heavyweight and the performance will drop. + * Under heavy load, checkpoints can be delayed if a transaction takes longer than the specified + * checkpointing interval. + * + *

If publish options are used and the checkpointing mode is at-least-once or exactly-once, they + * require a {@link DeserializationSchema} to be provided because messages that were persisted as + * part of an earlier checkpoint are needed to recompute routing/exchange. + */ +public class RabbitMQSink implements Sink, Void> { + + private final RabbitMQConnectionConfig connectionConfig; + private final String queueName; + private final SerializationSchema serializationSchema; + private final RabbitMQSinkPublishOptions publishOptions; + private final ConsistencyMode consistencyMode; + private final SerializableReturnListener returnListener; + + private static final ConsistencyMode DEFAULT_CONSISTENCY_MODE = ConsistencyMode.AT_MOST_ONCE; + + private RabbitMQSink( + RabbitMQConnectionConfig connectionConfig, + String queueName, + SerializationSchema serializationSchema, + ConsistencyMode consistencyMode, + @Nullable SerializableReturnListener returnListener, + @Nullable RabbitMQSinkPublishOptions publishOptions) { + this.connectionConfig = requireNonNull(connectionConfig); + this.queueName = requireNonNull(queueName); + this.serializationSchema = requireNonNull(serializationSchema); + this.consistencyMode = requireNonNull(consistencyMode); + + this.returnListener = returnListener; + + Preconditions.checkState( + verifyPublishOptions(), + "If consistency mode is stronger than at-most-once and publish options are defined " + + "then publish options need a deserialization schema"); + this.publishOptions = publishOptions; + } + + private boolean verifyPublishOptions() { + // If at-most-once, doesn't matter if publish options are provided (no state in writer). + if (consistencyMode == ConsistencyMode.AT_MOST_ONCE || publishOptions == null) { + return true; + } + + // If we are at-least or exactly-once and publish options are set, we need a deserialization + // schema to recover the original messages from the state to recompute publish options. + return publishOptions.getDeserializationSchema().isPresent(); + } + + /** + * Get a {@link RabbitMQSinkBuilder} for the sink. + * + * @param type of the sink + * @return a sink builder + * @see RabbitMQSinkBuilder + */ + public static RabbitMQSinkBuilder builder() { + return new RabbitMQSinkBuilder<>(); + } + + /** + * Create and return an extension of {@link RabbitMQSinkWriterBase} based on the selected {@link + * ConsistencyMode}. + * + * @param context The initialization context of the Sink + * @param states A list of states to initialize the writer with + * @return The SinkWriter implementation depending on the consistency mode set by the user + */ + @Override + public SinkWriter> createWriter( + InitContext context, List> states) { + try { + RabbitMQSinkWriterBase sinkWriter = createSpecializedWriter(); + // Setup RabbitMQ needs to be called before recover from states as the writer might send + // messages to RabbitMQ on recover. + sinkWriter.setupRabbitMQ(); + sinkWriter.recoverFromStates(states); + return sinkWriter; + } catch (Exception e) { + throw new RuntimeException(e.getMessage()); + } + } + + private RabbitMQSinkWriterBase createSpecializedWriter() throws IllegalStateException { + switch (consistencyMode) { + case AT_MOST_ONCE: + return new RabbitMQSinkWriterAtMostOnce<>( + connectionConfig, + queueName, + serializationSchema, + publishOptions, + returnListener); + case AT_LEAST_ONCE: + return new RabbitMQSinkWriterAtLeastOnce<>( + connectionConfig, + queueName, + serializationSchema, + publishOptions, + returnListener); + case EXACTLY_ONCE: + return new RabbitMQSinkWriterExactlyOnce<>( + connectionConfig, + queueName, + serializationSchema, + publishOptions, + returnListener); + default: + throw new IllegalStateException( + "Error in creating a SinkWriter: No valid consistency mode (" + + consistencyMode + + ") was specified."); + } + } + + @Override + public Optional> createCommitter() { + return Optional.empty(); + } + + @Override + public Optional> createGlobalCommitter() { + return Optional.empty(); + } + + @Override + public Optional> getCommittableSerializer() { + return Optional.empty(); + } + + @Override + public Optional> getGlobalCommittableSerializer() { + return Optional.empty(); + } + + /** + * If publish options are specified and the sink writer has state (at-least-once or + * exactly-once) the deserialization schema for the messages need to be provided for the state + * serializer. + * + * @return The serializer for the state of the SinkWriter + * @see RabbitMQSinkWriterStateSerializer + */ + @Override + public Optional>> + getWriterStateSerializer() { + if (publishOptions != null && publishOptions.getDeserializationSchema().isPresent()) { + return Optional.of( + new RabbitMQSinkWriterStateSerializer<>( + publishOptions.getDeserializationSchema().get())); + } else { + return Optional.of(new RabbitMQSinkWriterStateSerializer<>()); + } + } + + /** + * A Builder for the {@link RabbitMQSink}. Available consistency modes are contained in {@link + * ConsistencyMode} Required parameters are a {@code queueName}, a {@code connectionConfig} and + * a {@code serializationSchema}. Optional parameters include {@code publishOptions}, a {@code + * minimalResendIntervalMilliseconds} (for at-least-once) and a {@code returnListener}. + * + *

{@code
+     * RabbitMQSink
+     *   .builder()
+     *   .setConnectionConfig(connectionConfig)
+     *   .setQueueName("queue")
+     *   .setSerializationSchema(new SimpleStringSchema())
+     *   .setConsistencyMode(ConsistencyMode.AT_LEAST_ONCE)
+     *   .build();
+     * }
+ */ + public static class RabbitMQSinkBuilder { + + private String queueName; + private RabbitMQConnectionConfig connectionConfig; + private SerializationSchema serializationSchema; + private ConsistencyMode consistencyMode; + private RabbitMQSinkPublishOptions publishOptions; + private SerializableReturnListener returnListener; + + public RabbitMQSinkBuilder() { + this.consistencyMode = RabbitMQSink.DEFAULT_CONSISTENCY_MODE; + } + + /** + * Builds the sink instance. + * + * @return new Sink instance that has the specified configuration + */ + public RabbitMQSink build() { + return new RabbitMQSink<>( + connectionConfig, + queueName, + serializationSchema, + consistencyMode, + returnListener, + publishOptions); + } + + /** + * Sets the RMQConnectionConfig for this sink. + * + * @param connectionConfig configuration required to connect to RabbitMQ + * @return this builder + */ + public RabbitMQSinkBuilder setConnectionConfig( + RabbitMQConnectionConfig connectionConfig) { + this.connectionConfig = connectionConfig; + return this; + } + + /** + * Sets the name of the queue to publish to. + * + * @param queueName name of an existing queue in RabbitMQ + * @return this builder + */ + public RabbitMQSinkBuilder setQueueName(String queueName) { + this.queueName = queueName; + return this; + } + + /** + * Sets the SerializationSchema used to serialize incoming objects. + * + * @param serializationSchema the serialization schema to use + * @return this builder + */ + public RabbitMQSinkBuilder setSerializationSchema( + SerializationSchema serializationSchema) { + this.serializationSchema = serializationSchema; + return this; + } + + /** + * Sets the RabbitMQSinkPublishOptions for this sink. Publish options can be used for + * routing in an exchange in RabbitMQ. + * + * @param publishOptions the publish options to be used + * @return this builder + */ + public RabbitMQSinkBuilder setPublishOptions( + RabbitMQSinkPublishOptions publishOptions) { + this.publishOptions = publishOptions; + return this; + } + + /** + * Set the ConsistencyMode for this sink to operate in. Available modes are AT_MOST_ONCE, + * AT_LEAST_ONCE and EXACTLY_ONCE + * + * @param consistencyMode set the consistency mode + * @return this builder + */ + public RabbitMQSinkBuilder setConsistencyMode(ConsistencyMode consistencyMode) { + this.consistencyMode = consistencyMode; + return this; + } + + /** + * Set the {@link SerializableReturnListener} for this sink. If no ReturnListener is set, + * unrouted messages, which are returned by RabbitMQ, will be dropped silently. + * + * @param returnListener the return listener to use + * @return this builder + */ + public RabbitMQSinkBuilder setReturnListener(SerializableReturnListener returnListener) { + this.returnListener = returnListener; + return this; + } + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/common/RabbitMQSinkConnection.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/common/RabbitMQSinkConnection.java new file mode 100644 index 00000000000..0e7aa543c79 --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/common/RabbitMQSinkConnection.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.sink.common; + +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig; +import org.apache.flink.util.Preconditions; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; + +import static java.util.Objects.requireNonNull; + +/** + * This class provides basic RabbitMQ functionality and common behaviour such as establishing and + * closing a connection via the {@code connectionConfig}. In addition, it provides methods for + * serializing and sending messages to RabbitMQ (with or without publish options). + * + * @param The type of the messages that are published + */ +public class RabbitMQSinkConnection { + protected static final Logger LOG = LoggerFactory.getLogger(RabbitMQSinkConnection.class); + + private final RabbitMQConnectionConfig connectionConfig; + private final String queueName; + private Connection rmqConnection; + private Channel rmqChannel; + + @Nullable private final RabbitMQSinkPublishOptions publishOptions; + + @Nullable private final SerializableReturnListener returnListener; + + public RabbitMQSinkConnection( + RabbitMQConnectionConfig connectionConfig, + String queueName, + @Nullable RabbitMQSinkPublishOptions publishOptions, + @Nullable SerializableReturnListener returnListener) { + this.connectionConfig = requireNonNull(connectionConfig); + this.queueName = requireNonNull(queueName); + this.publishOptions = publishOptions; + this.returnListener = returnListener; + } + + /** + * Setup the RabbitMQ connection and a channel to send messages to. + * + * @throws Exception that might occur when setting up the connection and channel. + */ + public void setupRabbitMQ() throws Exception { + LOG.info("Setup RabbitMQ"); + this.rmqConnection = setupConnection(connectionConfig); + this.rmqChannel = setupChannel(rmqConnection, queueName, returnListener); + } + + private Connection setupConnection(RabbitMQConnectionConfig connectionConfig) throws Exception { + return connectionConfig.getConnectionFactory().newConnection(); + } + + private Channel setupChannel( + Connection rmqConnection, String queueName, SerializableReturnListener returnListener) + throws IOException { + final Channel rmqChannel = rmqConnection.createChannel(); + rmqChannel.queueDeclare(queueName, true, false, false, null); + if (returnListener != null) { + rmqChannel.addReturnListener(returnListener); + } + return rmqChannel; + } + + /** + * Only used by at-least-once and exactly-once for resending messages that could not be + * delivered. + * + * @param message sink message wrapping the atomic message object + */ + public void send(RabbitMQSinkMessageWrapper message) throws IOException { + send(message.getMessage(), message.getBytes()); + } + + /** + * Publish a message to a queue in RabbitMQ. With publish options enabled, first compute the + * necessary publishing information. + * + * @param message original message, only required for publishing with publish options present + * @param serializedMessage serialized message to send to RabbitMQ + */ + public void send(T message, byte[] serializedMessage) throws IOException { + if (publishOptions == null) { + rmqChannel.basicPublish("", queueName, null, serializedMessage); + } else { + publishWithOptions(message, serializedMessage); + } + } + + private void publishWithOptions(T message, byte[] serializedMessage) throws IOException { + if (publishOptions == null) { + throw new RuntimeException("Try to publish with options without publishOptions."); + } + + boolean mandatory = publishOptions.computeMandatory(message); + boolean immediate = publishOptions.computeImmediate(message); + + Preconditions.checkState( + !(returnListener == null && (mandatory || immediate)), + "Setting mandatory and/or immediate flags to true requires a ReturnListener."); + + String rk = publishOptions.computeRoutingKey(message); + String exchange = publishOptions.computeExchange(message); + + rmqChannel.basicPublish( + exchange, + rk, + mandatory, + immediate, + publishOptions.computeProperties(message), + serializedMessage); + } + + /** + * Close the channel and connection to RabbitMQ. + * + * @throws Exception channel or connection closing failed + */ + public void close() throws Exception { + // close the channel + if (rmqChannel != null) { + rmqChannel.close(); + rmqChannel = null; + } + + // close the connection + if (rmqConnection != null) { + rmqConnection.close(); + rmqConnection = null; + } + } + + /** + * Get the internally used RabbitMQ channel. + * + * @return RabbitMQ channel object + */ + public Channel getRmqChannel() { + return rmqChannel; + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/common/RabbitMQSinkMessageWrapper.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/common/RabbitMQSinkMessageWrapper.java new file mode 100644 index 00000000000..7ce40c8a51b --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/common/RabbitMQSinkMessageWrapper.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.sink.common; + +import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterAtLeastOnce; +import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterExactlyOnce; + +import static java.util.Objects.requireNonNull; + +/** + * A wrapper class for messages that need to be persisted in the state of a {@link + * RabbitMQSinkWriterAtLeastOnce} or {@link RabbitMQSinkWriterExactlyOnce}. + * + *

It holds the message in its serialized format which gets sent to RabbitMQ. In the case of + * publish options being present and checkpointing modes of at-least-once or exactly-once the + * original message needs to be stored as well because it is needed for recomputing the + * exchange/routing key from the message content. + */ +public class RabbitMQSinkMessageWrapper { + private T message; + private final byte[] bytes; + + public RabbitMQSinkMessageWrapper(byte[] bytes) { + this.bytes = requireNonNull(bytes); + } + + public RabbitMQSinkMessageWrapper(T message, byte[] bytes) { + this(bytes); + this.message = requireNonNull(message); + } + + public byte[] getBytes() { + return bytes; + } + + public T getMessage() { + return message; + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/common/RabbitMQSinkPublishOptions.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/common/RabbitMQSinkPublishOptions.java new file mode 100644 index 00000000000..e6613ead400 --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/common/RabbitMQSinkPublishOptions.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.sink.common; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; + +import com.rabbitmq.client.AMQP.BasicProperties; + +import java.util.Optional; + +/** + * This class was copied from the old RabbitMQ connector and got extended by the serialization + * schema which is required for at-least-once and exactly-once. + * + *

The message computation provides methods to compute the message routing key and/or the + * properties. + * + * @param The type of the data used by the sink. + */ +@PublicEvolving +public interface RabbitMQSinkPublishOptions extends java.io.Serializable { + + /** + * Compute the message's routing key from the data. + * + * @param a The data used by the sink + * @return The routing key of the message null will raise a NullPointerException + */ + String computeRoutingKey(IN a); + + /** + * Compute the message's properties from the data. + * + * @param a The data used by the sink + * @return The message's properties (can be null) + */ + BasicProperties computeProperties(IN a); + + /** + * Compute the exchange from the data. + * + * @param a The data used by the sink + * @return The exchange to publish the message to null will raise a NullPointerException + */ + String computeExchange(IN a); + + /** + * Compute the mandatory flag passed to method {@link + * com.rabbitmq.client.Channel#basicPublish(String, String, boolean, boolean, BasicProperties, + * byte[])}. A {@link SerializableReturnListener} is mandatory if this flag can be true. + * + * @param a The data used by the sink + * @return The mandatory flag + */ + default boolean computeMandatory(IN a) { + return false; + } + + /** + * Compute the immediate flag passed to method {@link + * com.rabbitmq.client.Channel#basicPublish(String, String, boolean, boolean, BasicProperties, + * byte[])}. A {@link SerializableReturnListener} is mandatory if this flag can be true. + * + * @param a The data used by the sink + * @return The mandatory flag + */ + default boolean computeImmediate(IN a) { + return false; + } + + /** + * Get the deserialization schema for the serialized messages send to RabbitMQ by the + * SinkWriter. This is necessary if at-least or exactly-once is required. In these cases, + * messages need to be stored serialized in checkpoints.On initialization of a SinkWriter after + * a failure, checkpointed message need to be retrieved, deserialize and resend. The + * deserialization step is important to support the other publish options. + * + * @return a optional deserialization schema + */ + default Optional> getDeserializationSchema() { + return Optional.empty(); + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/common/SerializableReturnListener.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/common/SerializableReturnListener.java new file mode 100644 index 00000000000..42ab127309d --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/common/SerializableReturnListener.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.sink.common; + +import com.rabbitmq.client.ReturnListener; + +import java.io.Serializable; + +/** + * This class was copied from the old RabbitMQ connector. A serializable {@link ReturnListener} to + * handle unroutable but "mandatory" messages. + * + *

If a message has the "mandatory" flag set, but cannot be routed, RabbitMQ's broker will return + * the message to the publishing client (via an AMQP.Basic.Return command). This ReturnListener + * implements a callback handler to get notified in such returns and act on these messages as + * wanted. + */ +public interface SerializableReturnListener extends Serializable, ReturnListener {} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/state/RabbitMQSinkWriterState.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/state/RabbitMQSinkWriterState.java new file mode 100644 index 00000000000..01aa4034cf7 --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/state/RabbitMQSinkWriterState.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.sink.state; + +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkMessageWrapper; +import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterAtLeastOnce; +import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterExactlyOnce; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * The state of a {@link SinkWriter} implementation. Contains {@code outstandingMessages} that could + * not be delivered in a checkpoint. Used in the {@link RabbitMQSinkWriterAtLeastOnce} and {@link + * RabbitMQSinkWriterExactlyOnce} implementations. + */ +public class RabbitMQSinkWriterState { + private final List> outstandingMessages; + + public RabbitMQSinkWriterState(List> outstandingMessages) { + this.outstandingMessages = requireNonNull(outstandingMessages); + } + + public List> getOutstandingMessages() { + return outstandingMessages; + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/state/RabbitMQSinkWriterStateSerializer.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/state/RabbitMQSinkWriterStateSerializer.java new file mode 100644 index 00000000000..b9edcc25fb3 --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/state/RabbitMQSinkWriterStateSerializer.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.sink.state; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkMessageWrapper; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Serializer for a {@link RabbitMQSinkWriterState} used for at-least and exactly-once consistency + * of the sink. + */ +public class RabbitMQSinkWriterStateSerializer + implements SimpleVersionedSerializer> { + private final DeserializationSchema deserializationSchema; + + public RabbitMQSinkWriterStateSerializer( + @Nullable DeserializationSchema deserializationSchema) { + this.deserializationSchema = deserializationSchema; + } + + public RabbitMQSinkWriterStateSerializer() { + this(null); + } + + @Override + public int getVersion() { + return 1; + } + + /** + * Serializes all {@code outstandingMessages} of a state of a single sink writer. + * + * @param rabbitMQSinkWriterState A state containing a list of {@code outstandingMessages} + * @throws IOException If output stream cant write the required data + */ + @Override + public byte[] serialize(RabbitMQSinkWriterState rabbitMQSinkWriterState) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos); + serializeV1(out, rabbitMQSinkWriterState.getOutstandingMessages()); + return baos.toByteArray(); + } + + private void serializeV1(DataOutputStream out, List> messages) + throws IOException { + out.writeInt(messages.size()); + for (RabbitMQSinkMessageWrapper message : messages) { + out.writeInt(message.getBytes().length); + out.write(message.getBytes()); + } + out.flush(); + } + + /** + * Deserializes {@link RabbitMQSinkMessageWrapper} objects that wrap the byte representation of + * a message that needs to be delivered to RabbitMQ as well as the original object + * representation if a deserialization schema is provided. + * + * @param version which deserialization version should be used + * @param bytes Serialized outstanding sink messages + * @return A list of messages that need to be redelivered to RabbitMQ + * @throws IOException If input stream cant read the required data + */ + @Override + public RabbitMQSinkWriterState deserialize(int version, byte[] bytes) throws IOException { + switch (version) { + case 1: + return deserializeV1(bytes); + default: + throw new IOException("Unrecognized version or corrupt state: " + version); + } + } + + private RabbitMQSinkWriterState deserializeV1(byte[] bytes) throws IOException { + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + DataInputStream in = new DataInputStream(bais); + return new RabbitMQSinkWriterState<>(readSinkMessages(in)); + } + + private List> readSinkMessages(DataInputStream in) + throws IOException { + final int numberOfMessages = in.readInt(); + List> messages = new ArrayList<>(); + for (int i = 0; i < numberOfMessages; i++) { + byte[] bytes = new byte[in.readInt()]; + in.read(bytes); + + // In this case, the messages need to be deserialized again, so we can recompute publish + // options + if (deserializationSchema != null) { + messages.add( + new RabbitMQSinkMessageWrapper<>( + deserializationSchema.deserialize(bytes), bytes)); + } else { + messages.add(new RabbitMQSinkMessageWrapper<>(bytes)); + } + } + return messages; + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/RabbitMQSinkWriterBase.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/RabbitMQSinkWriterBase.java new file mode 100644 index 00000000000..567101a8ff9 --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/RabbitMQSinkWriterBase.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.sink.writer; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkConnection; +import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkMessageWrapper; +import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkPublishOptions; +import org.apache.flink.connector.rabbitmq.sink.common.SerializableReturnListener; +import org.apache.flink.connector.rabbitmq.sink.state.RabbitMQSinkWriterState; +import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterAtLeastOnce; +import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterAtMostOnce; +import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterExactlyOnce; + +import com.rabbitmq.client.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * RabbitMQSinkWriterBase is the common abstract class of {@link RabbitMQSinkWriterAtMostOnce}, + * {@link RabbitMQSinkWriterAtLeastOnce} and {@link RabbitMQSinkWriterExactlyOnce}. + * + * @param Type of the elements in this sink + */ +public abstract class RabbitMQSinkWriterBase + implements SinkWriter> { + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSinkWriterBase.class); + + private final RabbitMQSinkConnection rmqSinkConnection; + private final SerializationSchema serializationSchema; + + public RabbitMQSinkWriterBase( + RabbitMQConnectionConfig connectionConfig, + String queueName, + SerializationSchema serializationSchema, + RabbitMQSinkPublishOptions publishOptions, + SerializableReturnListener returnListener) { + this.rmqSinkConnection = + new RabbitMQSinkConnection<>( + connectionConfig, queueName, publishOptions, returnListener); + this.serializationSchema = requireNonNull(serializationSchema); + } + + /** + * Receive the next stream element and publish it to RabbitMQ. + * + * @param element element from upstream flink task + * @param context context of this sink writer + */ + @Override + public void write(T element, Context context) throws IOException { + getRmqSinkConnection() + .send( + new RabbitMQSinkMessageWrapper<>( + element, serializationSchema.serialize(element))); + } + + /** + * Recover the writer with a specific state. + * + * @param states a list of states to recover the reader with + * @throws IOException that can be thrown as specialized writers might want to send messages. + */ + public void recoverFromStates(List> states) throws IOException {} + + /** + * Setup the RabbitMQ connection and a channel to send messages to. In the end specialized + * writers can configure the channel through {@link #configureChannel()}. + * + * @throws Exception that might occur when setting up the connection and channel. + */ + public void setupRabbitMQ() throws Exception { + this.rmqSinkConnection.setupRabbitMQ(); + configureChannel(); + } + + /** + * This method provides a hook to apply additional configuration to the channel. + * + * @throws IOException possible IOException that might be thrown when configuring the channel + */ + protected void configureChannel() throws IOException {} + + @Override + public List prepareCommit(boolean flush) { + return Collections.emptyList(); + } + + @Override + public List> snapshotState() throws IOException { + return Collections.emptyList(); + } + + @Override + public void close() throws Exception { + LOG.info("Close Sink Writer"); + rmqSinkConnection.close(); + } + + protected RabbitMQSinkConnection getRmqSinkConnection() { + return rmqSinkConnection; + } + + protected Channel getRmqChannel() { + return rmqSinkConnection.getRmqChannel(); + } + + protected SerializationSchema getSerializationSchema() { + return serializationSchema; + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/specialized/RabbitMQSinkWriterAtLeastOnce.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/specialized/RabbitMQSinkWriterAtLeastOnce.java new file mode 100644 index 00000000000..40cefddb5cd --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/specialized/RabbitMQSinkWriterAtLeastOnce.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.sink.writer.specialized; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq.sink.RabbitMQSink; +import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkMessageWrapper; +import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkPublishOptions; +import org.apache.flink.connector.rabbitmq.sink.common.SerializableReturnListener; +import org.apache.flink.connector.rabbitmq.sink.state.RabbitMQSinkWriterState; +import org.apache.flink.connector.rabbitmq.sink.writer.RabbitMQSinkWriterBase; + +import com.rabbitmq.client.ConfirmCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; + +/** + * A {@link SinkWriter} implementation for {@link RabbitMQSink} that has at-least-once semantics, + * meaning it guarantees that outgoing message arrive at RabbitMQ at least once. + * + *

At-least-once consistency is implemented by assigning sequence numbers to arriving messages + * and buffering them together in the state of the writer until an ack arrives. + * + *

Checkpointing is required for at-least-once to work because messages are resend only when a + * checkpoint is triggered (to avoid complex time tracking mechanisms for each individual message). + * Thus on each checkpoint, all messages which were sent at least once before to RabbitMQ but are + * still unacknowledged will be send once again - duplications are possible by this behavior. + * + *

After a failure, a new writer gets initialized with one or more states that contain + * unacknowledged messages. These messages get resend immediately while buffering them in the new + * state of the writer. + * + * @param Type of the elements in this sink + */ +public class RabbitMQSinkWriterAtLeastOnce extends RabbitMQSinkWriterBase { + protected final ConcurrentNavigableMap> outstandingConfirms; + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSinkWriterAtLeastOnce.class); + + private Set lastSeenMessageIds; + + /** + * Create a new RabbitMQSinkWriterAtLeastOnce. + * + * @param connectionConfig configuration parameters used to connect to RabbitMQ + * @param queueName name of the queue to publish to + * @param serializationSchema serialization schema to turn elements into byte representation + * @param publishOptions optionally used to compute routing/exchange for messages + * @param returnListener returnListener + */ + public RabbitMQSinkWriterAtLeastOnce( + RabbitMQConnectionConfig connectionConfig, + String queueName, + SerializationSchema serializationSchema, + RabbitMQSinkPublishOptions publishOptions, + SerializableReturnListener returnListener) { + super(connectionConfig, queueName, serializationSchema, publishOptions, returnListener); + this.outstandingConfirms = new ConcurrentSkipListMap<>(); + this.lastSeenMessageIds = new HashSet<>(); + } + + /** + * On recover all stored messages in the states get resend. + * + * @param states a list of states to recover the reader with + * @throws IOException as messages are send to RabbitMQ + */ + @Override + public void recoverFromStates(List> states) throws IOException { + for (RabbitMQSinkWriterState state : states) { + for (RabbitMQSinkMessageWrapper message : state.getOutstandingMessages()) { + send(message); + } + } + } + + private void send(RabbitMQSinkMessageWrapper msg) throws IOException { + long sequenceNumber = getRmqChannel().getNextPublishSeqNo(); + getRmqSinkConnection().send(msg); + outstandingConfirms.put(sequenceNumber, msg); + } + + private void resendMessages() throws IOException { + Set temp = outstandingConfirms.keySet(); + Set messagesToResend = new HashSet<>(temp); + messagesToResend.retainAll(lastSeenMessageIds); + for (Long id : messagesToResend) { + // remove the old message from the map, since the message was added a second time + // under a new id or is put into the list of messages to resend + RabbitMQSinkMessageWrapper msg = outstandingConfirms.remove(id); + if (msg != null) { + send(msg); + } + } + lastSeenMessageIds = temp; + } + + private ConfirmCallback handleAcknowledgements() { + return (sequenceNumber, multiple) -> { + // multiple flag indicates that all messages < sequenceNumber can be safely acknowledged + if (multiple) { + // create a view of the portion of the map that contains keys < sequenceNumber + ConcurrentNavigableMap> confirmed = + outstandingConfirms.headMap(sequenceNumber, true); + // changes to the view are reflected in the original map + confirmed.clear(); + } else { + outstandingConfirms.remove(sequenceNumber); + } + }; + } + + private ConfirmCallback handleNegativeAcknowledgements() { + return (sequenceNumber, multiple) -> { + RabbitMQSinkMessageWrapper message = outstandingConfirms.get(sequenceNumber); + LOG.error( + "Message with body {} has been nack-ed. Sequence number: {}, multiple: {}", + message.getMessage(), + sequenceNumber, + multiple); + }; + } + + @Override + protected void configureChannel() throws IOException { + ConfirmCallback ackCallback = handleAcknowledgements(); + ConfirmCallback nackCallback = handleNegativeAcknowledgements(); + // register callbacks for cases of ack and negative ack of messages (seq numbers) + getRmqChannel().addConfirmListener(ackCallback, nackCallback); + getRmqChannel().confirmSelect(); + } + + /** + * All messages that are sent to RabbitMQ and not acknowledged yet will be resend. A single + * state is returned that contains just the messages that could not be acknowledged within the + * last checkpoint. + * + * @return A singleton list of RabbitMQSinkWriterState with outstanding confirms + * @throws IOException if resend of messages fails + */ + @Override + public List> snapshotState() throws IOException { + resendMessages(); + return Collections.singletonList( + new RabbitMQSinkWriterState<>(new ArrayList<>(outstandingConfirms.values()))); + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/specialized/RabbitMQSinkWriterAtMostOnce.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/specialized/RabbitMQSinkWriterAtMostOnce.java new file mode 100644 index 00000000000..30b2539858d --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/specialized/RabbitMQSinkWriterAtMostOnce.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.sink.writer.specialized; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq.sink.RabbitMQSink; +import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkPublishOptions; +import org.apache.flink.connector.rabbitmq.sink.common.SerializableReturnListener; +import org.apache.flink.connector.rabbitmq.sink.writer.RabbitMQSinkWriterBase; + +import java.io.IOException; + +/** + * A {@link SinkWriter} implementation for {@link RabbitMQSink}. + * + *

It uses exclusively the basic functionalities provided by {@link RabbitMQSinkWriterBase} for + * publishing messages to RabbitMQ (serializing a stream element and publishing it to RabbitMQ in a + * fire-and-forget fashion). + */ +public class RabbitMQSinkWriterAtMostOnce extends RabbitMQSinkWriterBase { + + /** + * Create a new RabbitMQSinkWriterExactlyOnce. + * + * @param connectionConfig configuration parameters used to connect to RabbitMQ + * @param queueName name of the queue to publish to + * @param serializationSchema serialization schema to turn elements into byte representation + * @param publishOptions optionally used to compute routing/exchange for messages + * @param returnListener returnListener + */ + public RabbitMQSinkWriterAtMostOnce( + RabbitMQConnectionConfig connectionConfig, + String queueName, + SerializationSchema serializationSchema, + RabbitMQSinkPublishOptions publishOptions, + SerializableReturnListener returnListener) { + super(connectionConfig, queueName, serializationSchema, publishOptions, returnListener); + } + + @Override + public void write(T element, Context context) throws IOException { + getRmqSinkConnection().send(element, getSerializationSchema().serialize(element)); + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/specialized/RabbitMQSinkWriterExactlyOnce.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/specialized/RabbitMQSinkWriterExactlyOnce.java new file mode 100644 index 00000000000..ef25d6f99e3 --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/specialized/RabbitMQSinkWriterExactlyOnce.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.sink.writer.specialized; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq.sink.RabbitMQSink; +import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkMessageWrapper; +import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkPublishOptions; +import org.apache.flink.connector.rabbitmq.sink.common.SerializableReturnListener; +import org.apache.flink.connector.rabbitmq.sink.state.RabbitMQSinkWriterState; +import org.apache.flink.connector.rabbitmq.sink.writer.RabbitMQSinkWriterBase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * A {@link SinkWriter} implementation for {@link RabbitMQSink} that provides exactly-once delivery + * guarantees, which means incoming stream elements will be delivered to RabbitMQ exactly once. For + * this, checkpointing needs to be enabled. + * + *

Exactly-once consistency is implemented using a transactional RabbitMQ channel. All incoming + * stream elements are buffered in the state of this writer until the next checkpoint is triggered. + * All buffered {@code messages} are then send to RabbitMQ in a single transaction. When successful, + * all messages committed get removed from the state. If the transaction is aborted, all messages + * are put back into the state and send on the next checkpoint. + * + *

The transactional channel is heavyweight and will decrease throughput. If the system is under + * heavy load, consecutive checkpoints can be delayed if commits take longer than the checkpointing + * interval specified. Only use exactly-once if necessary (no duplicated messages in RabbitMQ + * allowed), otherwise consider using at-least-once. + * + * @param Type of the elements in this sink + */ +public class RabbitMQSinkWriterExactlyOnce extends RabbitMQSinkWriterBase { + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSinkWriterExactlyOnce.class); + + // All messages that arrived and could not be committed this far. + private List> messages; + + /** + * Create a new RabbitMQSinkWriterExactlyOnce. + * + * @param connectionConfig configuration parameters used to connect to RabbitMQ + * @param queueName name of the queue to publish to + * @param serializationSchema serialization schema to turn elements into byte representation + * @param publishOptions optionally used to compute routing/exchange for messages + * @param returnListener return listener + */ + public RabbitMQSinkWriterExactlyOnce( + RabbitMQConnectionConfig connectionConfig, + String queueName, + SerializationSchema serializationSchema, + RabbitMQSinkPublishOptions publishOptions, + SerializableReturnListener returnListener) { + super(connectionConfig, queueName, serializationSchema, publishOptions, returnListener); + this.messages = Collections.synchronizedList(new ArrayList<>()); + } + + /** + * On recover the messages are set to the outstanding messages from the states. + * + * @param states a list of states to recover the reader with + */ + @Override + public void recoverFromStates(List> states) { + for (RabbitMQSinkWriterState state : states) { + this.messages.addAll(state.getOutstandingMessages()); + } + } + + @Override + protected void configureChannel() throws IOException { + // puts channel in commit mode + getRmqChannel().txSelect(); + } + + @Override + public void write(T element, Context context) { + messages.add( + new RabbitMQSinkMessageWrapper<>( + element, getSerializationSchema().serialize(element))); + } + + @Override + public List> snapshotState() { + commitMessages(); + return Collections.singletonList(new RabbitMQSinkWriterState<>(messages)); + } + + private void commitMessages() { + List> messagesToSend = messages.subList(0, messages.size()); + try { + for (RabbitMQSinkMessageWrapper msg : messagesToSend) { + getRmqSinkConnection().send(msg); + } + getRmqChannel().txCommit(); + LOG.info("Successfully committed {} messages.", messagesToSend.size()); + messagesToSend.clear(); + } catch (IOException commitException) { + try { + getRmqChannel().txRollback(); + } catch (IOException rollbackException) { + throw new RuntimeException( + "Cannot rollback RabbitMQ transaction after commit failure, this might leave the transaction in a pending state. Commit Error: " + + commitException.getMessage() + + " Rollback Error: " + + rollbackException.getMessage()); + } + throw new RuntimeException( + "Error during transactional commit of messages. Rollback was successful. Error: " + + commitException.getMessage()); + } + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/README.md b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/README.md new file mode 100644 index 00000000000..eaee8d0ffea --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/README.md @@ -0,0 +1,14 @@ +# License of the RabbitMQ Connector + +Flink's RabbitMQ connector defines a Maven dependency on the +"RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public License 1.1 ("MPL"), +the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL"). + +Flink itself neither reuses source code from the "RabbitMQ AMQP Java Client" +nor packages binaries from the "RabbitMQ AMQP Java Client". + +Users that create and publish derivative work based on Flink's +RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client") +must be aware that this may be subject to conditions declared in the +Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") +and the Apache License version 2 ("ASL"). diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSource.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/RabbitMQSource.java similarity index 99% rename from flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSource.java rename to flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/RabbitMQSource.java index 86c1fd0b901..31acf2f3778 100644 --- a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSource.java +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/RabbitMQSource.java @@ -145,7 +145,7 @@ public Boundedness getBoundedness() { @Override public SourceReader createReader( SourceReaderContext sourceReaderContext) { - LOG.info("New Source Reader of type " + consistencyMode + " requested."); + LOG.info("New Source Reader of type {} requested.", consistencyMode); switch (consistencyMode) { case AT_MOST_ONCE: return new RabbitMQSourceReaderAtMostOnce<>( diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/common/RabbitMQSourceMessageWrapper.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/common/RabbitMQSourceMessageWrapper.java similarity index 100% rename from flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/common/RabbitMQSourceMessageWrapper.java rename to flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/common/RabbitMQSourceMessageWrapper.java diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/enumerator/RabbitMQSourceEnumState.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/enumerator/RabbitMQSourceEnumState.java similarity index 100% rename from flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/enumerator/RabbitMQSourceEnumState.java rename to flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/enumerator/RabbitMQSourceEnumState.java diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/enumerator/RabbitMQSourceEnumStateSerializer.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/enumerator/RabbitMQSourceEnumStateSerializer.java similarity index 100% rename from flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/enumerator/RabbitMQSourceEnumStateSerializer.java rename to flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/enumerator/RabbitMQSourceEnumStateSerializer.java diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/enumerator/RabbitMQSourceEnumerator.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/enumerator/RabbitMQSourceEnumerator.java similarity index 95% rename from flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/enumerator/RabbitMQSourceEnumerator.java rename to flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/enumerator/RabbitMQSourceEnumerator.java index 0860405334b..5a708ae607a 100644 --- a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/enumerator/RabbitMQSourceEnumerator.java +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/enumerator/RabbitMQSourceEnumerator.java @@ -75,13 +75,13 @@ public void start() { @Override public void handleSplitRequest(int i, @Nullable String s) { - LOG.info("Split request from reader " + i); + LOG.info("Split request from reader {}.", i); assignSplitToReader(i, split); } @Override public void addSplitsBack(List list, int i) { - if (list.size() == 0) { + if (list == null || list.size() == 0) { return; } @@ -91,7 +91,7 @@ public void addSplitsBack(List list, int i) { "There should only be one split added back at a time. per reader"); } - LOG.info("Split returned from reader " + i); + LOG.info("Split returned from reader {}.", i); // In case of exactly-once (parallelism 1) the single split gets updated with the // correlation ids and in case of a recovery we have to store this split until we can // assign it to the recovered reader. @@ -109,7 +109,7 @@ public void addReader(int i) {} /** @return empty enum state object */ @Override - public RabbitMQSourceEnumState snapshotState() { + public RabbitMQSourceEnumState snapshotState(long checkpointId) throws Exception { return new RabbitMQSourceEnumState(); } diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/reader/RabbitMQCollector.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/RabbitMQCollector.java similarity index 95% rename from flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/reader/RabbitMQCollector.java rename to flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/RabbitMQCollector.java index 7935a2c63ec..1886dc8f75e 100644 --- a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/reader/RabbitMQCollector.java +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/RabbitMQCollector.java @@ -42,12 +42,8 @@ public class RabbitMQCollector implements Collector { private long deliveryTag; private String correlationId; - private RabbitMQCollector(int capacity) { - this.unpolledMessageQueue = new LinkedBlockingQueue<>(capacity); - } - public RabbitMQCollector() { - this(Integer.MAX_VALUE); + this.unpolledMessageQueue = new LinkedBlockingQueue<>(); } /** @return boolean true if there are messages remaining in the collector. */ diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/reader/RabbitMQSourceReaderBase.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/RabbitMQSourceReaderBase.java similarity index 100% rename from flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/reader/RabbitMQSourceReaderBase.java rename to flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/RabbitMQSourceReaderBase.java diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/reader/specialized/RabbitMQSourceReaderAtLeastOnce.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/specialized/RabbitMQSourceReaderAtLeastOnce.java similarity index 100% rename from flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/reader/specialized/RabbitMQSourceReaderAtLeastOnce.java rename to flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/specialized/RabbitMQSourceReaderAtLeastOnce.java diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/reader/specialized/RabbitMQSourceReaderAtMostOnce.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/specialized/RabbitMQSourceReaderAtMostOnce.java similarity index 96% rename from flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/reader/specialized/RabbitMQSourceReaderAtMostOnce.java rename to flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/specialized/RabbitMQSourceReaderAtMostOnce.java index 8cc7714232f..73073a336bb 100644 --- a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/reader/specialized/RabbitMQSourceReaderAtMostOnce.java +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/specialized/RabbitMQSourceReaderAtMostOnce.java @@ -24,7 +24,7 @@ /** * The RabbitMQSourceReaderAtMostOnce provides at-most-once guarantee. Messages are automatically - * acknowledged when received from rabbitmq and afterwards consumed by the output. In case of a + * acknowledged when received from RabbitMQ and afterwards consumed by the output. In case of a * failure in Flink messages might be lost. * * @param The output type of the source. diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/reader/specialized/RabbitMQSourceReaderExactlyOnce.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/specialized/RabbitMQSourceReaderExactlyOnce.java similarity index 100% rename from flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/reader/specialized/RabbitMQSourceReaderExactlyOnce.java rename to flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/specialized/RabbitMQSourceReaderExactlyOnce.java diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/split/RabbitMQSourceSplit.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/split/RabbitMQSourceSplit.java similarity index 100% rename from flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/split/RabbitMQSourceSplit.java rename to flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/split/RabbitMQSourceSplit.java diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/split/RabbitMQSourceSplitSerializer.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/split/RabbitMQSourceSplitSerializer.java similarity index 100% rename from flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/split/RabbitMQSourceSplitSerializer.java rename to flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/split/RabbitMQSourceSplitSerializer.java diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/README.md b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/README.md deleted file mode 100644 index 2bf6447651a..00000000000 --- a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq2/source/README.md +++ /dev/null @@ -1,87 +0,0 @@ -# License of the Rabbit MQ Connector - -Flink's RabbitMQ connector defines a Maven dependency on the -"RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public License 1.1 ("MPL"), -the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL"). - -Flink itself neither reuses source code from the "RabbitMQ AMQP Java Client" -nor packages binaries from the "RabbitMQ AMQP Java Client". - -Users that create and publish derivative work based on Flink's -RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client") -must be aware that this may be subject to conditions declared in the -Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") -and the Apache License version 2 ("ASL"). - -# RabbitMQ Source - -Flink's RabbitMQ connector provides a streaming-only source which enables you to receive messages -from a RabbitMQ queue in three different consistency modes: at-most-once, at-least-once, -and exactly-once. - -## Consistency Modes -With __at-most-once__, the source will receive each message and automatically acknowledges it to -RabbitMQ. The message content is then polled by the output. If the system crashes in the meantime, -the messages that the source buffers are lost. - -By contrast, the messages in the __at-least-once__ mode are not automatically acknowledged but -instead the delivery tag is stored in order to acknowledge it later to RabbitMQ. Messages are polled -by the output and when the notification for a completed checkpoint is received the messages that were -polled are acknowledged to RabbitMQ. Therefore, the mode requires _checkpointing enabled_. This way, -it is assured that the messages are correctly processed by the system. If the system crashes in the -meantime, the unacknowledged messages will be resend by RabbitMQ to assure at-least-once behavior. - -The __exactly-once-mode__ mode uses _correlation ids_ to deduplicate messages. Correlation ids are -properties of the messages and need to be set by the message publisher (who publishes the messages -to RabbitMQ) in order for the mode to function. The user has the obligation to ensure that the set -correlation id for a message is unique, otherwise no exactly-once can be guaranteed here since -RabbitMQ itself has no support for automatic exactly-once ids or the required behavior. In addition, -it requires _checkpointing enabled_and only _parallelism 1_ is allowed. Similar to at-least-once, -the messages are received from RabbitMQ,buffered, and passed to the output when polled. A set of -seen correlation ids is maintained to apply the deduplication. During a checkpoint, the seen -correlation ids are stored so that in case of failure they can be recovered and used for -deduplication. When the notification for a completed checkpoint is received, all polled messages are -acknowledged as one transaction to ensure the reception by RabbitMQ. Afterwards, the set of -correlation ids is updated as RabbitMQ will not send the acknowledged messages again. This behavior -assures exactly-once processing but also has a performance drawback. Committing many messages will -take time and will thus increase the overall time it takes to do a checkpoint. This can result in -checkpoint delays and in peaks where checkpoint have either many or just a few messages. - -## How to use it -```java -public class Main { - public static void main(String[]args) { - - RabbitMQSource source = - RabbitMQSource.builder() - .setConnectionConfig(RMQ_CONNECTION_CONFIG) - .setQueueName(RABBITMQ_QUEUE_NAME) - .setDeserializationSchema(DESERIALIZATION_SCHEMA) - .setConsistencyMode(CONSISTENCY_MODE) - .build(); - - // ******************* An example usage looks like this ******************* - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - RMQConnectionConfig rmqConnectionConfig = - new RMQConnectionConfig.Builder() - .setHost("localhost") - .setVirtualHost("/") - .setUserName("guest") - .setPassword("guest") - .setPort(5672) - .build(); - - RabbitMQSource rmqSource = - RabbitMQSource.builder() - .setConnectionConfig(rmqConnectionConfig) - .setQueueName("consume-queue") - .setDeserializationSchema(new SimpleStringSchema()) - .setConsistencyMode(ConsistencyMode.AT_MOST_ONCE) - .build(); - - DataStream stream = env.fromSource(rmqSource, WatermarkStrategy.noWatermarks(), "RMQSource"); - } -} -``` diff --git a/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/common/RabbitMQBaseTest.java b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/common/RabbitMQBaseTest.java index de6f0fd5b5d..d72627a5f00 100644 --- a/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/common/RabbitMQBaseTest.java +++ b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/common/RabbitMQBaseTest.java @@ -81,6 +81,33 @@ protected void executeFlinkJob() { flinkCluster.getClusterClient().submitJob(job); } + public RabbitMQContainerClient addSinkOn( + DataStream stream, ConsistencyMode consistencyMode, int countDownLatchSize) + throws IOException, TimeoutException { + RabbitMQContainerClient client = + new RabbitMQContainerClient<>( + rabbitMq, new SimpleStringSchema(), countDownLatchSize); + String queueName = client.createQueue(); + final RabbitMQConnectionConfig connectionConfig = + new RabbitMQConnectionConfig.Builder() + .setHost(rabbitMq.getHost()) + .setVirtualHost("/") + .setUserName(rabbitMq.getAdminUsername()) + .setPassword(rabbitMq.getAdminPassword()) + .setPort(rabbitMq.getMappedPort(RABBITMQ_PORT)) + .build(); + + RabbitMQSink sink = + RabbitMQSink.builder() + .setConnectionConfig(connectionConfig) + .setQueueName(queueName) + .setSerializationSchema(new SimpleStringSchema()) + .setConsistencyMode(consistencyMode) + .build(); + stream.sinkTo(sink).setParallelism(1); + return client; + } + protected DataStream addSourceOn( StreamExecutionEnvironment env, ConsistencyMode consistencyMode) throws IOException, TimeoutException { diff --git a/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/common/RabbitMQContainerClient.java b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/common/RabbitMQContainerClient.java index b3af891e093..66bdb9a098b 100644 --- a/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/common/RabbitMQContainerClient.java +++ b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/common/RabbitMQContainerClient.java @@ -119,7 +119,9 @@ public void await() throws InterruptedException { private void handleMessageReceivedCallback(String consumerTag, Delivery delivery) { byte[] body = delivery.getBody(); messages.add(body); - latch.countDown(); + if (latch != null) { + latch.countDown(); + } } private Connection getRabbitMQConnection() throws TimeoutException, IOException { diff --git a/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkITCase.java b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkITCase.java new file mode 100644 index 00000000000..4753213c03a --- /dev/null +++ b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkITCase.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.sink; + +import org.apache.flink.connector.rabbitmq.common.ConsistencyMode; +import org.apache.flink.connector.rabbitmq.common.RabbitMQBaseTest; +import org.apache.flink.connector.rabbitmq.common.RabbitMQContainerClient; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * The tests for the RabbitMQ sink with different consistency modes. As the tests are working a lot + * with timeouts to uphold stream it is possible that tests might fail. + */ +public class RabbitMQSinkITCase extends RabbitMQBaseTest { + + private static AtomicBoolean shouldFail; + + @Before + public void setup() { + shouldFail = new AtomicBoolean(true); + } + + private static class GeneratorFailureSource implements SourceFunction { + + private final BlockingQueue messagesToSend; + private int failAtNthMessage; + + public GeneratorFailureSource(BlockingQueue messagesToSend, int failAtNthMessage) { + this.messagesToSend = messagesToSend; + this.failAtNthMessage = failAtNthMessage; + shouldFail.set(true); + } + + @Override + public void run(SourceContext sourceContext) throws Exception { + while (true) { + if (failAtNthMessage == 0 && shouldFail.get()) { + shouldFail.set(false); + throw new Exception("Supposed to Fail"); + } + failAtNthMessage -= 1; + String message = messagesToSend.take(); + sourceContext.collect(message); + } + } + + @Override + public void cancel() {} + } + + @Test + public void atMostOnceTest() throws Exception { + List messages = getRandomMessages(100); + + DataStream stream = env.fromCollection(messages); + RabbitMQContainerClient client = + addSinkOn(stream, ConsistencyMode.AT_MOST_ONCE, messages.size()); + executeFlinkJob(); + client.await(); + + List receivedMessages = client.getConsumedMessages(); + assertEquals(messages, receivedMessages); + } + + @Test + public void atLeastOnceTest() throws Exception { + List messages = getRandomMessages(100); + DataStream stream = env.fromCollection(messages); + RabbitMQContainerClient client = + addSinkOn(stream, ConsistencyMode.AT_LEAST_ONCE, messages.size()); + + executeFlinkJob(); + client.await(); + + List receivedMessages = client.getConsumedMessages(); + assertEquals(messages, receivedMessages); + } + + @Test + public void atLeastOnceWithFlinkFailureTest() throws Exception { + LinkedBlockingQueue messages = new LinkedBlockingQueue<>(getRandomMessages(100)); + + GeneratorFailureSource source = new GeneratorFailureSource(messages, 30); + + DataStream stream = env.addSource(source); + RabbitMQContainerClient client = + addSinkOn(stream, ConsistencyMode.AT_LEAST_ONCE, messages.size() + 30); + + executeFlinkJob(); + client.await(); + + List receivedMessages = client.getConsumedMessages(); + assertTrue(receivedMessages.containsAll(messages)); + } + + @Test + public void exactlyOnceTest() throws Exception { + LinkedBlockingQueue messages = new LinkedBlockingQueue<>(getRandomMessages(100)); + env.enableCheckpointing(100); + + GeneratorFailureSource source = new GeneratorFailureSource(messages, -1); + DataStream stream = env.addSource(source); + RabbitMQContainerClient client = + addSinkOn(stream, ConsistencyMode.EXACTLY_ONCE, messages.size()); + + executeFlinkJob(); + client.await(); + + assertArrayEquals(messages.toArray(), client.getConsumedMessages().toArray()); + } + + @Test + public void exactlyOnceWithFlinkFailureTest() throws Exception { + LinkedBlockingQueue messages = new LinkedBlockingQueue<>(getRandomMessages(100)); + env.enableCheckpointing(100); + + GeneratorFailureSource source = new GeneratorFailureSource(messages, 80); + DataStream stream = env.addSource(source); + RabbitMQContainerClient client = + addSinkOn(stream, ConsistencyMode.EXACTLY_ONCE, messages.size()); + + executeFlinkJob(); + client.await(); + + assertArrayEquals(messages.toArray(), client.getConsumedMessages().toArray()); + } +} diff --git a/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/state/RabbitMQSinkWriterStateSerializerTest.java b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/state/RabbitMQSinkWriterStateSerializerTest.java new file mode 100644 index 00000000000..2566dc5fd4b --- /dev/null +++ b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/state/RabbitMQSinkWriterStateSerializerTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.sink.state; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkMessageWrapper; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** Test the sink writer state serializer. */ +public class RabbitMQSinkWriterStateSerializerTest { + + private RabbitMQSinkWriterState getSinkWriterState() { + List> outStandingMessages = new ArrayList<>(); + SimpleStringSchema serializer = new SimpleStringSchema(); + for (int i = 0; i < 5; i++) { + String message = "Message " + i; + RabbitMQSinkMessageWrapper messageWrapper = + new RabbitMQSinkMessageWrapper<>(message, serializer.serialize(message)); + outStandingMessages.add(messageWrapper); + } + return new RabbitMQSinkWriterState<>(outStandingMessages); + } + + @Test + public void testWriterStateSerializer() throws IOException { + RabbitMQSinkWriterState writerState = getSinkWriterState(); + RabbitMQSinkWriterStateSerializer serializer = + new RabbitMQSinkWriterStateSerializer<>(); + + byte[] serializedWriterState = serializer.serialize(writerState); + RabbitMQSinkWriterState deserializedWriterState = + serializer.deserialize(serializer.getVersion(), serializedWriterState); + + List expectedBytes = + writerState.getOutstandingMessages().stream() + .map(RabbitMQSinkMessageWrapper::getBytes) + .collect(Collectors.toList()); + List actualBytes = + deserializedWriterState.getOutstandingMessages().stream() + .map(RabbitMQSinkMessageWrapper::getBytes) + .collect(Collectors.toList()); + + assertEquals(expectedBytes.size(), actualBytes.size()); + for (int i = 0; i < expectedBytes.size(); i++) { + Assert.assertArrayEquals(expectedBytes.get(i), actualBytes.get(i)); + } + + List actualMessages = + deserializedWriterState.getOutstandingMessages().stream() + .map(RabbitMQSinkMessageWrapper::getMessage) + .collect(Collectors.toList()); + + for (String message : actualMessages) { + assertNull(message); + } + } + + @Test + public void testWriterStateSerializerWithDeserializationSchema() throws IOException { + RabbitMQSinkWriterState writerState = getSinkWriterState(); + SimpleStringSchema deserializer = new SimpleStringSchema(); + RabbitMQSinkWriterStateSerializer serializer = + new RabbitMQSinkWriterStateSerializer<>(deserializer); + + byte[] serializedWriterState = serializer.serialize(writerState); + RabbitMQSinkWriterState deserializedWriterState = + serializer.deserialize(serializer.getVersion(), serializedWriterState); + + List expectedMessages = + writerState.getOutstandingMessages().stream() + .map(RabbitMQSinkMessageWrapper::getMessage) + .collect(Collectors.toList()); + List expectedBytes = + writerState.getOutstandingMessages().stream() + .map(RabbitMQSinkMessageWrapper::getBytes) + .collect(Collectors.toList()); + + List actualMessages = + deserializedWriterState.getOutstandingMessages().stream() + .map(RabbitMQSinkMessageWrapper::getMessage) + .collect(Collectors.toList()); + List actualBytes = + deserializedWriterState.getOutstandingMessages().stream() + .map(RabbitMQSinkMessageWrapper::getBytes) + .collect(Collectors.toList()); + + assertEquals(expectedMessages, actualMessages); + assertEquals(expectedBytes.size(), actualBytes.size()); + for (int i = 0; i < expectedBytes.size(); i++) { + Assert.assertArrayEquals(expectedBytes.get(i), actualBytes.get(i)); + } + } +}