Skip to content

Commit

Permalink
kafka consumer props in runner config
Browse files Browse the repository at this point in the history
lint

not required
  • Loading branch information
pyalex committed Aug 25, 2020
1 parent bccc786 commit ff8da66
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 8 deletions.
6 changes: 5 additions & 1 deletion ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import feast.storage.api.writer.WriteResult;
import feast.storage.connectors.bigquery.writer.BigQueryDeadletterSink;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -120,7 +121,10 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
.setSource(source)
.setSuccessTag(FEATURE_ROW_OUT)
.setFailureTag(DEADLETTER_OUT)
.setKafkaConsumerProperties(options.getKafkaConsumerProperties())
.setKafkaConsumerProperties(
options.getKafkaConsumerProperties() == null
? new HashMap<>()
: options.getKafkaConsumerProperties())
.build());

// Step 3. Process and validate incoming FeatureRows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.util.List;
import java.util.Map;

import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.options.Default;
Expand Down Expand Up @@ -75,9 +74,7 @@ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions,

void setStoresJson(List<String> storeJson);

@Required
@Description(
"Properties Map for Kafka Consumer used to pull FeatureRows")
@Description("Properties Map for Kafka Consumer used to pull FeatureRows")
Map<String, String> getKafkaConsumerProperties();

void setKafkaConsumerProperties(Map<String, String> kafkaConsumerProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.consumer.ConsumerConfig;

@AutoValue
Expand Down Expand Up @@ -83,7 +82,8 @@ public ReadFromSource build() {
@Override
public PCollectionTuple expand(PBegin input) {
Map<String, Object> consumerProperties = new HashMap<>(getKafkaConsumerProperties());
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG,
consumerProperties.put(
ConsumerConfig.GROUP_ID_CONFIG,
generateConsumerGroupId(input.getPipeline().getOptions().getJobName()));

return input
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
this.labels = runnerConfigOptions.getLabelsMap();
this.enableStreamingEngine = runnerConfigOptions.getEnableStreamingEngine();
this.workerDiskType = runnerConfigOptions.getWorkerDiskType();
this.kafkaConsumerProperties = runnerConfigOptions.getKafkaConsumerPropertiesMap();
validate();
}

Expand Down Expand Up @@ -100,6 +101,9 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
/* Type of persistent disk to be used by workers */
public String workerDiskType;

/* Kafka Consumer Config Properties used in FeatureRow Consumer */
public Map<String, String> kafkaConsumerProperties;

/** Validates Dataflow runner configuration options */
public void validate() {
ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
Expand Down
3 changes: 3 additions & 0 deletions job-controller/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ feast:
usePublicIps: false
workerMachineType: n1-standard-1
deadLetterTableSpec: project_id:dataset_id.table_id
kafkaConsumerProperties:
"max.poll.records": "50000"
"receive.buffer.bytes": "33554432"

# Configuration options for metric collection for all ingestion jobs
metrics:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
package feast.jobcontroller.runner.dataflow;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;

import com.google.common.collect.Lists;
import feast.ingestion.options.ImportOptions;
import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.jupiter.api.Test;

public class DataflowRunnerConfigTest {
Expand All @@ -46,6 +48,8 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException {
.setDeadLetterTableSpec("project_id:dataset_id.table_id")
.setDiskSizeGb(100)
.putLabels("key", "value")
.putKafkaConsumerProperties("max.poll.records", "1000")
.putKafkaConsumerProperties("receive.buffer.bytes", "1000000")
.build();

DataflowRunnerConfig dataflowRunnerConfig = new DataflowRunnerConfig(opts);
Expand All @@ -65,11 +69,20 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException {
"--deadLetterTableSpec=project_id:dataset_id.table_id",
"--diskSizeGb=100",
"--labels={\"key\":\"value\"}",
"--kafkaConsumerProperties={\"max.poll.records\":\"1000\",\"receive.buffer.bytes\":\"1000000\"}",
"--enableStreamingEngine=true",
"--workerDiskType=pd-ssd")
.toArray(String[]::new);

assertThat(args.size(), equalTo(expectedArgs.length));
assertThat(args, containsInAnyOrder(expectedArgs));

ImportOptions pipelineOptions =
PipelineOptionsFactory.fromArgs(dataflowRunnerConfig.toArgs()).as(ImportOptions.class);

assertThat(
pipelineOptions.getKafkaConsumerProperties(),
equalTo(opts.getKafkaConsumerPropertiesMap()));
}

@Test
Expand Down Expand Up @@ -103,8 +116,10 @@ public void shouldIgnoreOptionalArguments() throws IllegalAccessException {
"--usePublicIps=false",
"--workerMachineType=n1-standard-1",
"--labels={}",
"--kafkaConsumerProperties={}",
"--enableStreamingEngine=false")
.toArray(String[]::new);

assertThat(args.size(), equalTo(expectedArgs.length));
assertThat(args, containsInAnyOrder(expectedArgs));
}
Expand Down

0 comments on commit ff8da66

Please sign in to comment.