From 20a45bb6eaa271074f8b1e1a3c4e21b9f8bd97ce Mon Sep 17 00:00:00 2001 From: liferoad Date: Thu, 10 Oct 2024 15:22:05 -0400 Subject: [PATCH] Fix the FlinkRunner for Dataproc (#32742) --- .../interactive/dataproc/dataproc_cluster_manager.py | 4 +++- .../apache_beam/runners/interactive/interactive_beam.py | 7 ++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py index 2b39279f43e9..4d260d4a6a56 100644 --- a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py +++ b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py @@ -169,6 +169,7 @@ def create_cluster(self, cluster: dict) -> None: def create_flink_cluster(self) -> None: """Calls _create_cluster with a configuration that enables FlinkRunner.""" init_action_path = self.stage_init_action() + # https://cloud.google.com/php/docs/reference/cloud-dataproc/latest/V1.Cluster cluster = { 'project_id': self.cluster_metadata.project_id, 'cluster_name': self.cluster_metadata.cluster_name, @@ -194,7 +195,8 @@ def create_flink_cluster(self) -> None: }, 'service_account_scopes': [ 'https://www.googleapis.com/auth/cloud-platform' - ] + ], + 'internal_ip_only': False }, 'master_config': { # There must be 1 and only 1 instance of master. diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py b/sdks/python/apache_beam/runners/interactive/interactive_beam.py index 9554abf3a47a..60453b5066c3 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py @@ -408,10 +408,11 @@ class Clusters: To configure a pipeline to run on a local FlinkRunner, explicitly set the default cluster metadata to None: ib.clusters.set_default_cluster(None). """ - # Explicitly set the Flink version here to ensure compatibility with 2.1 + # Explicitly set the Flink version here to ensure compatibility with 2.2 # Dataproc images: - # https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.1 - DATAPROC_FLINK_VERSION = '1.15' + # https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.2 + # you can manually override this by importing Clusters + DATAPROC_FLINK_VERSION = '1.17' # The minimum worker number to create a Dataproc cluster. DATAPROC_MINIMUM_WORKER_NUM = 2