Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable kafka consumer in IngestionJob #959

Merged
merged 2 commits into from
Aug 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import feast.storage.api.writer.WriteResult;
import feast.storage.connectors.bigquery.writer.BigQueryDeadletterSink;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -120,6 +121,10 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
.setSource(source)
.setSuccessTag(FEATURE_ROW_OUT)
.setFailureTag(DEADLETTER_OUT)
.setKafkaConsumerProperties(
options.getKafkaConsumerProperties() == null
? new HashMap<>()
: options.getKafkaConsumerProperties())
.build());

// Step 3. Process and validate incoming FeatureRows
Expand Down Expand Up @@ -172,14 +177,6 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
DeadletterSink deadletterSink =
new BigQueryDeadletterSink(options.getDeadLetterTableSpec());

convertedFeatureRows
.get(DEADLETTER_OUT)
.apply("WriteFailedElements_ReadFromSource", deadletterSink.write());

validatedRows
.get(DEADLETTER_OUT)
.apply("WriteFailedElements_ValidateRows", deadletterSink.write());

writeFeatureRows
.getFailedInserts()
.apply("WriteFailedElements_WriteFeatureRowToStore", deadletterSink.write());
Expand All @@ -195,6 +192,18 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
.apply("WriteFailureMetrics", WriteFailureMetricsTransform.create(store.getName()));
}

if (options.getDeadLetterTableSpec() != null) {
DeadletterSink deadletterSink = new BigQueryDeadletterSink(options.getDeadLetterTableSpec());

convertedFeatureRows
.get(DEADLETTER_OUT)
.apply("WriteFailedElements_ReadFromSource", deadletterSink.write());

validatedRows
.get(DEADLETTER_OUT)
.apply("WriteFailedElements_ValidateRows", deadletterSink.write());
}

sinkReadiness
.apply(Flatten.pCollections())
.apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package feast.ingestion.options;

import java.util.List;
import java.util.Map;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.options.Default;
Expand Down Expand Up @@ -73,6 +74,11 @@ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions,

void setStoresJson(List<String> storeJson);

@Description("Properties Map for Kafka Consumer used to pull FeatureRows")
Map<String, String> getKafkaConsumerProperties();

void setKafkaConsumerProperties(Map<String, String> kafkaConsumerProperties);

@Description(
"(Optional) Deadletter elements will be written to this BigQuery table."
+ "Table spec must follow this format PROJECT_ID:DATASET_ID.PROJECT_ID"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import feast.proto.types.FeatureRowProto.FeatureRow;
import feast.storage.api.writer.FailedElement;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform;
Expand All @@ -32,7 +34,7 @@
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.consumer.ConsumerConfig;

@AutoValue
public abstract class ReadFromSource extends PTransform<PBegin, PCollectionTuple> {
Expand All @@ -43,6 +45,8 @@ public abstract class ReadFromSource extends PTransform<PBegin, PCollectionTuple

public abstract TupleTag<FailedElement> getFailureTag();

public abstract Map<String, String> getKafkaConsumerProperties();

public static Builder newBuilder() {
return new AutoValue_ReadFromSource.Builder();
}
Expand All @@ -56,6 +60,8 @@ public abstract static class Builder {

public abstract Builder setFailureTag(TupleTag<FailedElement> failureTag);

public abstract Builder setKafkaConsumerProperties(Map<String, String> kafkaConsumerProperties);

abstract ReadFromSource autobuild();

public ReadFromSource build() {
Expand All @@ -75,17 +81,19 @@ public ReadFromSource build() {

@Override
public PCollectionTuple expand(PBegin input) {
Map<String, Object> consumerProperties = new HashMap<>(getKafkaConsumerProperties());
consumerProperties.put(
ConsumerConfig.GROUP_ID_CONFIG,
generateConsumerGroupId(input.getPipeline().getOptions().getJobName()));

return input
.getPipeline()
.apply(
"ReadFromKafka",
KafkaIO.readBytes()
.withBootstrapServers(getSource().getKafkaSourceConfig().getBootstrapServers())
.withTopic(getSource().getKafkaSourceConfig().getTopic())
.withConsumerConfigUpdates(
ImmutableMap.of(
"group.id",
generateConsumerGroupId(input.getPipeline().getOptions().getJobName())))
.withConsumerConfigUpdates(consumerProperties)
.withReadCommitted()
.commitOffsetsInFinalize())
.apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
this.labels = runnerConfigOptions.getLabelsMap();
this.enableStreamingEngine = runnerConfigOptions.getEnableStreamingEngine();
this.workerDiskType = runnerConfigOptions.getWorkerDiskType();
this.kafkaConsumerProperties = runnerConfigOptions.getKafkaConsumerPropertiesMap();
validate();
}

Expand Down Expand Up @@ -100,6 +101,9 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
/* Type of persistent disk to be used by workers */
public String workerDiskType;

/* Kafka Consumer Config Properties used in FeatureRow Consumer */
public Map<String, String> kafkaConsumerProperties;

/** Validates Dataflow runner configuration options */
public void validate() {
ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
Expand Down
3 changes: 3 additions & 0 deletions job-controller/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ feast:
usePublicIps: false
workerMachineType: n1-standard-1
deadLetterTableSpec: project_id:dataset_id.table_id
kafkaConsumerProperties:
"max.poll.records": "50000"
"receive.buffer.bytes": "33554432"

# Configuration options for metric collection for all ingestion jobs
metrics:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
package feast.jobcontroller.runner.dataflow;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;

import com.google.common.collect.Lists;
import feast.ingestion.options.ImportOptions;
import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.jupiter.api.Test;

public class DataflowRunnerConfigTest {
Expand All @@ -46,6 +48,8 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException {
.setDeadLetterTableSpec("project_id:dataset_id.table_id")
.setDiskSizeGb(100)
.putLabels("key", "value")
.putKafkaConsumerProperties("max.poll.records", "1000")
.putKafkaConsumerProperties("receive.buffer.bytes", "1000000")
.build();

DataflowRunnerConfig dataflowRunnerConfig = new DataflowRunnerConfig(opts);
Expand All @@ -65,11 +69,20 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException {
"--deadLetterTableSpec=project_id:dataset_id.table_id",
"--diskSizeGb=100",
"--labels={\"key\":\"value\"}",
"--kafkaConsumerProperties={\"max.poll.records\":\"1000\",\"receive.buffer.bytes\":\"1000000\"}",
"--enableStreamingEngine=true",
"--workerDiskType=pd-ssd")
.toArray(String[]::new);

assertThat(args.size(), equalTo(expectedArgs.length));
assertThat(args, containsInAnyOrder(expectedArgs));

ImportOptions pipelineOptions =
PipelineOptionsFactory.fromArgs(dataflowRunnerConfig.toArgs()).as(ImportOptions.class);

assertThat(
pipelineOptions.getKafkaConsumerProperties(),
equalTo(opts.getKafkaConsumerPropertiesMap()));
}

@Test
Expand Down Expand Up @@ -103,8 +116,10 @@ public void shouldIgnoreOptionalArguments() throws IllegalAccessException {
"--usePublicIps=false",
"--workerMachineType=n1-standard-1",
"--labels={}",
"--kafkaConsumerProperties={}",
"--enableStreamingEngine=false")
.toArray(String[]::new);

assertThat(args.size(), equalTo(expectedArgs.length));
assertThat(args, containsInAnyOrder(expectedArgs));
}
Expand Down
3 changes: 3 additions & 0 deletions protos/feast/core/Runner.proto
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,7 @@ message DataflowRunnerConfigOptions {

/* Type of persistent disk to be used by workers */
string workerDiskType = 16;

/* Kafka consumer configuration properties */
map<string, string> kafkaConsumerProperties = 17;
}