From a926e6f32c2b41edd1867d8c44178fbec3bfa4f9 Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Mon, 31 Aug 2020 15:13:18 +0800 Subject: [PATCH] fetch job created from dataflow (#966) This reverts commit a37d0a6d1e563dddbe4fe2d7f9739ed157842844. --- .../src/main/java/feast/jobcontroller/model/Job.java | 11 ++++++++--- .../runner/dataflow/DataflowJobManager.java | 4 ++++ job-controller/src/main/resources/application.yml | 4 ++-- .../runner/dataflow/DataflowJobManagerTest.java | 9 ++++++++- 4 files changed, 22 insertions(+), 6 deletions(-) diff --git a/job-controller/src/main/java/feast/jobcontroller/model/Job.java b/job-controller/src/main/java/feast/jobcontroller/model/Job.java index 6419399804..cf592d4567 100644 --- a/job-controller/src/main/java/feast/jobcontroller/model/Job.java +++ b/job-controller/src/main/java/feast/jobcontroller/model/Job.java @@ -88,10 +88,10 @@ public Date getLastUpdated() { } public void preSave() { - if (created == null) { - created = new Date(); + if (this.created == null) { + this.created = new Date(); } - lastUpdated = new Date(); + this.lastUpdated = new Date(); } public void setExtId(String extId) { @@ -102,6 +102,11 @@ public void setStatus(JobStatus status) { this.status = status; } + public void setCreated(Date created) { + this.created = created; + this.lastUpdated = created; + } + public boolean hasTerminated() { return getStatus().isTerminal(); } diff --git a/job-controller/src/main/java/feast/jobcontroller/runner/dataflow/DataflowJobManager.java b/job-controller/src/main/java/feast/jobcontroller/runner/dataflow/DataflowJobManager.java index 034d3b0f8f..4ceb8f50b8 100644 --- a/job-controller/src/main/java/feast/jobcontroller/runner/dataflow/DataflowJobManager.java +++ b/job-controller/src/main/java/feast/jobcontroller/runner/dataflow/DataflowJobManager.java @@ -51,6 +51,7 @@ import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.joda.time.DateTime; @Slf4j public class DataflowJobManager implements JobManager { @@ -307,6 +308,9 @@ public List listRunningJobs() { job.setExtId(dfJob.getId()); job.setStatus(JobStatus.RUNNING); + if (dfJob.getCreateTime() != null) { + job.setCreated(DateTime.parse(dfJob.getCreateTime()).toDate()); + } return job; }) diff --git a/job-controller/src/main/resources/application.yml b/job-controller/src/main/resources/application.yml index f39a81deac..49e7a777e9 100644 --- a/job-controller/src/main/resources/application.yml +++ b/job-controller/src/main/resources/application.yml @@ -57,8 +57,8 @@ feast: workerMachineType: n1-standard-1 deadLetterTableSpec: project_id:dataset_id.table_id kafkaConsumerProperties: - "max.poll.records": "50000" - "receive.buffer.bytes": "33554432" + "[max.poll.records]": "50000" + "[receive.buffer.bytes]": "33554432" # Configuration options for metric collection for all ingestion jobs metrics: diff --git a/job-controller/src/test/java/feast/jobcontroller/runner/dataflow/DataflowJobManagerTest.java b/job-controller/src/test/java/feast/jobcontroller/runner/dataflow/DataflowJobManagerTest.java index 4451bec142..5ac80788a5 100644 --- a/job-controller/src/test/java/feast/jobcontroller/runner/dataflow/DataflowJobManagerTest.java +++ b/job-controller/src/test/java/feast/jobcontroller/runner/dataflow/DataflowJobManagerTest.java @@ -51,6 +51,8 @@ import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.joda.time.DateTime; +import org.joda.time.LocalDateTime; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -237,6 +239,8 @@ public void shouldRetrieveRunningJobsFromDataflow() { Printer jsonPrinter = JsonFormat.printer(); + LocalDateTime created = DateTime.now().toLocalDateTime(); + when(dataflow .projects() .locations() @@ -248,6 +252,7 @@ public void shouldRetrieveRunningJobsFromDataflow() { new com.google.api.services.dataflow.model.Job() .setLabels(ImmutableMap.of("application", "feast")) .setId("job-2") + .setCreateTime(created.toString()) .setEnvironment( new Environment() .setSdkPipelineOptions( @@ -268,7 +273,9 @@ public void shouldRetrieveRunningJobsFromDataflow() { hasProperty("id", equalTo("kafka-to-redis")), hasProperty("source", equalTo(source)), hasProperty("stores", hasValue(store)), - hasProperty("extId", equalTo("job-2"))))); + hasProperty("extId", equalTo("job-2")), + hasProperty("created", equalTo(created.toDate())), + hasProperty("lastUpdated", equalTo(created.toDate()))))); } @Test