Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ad-mapper crashes when wrong metrics sent to aa-metrics #253

Closed
tkamenov-expedia opened this issue Nov 1, 2018 · 1 comment
Closed

ad-mapper crashes when wrong metrics sent to aa-metrics #253

tkamenov-expedia opened this issue Nov 1, 2018 · 1 comment
Assignees
Labels
bug Something isn't working

Comments

@tkamenov-expedia
Copy link
Contributor

tkamenov-expedia commented Nov 1, 2018

The ad-mapper crashes when wrong metrics sent to aa-metrics Kafka topic.

Way to reproduce:
Make ad-mapper running locally in Kubernetes and run salesdata-to-gaas to push to Kafka's mapped-metrics topic.

Example metrics:
{"metric":{"tags":[null,null,"m_application=sales-data","unit=count","what=bookings","lob=tshop","region=travelocity-na"],"meta":{"source":"sales-data-api"}},"epochTimeInSeconds":1541112000,"value":1.0}

Result:
ad-mapper breaks and shuts down (and is restarted by kubernetes after)

Log:

2018-11-01 22:58:00 INFO  KafkaStreams:261 - stream-client [ad-mapper-a0320118-b8eb-44c1-a63a-5403a8d44063] State transition from REBALANCING to RUNNING
2018-11-01 22:58:00 INFO  StateChangeListener:40 - State change event called with newState=RUNNING and oldState=REBALANCING
2018-11-01 22:58:00 INFO  Fetcher:561 - [Consumer clientId=ad-mapper-a0320118-b8eb-44c1-a63a-5403a8d44063-StreamThread-1-consumer, groupId=ad-mapper] Resetting offset for partition aa-metrics-0 to offset 229.
2018-11-01 22:59:05 ERROR StreamThread:40 - Exception caught during Deserialization, taskId: 0_0, topic: aa-metrics, partition: 0, offset: 229
org.msgpack.core.MessageTypeException: Expected Map, but got Integer (7b)
	at org.msgpack.core.MessageUnpacker.unexpected(MessageUnpacker.java:595)
	at org.msgpack.core.MessageUnpacker.unpackMapHeader(MessageUnpacker.java:1297)
	at com.expedia.metrics.metrictank.MessagePackSerializer.deserialize(MessagePackSerializer.java:144)
	at com.expedia.metrics.metrictank.MessagePackSerializer.deserialize(MessagePackSerializer.java:61)
	at com.expedia.adaptivealerting.kafka.serde.MetricDataSerde$DataDeserializer.deserialize(MetricDataSerde.java:60)
	at com.expedia.adaptivealerting.kafka.serde.MetricDataSerde$DataDeserializer.deserialize(MetricDataSerde.java:50)
	at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
	at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
	at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:56)
	at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:61)
	at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
	at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
	at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:567)
	at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:900)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:801)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)
2018-11-01 22:59:05 INFO  StreamThread:200 - stream-thread [ad-mapper-a0320118-b8eb-44c1-a63a-5403a8d44063-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
2018-11-01 22:59:05 INFO  StreamThread:1108 - stream-thread [ad-mapper-a0320118-b8eb-44c1-a63a-5403a8d44063-StreamThread-1] Shutting down
2018-11-01 22:59:05 INFO  KafkaProducer:1054 - [Producer clientId=ad-mapper-a0320118-b8eb-44c1-a63a-5403a8d44063-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-11-01 22:59:05 INFO  StreamThread:200 - stream-thread [ad-mapper-a0320118-b8eb-44c1-a63a-5403a8d44063-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
2018-11-01 22:59:05 INFO  KafkaStreams:261 - stream-client [ad-mapper-a0320118-b8eb-44c1-a63a-5403a8d44063] State transition from RUNNING to ERROR
2018-11-01 22:59:05 INFO  StateChangeListener:40 - State change event called with newState=ERROR and oldState=RUNNING
2018-11-01 22:59:05 WARN  KafkaStreams:413 - stream-client [ad-mapper-a0320118-b8eb-44c1-a63a-5403a8d44063] All stream threads have died. The instance will be in error state and should be closed.
2018-11-01 22:59:05 INFO  StreamThread:1128 - stream-thread [ad-mapper-a0320118-b8eb-44c1-a63a-5403a8d44063-StreamThread-1] Shutdown complete
2018-11-01 22:59:05 ERROR StateChangeListener:50 - uncaught exception occurred running kafka streams for thread=ad-mapper-a0320118-b8eb-44c1-a63a-5403a8d44063-StreamThread-1
org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
	at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
	at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
	at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
	at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:567)
	at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:900)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:801)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)
Caused by: org.msgpack.core.MessageTypeException: Expected Map, but got Integer (7b)
	at org.msgpack.core.MessageUnpacker.unexpected(MessageUnpacker.java:595)
	at org.msgpack.core.MessageUnpacker.unpackMapHeader(MessageUnpacker.java:1297)
	at com.expedia.metrics.metrictank.MessagePackSerializer.deserialize(MessagePackSerializer.java:144)
	at com.expedia.metrics.metrictank.MessagePackSerializer.deserialize(MessagePackSerializer.java:61)
	at com.expedia.adaptivealerting.kafka.serde.MetricDataSerde$DataDeserializer.deserialize(MetricDataSerde.java:60)
	at com.expedia.adaptivealerting.kafka.serde.MetricDataSerde$DataDeserializer.deserialize(MetricDataSerde.java:50)
	at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
	at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
	at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:56)
	at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:61)
	... 7 more
2018-11-01 22:59:05 ERROR HealthStatusController:47 - Setting the app status as 'UNHEALTHY'
2018-11-01 22:59:05 INFO  Application:91 - Shutting down topology
2018-11-01 22:59:05 INFO  KafkaStreams:261 - stream-client [ad-mapper-a0320118-b8eb-44c1-a63a-5403a8d44063] State transition from ERROR to PENDING_SHUTDOWN
2018-11-01 22:59:05 INFO  StateChangeListener:40 - State change event called with newState=PENDING_SHUTDOWN and oldState=ERROR
2018-11-01 22:59:05 INFO  StreamThread:1094 - stream-thread [ad-mapper-a0320118-b8eb-44c1-a63a-5403a8d44063-StreamThread-1] Informed to shut down
2018-11-01 22:59:05 INFO  KafkaStreams:261 - stream-client [ad-mapper-a0320118-b8eb-44c1-a63a-5403a8d44063] State transition from PENDING_SHUTDOWN to NOT_RUNNING
2018-11-01 22:59:05 INFO  KafkaStreams:867 - stream-client [ad-mapper-a0320118-b8eb-44c1-a63a-5403a8d44063] Streams client stopped completely
2018-11-01 22:59:05 INFO  Application:94 - Shutting down jmxReporter

Highlight:
org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.

@tkamenov-expedia tkamenov-expedia added the bug Something isn't working label Nov 1, 2018
@williewheeler williewheeler self-assigned this Nov 2, 2018
@williewheeler
Copy link
Contributor

Two things going on here:

  • The metric format above is old and no longer valid. We switched over to the metrics-java format a while back.
  • The software fix required here is to add an exception handler for deserialization. I'll take care of that since I'm already doing this for the ADManager (ad-manager crashes when wrong metrics sent to mapped-metrics #245)

williewheeler added a commit that referenced this issue Nov 10, 2018
- Set the default deserialization exception handler to log/continue
- Added unit tests to verify behavior
bibinss added a commit that referenced this issue Nov 11, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants