diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java index e9d22dbf8e62..b64d8bde095c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java @@ -42,10 +42,14 @@ public class CoderTypeInformation extends TypeInformation implements Atomi private final SerializablePipelineOptions pipelineOptions; public CoderTypeInformation(Coder coder, PipelineOptions pipelineOptions) { + this(coder, new SerializablePipelineOptions(pipelineOptions)); + } + + public CoderTypeInformation(Coder coder, SerializablePipelineOptions pipelineOptions) { checkNotNull(coder); checkNotNull(pipelineOptions); this.coder = coder; - this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions); + this.pipelineOptions = pipelineOptions; } public Coder getCoder() { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java index 68c891d4f94d..204247b1d836 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java @@ -51,6 +51,6 @@ public ByteBuffer getKey(WindowedValue> value) { @Override public TypeInformation getProducedType() { - return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of(), pipelineOptions.get()); + return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of(), pipelineOptions); } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SdfByteBufferKeySelector.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SdfByteBufferKeySelector.java index 29af81de42f1..8c6f10abf448 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SdfByteBufferKeySelector.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SdfByteBufferKeySelector.java @@ -56,6 +56,6 @@ public ByteBuffer getKey(WindowedValue, Double>> value) { @Override public TypeInformation getProducedType() { - return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of(), pipelineOptions.get()); + return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of(), pipelineOptions); } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java index 3cdb0aece9a7..64ea6ca26d4d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java @@ -52,6 +52,6 @@ public ByteBuffer getKey(WindowedValue> value) throws Except @Override public TypeInformation getProducedType() { - return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of(), pipelineOptions.get()); + return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of(), pipelineOptions); } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 5f35fd96ce02..92d0652e11f8 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -434,8 +434,7 @@ public void initializeState(FunctionInitializationContext context) throws Except @SuppressWarnings("unchecked") CoderTypeInformation, CheckpointMarkT>> typeInformation = - (CoderTypeInformation) - new CoderTypeInformation<>(checkpointCoder, serializedOptions.get()); + (CoderTypeInformation) new CoderTypeInformation<>(checkpointCoder, serializedOptions); stateForCheckpoint = stateStore.getListState( new ListStateDescriptor<>( diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java index 9830133166ff..10d3ea1f7a5a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java @@ -33,7 +33,6 @@ import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.MapCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.BagState; import org.apache.beam.sdk.state.CombiningState; import org.apache.beam.sdk.state.MapState; @@ -102,14 +101,14 @@ public T state( public ValueState bindValue(StateTag> address, Coder coder) { return new FlinkBroadcastValueState<>( - stateBackend, address, namespace, coder, pipelineOptions.get()); + stateBackend, address, namespace, coder, pipelineOptions); } @Override public BagState bindBag(StateTag> address, Coder elemCoder) { return new FlinkBroadcastBagState<>( - stateBackend, address, namespace, elemCoder, pipelineOptions.get()); + stateBackend, address, namespace, elemCoder, pipelineOptions); } @Override @@ -142,7 +141,7 @@ CombiningState bindCombiningValue( Combine.CombineFn combineFn) { return new FlinkCombiningState<>( - stateBackend, address, combineFn, namespace, accumCoder, pipelineOptions.get()); + stateBackend, address, combineFn, namespace, accumCoder, pipelineOptions); } @Override @@ -187,7 +186,7 @@ private abstract class AbstractBroadcastState { String name, StateNamespace namespace, Coder coder, - PipelineOptions pipelineOptions) { + SerializablePipelineOptions pipelineOptions) { this.name = name; this.namespace = namespace; @@ -303,7 +302,7 @@ private class FlinkBroadcastValueState extends AbstractBroadcastState StateTag> address, StateNamespace namespace, Coder coder, - PipelineOptions pipelineOptions) { + SerializablePipelineOptions pipelineOptions) { super(flinkStateBackend, address.getId(), namespace, coder, pipelineOptions); this.namespace = namespace; @@ -363,7 +362,7 @@ private class FlinkBroadcastBagState extends AbstractBroadcastState> StateTag> address, StateNamespace namespace, Coder coder, - PipelineOptions pipelineOptions) { + SerializablePipelineOptions pipelineOptions) { super(flinkStateBackend, address.getId(), namespace, ListCoder.of(coder), pipelineOptions); this.namespace = namespace; @@ -451,7 +450,7 @@ private class FlinkCombiningState extends AbstractBroad Combine.CombineFn combineFn, StateNamespace namespace, Coder accumCoder, - PipelineOptions pipelineOptions) { + SerializablePipelineOptions pipelineOptions) { super(flinkStateBackend, address.getId(), namespace, accumCoder, pipelineOptions); this.namespace = namespace; @@ -568,7 +567,7 @@ private class FlinkCombiningStateWithContext StateNamespace namespace, Coder accumCoder, CombineWithContext.Context context) { - super(flinkStateBackend, address.getId(), namespace, accumCoder, pipelineOptions.get()); + super(flinkStateBackend, address.getId(), namespace, accumCoder, pipelineOptions); this.namespace = namespace; this.address = address;