Skip to content

Commit

Permalink
Improves the error in case the input on BytesFormatWriter is not bina…
Browse files Browse the repository at this point in the history
…ry. (#925)

Matches on ByteBuffer as well, Connect framework through the deserialisers can return that

Co-authored-by: stheppi <[email protected]>
  • Loading branch information
stheppi and stheppi authored Mar 24, 2023
1 parent 9fd0b34 commit ba916f5
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ class BytesFormatWriter(outputStreamFn: () => S3OutputStream, bytesWriteMode: By
def convertToBytes(sinkData: SinkData): Either[Throwable, Array[Byte]] =
sinkData match {
case ByteArraySinkData(array, _) => array.asRight
case _ =>
case v =>
new IllegalStateException(
"Non-binary content received. Please check your configuration. It may be advisable to ensure you are using org.apache.kafka.connect.converters.ByteArrayConverter\", exception)\n case Success(value) => value",
s"Non-binary content received: ${v.getClass.getName} . Please check your configuration. It may be advisable to ensure you are using org.apache.kafka.connect.converters.ByteArrayConverter.",
).asLeft
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.errors.ConnectException

import java.nio.ByteBuffer
import java.util
import scala.jdk.CollectionConverters.MapHasAsScala

Expand All @@ -39,6 +40,7 @@ object ValueToSinkDataConverter {
case mapVal: Map[_, _] => MapSinkDataConverter(mapVal, schema)
case mapVal: util.Map[_, _] => MapSinkDataConverter(mapVal.asScala.toMap, schema)
case bytesVal: Array[Byte] => ByteArraySinkData(bytesVal, schema)
case bytesVal: ByteBuffer => ByteArraySinkData(bytesVal.array(), schema)
case arrayVal: Array[_] => ArraySinkDataConverter(arrayVal, schema)
case listVal: util.List[_] => ArraySinkDataConverter(listVal.toArray, schema)
case null => NullSinkData(schema)
Expand Down

0 comments on commit ba916f5

Please sign in to comment.