Skip to content

Commit

Permalink
Refactoring kafka to kafka pipe to support SPI and multiple sinks (#286)
Browse files Browse the repository at this point in the history
  • Loading branch information
nsahai8 authored Oct 5, 2020
1 parent 08d246b commit 2becfed
Show file tree
Hide file tree
Showing 74 changed files with 1,545 additions and 1,501 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ target/
*/.project
*/.settings
*/.classpath
*/.factorypath
*/.factorypath
*.DS_Store
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: all clean release json-transformer kafka-producer http-poster firehose-writer secret-detector span-key-extractor
.PHONY: all clean release json-transformer kafka-producer http-poster firehose-writer secret-detector span-key-extractor sample-key-extractor

PWD := $(shell pwd)

Expand Down Expand Up @@ -28,11 +28,15 @@ secret-detector:
span-key-extractor:
mvn package -DfinalName=span-key-extractor -pl span-key-extractor -am

sample-key-extractor:
mvn package -DfinalName=sample-key-extractor -pl sample-key-extractor -am

# build all and release
release: clean span-key-extractor json-transformer kafka-producer http-poster firehose-writer secret-detector
release: clean span-key-extractor json-transformer kafka-producer http-poster firehose-writer secret-detector sample-key-extractor
cd json-transformer && $(MAKE) release
cd kafka-producer && $(MAKE) release
cd http-poster && $(MAKE) release
cd firehose-writer && $(MAKE) release
cd secret-detector && $(MAKE) release
cd sample-key-extractor && $(MAKE) release
./.travis/deploy.sh
2 changes: 1 addition & 1 deletion commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<artifactId>haystack-pipes</artifactId>
<groupId>com.expedia.www</groupId>
<version>1.0-SNAPSHOT</version>
<version>2.0.0-SNAPSHOT</version>
</parent>

<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,22 @@ public class KafkaConsumerStarter {
public final Class<?> containingClass;
public final String clientId;
private final List<ConsumerTask> tasks;
private final KafkaConfig kafkaConfig;

public KafkaConsumerStarter(Class<?> containingClass,
String clientId,
KafkaConfig kafkaConfig,
HealthController healthController) {
this.containingClass = containingClass;
this.clientId = clientId;
this.kafkaConfig = kafkaConfig;
this.healthController = healthController;
this.tasks = new ArrayList<>();
}

public void createAndStartConsumer(SpanProcessorSupplier processorSupplier) {
for(int idx = 0; idx <= getThreadCount(); idx++) {
final ConsumerTask task = new ConsumerTask(getKafkaConfig(), containingClass, processorSupplier, healthController);
final ConsumerTask task = new ConsumerTask(kafkaConfig, containingClass, processorSupplier, healthController);
this.tasks.add(task);
final Thread thread = new Thread(task);
thread.setDaemon(true);
Expand All @@ -74,11 +77,6 @@ private void close() {
}

private int getThreadCount() {
final KafkaConfig kafkaConfig = getKafkaConfig();
return kafkaConfig.threadcount();
}

private static KafkaConfig getKafkaConfig() {
return CONFIGURATION_PROVIDER.bind(HAYSTACK_KAFKA_CONFIG_PREFIX, KafkaConfig.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ public abstract class KafkaStreamBuilderBase implements KafkaStreamBuilder, Main
private final KafkaStreamStarter kafkaStreamStarter;
private final SerdeFactory serdeFactory;
private final String application;
private final KafkaConfigurationProvider kafkaConfigurationProvider;
private final KafkaConfig kafkaConfig;
private final ForeachAction<String, Span> foreachAction;
private final ProcessorSupplier<String, Span> processorSupplier;

public KafkaStreamBuilderBase(KafkaStreamStarter kafkaStreamStarter,
SerdeFactory serdeFactory,
String application,
KafkaConfigurationProvider kafkaConfigurationProvider,
KafkaConfig kafkaConfig,
ForeachAction<String, Span> foreachAction) {
this(kafkaStreamStarter, serdeFactory, application, kafkaConfigurationProvider, foreachAction, null);
this(kafkaStreamStarter, serdeFactory, application, kafkaConfig, foreachAction, null);

}

Expand All @@ -37,13 +37,13 @@ public KafkaStreamBuilderBase(KafkaStreamStarter kafkaStreamStarter,
private KafkaStreamBuilderBase(KafkaStreamStarter kafkaStreamStarter,
SerdeFactory serdeFactory,
String application,
KafkaConfigurationProvider kafkaConfigurationProvider,
KafkaConfig kafkaConfig,
ForeachAction<String, Span> foreachAction,
ProcessorSupplier<String, Span> processorSupplier) {
this.kafkaStreamStarter = kafkaStreamStarter;
this.serdeFactory = serdeFactory;
this.application = application;
this.kafkaConfigurationProvider = kafkaConfigurationProvider;
this.kafkaConfig = kafkaConfig;
this.foreachAction = foreachAction;
this.processorSupplier = processorSupplier;
}
Expand All @@ -52,7 +52,7 @@ private KafkaStreamBuilderBase(KafkaStreamStarter kafkaStreamStarter,
public void buildStreamTopology(KStreamBuilder kStreamBuilder) {
final Serde<String> stringSerde = Serdes.String();
final Serde<Span> spanSerde = serdeFactory.createJsonProtoSpanSerde(application);
final String fromTopic = kafkaConfigurationProvider.fromtopic();
final String fromTopic = kafkaConfig.fromtopic();
final KStream<String, Span> stream = kStreamBuilder.stream(stringSerde, spanSerde, fromTopic);
if(foreachAction != null) {
stream.foreach(foreachAction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.expedia.www.haystack.pipes.commons.kafka;

import com.expedia.www.haystack.commons.config.Configuration;
import com.expedia.www.haystack.pipes.commons.IntermediateStreamsConfig;
import com.expedia.www.haystack.pipes.commons.SystemExitUncaughtExceptionHandler;
import com.expedia.www.haystack.pipes.commons.health.HealthController;
import com.netflix.servo.util.VisibleForTesting;
Expand All @@ -32,8 +31,6 @@

import java.util.Properties;

import static com.expedia.www.haystack.pipes.commons.Configuration.HAYSTACK_KAFKA_CONFIG_PREFIX;
import static com.expedia.www.haystack.pipes.commons.Configuration.HAYSTACK_PIPE_STREAMS;
import static java.util.concurrent.TimeUnit.SECONDS;

public class KafkaStreamStarter {
Expand All @@ -58,13 +55,17 @@ public class KafkaStreamStarter {
public final Class<? extends KafkaStreamBuilder> containingClass;
public final String clientId;
private final StreamsConfig streamsConfig;
@VisibleForTesting
static KafkaConfig kafkaConfig;

public KafkaStreamStarter(Class<? extends KafkaStreamBuilder> containingClass,
String clientId,
KafkaConfig kafkaConfig,
HealthController healthController) {
this.containingClass = containingClass;
this.clientId = clientId;
this.healthController = healthController;
this.kafkaConfig = kafkaConfig;
this.streamsConfig = new StreamsConfig(getProperties());
}

Expand Down Expand Up @@ -97,47 +98,31 @@ Properties getProperties() {
props.put(ConsumerConfig.GROUP_ID_CONFIG, containingClass.getName());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, containingClass.getSimpleName());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, getIpAnPort());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, getReplicationFactor());
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), getConsumerSessionTimeout());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, getThreadCount());
return props;
}

private String getIpAnPort() {
final KafkaConfig kafkaConfig = getKafkaConfig();
return kafkaConfig.brokers() + ":" + kafkaConfig.port();
}

private String getFromTopic() {
final KafkaConfig kafkaConfig = getKafkaConfig();
return kafkaConfig.fromtopic();
}

private String getToTopic() {
final KafkaConfig kafkaConfig = getKafkaConfig();
return kafkaConfig.totopic();
}

private int getThreadCount() {
final KafkaConfig kafkaConfig = getKafkaConfig();
return kafkaConfig.threadcount();
}

private int getReplicationFactor() {
final IntermediateStreamsConfig intermediateStreamsConfig = CONFIGURATION_PROVIDER.bind(
HAYSTACK_PIPE_STREAMS, IntermediateStreamsConfig.class);
return intermediateStreamsConfig.replicationfactor();
}

private int getConsumerSessionTimeout() {
final KafkaConfig kafkaConfig = getKafkaConfig();
return kafkaConfig.sessiontimeout();
}

private static KafkaConfig getKafkaConfig() {
return CONFIGURATION_PROVIDER.bind(HAYSTACK_KAFKA_CONFIG_PREFIX, KafkaConfig.class);
}

static class Factory {
KStreamBuilder createKStreamBuilder() {
return new KStreamBuilder();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.expedia.www.haystack.pipes.commons.kafka.config;

import com.expedia.www.haystack.pipes.commons.kafka.KafkaConfig;

public class KafkaConsumerConfig implements KafkaConfig {

private String brokers;

private int port;

private String fromTopic;

private String toTopic;

private int threadCount;

private int sessionTimeout;

private int maxWakeUps;

private int wakeUpTimeoutMs;

private long pollTimeoutMs;

private long commitMs;

public KafkaConsumerConfig(final String brokers, final int port, final String fromTopic, final String toTopic, final int threadCount, final int sessionTimeout, final int maxWakeUps, final int wakeUpTimeoutMs, final long pollTimeoutMs, final long commitMs) {
this.brokers = brokers;
this.port = port;
this.fromTopic = fromTopic;
this.toTopic = toTopic;
this.threadCount = threadCount;
this.sessionTimeout = sessionTimeout;
this.maxWakeUps = maxWakeUps;
this.wakeUpTimeoutMs = wakeUpTimeoutMs;
this.pollTimeoutMs = pollTimeoutMs;
this.commitMs = commitMs;
}

@Override
public String brokers() {
return this.brokers;
}

@Override
public int port() {
return this.port;
}

@Override
public String fromtopic() {
return this.fromTopic;
}

@Override
public String totopic() {
return this.toTopic;
}

@Override
public int threadcount() {
return this.threadCount;
}

@Override
public int sessiontimeout() {
return this.sessionTimeout;
}

@Override
public int maxwakeups() {
return this.maxWakeUps;
}

@Override
public int wakeuptimeoutms() {
return this.wakeUpTimeoutMs;
}

@Override
public long polltimeoutms() {
return this.pollTimeoutMs;
}

@Override
public long commitms() {
return this.commitMs;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ public class KafkaStreamStarterTest {
@Mock
private SystemExitUncaughtExceptionHandler mockSystemExitUncaughtExceptionHandler;
@Mock
private ConfigurationProvider mockConfigurationProvider;
@Mock
private KafkaConfig mockKafkaConfig;

private KafkaStreamStarter kafkaStreamStarter;
Expand All @@ -86,7 +84,8 @@ public void setUp() {
KafkaStreamStarter.factory = mockFactory;
realLogger = KafkaStreamStarter.logger;
KafkaStreamStarter.logger = mockLogger;
kafkaStreamStarter = new KafkaStreamStarter(mockKafkaStreamBuilder.getClass(), CLIENT_ID, healthController);
KafkaConfigurationProvider configurationProvider = new KafkaConfigurationProvider();
kafkaStreamStarter = new KafkaStreamStarter(mockKafkaStreamBuilder.getClass(), CLIENT_ID,configurationProvider,healthController);
}

@After
Expand All @@ -97,7 +96,7 @@ public void tearDown() {
PollScheduler.getInstance().stop();
}
verifyNoMoreInteractions(mockFactory, mockLogger, mockKafkaStreamBuilder, mockKStreamBuilder,
mockKafkaStreams, mockSystemExitUncaughtExceptionHandler, mockConfigurationProvider, mockKafkaConfig);
mockKafkaStreams, mockSystemExitUncaughtExceptionHandler, mockKafkaConfig);
}

@Test
Expand All @@ -113,19 +112,17 @@ public void testCreateAndStartStreamWithToTopic() {
@Test
public void testCreateAndStartStreamWithoutToTopic() {
commonWhensForCreateAndStartStream();
when(mockConfigurationProvider.bind(HAYSTACK_KAFKA_CONFIG_PREFIX, KafkaConfig.class)).thenReturn(mockKafkaConfig);
when(mockKafkaConfig.fromtopic()).thenReturn(KAFKA_FROM_TOPIC);
when(mockKafkaConfig.brokers()).thenReturn(BROKERS);
when(mockKafkaConfig.port()).thenReturn(PORT);

final ConfigurationProvider savedConfigurationProvider = KafkaStreamStarter.CONFIGURATION_PROVIDER;
KafkaStreamStarter.CONFIGURATION_PROVIDER = mockConfigurationProvider;
final KafkaConfig savedKafkaConfig = KafkaStreamStarter.kafkaConfig;
KafkaStreamStarter.kafkaConfig = mockKafkaConfig;
kafkaStreamStarter.createAndStartStream(mockKafkaStreamBuilder);
KafkaStreamStarter.CONFIGURATION_PROVIDER = savedConfigurationProvider;
kafkaStreamStarter.kafkaConfig = savedKafkaConfig;

commonVerifiesForCreateAndStartStream();
verify(mockLogger).info(String.format(STARTING_MSG_WITHOUT_TO_TOPIC, KAFKA_IP_AND_PORT, KAFKA_FROM_TOPIC));
verify(mockConfigurationProvider, times(3)).bind(HAYSTACK_KAFKA_CONFIG_PREFIX, KafkaConfig.class);
verify(mockKafkaConfig).fromtopic();
verify(mockKafkaConfig).totopic();
verify(mockKafkaConfig).brokers();
Expand Down Expand Up @@ -157,7 +154,6 @@ public void testGetProperties() {
assertEquals(mockKafkaStreamBuilder.getClass().getName(), properties.remove(ConsumerConfig.GROUP_ID_CONFIG));
assertEquals(mockKafkaStreamBuilder.getClass().getSimpleName(), properties.remove(StreamsConfig.APPLICATION_ID_CONFIG));
assertEquals(KAFKA_IP_AND_PORT, properties.remove(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
assertEquals(2147483645, properties.remove(StreamsConfig.REPLICATION_FACTOR_CONFIG));
assertEquals(THREAD_COUNT_CONFIGURATION_IN_TEST_BASE_DOT_YAML, properties.remove(StreamsConfig.NUM_STREAM_THREADS_CONFIG));
assertEquals(15000, properties.remove(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG)));
assertEquals("Properties should be empty but is: " + properties, 0, properties.size());
Expand Down
3 changes: 0 additions & 3 deletions commons/src/test/resources/base.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ haystack:
wakeuptimeoutms: 3000
polltimeoutms: 250
commitms: 3000
pipe:
streams:
replicationfactor: 2147483645
graphite:
prefix: "haystack.graphite.prefix"
host: "haystack.graphite.host"
Expand Down
4 changes: 2 additions & 2 deletions firehose-writer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

<artifactId>haystack-pipes-firehose-writer</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<version>2.0.0-SNAPSHOT</version>

<name>haystack-pipes-firehose-writer</name>

<parent>
<artifactId>haystack-pipes</artifactId>
<groupId>com.expedia.www</groupId>
<version>1.0-SNAPSHOT</version>
<version>2.0.0-SNAPSHOT</version>
</parent>

<properties>
Expand Down
Loading

0 comments on commit 2becfed

Please sign in to comment.