diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/ThreadSafeDeserializationSchema.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/ThreadSafeDeserializationSchema.java index 1391e930..72b91f27 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/ThreadSafeDeserializationSchema.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/ThreadSafeDeserializationSchema.java @@ -16,6 +16,7 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Collector; import java.io.IOException; @@ -47,6 +48,11 @@ public synchronized T deserialize(byte[] bytes) throws IOException { return deserializationSchema.deserialize(bytes); } + @Override + public synchronized void deserialize(byte[] message, Collector out) throws IOException { + deserializationSchema.deserialize(message, out); + } + @Override public synchronized boolean isEndOfStream(T object) { return deserializationSchema.isEndOfStream(object);