From ff8da66cd35e7d1f8f2db235154d21b355663d20 Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Mon, 24 Aug 2020 15:58:42 +0800 Subject: [PATCH] kafka consumer props in runner config lint not required --- .../main/java/feast/ingestion/ImportJob.java | 6 +++++- .../feast/ingestion/options/ImportOptions.java | 5 +---- .../ingestion/transform/ReadFromSource.java | 4 ++-- .../runner/dataflow/DataflowRunnerConfig.java | 4 ++++ .../src/main/resources/application.yml | 3 +++ .../dataflow/DataflowRunnerConfigTest.java | 17 ++++++++++++++++- 6 files changed, 31 insertions(+), 8 deletions(-) diff --git a/ingestion/src/main/java/feast/ingestion/ImportJob.java b/ingestion/src/main/java/feast/ingestion/ImportJob.java index 777536cae9..e51d05608a 100644 --- a/ingestion/src/main/java/feast/ingestion/ImportJob.java +++ b/ingestion/src/main/java/feast/ingestion/ImportJob.java @@ -41,6 +41,7 @@ import feast.storage.api.writer.WriteResult; import feast.storage.connectors.bigquery.writer.BigQueryDeadletterSink; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -120,7 +121,10 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti .setSource(source) .setSuccessTag(FEATURE_ROW_OUT) .setFailureTag(DEADLETTER_OUT) - .setKafkaConsumerProperties(options.getKafkaConsumerProperties()) + .setKafkaConsumerProperties( + options.getKafkaConsumerProperties() == null + ? new HashMap<>() + : options.getKafkaConsumerProperties()) .build()); // Step 3. Process and validate incoming FeatureRows diff --git a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java index 0f3fb0570a..7416ee01f7 100644 --- a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java +++ b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java @@ -18,7 +18,6 @@ import java.util.List; import java.util.Map; - import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.direct.DirectOptions; import org.apache.beam.sdk.options.Default; @@ -75,9 +74,7 @@ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions, void setStoresJson(List storeJson); - @Required - @Description( - "Properties Map for Kafka Consumer used to pull FeatureRows") + @Description("Properties Map for Kafka Consumer used to pull FeatureRows") Map getKafkaConsumerProperties(); void setKafkaConsumerProperties(Map kafkaConsumerProperties); diff --git a/ingestion/src/main/java/feast/ingestion/transform/ReadFromSource.java b/ingestion/src/main/java/feast/ingestion/transform/ReadFromSource.java index 8f5e5bba4a..766e2adaf1 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/ReadFromSource.java +++ b/ingestion/src/main/java/feast/ingestion/transform/ReadFromSource.java @@ -34,7 +34,6 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.kafka.clients.consumer.ConsumerConfig; @AutoValue @@ -83,7 +82,8 @@ public ReadFromSource build() { @Override public PCollectionTuple expand(PBegin input) { Map consumerProperties = new HashMap<>(getKafkaConsumerProperties()); - consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, + consumerProperties.put( + ConsumerConfig.GROUP_ID_CONFIG, generateConsumerGroupId(input.getPipeline().getOptions().getJobName())); return input diff --git a/job-controller/src/main/java/feast/jobcontroller/runner/dataflow/DataflowRunnerConfig.java b/job-controller/src/main/java/feast/jobcontroller/runner/dataflow/DataflowRunnerConfig.java index ec7d0eec8f..1e70d2592e 100644 --- a/job-controller/src/main/java/feast/jobcontroller/runner/dataflow/DataflowRunnerConfig.java +++ b/job-controller/src/main/java/feast/jobcontroller/runner/dataflow/DataflowRunnerConfig.java @@ -47,6 +47,7 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) { this.labels = runnerConfigOptions.getLabelsMap(); this.enableStreamingEngine = runnerConfigOptions.getEnableStreamingEngine(); this.workerDiskType = runnerConfigOptions.getWorkerDiskType(); + this.kafkaConsumerProperties = runnerConfigOptions.getKafkaConsumerPropertiesMap(); validate(); } @@ -100,6 +101,9 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) { /* Type of persistent disk to be used by workers */ public String workerDiskType; + /* Kafka Consumer Config Properties used in FeatureRow Consumer */ + public Map kafkaConsumerProperties; + /** Validates Dataflow runner configuration options */ public void validate() { ValidatorFactory factory = Validation.buildDefaultValidatorFactory(); diff --git a/job-controller/src/main/resources/application.yml b/job-controller/src/main/resources/application.yml index 909ba1b42b..f39a81deac 100644 --- a/job-controller/src/main/resources/application.yml +++ b/job-controller/src/main/resources/application.yml @@ -56,6 +56,9 @@ feast: usePublicIps: false workerMachineType: n1-standard-1 deadLetterTableSpec: project_id:dataset_id.table_id + kafkaConsumerProperties: + "max.poll.records": "50000" + "receive.buffer.bytes": "33554432" # Configuration options for metric collection for all ingestion jobs metrics: diff --git a/job-controller/src/test/java/feast/jobcontroller/runner/dataflow/DataflowRunnerConfigTest.java b/job-controller/src/test/java/feast/jobcontroller/runner/dataflow/DataflowRunnerConfigTest.java index 1eeada7575..f444af519a 100644 --- a/job-controller/src/test/java/feast/jobcontroller/runner/dataflow/DataflowRunnerConfigTest.java +++ b/job-controller/src/test/java/feast/jobcontroller/runner/dataflow/DataflowRunnerConfigTest.java @@ -17,13 +17,15 @@ package feast.jobcontroller.runner.dataflow; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertThat; import com.google.common.collect.Lists; +import feast.ingestion.options.ImportOptions; import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions; import java.util.Arrays; import java.util.List; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.jupiter.api.Test; public class DataflowRunnerConfigTest { @@ -46,6 +48,8 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException { .setDeadLetterTableSpec("project_id:dataset_id.table_id") .setDiskSizeGb(100) .putLabels("key", "value") + .putKafkaConsumerProperties("max.poll.records", "1000") + .putKafkaConsumerProperties("receive.buffer.bytes", "1000000") .build(); DataflowRunnerConfig dataflowRunnerConfig = new DataflowRunnerConfig(opts); @@ -65,11 +69,20 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException { "--deadLetterTableSpec=project_id:dataset_id.table_id", "--diskSizeGb=100", "--labels={\"key\":\"value\"}", + "--kafkaConsumerProperties={\"max.poll.records\":\"1000\",\"receive.buffer.bytes\":\"1000000\"}", "--enableStreamingEngine=true", "--workerDiskType=pd-ssd") .toArray(String[]::new); + assertThat(args.size(), equalTo(expectedArgs.length)); assertThat(args, containsInAnyOrder(expectedArgs)); + + ImportOptions pipelineOptions = + PipelineOptionsFactory.fromArgs(dataflowRunnerConfig.toArgs()).as(ImportOptions.class); + + assertThat( + pipelineOptions.getKafkaConsumerProperties(), + equalTo(opts.getKafkaConsumerPropertiesMap())); } @Test @@ -103,8 +116,10 @@ public void shouldIgnoreOptionalArguments() throws IllegalAccessException { "--usePublicIps=false", "--workerMachineType=n1-standard-1", "--labels={}", + "--kafkaConsumerProperties={}", "--enableStreamingEngine=false") .toArray(String[]::new); + assertThat(args.size(), equalTo(expectedArgs.length)); assertThat(args, containsInAnyOrder(expectedArgs)); }