diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index ea8bc9d089f8..a6e8df6d2b0b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -1540,7 +1540,8 @@ private void getConfigFromWindmill(String computation) { for (Windmill.GetConfigResponse.ComputationConfigMapEntry computationConfig : response.getComputationConfigMapList()) { Map transformUserNameToStateFamily = - transformUserNameToStateFamilyByComputationId.get(computationConfig.getComputationId()); + transformUserNameToStateFamilyByComputationId.computeIfAbsent( + computationConfig.getComputationId(), k -> new HashMap<>()); for (Windmill.ComputationConfig.TransformUserNameToStateFamilyEntry entry : computationConfig.getComputationConfig().getTransformUserNameToStateFamilyList()) { transformUserNameToStateFamily.put(entry.getTransformUserName(), entry.getStateFamily()); @@ -1949,7 +1950,10 @@ public ComputationState( this.computationId = computationId; this.mapTask = mapTask; this.executor = executor; - this.transformUserNameToStateFamily = ImmutableMap.copyOf(transformUserNameToStateFamily); + this.transformUserNameToStateFamily = + transformUserNameToStateFamily != null + ? ImmutableMap.copyOf(transformUserNameToStateFamily) + : ImmutableMap.of(); Preconditions.checkNotNull(mapTask.getStageName()); Preconditions.checkNotNull(mapTask.getSystemName()); }