Skip to content

Commit

Permalink
fix: rudementary support for username+password
Browse files Browse the repository at this point in the history
  • Loading branch information
Jamie Chapman-Brown committed Apr 27, 2023
1 parent a2c5d15 commit 3828173
Show file tree
Hide file tree
Showing 7 changed files with 312 additions and 208 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.indexing.rabbitstream;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -28,13 +30,15 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.segment.indexing.DataSchema;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.HashMap;
import java.util.Map;

public class RabbitStreamIndexTask extends SeekableStreamIndexTask<String, Long, ByteEntity>
{
private static final String TYPE = "index_rabbit";

private final ObjectMapper configMapper;
// This value can be tuned in some tests
private long pollRetryMs = 30000;

Expand All @@ -45,7 +49,8 @@ public RabbitStreamIndexTask(
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("tuningConfig") RabbitStreamIndexTaskTuningConfig tuningConfig,
@JsonProperty("ioConfig") RabbitStreamIndexTaskIOConfig ioConfig,
@JsonProperty("context") Map<String, Object> context)
@JsonProperty("context") Map<String, Object> context,
@JacksonInject ObjectMapper configMapper)
{
super(
getOrMakeId(id, dataSchema.getDataSource(), TYPE),
Expand All @@ -55,7 +60,8 @@ public RabbitStreamIndexTask(
ioConfig,
context,
getFormattedGroupId(dataSchema.getDataSource(), TYPE));
// this.rabbitCredentialConfig = rabbitCredentialConfig;
this.configMapper = configMapper;


Preconditions.checkArgument(
ioConfig.getStartSequenceNumbers().getExclusivePartitions().isEmpty(),
Expand Down Expand Up @@ -83,10 +89,11 @@ protected RabbitStreamRecordSupplier newTaskRecordSupplier()
{

RabbitStreamIndexTaskIOConfig ioConfig = ((RabbitStreamIndexTaskIOConfig) super.ioConfig);
final Map<String, Object> props = new HashMap<>(ioConfig.getConsumerProperties());
// tuning config not yet used
//RabbitStreamIndexTaskTuningConfig tuningConfig = ((RabbitStreamIndexTaskTuningConfig) super.tuningConfig);

return new RabbitStreamRecordSupplier(ioConfig.getUri());
return new RabbitStreamRecordSupplier(props, configMapper, ioConfig.getUri());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.util.Map;

public class RabbitStreamIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<String, Long>
{
private final long pollTimeout;
private final String uri;
private final Map<String, Object> consumerProperties;

@JsonCreator
public RabbitStreamIndexTaskIOConfig(
Expand All @@ -47,6 +49,7 @@ public RabbitStreamIndexTaskIOConfig(
// versions
@JsonProperty("startSequenceNumbers") SeekableStreamStartSequenceNumbers<String, Long> startSequenceNumbers,
@JsonProperty("endSequenceNumbers") SeekableStreamEndSequenceNumbers<String, Long> endSequenceNumbers,
@JsonProperty("consumerProperties") Map<String, Object> consumerProperties,
@JsonProperty("pollTimeout") Long pollTimeout,
@JsonProperty("useTransaction") Boolean useTransaction,
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
Expand All @@ -67,6 +70,8 @@ public RabbitStreamIndexTaskIOConfig(
this.pollTimeout = pollTimeout != null ? pollTimeout : RabbitStreamSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS;
this.uri = uri;

this.consumerProperties = consumerProperties;

final SeekableStreamEndSequenceNumbers<String, Long> myEndSequenceNumbers = getEndSequenceNumbers();
for (String partition : myEndSequenceNumbers.getPartitionSequenceNumberMap().keySet()) {
Preconditions.checkArgument(
Expand All @@ -78,6 +83,12 @@ public RabbitStreamIndexTaskIOConfig(
}
}

@JsonProperty
public Map<String, Object> getConsumerProperties()
{
return consumerProperties;
}

@JsonProperty
public long getPollTimeout()
{
Expand All @@ -98,6 +109,7 @@ public String toString()
", baseSequenceName='" + getBaseSequenceName() + '\'' +
", startSequenceNumbers=" + getStartSequenceNumbers() +
", endSequenceNumbers=" + getEndSequenceNumbers() +
", consumerProperties=" + consumerProperties +
", pollTimeout=" + pollTimeout +
", useTransaction=" + isUseTransaction() +
", minimumMessageTime=" + getMinimumMessageTime() +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

package org.apache.druid.indexing.rabbitstream;


import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.ConsumerBuilder;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.EnvironmentBuilder;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.MessageHandler;
import com.rabbitmq.stream.OffsetSpecification;
Expand All @@ -33,6 +36,10 @@
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.metadata.DynamicConfigProvider;

import org.apache.druid.indexing.rabbitstream.supervisor.RabbitStreamSupervisorIOConfig;

import javax.annotation.Nonnull;

Expand All @@ -55,30 +62,67 @@ public class RabbitStreamRecordSupplier implements RecordSupplier<String, Long,
private List<OrderedPartitionableRecord<String, Long, ByteEntity>> queue;
private String superStream;
private String uri;
private ObjectMapper mapper;

private String password;
private String username;

public RabbitStreamRecordSupplier(
Map<String, Object> consumerProperties,
ObjectMapper mapper,
String uri)
{
this.uri = uri;
this.env = this.getRabbitEnvironment();
this.mapper = mapper;

// Messages will be added to this queue from multiple threads
queue = Collections.synchronizedList(new ArrayList<OrderedPartitionableRecord<String, Long, ByteEntity>>());
streamBuilders = new ConcurrentHashMap<>();

this.password = null;
this.username = null;

if (consumerProperties != null) {

// Additional DynamicConfigProvider based extensible support for all consumer properties
Object dynamicConfigProviderJson = consumerProperties.get(RabbitStreamSupervisorIOConfig.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY);
if (dynamicConfigProviderJson != null) {
DynamicConfigProvider dynamicConfigProvider = mapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class);
Map<String, String> dynamicConfig = dynamicConfigProvider.getConfig();
for (Map.Entry<String, String> e : dynamicConfig.entrySet()) {
System.out.println("dynamic key: " + e.getKey());
System.out.println("dynamic value: " + e.getValue());
if (e.getKey().equals("password")){
this.password = e.getValue();
}
if (e.getKey().equals("username")){
this.username = e.getValue();
}
}
}
}
}

public Environment getRabbitEnvironment()
{
return Environment.builder().uri(this.uri).build();
EnvironmentBuilder envBuilder = Environment.builder().uri(this.uri);

if (this.password != null) {
envBuilder = envBuilder.password(this.password);
}
if (this.username != null) {
envBuilder = envBuilder.username(this.username);
}
return envBuilder.build();
}

@Override
public void assign(Set<StreamPartition<String>> streamPartitions)
{
for (StreamPartition<String> part : streamPartitions) {
ConsumerBuilder builder = this.env.consumerBuilder();
builder = builder.name(part.getPartitionId());
builder = builder.stream(part.getPartitionId());
builder = builder.messageHandler(this);
streamBuilders.put(part.getPartitionId(), builder);
this.superStream = part.getStream();
Expand Down Expand Up @@ -195,6 +239,14 @@ public Set<String> getPartitionIds(String stream)
parameters.port(parsedURI.getPort());
}

if (this.password != null) {
parameters.password(password);
}

if (this.username != null) {
parameters.username(username);
}

Client client = new Client(parameters);

List<String> partitions = client.partitions(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.indexing.rabbitstream;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -29,24 +30,34 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamSamplerSpec;

import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;

public class RabbitStreamSamplerSpec extends SeekableStreamSamplerSpec
{
private final ObjectMapper objectMapper;

@JsonCreator
public RabbitStreamSamplerSpec(
@JsonProperty("spec") final RabbitStreamSupervisorSpec ingestionSpec,
@JsonProperty("samplerConfig") @Nullable final SamplerConfig samplerConfig,
@JacksonInject InputSourceSampler inputSourceSampler)
@JacksonInject InputSourceSampler inputSourceSampler,
@JacksonInject ObjectMapper objectMapper)
{
super(ingestionSpec, samplerConfig, inputSourceSampler);
this.objectMapper = objectMapper;
}

@Override
protected RabbitStreamRecordSupplier createRecordSupplier()
{
RabbitStreamSupervisorIOConfig ioConfig = (RabbitStreamSupervisorIOConfig) RabbitStreamSamplerSpec.this.ioConfig;
final Map<String, Object> props = new HashMap<>(((RabbitStreamSupervisorIOConfig) ioConfig).getConsumerProperties());

return new RabbitStreamRecordSupplier(ioConfig.getUri());
return new RabbitStreamRecordSupplier(
props,
objectMapper,
ioConfig.getUri()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ public RabbitStreamSupervisor(
@Override
protected RecordSupplier<String, Long, ByteEntity> setupRecordSupplier()
{
return new RabbitStreamRecordSupplier(spec.getIoConfig().getUri());
return new RabbitStreamRecordSupplier(
spec.getIoConfig().getConsumerProperties(),
sortingMapper,
spec.getIoConfig().getUri());
}

@Override
Expand Down Expand Up @@ -187,6 +190,7 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
baseSequenceName,
new SeekableStreamStartSequenceNumbers<>(ioConfig.getStream(), startPartitions, Collections.emptySet()),
new SeekableStreamEndSequenceNumbers<>(ioConfig.getStream(), endPartitions),
rabbitConfig.getConsumerProperties(),
rabbitConfig.getPollTimeout(),
true,
minimumMessageTime,
Expand Down Expand Up @@ -217,7 +221,8 @@ protected List<SeekableStreamIndexTask<String, Long, ByteEntity>> createIndexTas
spec.getDataSchema(),
(RabbitStreamIndexTaskTuningConfig) taskTuningConfig,
(RabbitStreamIndexTaskIOConfig) taskIoConfig,
context));
context,
sortingMapper));
}
return taskList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,17 @@

import javax.annotation.Nullable;

import java.util.Map;

public class RabbitStreamSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
{
public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider";
public static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
public static final String TRUST_STORE_PASSWORD_KEY = "ssl.truststore.password";
public static final String KEY_STORE_PASSWORD_KEY = "ssl.keystore.password";
public static final String KEY_PASSWORD_KEY = "ssl.key.password";

public static final String USERNAME_KEY = "username";
public static final String PASSWORD_KEY = "password";

private final Map<String, Object> consumerProperties;

public static final long DEFAULT_POLL_TIMEOUT_MILLIS = 100;

private final String uri;
Expand All @@ -51,6 +55,7 @@ public RabbitStreamSupervisorIOConfig(
@JsonProperty("replicas") Integer replicas,
@JsonProperty("taskCount") Integer taskCount,
@JsonProperty("taskDuration") Period taskDuration,
@JsonProperty("consumerProperties") Map<String, Object> consumerProperties,
@Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig autoScalerConfig,
@JsonProperty("pollTimeout") Long pollTimeout,
@JsonProperty("startDelay") Period startDelay,
Expand All @@ -77,6 +82,7 @@ public RabbitStreamSupervisorIOConfig(
lateMessageRejectionStartDateTime,
new IdleConfig(null, null));

this.consumerProperties = consumerProperties;
this.uri = uri;
Preconditions.checkNotNull(stream, "uri");

Expand All @@ -89,6 +95,12 @@ public String getUri()
return this.uri;
}

@JsonProperty
public Map<String, Object> getConsumerProperties()
{
return consumerProperties;
}

@JsonProperty
public long getPollTimeout()
{
Expand Down
Loading

0 comments on commit 3828173

Please sign in to comment.