Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-20628] RabbitMQ Connector using FLIP-27 Source API #1

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

pscls
Copy link

@pscls pscls commented May 23, 2022

What is the purpose of the change

This pull request ports the RabbitMQ connector implementation to the new Connector’s API described in FLIP-27 and FLIP-143. It includes both source and sink with at-most-once, at-least-once, and exactly-once behavior, respectively.

This pull request closes the following issues (separated RabbitMQ connector Source and Sink tickets): FLINK-20628 and FLINK-21373

Brief change log

  • Source and Sink use the RabbitMQ’s Java Client API to interact with RabbitMQ
  • The RabbitMQ Source reads messages from a queue
  • At-least-once
    • Messages are acknowledged on checkpoint completion
  • Exactly-once
    • Messages are acknowledged in a transaction
  • The user has to set correlation ids for deduplication
  • The RabbitMQ Sink publishes messages to a queue
  • At-least-once
    • Unacknowledged messages are resend on checkpoints
  • Exactly-once
    • Messages between two checkpoints are published in a transaction

Verifying this change

This change added tests and can be verified as follows:

All changes are within the flink-connectors/flink-connector-rabbitmq2/ module.
Added Integration Tests can be find under org.apache.flink.connector.rabbitmq2.source and org.apache.flink.connector.rabbitmq2.sink package in the test respective directories.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (don't know)
  • The runtime per-record code paths (performance sensitive): (don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (don't know)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduces a new feature? (yes)
  • If yes, how is the feature documented? (JavaDocs)

Co-authored-by: Yannik Schroeder [email protected]
Co-authored-by: Jan Westphal [email protected]

@pscls
Copy link
Author

pscls commented May 23, 2022

This is a copy from the original PR (apache/flink#15140) against the Flink repository.

@pscls
Copy link
Author

pscls commented May 23, 2022

@MartijnVisser We are not exactly sure what has to be part of the root-pom.

@MartijnVisser
Copy link
Contributor

@pscls I think you've done a good job already with the root-pom; it looks like the one we currently have for Elasticsearch. I've just approved the run, so we can also see how the build behaves. When I tried it locally, it complained about https://github.com/pscls/flink-connector-rabbitmq/blob/new-api-connector/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSink.java#L124 having a whitespace, but now the tests are running for me.

I'll work on finding someone who can help with the review for this.

@MartijnVisser
Copy link
Contributor

@pscls Can you have a look at the failing build? It's a checkstyle error.

@wanglijie95
Copy link

@MartijnVisser @pscls I noticed that the GitBox of flink-connector-rabbitmq sent emails to the [email protected]. Is it expected?

@MartijnVisser
Copy link
Contributor

@wanglijie95 No. Most likely this is caused because the PR was created/is not yet using the ASF config as defined in https://github.com/apache/flink-connector-rabbitmq/blob/main/.asf.yaml

@MartijnVisser
Copy link
Contributor

@pscls The CI fails due to spotless; can you fix that? (By running mvn spotless:apply)

@pscls
Copy link
Author

pscls commented Aug 9, 2022

@pscls The CI fails due to spotless; can you fix that? (By running mvn spotless:apply)

@MartijnVisser I've nothing to commit when running mvn spotless:apply.
image

@MartijnVisser
Copy link
Contributor

@pscls Weird. Could you push once more, since the logs are no longer available?

westphal-jan and others added 2 commits September 12, 2022 18:37
RabbitMQ Connector using the new Source API
https://issues.apache.org/jira/browse/FLINK-20628

Co-authored-by: Yannik Schroeder <[email protected]>
Co-authored-by: Jan Westphal <[email protected]>
RabbitMQ Connector using the new Sink API
https://issues.apache.org/jira/browse/FLINK-21373

Co-authored-by: Yannik Schroeder <[email protected]>
Co-authored-by: Jan Westphal <[email protected]>
Comment on lines +10 to +14
Users that create and publish derivative work based on Flink's
RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client")
must be aware that this may be subject to conditions declared in the
Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL")
and the Apache License version 2 ("ASL").
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This belongs into a NOTICE, both contained in the source release and jars.

Suggested change
Users that create and publish derivative work based on Flink's
RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client")
must be aware that this may be subject to conditions declared in the
Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL")
and the Apache License version 2 ("ASL").
Users that create and publish derivative work based on Flink's
RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client")
must be aware that this is subject to conditions declared in either the
Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL")
OR the Apache License version 2 ("ASL").

Additionally this needs clarification that users can choose which license they use for the derivative work.

<modelVersion>4.0.0</modelVersion>

<groupId>org.apache.flink</groupId>
<artifactId>flink-connectors</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is missing a fair amount of cleanups that were later applied to the ES parent pom.

See apache/flink-connector-elasticsearch#31 and replicate the changes.

Comment on lines +102 to +117

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we publishing a test-jar?


import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig;

import org.junit.Test;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good if we could start out with junit5 and not have to go through a migration again.

from a RabbitMQ queue in three different consistency modes: at-most-once, at-least-once,
and exactly-once.

## Consistency Modes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all belongs into the documentation, not a readme.

Comment on lines +167 to +172
return CompletableFuture.runAsync(
() -> {
while (!collector.hasUnpolledMessages()) {
// supposed to be empty
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is problematic for a few reasons.

a) It runs in a non-specified thread pool, which means it runs in the JVMs common pool which may also be in use by other components. Use a dedicated executor.
b) It hot-loops, which both blocks an entire thread from doing anything and blows through CPU cycles. Consider restructuring the collector to return a sort of availability future that is completed once a message was added, or use basic locking to at least prevent hot-looping.


@Override
public List<RabbitMQSourceSplit> snapshotState(long checkpointId) {
return split != null ? Collections.singletonList(split.copy()) : new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return split != null ? Collections.singletonList(split.copy()) : new ArrayList<>();
return split != null ? Collections.singletonList(split.copy()) : Collections.emptyList();

Comment on lines +185 to +187
if (split != null) {
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems wrong.

*/
public RabbitMQSourceBuilder<T> setDeserializationSchema(
DeserializationSchema<T> deserializationSchema) {
this.deserializationSchema = deserializationSchema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing null checks; preferable to do them here to fail as early as possible.

* provides behavior to easily add onto the stream, send message to RabbitMQ and get the messages in
* RabbitMQ.
*/
public abstract class RabbitMQBaseTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't use test bases; they've always ended up creating problems.

Model this as an extension instead.

@zentol zentol self-assigned this Sep 29, 2022
@zentol
Copy link
Contributor

zentol commented Sep 30, 2022

You can avoid a lot of boilerplate by using the preliminary flink-connector-parent pom as shown here: apache/flink-connector-elasticsearch@6e30d5d

@RocMarshal
Copy link
Contributor

Hi, @pscls Thank you very much for the contribution.
I notice that this PR has not been updated for a long time.
Would you like to continue advancing it ?
After the PR completed, FLINK-25380 will be introduced.
Looking forward to your opinion.
Thanks.

@MartijnVisser
Copy link
Contributor

Would you like to continue advancing it ?

@RocMarshal Do you want to take this over?

@RocMarshal
Copy link
Contributor

Would you like to continue advancing it ?

@RocMarshal Do you want to take this over?

Hi, @MartijnVisser Glad to get your attention.
In fact, you have already assigned this ticket to me on https://issues.apache.org/jira/browse/FLINK-20628 . I am working for it now.
Thank you ~

@MartijnVisser
Copy link
Contributor

I am working for it now.

Any update to report from your end on this @RocMarshal ?

@RocMarshal
Copy link
Contributor

I am working for it now.

Any update to report from your end on this @RocMarshal ?

Hi, @MartijnVisser Still in doing. I'll update in the end of this week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants