Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-20628] RabbitMQ Connector using FLIP-27 Source API #1

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions flink-connector-rabbitmq/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# License of the RabbitMQ Connector

Flink's RabbitMQ connector defines a Maven dependency on the
"RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public License 1.1 ("MPL"),
the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL").

Flink itself neither reuses source code from the "RabbitMQ AMQP Java Client"
nor packages binaries from the "RabbitMQ AMQP Java Client".

Users that create and publish derivative work based on Flink's
RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client")
must be aware that this may be subject to conditions declared in the
Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL")
and the Apache License version 2 ("ASL").

This connector allows consuming messages from and publishing to RabbitMQ. It implements the
Source API specified in [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
and the Sink API specified in [FLIP-143](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API).

For more information about RabbitMQ visit https://www.rabbitmq.com/.

# RabbitMQ Source

Flink's RabbitMQ connector provides a streaming-only source which enables you to receive messages
from a RabbitMQ queue in three different consistency modes: at-most-once, at-least-once,
and exactly-once.

## Consistency Modes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all belongs into the documentation, not a readme.


With **at-most-once**, the source will receive each message and automatically acknowledges it to
RabbitMQ. The message content is then polled by the output. If the system crashes in the meantime,
the messages that the source buffers are lost.

By contrast, the messages in the **at-least-once** mode are not automatically acknowledged but
instead the delivery tag is stored in order to acknowledge it later to RabbitMQ. Messages are polled
by the output and when the notification for a completed checkpoint is received the messages that were
polled are acknowledged to RabbitMQ. Therefore, the mode requires _checkpointing enabled_. This way,
it is assured that the messages are correctly processed by the system. If the system crashes in the
meantime, the unacknowledged messages will be resend by RabbitMQ to assure at-least-once behavior.

The **exactly-once-mode** mode uses _correlation ids_ to deduplicate messages. Correlation ids are
properties of the messages and need to be set by the message publisher (who publishes the messages
to RabbitMQ) in order for the mode to function. The user has the obligation to ensure that the set
correlation id for a message is unique, otherwise no exactly-once can be guaranteed here since
RabbitMQ itself has no support for automatic exactly-once ids or the required behavior. In addition,
it requires _checkpointing enabled_and only \_parallelism 1_ is allowed. Similar to at-least-once,
the messages are received from RabbitMQ,buffered, and passed to the output when polled. A set of
seen correlation ids is maintained to apply the deduplication. During a checkpoint, the seen
correlation ids are stored so that in case of failure they can be recovered and used for
deduplication. When the notification for a completed checkpoint is received, all polled messages are
acknowledged as one transaction to ensure the reception by RabbitMQ. Afterwards, the set of
correlation ids is updated as RabbitMQ will not send the acknowledged messages again. This behavior
assures exactly-once processing but also has a performance drawback. Committing many messages will
take time and will thus increase the overall time it takes to do a checkpoint. This can result in
checkpoint delays and in peaks where checkpoint have either many or just a few messages.

## How to use it

```java
public class Main {
public static void main(String[]args) {

RabbitMQSource<T> source =
RabbitMQSource.<T>builder()
.setConnectionConfig(RMQ_CONNECTION_CONFIG)
.setQueueName(RABBITMQ_QUEUE_NAME)
.setDeserializationSchema(DESERIALIZATION_SCHEMA)
.setConsistencyMode(CONSISTENCY_MODE)
.build();

// ******************* An example usage looks like this *******************

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

RMQConnectionConfig rmqConnectionConfig =
new RMQConnectionConfig.Builder()
.setHost("localhost")
.setVirtualHost("/")
.setUserName("guest")
.setPassword("guest")
.setPort(5672)
.build();

RabbitMQSource<String> rmqSource =
RabbitMQSource.<String>builder()
.setConnectionConfig(rmqConnectionConfig)
.setQueueName("consume-queue")
.setDeserializationSchema(new SimpleStringSchema())
.setConsistencyMode(ConsistencyMode.AT_MOST_ONCE)
.build();

DataStream<String> stream = env.fromSource(rmqSource, WatermarkStrategy.noWatermarks(), "RMQSource");
}
}
```

# RabbitMQ Sink

Flink's RabbitMQ connector provides a sink which enables you to publish your stream directly
to a RabbitMQ exchange in three different consistency modes: at-most-once, at-least-once,
and exactly-once. Furthermore, user defined publish options can be used to customize each message
options in regard to exchange and publish settings in the RabbitMQ context.

## Consistency Mode

With **at-most-once**, the sink will simply take each message and publish the serialization of it
(with publish options if given) to RabbitMQ. At this point the sink gives up the ownership of the message.

For **at-least-once** the same process as for at-most-once is executed except that the ownership of
the message does not end immediately with publishing it. The sink will keep the individual publishing
id for each message as well as the message itself and buffer it as long as it takes to receive the
message received acknowledgment from RabbitMQ. Since we are in the need of buffering messages and waiting
for their asynchronous acknowledgment, this requires _checkpointing enabled_. On each checkpoint,
all to this point buffered (and thus send) but unacknowledged messages will be stored. Simultaneously,
on each checkpoint a resend will be triggered to send all unacknowledged messages once again since
we have to assume that something went wrong for it during the publishing process. Since it can take a
moment until messages get acknowledged from RabbitMQ this can and probably will result in a message
duplication and therefore this logic becomes at-least-once.

By contrast, the **exactly-once-mode** mode will not send messages on receive. All incoming messages
will be buffered until a checkpoint is triggered. On each checkpoint all messages will be
published/committed as one transaction to ensure the reception acknowledge by RabbitMQ.
If successful, all messages which were committed will be given up, otherwise they will be stored
and tried to commit again in the next transaction during the next checkpoint.
This consistency mode ensures that each message will be stored in RabbitMQ exactly once but also has
a performance drawback. Committing many messages will take time and will thus increase the overall
time it takes to do a checkpoint. This can result in checkpoint delays and in peaks where
checkpoint have either many or just a few messages. This also correlates to the latency of each message.

## How to use it

```java
RabbitMQSink<T> sink =
RabbitMQSink.<T>builder()
.setConnectionConfig(<RMQConnectionConfig>)
.setQueueName(<RabbitMQ Queue Name>)
.setSerializationSchema(<Serialization Schema>)
.setConsistencyMode(<ConsistencyMode>)
.build();

// ******************* An example usage looks like this *******************

RMQConnectionConfig rmqConnectionConfig =
new RMQConnectionConfig.Builder()
.setHost("localhost")
.setVirtualHost("/")
.setUserName("guest")
.setPassword("guest")
.setPort(5672)
.build();

RabbitMQSink<String> rmqSink =
RabbitMQSink.<String>builder()
.setConnectionConfig(rmqConnectionConfig)
.setQueueName("publish-queue")
.setSerializationSchema(new SimpleStringSchema())
.setConsistencyMode(ConsistencyMode.AT_MOST_ONCE)
.build();

(DataStream<String>).sinkTo(rmqSink)
```
119 changes: 119 additions & 0 deletions flink-connector-rabbitmq/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connectors</artifactId>
<version>1.16-SNAPSHOT</version>
</parent>

<artifactId>flink-connector-rabbitmq</artifactId>
<name>Flink : Connectors : RabbitMQ</name>

<packaging>jar</packaging>

<!-- Allow users to pass custom connector versions -->
<properties>
<rabbitmq.version>5.9.0</rabbitmq.version>
</properties>

<dependencies>

<!-- Core -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- RabbitMQ -->

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${rabbitmq.version}</version>
</dependency>

<!-- Tests -->

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>rabbitmq</artifactId>
<version>1.15.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
Comment on lines +102 to +117
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we publishing a test-jar?


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.rabbitmq.common;

/**
* The different consistency modes that can be defined for the sink and source individually.
*
* <p>The available consistency modes are as follows.
*
* <ul>
* <li><code>AT_MOST_ONCE</code> Messages are consumed by the output once or never.
* <li><code>AT_LEAST_ONCE</code> Messages are consumed by the output at least once.
* <li><code>EXACTLY_ONCE</code> Messages are consumed by the output exactly once.
* </ul>
*
* <p>Note that the higher the consistency guarantee gets, fewer messages can be processed by the
* system. At-least-once and exactly-once should only be used if necessary.
*/
public enum ConsistencyMode {
AT_MOST_ONCE,
AT_LEAST_ONCE,
EXACTLY_ONCE,
}
Loading