Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Endless Loop #135

Open
dheerajkarande opened this issue Apr 11, 2018 · 3 comments
Open

Endless Loop #135

dheerajkarande opened this issue Apr 11, 2018 · 3 comments

Comments

@dheerajkarande
Copy link

dheerajkarande commented Apr 11, 2018

Hi @simonsouter ,

I have created a JSON consumer as follow:

val consumerConf = KafkaConsumer.Conf(
  new StringDeserializer,
  new JsonDeserializer[SomeClassWithAttributes],
  groupId = config.getString("groupId"),
  enableAutoCommit = false,
  autoOffsetReset = OffsetResetStrategy.EARLIEST)
  .withConf(config.getConfig("consumer"))

val actorConf = KafkaConsumerActor.Conf(1000,3000)

val consumer = context.actorOf(
  KafkaConsumerActor.props(kafkaConfig, actorConfig, self)
)

I referred Getting Started for defining JsonDeserializer which looks like:

import java.util
import org.apache.kafka.common.serialization.{Deserializer, StringDeserializer}
import play.api.libs.json.{Json, Reads}

class JsonDeserializer[A: Reads] extends Deserializer[A] {
  private val stringDeserializer = new StringDeserializer

  override def deserialize(topic: String, data: Array[Byte]) =
    Json.parse(stringDeserializer.deserialize(topic, data)).as[A]

  override def close() = stringDeserializer.close()

  override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = 
     stringDeserializer.configure(configs, isKey)
}

I'm able to receive valid JSON and all works fine but when I hit with invalid JSON it throws org.apache.kafka.common.errors.SerializationException and goes into an endless loop.

What I understood till now is inside JsonDeserializer class following line throws an exception because of invalid JSON
Json.parse(stringDeserializer.deserialize(topic, data)).as[A]
And because of which actor restarts. But when it restarts it reads the previous invalid JSON message from Kafka topic and again throw an exception and goes in an endless loop.
Right now I am confirming a message after processing a message in receive method but because of this, I am not able to confirm the message so that when it get restarts it can fetch new messages.
My receive method looks like:

override def receive: Receive = {
  case recordsExt(records) =>
    records.pairs.foreach {
      case (None, someClassWithAttributes) => log.error("Please provide valid key")
      case (key, value) => log.info(s"Received [$key,$value]")
    }
    sender() ! Confirm(records.offsets, commit = true)
}

It seems like actor failed before receive method and so when it restarts it receives previous invalid JSON message.
I tried handling exception inside JsonDeserializer class but same output.
Is there any way to handle such exceptions in deserializer like throwing InvalidArgumentException from catch and handle it in receive method separately?

One of the possible solutions I found is we can use StringDeserializer and deserialize the message in receive method surrounded by try/catch that way control will be at my side and I can confirm the message.

I might be doing completely wrong and maybe I'm not aware of best practice in such cases.
Please help me with this.
If something bothers you please suggest changes in existing code also.

Regards,
Dheeraj

@dheerajkarande
Copy link
Author

@simonsouter any thoughts

@simonsouter
Copy link
Contributor

Hi @DheerajK handling message parsing issues is a tricky in general. Its not always ideal just to confirm the problematic message, as if the problem was actually in your code, it could result in you skipping over all messages potentially. Im sure there are some discussions about this on the Kafka mailing list somewhere.

As for implementation approach, I would have thought it would be possible to catch the parsing error in the JsonDeserializer. Your suggestion of using a StringDeserializer is a sensible one. I have seen a similar approach in one of our teams that performed the deserialisation manually when receiving the 'raw' message. This gives you the capability to handle parsing errors however you want.

@dheerajkarande
Copy link
Author

@simonsouter Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants