diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index bd90206117..6002133e82 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -18,6 +18,7 @@ import static feast.core.util.PipelineUtil.detectClassPathResourcesToStage; +import com.google.api.client.auth.oauth2.Credential; import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; import com.google.api.client.json.jackson2.JacksonFactory; @@ -66,16 +67,15 @@ public class DataflowJobManager implements JobManager { public DataflowJobManager( Map runnerConfigOptions, MetricsProperties metricsProperties) { + this(runnerConfigOptions, metricsProperties, getGoogleCredential()); + } - DataflowRunnerConfig config = new DataflowRunnerConfig(runnerConfigOptions); + public DataflowJobManager( + Map runnerConfigOptions, + MetricsProperties metricsProperties, + Credential credential) { - GoogleCredential credential = null; - try { - credential = GoogleCredential.getApplicationDefault().createScoped(DataflowScopes.all()); - } catch (IOException e) { - throw new IllegalStateException( - "Unable to find credential required for Dataflow monitoring API", e); - } + DataflowRunnerConfig config = new DataflowRunnerConfig(runnerConfigOptions); Dataflow dataflow = null; try { @@ -97,6 +97,17 @@ public DataflowJobManager( this.location = config.getRegion(); } + private static Credential getGoogleCredential() { + GoogleCredential credential = null; + try { + credential = GoogleCredential.getApplicationDefault().createScoped(DataflowScopes.all()); + } catch (IOException e) { + throw new IllegalStateException( + "Unable to find credential required for Dataflow monitoring API", e); + } + return credential; + } + @Override public Runner getRunnerType() { return RUNNER_TYPE; diff --git a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java index 340ea2dfee..e610f39373 100644 --- a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java +++ b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java @@ -22,6 +22,8 @@ import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.googleapis.testing.auth.oauth2.MockGoogleCredential; import com.google.api.services.dataflow.Dataflow; import com.google.common.collect.Lists; import com.google.protobuf.Duration; @@ -83,7 +85,14 @@ public void setUp() { defaults.put("subnetwork", "subnetwork"); MetricsProperties metricsProperties = new MetricsProperties(); metricsProperties.setEnabled(false); - dfJobManager = new DataflowJobManager(defaults, metricsProperties); + Credential credential = null; + try { + credential = MockGoogleCredential.getApplicationDefault(); + } catch (IOException e) { + e.printStackTrace(); + } + + dfJobManager = new DataflowJobManager(defaults, metricsProperties, credential); dfJobManager = spy(dfJobManager); }