Skip to content

Commit

Permalink
Merge pull request #268 from ExpediaDotCom/bugfix/kafka-deser-exception
Browse files Browse the repository at this point in the history
Fixed Kafka deserialization bugs #245 and #253
  • Loading branch information
bibinss authored Nov 11, 2018
2 parents 77eab4a + dd1382e commit 405c457
Show file tree
Hide file tree
Showing 7 changed files with 299 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
* @author Willie Wheeler
*/
@Slf4j
public final class AnomalyDetectorManager {
public class AnomalyDetectorManager {

/**
* Detectors configuration key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

import static com.expedia.adaptivealerting.core.util.AssertUtil.notNull;

/**
* Kafka streams wrapper around {@link AnomalyDetectorManager}.
*
Expand All @@ -42,12 +44,14 @@ public final class KafkaAnomalyDetectorManager extends AbstractStreamsApp {
public static void main(String[] args) {
val tsConfig = new TypesafeConfigLoader(CK_AD_MANAGER).loadMergedConfig();
val saConfig = new StreamsAppConfig(tsConfig);
new KafkaAnomalyDetectorManager(saConfig).start();
val manager = buildManager(saConfig);
new KafkaAnomalyDetectorManager(saConfig, manager).start();
}

public KafkaAnomalyDetectorManager(StreamsAppConfig config) {
public KafkaAnomalyDetectorManager(StreamsAppConfig config, AnomalyDetectorManager manager) {
super(config);
this.manager = buildManager(config);
notNull(manager, "manager can't be null");
this.manager = manager;
}

@Override
Expand Down
1 change: 1 addition & 0 deletions kafka/src/main/resources/config/base.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ kstream.app.default {
auto.offset.reset = "latest"
default.key.serde = "org.apache.kafka.common.serialization.Serdes$StringSerde"
default.value.serde = "com.expedia.adaptivealerting.kafka.serde.JsonPojoSerde"
default.deserialization.exception.handler = "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"
}
health.status.path = "/app/isHealthy"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,123 @@
*/
package com.expedia.adaptivealerting.kafka;

import org.junit.Ignore;
import com.expedia.adaptivealerting.anomdetect.AnomalyDetectorManager;
import com.expedia.adaptivealerting.core.anomaly.AnomalyResult;
import com.expedia.adaptivealerting.core.data.MappedMetricData;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import static org.junit.Assert.fail;
import static com.expedia.adaptivealerting.kafka.KafkaAnomalyDetectorMapper.CK_MODEL_SERVICE_URI_TEMPLATE;
import static org.mockito.Mockito.when;

/**
* Unit test for {@link KafkaAnomalyDetectorManager}. See
* https://kafka.apache.org/20/documentation/streams/developer-guide/testing.html
*
* @author Willie Wheeler
*/
public class KafkaAnomalyDetectorManagerTest {
@Slf4j
public final class KafkaAnomalyDetectorManagerTest {
private static final String KAFKA_KEY = "some-kafka-key";
private static final String INBOUND_TOPIC = "mapped-metrics";
private static final String OUTBOUND_TOPIC = "anomalies";

@Mock
private AnomalyDetectorManager manager;

@Mock
private StreamsAppConfig saConfig;

@Mock
private Config tsConfig;

// Test objects
private MappedMetricData mappedMetricData;
private AnomalyResult anomalyResult;

// Test machinery
private TopologyTestDriver logAndFailDriver;
private TopologyTestDriver logAndContinueDriver;
private ConsumerRecordFactory<String, MappedMetricData> mmdRecordFactory;
private ConsumerRecordFactory<String, String> stringRecordFactory;
private StringDeserializer stringDeser;
private Deserializer<AnomalyResult> arDeserializer;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
initConfig();
initTestObjects();
initDependencies();
initTestMachinery();
}

@After
public void tearDown() {
logAndFailDriver.close();
logAndContinueDriver.close();
}

/**
* Addresses bug https://github.com/ExpediaDotCom/adaptive-alerting/issues/245
*/
@Test(expected = StreamsException.class)
public void failsOnDeserializationException() {
logAndFailDriver.pipeInput(stringRecordFactory.create(INBOUND_TOPIC, KAFKA_KEY, "invalid_input"));
logAndFailDriver.readOutput(OUTBOUND_TOPIC, stringDeser, arDeserializer);
}

/**
* Addresses bug https://github.com/ExpediaDotCom/adaptive-alerting/issues/245
*/
@Test
@Ignore
public void testDummy() {
fail();
public void continuesOnDeserializationException() {
logAndContinueDriver.pipeInput(stringRecordFactory.create(INBOUND_TOPIC, KAFKA_KEY, "invalid_input"));
logAndContinueDriver.readOutput(OUTBOUND_TOPIC, stringDeser, arDeserializer);
}

private void initConfig() {
when(tsConfig.getString(CK_MODEL_SERVICE_URI_TEMPLATE)).thenReturn("https://example.com/");

when(saConfig.getTypesafeConfig()).thenReturn(tsConfig);
when(saConfig.getInboundTopic()).thenReturn(INBOUND_TOPIC);
when(saConfig.getOutboundTopic()).thenReturn(OUTBOUND_TOPIC);
}

private void initTestObjects() {
val metricData = TestObjectMother.metricData();
this.mappedMetricData = TestObjectMother.mappedMetricData(metricData);
}

private void initDependencies() {
// when(mapper.map(any(MetricData.class)))
// .thenReturn(Collections.singleton(mappedMetricData));
}

private void initTestMachinery() {

// Topology test drivers
val topology = new KafkaAnomalyDetectorManager(saConfig, manager).buildTopology();
this.logAndFailDriver = TestObjectMother.topologyTestDriver(topology, false);
this.logAndContinueDriver = TestObjectMother.topologyTestDriver(topology, true);

// MetricData sources
this.mmdRecordFactory = TestObjectMother.mappedMetricDataFactory();
this.stringRecordFactory = TestObjectMother.stringFactory();

// MappedMetricData consumers
this.stringDeser = TestObjectMother.stringDeserializer();
this.arDeserializer = TestObjectMother.anomalyResultDeserializer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,14 @@

import com.expedia.adaptivealerting.anomdetect.AnomalyDetectorMapper;
import com.expedia.adaptivealerting.core.data.MappedMetricData;
import com.expedia.adaptivealerting.kafka.serde.JsonPojoDeserializer;
import com.expedia.adaptivealerting.kafka.serde.JsonPojoSerde;
import com.expedia.adaptivealerting.kafka.serde.JsonPojoSerializer;
import com.expedia.metrics.MetricData;
import com.expedia.metrics.MetricDefinition;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.streams.test.OutputVerifier;
import org.junit.After;
Expand All @@ -38,11 +33,7 @@
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.UUID;

import static com.expedia.adaptivealerting.kafka.KafkaAnomalyDetectorMapper.CK_MODEL_SERVICE_URI_TEMPLATE;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -56,29 +47,30 @@
*/
@Slf4j
public final class KafkaAnomalyDetectorMapperTest {
private static final String KAFKA_KEY = "some-kafka-key";
private static final String INBOUND_TOPIC = "metrics";
private static final String OUTBOUND_TOPIC = "mapped-metrics";

@Mock
private AnomalyDetectorMapper mapper;

@Mock
private StreamsAppConfig streamsAppConfig;
private StreamsAppConfig saConfig;

@Mock
private Config config;

private Properties streamsProps;
private Config tsConfig;

// Test objects
private MetricData metricData;
private MappedMetricData mappedMetricData;

// Test machinery
private TopologyTestDriver logAndFailDriver;
private TopologyTestDriver logAndContinueDriver;
private ConsumerRecordFactory<String, MetricData> mdRecordFactory;
private ConsumerRecordFactory<String, String> stringRecordFactory;
private StringDeserializer stringDeser;
private JsonPojoDeserializer<MappedMetricData> mmdDeserializer;
private ConsumerRecordFactory<String, MetricData> recordFactory;
private TopologyTestDriver testDriver;
private Deserializer<MappedMetricData> mmdDeserializer;

@Before
public void setUp() {
Expand All @@ -91,48 +83,50 @@ public void setUp() {

@After
public void tearDown() {
testDriver.close();
logAndFailDriver.close();
logAndContinueDriver.close();
}

@Test
public void metricDataToMappedMetricData() {
val inputKafkaKey = "some-kafka-key";
testDriver.pipeInput(recordFactory.create(INBOUND_TOPIC, inputKafkaKey, metricData));
val outputRecord = testDriver.readOutput(OUTBOUND_TOPIC, stringDeser, mmdDeserializer);
logAndFailDriver.pipeInput(mdRecordFactory.create(INBOUND_TOPIC, KAFKA_KEY, metricData));

// The streams app remaps the key to the detector UUID. [WLW]
val outputRecord = logAndFailDriver.readOutput(OUTBOUND_TOPIC, stringDeser, mmdDeserializer);
log.trace("outputRecord={}", outputRecord);
val outputKafkaKey = mappedMetricData.getDetectorUuid().toString();
OutputVerifier.compareKeyValue(outputRecord, outputKafkaKey, mappedMetricData);
}

/**
* Addresses bug https://github.com/ExpediaDotCom/adaptive-alerting/issues/253
*/
@Test(expected = StreamsException.class)
public void failsOnDeserializationException() {
logAndFailDriver.pipeInput(stringRecordFactory.create(INBOUND_TOPIC, KAFKA_KEY, "invalid_input"));
logAndFailDriver.readOutput(OUTBOUND_TOPIC, stringDeser, mmdDeserializer);
}

/**
* Addresses bug https://github.com/ExpediaDotCom/adaptive-alerting/issues/253
*/
@Test
public void handlesDeserializationException() {
// TODO
public void continuesOnDeserializationException() {
logAndContinueDriver.pipeInput(stringRecordFactory.create(INBOUND_TOPIC, KAFKA_KEY, "invalid_input"));
logAndContinueDriver.readOutput(OUTBOUND_TOPIC, stringDeser, mmdDeserializer);
}

private void initConfig() {
when(config.getString(CK_MODEL_SERVICE_URI_TEMPLATE)).thenReturn("https://example.com/");

when(streamsAppConfig.getTypesafeConfig()).thenReturn(config);
when(streamsAppConfig.getInboundTopic()).thenReturn(INBOUND_TOPIC);
when(streamsAppConfig.getOutboundTopic()).thenReturn(OUTBOUND_TOPIC);
when(tsConfig.getString(CK_MODEL_SERVICE_URI_TEMPLATE)).thenReturn("https://example.com/");

this.streamsProps = new Properties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
streamsProps.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsProps.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonPojoSerde.class.getName());
streamsProps.setProperty(JsonPojoDeserializer.CK_JSON_POJO_CLASS, MetricData.class.getName());

// TODO Activate this to avoid crashing the app when deserialization fails.
// streamsProps.setProperty(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, TODO);
when(saConfig.getTypesafeConfig()).thenReturn(tsConfig);
when(saConfig.getInboundTopic()).thenReturn(INBOUND_TOPIC);
when(saConfig.getOutboundTopic()).thenReturn(OUTBOUND_TOPIC);
}

private void initTestObjects() {
val metricDefinition = new MetricDefinition("some-metric-key");
val now = Instant.now().getEpochSecond();
this.metricData = new MetricData(metricDefinition, 100.0, now);
this.mappedMetricData = new MappedMetricData(metricData, UUID.randomUUID(), "some-detector-type");
this.metricData = TestObjectMother.metricData();
this.mappedMetricData = TestObjectMother.mappedMetricData(metricData);
}

private void initDependencies() {
Expand All @@ -141,22 +135,18 @@ private void initDependencies() {
}

private void initTestMachinery() {

// Topology test drivers
val topology = new KafkaAnomalyDetectorMapper(saConfig, mapper).buildTopology();
this.logAndFailDriver = TestObjectMother.topologyTestDriver(topology, false);
this.logAndContinueDriver = TestObjectMother.topologyTestDriver(topology, true);

// Test driver
val kafkaMapper = new KafkaAnomalyDetectorMapper(streamsAppConfig, mapper);
val topology = kafkaMapper.buildTopology();
this.testDriver = new TopologyTestDriver(topology, streamsProps);

// MetricData source
val stringSer = new StringSerializer();
val mdSerializer = new JsonPojoSerializer<MetricData>();
this.recordFactory = new ConsumerRecordFactory<>(stringSer, mdSerializer);
// MetricData sources
this.mdRecordFactory = TestObjectMother.metricDataFactory();
this.stringRecordFactory = TestObjectMother.stringFactory();

// MappedMetricData consumer
this.stringDeser = new StringDeserializer();
val mmdDeserProps = new HashMap<String, Object>();
mmdDeserProps.put(JsonPojoDeserializer.CK_JSON_POJO_CLASS, MappedMetricData.class);
this.mmdDeserializer = new JsonPojoDeserializer<>();
mmdDeserializer.configure(mmdDeserProps, false);
// MappedMetricData consumers
this.stringDeser = TestObjectMother.stringDeserializer();
this.mmdDeserializer = TestObjectMother.mappedMetricDataDeserializer();
}
}
Loading

0 comments on commit 405c457

Please sign in to comment.