Skip to content

Commit

Permalink
KE-908 Store dead-letter messages (feast-dev#68)
Browse files Browse the repository at this point in the history
Closes KE-908

Store dead-letter messages (not passing validation) as Delta onto DBFS.
  • Loading branch information
algattik authored Jul 13, 2020
1 parent 86e8a6b commit 29071f1
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class DatabricksJobManager implements JobManager {
private final String checkpointLocation;
private final String jarFile;
private final DatabricksRunnerConfigOptions.DatabricksNewClusterOptions newClusterConfigOptions;
private final String deadLetterPath;
private final HttpClient httpClient;
private static final ObjectMapper mapper = ObjectMapperFactory.createObjectMapper();
private final int timeoutSeconds;
Expand All @@ -69,6 +70,7 @@ public DatabricksJobManager(
this.checkpointLocation = runnerConfigOptions.getCheckpointLocation();
this.httpClient = httpClient;
this.newClusterConfigOptions = runnerConfigOptions.getNewCluster();
this.deadLetterPath = runnerConfigOptions.getDeadLetterPath();
this.jarFile = runnerConfigOptions.getJarFile();
this.timeoutSeconds = runnerConfigOptions.getTimeoutSeconds();
}
Expand Down Expand Up @@ -222,7 +224,12 @@ private long createDatabricksRun(Job job) {

List<String> params =
Arrays.asList(
job.getId(), checkpointLocation, defaultFeastProject, featureSetsJson, storesJson);
job.getId(),
checkpointLocation,
defaultFeastProject,
deadLetterPath,
featureSetsJson,
storesJson);
RunsSubmitRequest runRequest = getJobRequest(jobName, params);

try {
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ feast:
host: databricks_host
# Token for authentication
token: ${DATABRICKS_TOKEN}
# Path to store Spark job checkpoints
# Path to store Spark job checkpoints
checkpointLocation: dbfs:/checkpoints/feast
# Path to the Jar file on the databricks file system
jarFile: jar_file
# Timeout in seconds for Databricks jobs, or -1 for unlimited
timeoutSeconds: -1
# Path to store dead letter data
deadLetterPath: dbfs:/feast/deadletter
newCluster:
# The platform version for scala and spark (e.g. 6.5.x-scala2.11)
sparkVersion: your_spark_version
Expand Down
1 change: 1 addition & 0 deletions infra/docker-compose/core/databricks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ feast:
host: http://databricks-emulator:8080
token: unused
checkpointLocation: /tmp/checkpoints/feast
deadLetterPath: dbfs:/feast/deadletter
jarFile: /opt/sparkjars/spark-ingestion-job.jar
timeoutSeconds: -1
newCluster:
Expand Down
1 change: 1 addition & 0 deletions infra/terraform/app/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ feast-core:
host: "${var.databricks_workspace_url}"
token: "${databricks_token.feast.token_value}"
checkpointLocation: dbfs:/checkpoints/feast
deadLetterPath: dbfs:/feast/deadletter
jarFile: "${local.databricks_dbfs_jar_folder}/sparkjars/spark-ingestion-job.jar"
timeoutSeconds: 1200
newCluster:
Expand Down
2 changes: 2 additions & 0 deletions protos/feast/core/Runner.proto
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,6 @@ message DatabricksRunnerConfigOptions {
string checkpointLocation = 5;

DatabricksNewClusterOptions newCluster = 6;

string deadLetterPath = 7;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
*/
package feast.ingestion.transform;

import feast.ingestion.enums.ValidationStatus;
import feast.ingestion.transform.fn.ProcessFeatureRowDoFn;
import feast.ingestion.transform.fn.ValidateFeatureRowDoFn;
import feast.ingestion.values.FeatureSet;
import feast.proto.types.FeatureRowProto;
import feast.proto.types.FeatureRowProto.FeatureRow;
import feast.spark.ingestion.RowWithValidationResult;
import java.util.HashMap;
import org.apache.spark.sql.Dataset;
Expand All @@ -35,7 +34,7 @@ public ProcessAndValidateFeatureRows(String defaultFeastProject) {
this.defaultFeastProject = defaultFeastProject;
}

public Dataset<byte[]> processDataset(
public Dataset<RowWithValidationResult> processDataset(
Dataset<Row> input, HashMap<String, FeatureSet> featureSets) {
ValidateFeatureRowDoFn validFeat = new ValidateFeatureRowDoFn(featureSets);

Expand All @@ -46,20 +45,11 @@ public Dataset<byte[]> processDataset(
.select("value")
.map(
r -> {
return validFeat.validateElement((byte[]) r.getAs(0));
FeatureRow featureRow = FeatureRow.parseFrom((byte[]) r.getAs(0));
FeatureRow el = procFeat.processElement(featureRow);
return validFeat.validateElement(el);
},
Encoders.kryo(RowWithValidationResult.class));

Dataset<RowWithValidationResult> validRows =
rowsWithValidationResult.filter(
row -> row.getValidationStatus().equals(ValidationStatus.SUCCESS));

return validRows.map(
r -> {
FeatureRowProto.FeatureRow featureRow =
FeatureRowProto.FeatureRow.parseFrom(r.getFeatureRow());
return procFeat.processElement(featureRow).toByteArray();
},
Encoders.BINARY());
return rowsWithValidationResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ public ValidateFeatureRowDoFn(HashMap<String, FeatureSet> featureSets) {
this.featureSets = featureSets;
}

public RowWithValidationResult validateElement(byte[] featureRowBytes)
public RowWithValidationResult validateElement(FeatureRow featureRow)
throws InvalidProtocolBufferException {
// TODO: Abstract duplicated validation logic into shared module.
String error = null;
FeatureRow featureRow = FeatureRow.parseFrom(featureRowBytes);
byte[] featureRowBytes = featureRow.toByteArray();
FeatureSet featureSet = featureSets.get(featureRow.getFeatureSet());
List<FieldProto.Field> fields = new ArrayList<>();
if (featureSet != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static feast.ingestion.utils.SpecUtil.getFeatureSetReference;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.ingestion.enums.ValidationStatus;
import feast.ingestion.transform.ProcessAndValidateFeatureRows;
import feast.ingestion.transform.ReadFromSource;
import feast.ingestion.utils.SpecUtil;
Expand All @@ -28,11 +29,15 @@
import feast.proto.core.StoreProto.Store;
import feast.spark.ingestion.delta.SparkDeltaSink;
import feast.spark.ingestion.redis.SparkRedisSink;
import feast.storage.api.writer.FailedElement;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.*;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,8 +49,21 @@ public class SparkIngestion {
private final String jobId;
private final String checkpointLocation;
private final String defaultFeastProject;
private final String deadLetterPath;
private final List<FeatureSet> featureSets;
private final List<Store> stores;
private static final StructType deadLetterType;

static {
StructType schema = new StructType();
schema = schema.add("timestamp", DataTypes.TimestampType, false);
schema = schema.add("job_name", DataTypes.StringType, true);
schema = schema.add("transform_name", DataTypes.StringType, true);
schema = schema.add("payload", DataTypes.StringType, true);
schema = schema.add("error_message", DataTypes.StringType, true);
schema = schema.add("stack_trace", DataTypes.StringType, true);
deadLetterType = schema;
}

/**
* Run a Spark ingestion job.
Expand All @@ -70,7 +88,7 @@ public static void main(String[] args) throws Exception {
* @throws InvalidProtocolBufferException
*/
public SparkIngestion(String[] args) throws InvalidProtocolBufferException {
int numArgs = 5;
int numArgs = 6;
if (args.length != numArgs) {
throw new IllegalArgumentException("Expecting " + numArgs + " arguments");
}
Expand All @@ -79,6 +97,7 @@ public SparkIngestion(String[] args) throws InvalidProtocolBufferException {
jobId = args[index++];
checkpointLocation = args[index++];
defaultFeastProject = args[index++];
deadLetterPath = args[index++];
String featureSetSpecsJson = args[index++];
String storesJson = args[index++];

Expand Down Expand Up @@ -182,7 +201,7 @@ public StreamingQuery createQuery() {
ProcessAndValidateFeatureRows processAndValidateFeatureRows =
new ProcessAndValidateFeatureRows(defaultFeastProject);

Dataset<byte[]> processedRows =
Dataset<RowWithValidationResult> processedRows =
processAndValidateFeatureRows.processDataset(input, featureSets);

// Start running the query that writes the data to sink
Expand All @@ -195,17 +214,50 @@ public StreamingQuery createQuery() {
.foreachBatch(
(batchDF, batchId) -> {
batchDF.persist();
Dataset<byte[]> validRows =
batchDF
.filter(row -> row.getValidationStatus().equals(ValidationStatus.SUCCESS))
.map(RowWithValidationResult::getFeatureRow, Encoders.BINARY());

validRows.persist();
consumerSinks.forEach(
c -> {
try {
c.call(batchDF, batchId);
c.call(validRows, batchId);
} catch (Exception e) {
log.error("Error invoking sink", e);
throw new RuntimeException(e);
}
});
validRows.unpersist();

storeDeadLetter(batchDF);

batchDF.unpersist();
})
.start();
}

private void storeDeadLetter(Dataset<RowWithValidationResult> batchDF) {
Dataset<RowWithValidationResult> invalidRows =
batchDF.filter(row -> row.getValidationStatus().equals(ValidationStatus.FAILURE));

invalidRows
.map(
e -> {
FailedElement element = e.getFailedElement();
return RowFactory.create(
new java.sql.Timestamp(element.getTimestamp().getMillis()),
element.getJobName(),
element.getTransformName(),
element.getPayload(),
element.getErrorMessage(),
element.getStackTrace());
},
RowEncoder.apply(deadLetterType))
.write()
.format("delta")
.mode("append")
.save(deadLetterPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public class SparkIngestionTest {

@Rule public TemporaryFolder checkpointFolder = new TemporaryFolder();

@Rule public TemporaryFolder deadLetterFolder = new TemporaryFolder();

@Rule public final SparkSessionRule spark = new SparkSessionRule();

@BeforeClass
Expand Down Expand Up @@ -146,11 +148,20 @@ public void streamingQueryShouldWriteKafkaPayloadAsDeltaLakeAndRedis() throws Ex
FeatureSet featureSetForDelta = TestUtil.createFeatureSetForDelta(kafka);
FeatureSetSpec specForRedis = featureSetForRedis.getSpec();
FeatureSetSpec specForDelta = featureSetForDelta.getSpec();
FeatureSetSpec invalidSpec =
FeatureSetSpec.newBuilder(specForDelta).setProject("invalid_project").build();

List<FeatureRow> inputForRedis =
TestUtil.generateTestData(specForRedis, IMPORT_JOB_SAMPLE_FEATURE_ROW_SIZE);
List<FeatureRow> inputForDelta =
TestUtil.generateTestData(specForDelta, IMPORT_JOB_SAMPLE_FEATURE_ROW_SIZE);
List<FeatureRow> invalidInput =
TestUtil.generateTestData(invalidSpec, IMPORT_JOB_SAMPLE_FEATURE_ROW_SIZE);
List<FeatureRow> allInputs =
Stream.concat(
Stream.concat(inputForRedis.stream(), invalidInput.stream()),
inputForDelta.stream())
.collect(Collectors.toList());

LOGGER.info("Starting Import Job");

Expand All @@ -173,22 +184,26 @@ public void streamingQueryShouldWriteKafkaPayloadAsDeltaLakeAndRedis() throws Ex
Dataset<Row> data = null;

String checkpointDir = checkpointFolder.getRoot().getAbsolutePath();
String deadLetterDir = deadLetterFolder.getRoot().getAbsolutePath();

SparkIngestion ingestion =
new SparkIngestion(
new String[] {
TEST_JOB_ID, checkpointDir, "myDefaultFeastProject", featureSetsJson, storesJson
TEST_JOB_ID,
checkpointDir,
"myDefaultFeastProject",
deadLetterDir,
featureSetsJson,
storesJson
});

StreamingQuery query = ingestion.createQuery();

LOGGER.info(
"Publishing {} Feature Row messages to Kafka ...",
inputForRedis.size() + inputForDelta.size());
LOGGER.info("Publishing {} Feature Row messages to Kafka ...", allInputs.size());
TestUtil.publishFeatureRowsToKafka(
KAFKA_BOOTSTRAP_SERVERS,
KAFKA_TOPIC,
Stream.concat(inputForRedis.stream(), inputForDelta.stream()).collect(Collectors.toList()),
allInputs,
ByteArraySerializer.class,
KAFKA_PUBLISH_TIMEOUT_SEC);

Expand Down Expand Up @@ -219,6 +234,8 @@ public void streamingQueryShouldWriteKafkaPayloadAsDeltaLakeAndRedis() throws Ex
TestUtil.validateRedis(featureSetForRedis, inputForRedis, redisConfig, TEST_JOB_ID);

validateDelta(featureSetForDelta, inputForDelta, data);

validateDeadLetter(invalidInput);
}

private <T extends MessageOrBuilder> String toJsonLines(Collection<T> items) {
Expand Down Expand Up @@ -254,6 +271,39 @@ public static void validateDelta(
assertEquals(new HashSet<>(input), delta);
}

private void validateDeadLetter(List<FeatureRow> invalidInput) throws Exception {
String deadLetterDir = deadLetterFolder.getRoot().getAbsolutePath();
for (int i = 0; i < 60; i++) {

Dataset<Row> data = spark.session.read().format("delta").load(deadLetterDir.toString());
long count = data.count();
assertThat(count, is((long) IMPORT_JOB_SAMPLE_FEATURE_ROW_SIZE));
Row f = data.first();
if (f.length() > 0) {
break;
} else {
LOGGER.info("Delta directory not yet created.");
}
Thread.sleep(1000);
}

Dataset<Row> data = spark.session.read().format("delta").load(deadLetterDir.toString());
long count = data.count();
assertThat(count, is((long) IMPORT_JOB_SAMPLE_FEATURE_ROW_SIZE));
Row f = data.first();
assertThat(f.length(), is(6));
int i = 0;
assertThat("timestamp", f.get(i++), instanceOf(java.sql.Timestamp.class));
assertThat("jobName", (String) f.getAs(i++), equalTo(""));
assertThat("transformName", (String) f.getAs(i++), is("ValidateFeatureRow"));
assertThat("payload", (String) f.getAs(i++), startsWith("fields"));
assertThat(
"errorMessage",
(String) f.getAs(i++),
containsString("FeatureRow contains invalid feature set id"));
assertThat("stackTrace", (String) f.getAs(i++), equalTo(null));
}

public static FeatureRow sparkRowToFeatureRow(FeatureSetSpec featureSetSpec, Row row) {
java.sql.Timestamp ts = row.getAs(FeatureRowToSparkRow.EVENT_TIMESTAMP_COLUMN);
Builder builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void shouldOutputFailedElementOnFailedValidation() throws Exception {

ValidateFeatureRowDoFn validFeat = new ValidateFeatureRowDoFn(featureSets);

RowWithValidationResult result = validFeat.validateElement(invalidRow.toByteArray());
RowWithValidationResult result = validFeat.validateElement(invalidRow);

assertThat(result.getValidationStatus(), equalTo(ValidationStatus.FAILURE));
}
Expand Down Expand Up @@ -124,7 +124,7 @@ public void shouldOutputSuccessStatusOnSuccessfulValidation() throws Exception {

FeatureRowProto.FeatureRow randomRow = TestUtil.createRandomFeatureRow(fs1);

RowWithValidationResult result = validFeat.validateElement(randomRow.toByteArray());
RowWithValidationResult result = validFeat.validateElement(randomRow);

assertThat(result.getValidationStatus(), equalTo(ValidationStatus.SUCCESS));
}
Expand Down

0 comments on commit 29071f1

Please sign in to comment.