Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: add support for ingesting from rabbitmq super streams #14137

Conversation

jamiechapmanbrn
Copy link
Contributor

@jamiechapmanbrn jamiechapmanbrn commented Apr 21, 2023

This PR aims to add support for ingesting logs from a rabbitmq superstream, as a peer to Kafka or Kinesis. Since this supports scalable exactly once delivery, this provides a good gasket between RabbitMQ and Druid.

Currently, this is a WIP for a few reasons:

  1. No support for username/password and some other features, I welcome suggestions here as I'm not exactly sure how those work yet and could use some guidance on how that plumbing works

  2. The algorithm is pretty poor here, simply making a new rabbit consumer for each poll, it should be doing something similar to Kinesis where it fetches in the background, occasionally backing off if it gets too far ahead.

  3. Testing is extremely bare bones, primarily because the design of the record supplier could change here, and design layout could change test layout here.

  4. Documentation on how exactly to use it is missing. I've been testing it with the below supervisor spec, the primary configuration mechanism is the 'uri' which is used to configure how it connects to rabbitmq for reading metadata and for getting messages

The meat of the change is in RabbitStreamRecordSupplier, which has the components that interface with rabbitmq.
It uses two interfaces, one being the low level 'Client' interface to read the partitions from a super stream so that druid can distribute work, and the other being the 'Consumer' interface, the conventional interface which creates a thread that listens for messages for you, and calls back on the handler which puts the messages in queue.

The current interface simply manages a map of ConsumerBuilders as partitions are assigned, and then when it is polled for messages it will create consumers and wait for the timeout. Once the timeout has elapsed, it will close all the consumers, then collect any messages from the queue and return them. This should ensure no messages are lost, but probably isn't very efficient as it could be reconnecting more than necessary.

Release note

New: You can now ingest logs from RabbitMQ via super streams

This feature gives the ability to read directly from rabbitmq using the new super-streams feature. As super streams allows exactly-once delivery with full support for partitioning, it is now compatible with Druid's modern ingestion algorithm, without the downsides that the prior RabbitMQ firehose.

Note that this uses the RabbitMQ streams feature, and not a conventional exchange. You will need to make sure that your messages are in a super stream before consumption. For more information, see https://www.rabbitmq.com/streams.html.

In order to configure, create a new index task using the 'rabbit' type, and configure it's stream and URI to connect to the rabbitmq host.

{
  "type": "rabbit",
  "dataSchema": {
    "dataSource": <your super-stream>,
    }
  },
  "ioConfig": {
    "type": "rabbit",
    "stream": "api-audit",
    "uri": "rabbitmq-stream://localhost:5552",
    "taskCount": 1,
    "replicas": 1,
    "taskDuration": "PT1H"
  },
  "tuningConfig": {
    "type": "rabbit",
    "resetOffsetAutomatically": true
  }
}

Key changed/added classes in this PR

The following classes have been added to druid in this PR:

  • RabbitStreamSamplerSpec
  • RabbitStreamRecordSupplier
  • RabbitStreamIndexTaskTuningConfig
  • RabbitStreamIndexTaskModule
  • RabbitStreamIndexTaskIOConfig
  • RabbitStreamIndexTaskClientFactory
  • RabbitStreamIndexTask
  • RabbitStreamDataSourceMetadata
  • IncrementalPublishingRabbitStreamIndexTaskRunner
  • RabbitSupervisorTuningConfig
  • RabbitStreamSupervisorSpec
  • RabbitStreamSupervisorReportPayload
  • RabbitStreamSupervisorIOConfig
  • RabbitStreamSupervisorIngestionSpec
  • RabbitStreamSupervisor

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • [] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@jamiechapmanbrn jamiechapmanbrn changed the title initial commit, make module available for feedback Feature: add support for ingesting from rabbitmq super streams Apr 21, 2023
@jamiechapmanbrn jamiechapmanbrn force-pushed the feature-add-rabbit-stream-indexing-service branch from 3828173 to 6c49ca7 Compare May 1, 2023 14:37
@abhishekagarwal87
Copy link
Contributor

@jamiechapmanbrn - let us know if you need any help from druid dev community in moving this PR forward.

@jamiechapmanbrn jamiechapmanbrn force-pushed the feature-add-rabbit-stream-indexing-service branch from 8743213 to 822a191 Compare June 1, 2023 21:38
@jamiechapmanbrn jamiechapmanbrn marked this pull request as ready for review June 1, 2023 22:08
@jamiechapmanbrn
Copy link
Contributor Author

I've marked this as ready for review. We're testing it currently in our cluster and it seems to be working fine. I've added documentation and extended the tests significantly, I'm not 100% sure whether it has enough coverage to pass the CI yet or not, but I'm running out of steam to work on those. They are relatively tricky for me since I don't work much with Java and I'm not as familiar with the supervisor structures/classes.

Copy link

@github-advanced-security github-advanced-security bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CodeQL found more than 10 potential problems in the proposed changes. Check the Files changed tab for more details.

@abhishekagarwal87
Copy link
Contributor

have you done any scale testing on this yet? I am wondering what kind of scale you have run it with so far.

@jamiechapmanbrn
Copy link
Contributor Author

Currently, our org is running with it with a relatively low scale, somewhere on the order of a few events per minute. What would you consider reasonable for scale testing?

@AmatyaAvadhanula
Copy link
Contributor

@jamiechapmanbrn, I'll complete my review for this PR soon.
Could you please take a look at the Integration tests that are failing as well?

@jamiechapmanbrn
Copy link
Contributor Author

Okay, I rebased on the latest master and updated the pom.xml file to reference the latest. I'll have a look at any test failures that come when they're available.

@jamiechapmanbrn jamiechapmanbrn force-pushed the feature-add-rabbit-stream-indexing-service branch 2 times, most recently from f705005 to fd210b7 Compare January 8, 2024 00:13
Copy link

@github-advanced-security github-advanced-security bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CodeQL found more than 20 potential problems in the proposed changes. Check the Files changed tab for more details.

@jamiechapmanbrn jamiechapmanbrn force-pushed the feature-add-rabbit-stream-indexing-service branch from fd210b7 to 9d4b9de Compare January 8, 2024 21:54
@jamiechapmanbrn jamiechapmanbrn force-pushed the feature-add-rabbit-stream-indexing-service branch from 9d4b9de to 24f5e2c Compare January 8, 2024 21:56
@AmatyaAvadhanula
Copy link
Contributor

Thank you @jamiechapmanbrn. Overall approach LGTM!
Have a few minor comments that I'll post soon.

Copy link
Contributor

@AmatyaAvadhanula AmatyaAvadhanula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please take a pass over the RabbitStreamRecordSupplier and RabbitStreamSupervisor classes to clean up minor things such as unneeded exceptions, unused arguments in newly added methods, and see if any public methods exist that can be made private?

@jamiechapmanbrn jamiechapmanbrn force-pushed the feature-add-rabbit-stream-indexing-service branch 2 times, most recently from d706e3d to 36d9910 Compare January 11, 2024 15:53
Copy link
Contributor

@AmatyaAvadhanula AmatyaAvadhanula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!
+1 after the conflicts and build failures are fixed.

@jamiechapmanbrn jamiechapmanbrn force-pushed the feature-add-rabbit-stream-indexing-service branch from 36d9910 to 817d229 Compare February 13, 2024 06:00
@jamiechapmanbrn jamiechapmanbrn force-pushed the feature-add-rabbit-stream-indexing-service branch from 817d229 to 95ac4e5 Compare February 13, 2024 06:04
@jamiechapmanbrn
Copy link
Contributor Author

That's great to hear!

I've rebased once again, and added additional test coverage. It should pass now. I'll take another look once the CI has done its magic.

@jamiechapmanbrn jamiechapmanbrn force-pushed the feature-add-rabbit-stream-indexing-service branch from 95ac4e5 to 71a1706 Compare February 13, 2024 21:37
@jamiechapmanbrn
Copy link
Contributor Author

I think I've fixed the pipeline issues.
I see a bunch of dependency pull problems for druid-iceberg-extensions that don't have any relation to this pr.

Error: Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: org.eclipse.aether.resolution.DependencyResolutionException: Could not find artifact org.apache.druid.extensions:druid-iceberg-extensions:jar:30.0.0-SNAPSHOT in  (https://repo1.maven.org/maven2/)
	at org.apache.druid.cli.PullDependencies.run(PullDependencies.java:333)
	at org.apache.druid.cli.Main.main(Main.java:112)
Caused by: java.lang.RuntimeException: org.eclipse.aether.resolution.DependencyResolutionException: Could not find artifact org.apache.druid.extensions:druid-iceberg-extensions:jar:30.0.0-SNAPSHOT in  (https://repo1.maven.org/maven2/)
	at org.apache.druid.cli.PullDependencies.downloadExtension(PullDependencies.java:418)
	at org.apache.druid.cli.PullDependencies.downloadExtension(PullDependencies.java:363)
	at org.apache.druid.cli.PullDependencies.run(PullDependencies.java:309)
	... 1 more

distribution/pom.xml Outdated Show resolved Hide resolved
@jamiechapmanbrn jamiechapmanbrn force-pushed the feature-add-rabbit-stream-indexing-service branch from 971fdf0 to ff7beb7 Compare February 15, 2024 19:28
@jamiechapmanbrn jamiechapmanbrn force-pushed the feature-add-rabbit-stream-indexing-service branch from ff7beb7 to d1ec691 Compare February 15, 2024 21:30
@AmatyaAvadhanula AmatyaAvadhanula merged commit 80942d5 into apache:master Feb 22, 2024
82 of 83 checks passed
@AmatyaAvadhanula
Copy link
Contributor

@jamiechapmanbrn Thank you!

@jamiechapmanbrn
Copy link
Contributor Author

That's great! Thanks for all the help!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants