Skip to content

Commit

Permalink
Feature: add support for ingesting from rabbitmq super streams (#14137)
Browse files Browse the repository at this point in the history
* Add support for ingesting from Rabbit MQ Super Streams
  • Loading branch information
jamiechapmanbrn authored Feb 22, 2024
1 parent 59bb72a commit 80942d5
Show file tree
Hide file tree
Showing 29 changed files with 4,538 additions and 0 deletions.
2 changes: 2 additions & 0 deletions distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,8 @@
<argument>org.apache.druid.extensions.contrib:druid-deltalake-extensions</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:druid-spectator-histogram</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:druid-rabbit-indexing-service</argument>
</arguments>
</configuration>
</execution>
Expand Down
239 changes: 239 additions & 0 deletions docs/ingestion/rabbit-stream-ingestion.md

Large diffs are not rendered by default.

178 changes: 178 additions & 0 deletions extensions-contrib/rabbit-stream-indexing-service/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRA NTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.apache.druid.extensions.contrib</groupId>
<artifactId>druid-rabbit-indexing-service</artifactId>
<name>druid-rabbit-indexing-service</name>
<description>druid-rabbit-indexing-service</description>

<parent>
<groupId>org.apache.druid</groupId>
<artifactId>druid</artifactId>
<version>30.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<dependencies>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-indexing-service</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.10.6.Final</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>stream-client</artifactId>
<version>0.15.0</version>
</dependency>
<dependency>
<groupId>jakarta.validation</groupId>
<artifactId>jakarta.validation-api</artifactId>
<version>2.0.2</version>
<scope>provided</scope>
</dependency>

<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-indexing-service</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.rabbitstream;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SequenceMetadata;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.security.AuthorizerMapper;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

/**
* RabbitStream indexing task runner supporting incremental segments publishing
*/
public class IncrementalPublishingRabbitStreamIndexTaskRunner
extends SeekableStreamIndexTaskRunner<String, Long, ByteEntity>
{
private static final EmittingLogger log = new EmittingLogger(IncrementalPublishingRabbitStreamIndexTaskRunner.class);
private final RabbitStreamIndexTask task;

IncrementalPublishingRabbitStreamIndexTaskRunner(
RabbitStreamIndexTask task,
@Nullable InputRowParser<ByteBuffer> parser,
AuthorizerMapper authorizerMapper,
LockGranularity lockGranularityToUse)
{
super(
task,
parser,
authorizerMapper,
lockGranularityToUse);
this.task = task;
}

@Override
protected Long getNextStartOffset(@NotNull Long sequenceNumber)
{
return sequenceNumber + 1;
}

@Nonnull
@Override
protected List<OrderedPartitionableRecord<String, Long, ByteEntity>> getRecords(
RecordSupplier<String, Long, ByteEntity> recordSupplier,
TaskToolbox toolbox)
{
return recordSupplier.poll(task.getIOConfig().getPollTimeout());
}

@Override
protected SeekableStreamEndSequenceNumbers<String, Long> deserializePartitionsFromMetadata(
ObjectMapper mapper,
Object object)
{
return mapper.convertValue(object, mapper.getTypeFactory().constructParametrizedType(
SeekableStreamEndSequenceNumbers.class,
SeekableStreamEndSequenceNumbers.class,
String.class,
Long.class));
}

@Override
protected SeekableStreamDataSourceMetadata<String, Long> createDataSourceMetadata(
SeekableStreamSequenceNumbers<String, Long> partitions)
{
return new RabbitStreamDataSourceMetadata(partitions);
}

@Override
protected OrderedSequenceNumber<Long> createSequenceNumber(Long sequenceNumber)
{
return RabbitSequenceNumber.of(sequenceNumber);
}

@Override
protected void possiblyResetDataSourceMetadata(
TaskToolbox toolbox,
RecordSupplier<String, Long, ByteEntity> recordSupplier,
Set<StreamPartition<String>> assignment)
{
// do nothing
}

@Override
protected boolean isEndOffsetExclusive()
{
return true;
}

@Override
protected boolean isEndOfShard(Long seqNum)
{
return false;
}

@Override
public TypeReference<List<SequenceMetadata<String, Long>>> getSequenceMetadataTypeReference()
{
return new TypeReference<List<SequenceMetadata<String, Long>>>() {
};
}

@Nullable
@Override
protected TreeMap<Integer, Map<String, Long>> getCheckPointsFromContext(
TaskToolbox toolbox,
String checkpointsString) throws IOException
{
if (checkpointsString != null) {
log.debug("Got checkpoints from task context[%s].", checkpointsString);
return toolbox.getJsonMapper().readValue(
checkpointsString,
new TypeReference<TreeMap<Integer, Map<String, Long>>>() {
});
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.rabbitstream;

import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;

import javax.validation.constraints.NotNull;

// OrderedSequenceNumber.equals() should be used instead.
@SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
public class RabbitSequenceNumber extends OrderedSequenceNumber<Long>
{
private RabbitSequenceNumber(Long sequenceNumber)
{
super(sequenceNumber, false);
}

public static RabbitSequenceNumber of(Long sequenceNumber)
{
return new RabbitSequenceNumber(sequenceNumber);
}

@Override
public int compareTo(
@NotNull OrderedSequenceNumber<Long> o)
{
return this.get().compareTo(o.get());
}

}
Loading

0 comments on commit 80942d5

Please sign in to comment.