Skip to content

Commit

Permalink
Re-use serializable pipeline options when already available (#24192)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jozef Vilcek committed Nov 16, 2022
1 parent e0e10b9 commit ffdee0b
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@ public class CoderTypeInformation<T> extends TypeInformation<T> implements Atomi
private final SerializablePipelineOptions pipelineOptions;

public CoderTypeInformation(Coder<T> coder, PipelineOptions pipelineOptions) {
this(coder, new SerializablePipelineOptions(pipelineOptions));
}

public CoderTypeInformation(Coder<T> coder, SerializablePipelineOptions pipelineOptions) {
checkNotNull(coder);
checkNotNull(pipelineOptions);
this.coder = coder;
this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
this.pipelineOptions = pipelineOptions;
}

public Coder<T> getCoder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ public ByteBuffer getKey(WindowedValue<KV<K, V>> value) {

@Override
public TypeInformation<ByteBuffer> getProducedType() {
return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of(), pipelineOptions.get());
return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of(), pipelineOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ public ByteBuffer getKey(WindowedValue<KV<KV<K, V>, Double>> value) {

@Override
public TypeInformation<ByteBuffer> getProducedType() {
return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of(), pipelineOptions.get());
return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of(), pipelineOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ public ByteBuffer getKey(WindowedValue<KeyedWorkItem<K, V>> value) throws Except

@Override
public TypeInformation<ByteBuffer> getProducedType() {
return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of(), pipelineOptions.get());
return new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of(), pipelineOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,7 @@ public void initializeState(FunctionInitializationContext context) throws Except
@SuppressWarnings("unchecked")
CoderTypeInformation<KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>>
typeInformation =
(CoderTypeInformation)
new CoderTypeInformation<>(checkpointCoder, serializedOptions.get());
(CoderTypeInformation) new CoderTypeInformation<>(checkpointCoder, serializedOptions);
stateForCheckpoint =
stateStore.getListState(
new ListStateDescriptor<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
Expand Down Expand Up @@ -102,14 +101,14 @@ public <T extends State> T state(
public <T2> ValueState<T2> bindValue(StateTag<ValueState<T2>> address, Coder<T2> coder) {

return new FlinkBroadcastValueState<>(
stateBackend, address, namespace, coder, pipelineOptions.get());
stateBackend, address, namespace, coder, pipelineOptions);
}

@Override
public <T2> BagState<T2> bindBag(StateTag<BagState<T2>> address, Coder<T2> elemCoder) {

return new FlinkBroadcastBagState<>(
stateBackend, address, namespace, elemCoder, pipelineOptions.get());
stateBackend, address, namespace, elemCoder, pipelineOptions);
}

@Override
Expand Down Expand Up @@ -142,7 +141,7 @@ CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {

return new FlinkCombiningState<>(
stateBackend, address, combineFn, namespace, accumCoder, pipelineOptions.get());
stateBackend, address, combineFn, namespace, accumCoder, pipelineOptions);
}

@Override
Expand Down Expand Up @@ -187,7 +186,7 @@ private abstract class AbstractBroadcastState<T> {
String name,
StateNamespace namespace,
Coder<T> coder,
PipelineOptions pipelineOptions) {
SerializablePipelineOptions pipelineOptions) {
this.name = name;

this.namespace = namespace;
Expand Down Expand Up @@ -303,7 +302,7 @@ private class FlinkBroadcastValueState<T> extends AbstractBroadcastState<T>
StateTag<ValueState<T>> address,
StateNamespace namespace,
Coder<T> coder,
PipelineOptions pipelineOptions) {
SerializablePipelineOptions pipelineOptions) {
super(flinkStateBackend, address.getId(), namespace, coder, pipelineOptions);

this.namespace = namespace;
Expand Down Expand Up @@ -363,7 +362,7 @@ private class FlinkBroadcastBagState<T> extends AbstractBroadcastState<List<T>>
StateTag<BagState<T>> address,
StateNamespace namespace,
Coder<T> coder,
PipelineOptions pipelineOptions) {
SerializablePipelineOptions pipelineOptions) {
super(flinkStateBackend, address.getId(), namespace, ListCoder.of(coder), pipelineOptions);

this.namespace = namespace;
Expand Down Expand Up @@ -451,7 +450,7 @@ private class FlinkCombiningState<InputT, AccumT, OutputT> extends AbstractBroad
Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
StateNamespace namespace,
Coder<AccumT> accumCoder,
PipelineOptions pipelineOptions) {
SerializablePipelineOptions pipelineOptions) {
super(flinkStateBackend, address.getId(), namespace, accumCoder, pipelineOptions);

this.namespace = namespace;
Expand Down Expand Up @@ -568,7 +567,7 @@ private class FlinkCombiningStateWithContext<K2, InputT, AccumT, OutputT>
StateNamespace namespace,
Coder<AccumT> accumCoder,
CombineWithContext.Context context) {
super(flinkStateBackend, address.getId(), namespace, accumCoder, pipelineOptions.get());
super(flinkStateBackend, address.getId(), namespace, accumCoder, pipelineOptions);

this.namespace = namespace;
this.address = address;
Expand Down

0 comments on commit ffdee0b

Please sign in to comment.