diff --git a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRequestExceptionHandler.java b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRequestExceptionHandler.java index e9b09cd62d..048172bea1 100644 --- a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRequestExceptionHandler.java +++ b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRequestExceptionHandler.java @@ -11,7 +11,9 @@ import com.linecorp.armeria.server.RequestTimeoutException; import io.grpc.Metadata; import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.micrometer.core.instrument.Counter; + import org.opensearch.dataprepper.exceptions.BadRequestException; import org.opensearch.dataprepper.exceptions.BufferWriteException; import org.opensearch.dataprepper.exceptions.RequestCancelledException; @@ -51,6 +53,7 @@ public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics) { } private Status handleExceptions(final Throwable e) { + String message = e.getMessage(); if (e instanceof RequestTimeoutException || e instanceof TimeoutException) { requestTimeoutsCounter.increment(); return createStatus(e, Status.RESOURCE_EXHAUSTED); @@ -60,6 +63,9 @@ private Status handleExceptions(final Throwable e) { } else if (e instanceof BadRequestException) { badRequestsCounter.increment(); return createStatus(e, Status.INVALID_ARGUMENT); + } else if ((e instanceof StatusRuntimeException) && (message.contains("Invalid protobuf byte sequence") || message.contains("Can't decode compressed frame"))) { + badRequestsCounter.increment(); + return createStatus(e, Status.INVALID_ARGUMENT); } else if (e instanceof RequestCancelledException) { requestTimeoutsCounter.increment(); return createStatus(e, Status.CANCELLED);