Skip to content

Commit

Permalink
HOTFIX: KAFKA-3160 follow-up, catch decompression errors in constructor
Browse files Browse the repository at this point in the history
After testing KAFKA-3160 a bit more, I found that the error code was not being set properly in ProduceResponse. This happened because the validation error is raised in the CompressionFactory constructor, which was not wrapped in a try / catch.

ijuma junrao

(This contribution is my original work and I license the work under Apache 2.0.)

Author: Dana Powers <[email protected]>
Author: Ismael Juma <[email protected]>

Reviewers: Jun Rao <[email protected]>, Gwen Shapira <[email protected]>, Ismael Juma <[email protected]>

Closes apache#1344 from dpkp/decompress_error_code
  • Loading branch information
dpkp authored and ijuma committed May 9, 2016
1 parent 2caf872 commit 4331bf4
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 7 deletions.
13 changes: 7 additions & 6 deletions core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ import java.nio.ByteBuffer
import java.nio.channels._
import java.io._
import java.util.ArrayDeque
import java.util.concurrent.atomic.AtomicLong

import scala.collection.JavaConverters._

import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.errors.InvalidTimestampException
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.utils.Utils
Expand Down Expand Up @@ -96,7 +92,12 @@ object ByteBufferMessageSet {
if (wrapperMessage.payload == null)
throw new KafkaException(s"Message payload is null: $wrapperMessage")
val inputStream = new ByteBufferBackedInputStream(wrapperMessage.payload)
val compressed = new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, wrapperMessage.magic, inputStream))
val compressed = try {
new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, wrapperMessage.magic, inputStream))
} catch {
case ioe: IOException =>
throw new InvalidMessageException(s"Failed to instantiate input stream compressed with ${wrapperMessage.compressionCodec}", ioe)
}
var lastInnerOffset = -1L

val messageAndOffsets = if (wrapperMessageAndOffset.message.magic > MagicValue_V0) {
Expand All @@ -108,7 +109,7 @@ object ByteBufferMessageSet {
case eofe: EOFException =>
compressed.close()
case ioe: IOException =>
throw new CorruptRecordException(ioe)
throw new InvalidMessageException(s"Error while reading message from stream compressed with ${wrapperMessage.compressionCodec}", ioe)
}
Some(innerMessageAndOffsets)
} else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.CorruptRecordException
* Because ByteBufferMessageSet.scala and Message.scala are used in both server and client code having
* InvalidMessageException extend CorruptRecordException allows us to change server code without affecting the client.
*/
class InvalidMessageException(message: String) extends CorruptRecordException(message) {
class InvalidMessageException(message: String, throwable: Throwable) extends CorruptRecordException(message, throwable) {
def this(message: String) = this(null, null)
def this() = this(null)
}
86 changes: 86 additions & 0 deletions core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server

import kafka.message.{ByteBufferMessageSet, LZ4CompressionCodec, Message}
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
import org.junit.Assert._
import org.junit.Test

import scala.collection.JavaConverters._

/**
* Subclasses of `BaseProduceSendRequestTest` exercise the producer and produce request/response. This class
* complements those classes with tests that require lower-level access to the protocol.
*/
class ProduceRequestTest extends BaseRequestTest {

@Test
def testSimpleProduceRequest() {
val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
val messageBuffer = new ByteBufferMessageSet(new Message("value".getBytes, "key".getBytes,
System.currentTimeMillis(), 1: Byte)).buffer
val topicPartition = new TopicPartition("topic", partition)
val partitionRecords = Map(topicPartition -> messageBuffer)
val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
assertEquals(1, produceResponse.responses.size)
val (tp, partitionResponse) = produceResponse.responses.asScala.head
assertEquals(topicPartition, tp)
assertEquals(Errors.NONE.code, partitionResponse.errorCode)
assertEquals(0, partitionResponse.baseOffset)
assertEquals(-1, partitionResponse.timestamp)
}

/* returns a pair of partition id and leader id */
private def createTopicAndFindPartitionWithLeader(topic: String): (Int, Int) = {
val partitionToLeader = TestUtils.createTopic(zkUtils, topic, 3, 2, servers)
partitionToLeader.collectFirst {
case (partition, Some(leader)) if leader != -1 => (partition, leader)
}.getOrElse(fail(s"No leader elected for topic $topic"))
}

@Test
def testCorruptLz4ProduceRequest() {
val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
val messageBuffer = new ByteBufferMessageSet(LZ4CompressionCodec, new Message("value".getBytes, "key".getBytes,
System.currentTimeMillis(), 1: Byte)).buffer
// Change the lz4 checksum value so that it doesn't match the contents
messageBuffer.array.update(40, 0)
val topicPartition = new TopicPartition("topic", partition)
val partitionRecords = Map(topicPartition -> messageBuffer)
val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
assertEquals(1, produceResponse.responses.size)
val (tp, partitionResponse) = produceResponse.responses.asScala.head
assertEquals(topicPartition, tp)
assertEquals(Errors.CORRUPT_MESSAGE.code, partitionResponse.errorCode)
assertEquals(-1, partitionResponse.baseOffset)
assertEquals(-1, partitionResponse.timestamp)
}

private def sendProduceRequest(leaderId: Int, request: ProduceRequest): ProduceResponse = {
val socket = connect(s = servers.find(_.config.brokerId == leaderId).map(_.socketServer).getOrElse {
fail(s"Could not find broker with id $leaderId")
})
val response = send(socket, request, ApiKeys.PRODUCE, ProtoUtils.latestVersion(ApiKeys.PRODUCE.id))
ProduceResponse.parse(response)
}

}

0 comments on commit 4331bf4

Please sign in to comment.