, PDone> write();
+}
diff --git a/storage/api/src/main/java/feast/storage/api/write/FailedElement.java b/storage/api/src/main/java/feast/storage/api/write/FailedElement.java
new file mode 100644
index 0000000000..40d846d6ab
--- /dev/null
+++ b/storage/api/src/main/java/feast/storage/api/write/FailedElement.java
@@ -0,0 +1,83 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2019 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.storage.api.write;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.joda.time.Instant;
+
+@AutoValue
+// Use DefaultSchema annotation so this AutoValue class can be serialized by Beam
+// https://issues.apache.org/jira/browse/BEAM-1891
+// https://github.com/apache/beam/pull/7334
+@DefaultSchema(AutoValueSchema.class)
+public abstract class FailedElement {
+ public abstract Instant getTimestamp();
+
+ @Nullable
+ public abstract String getJobName();
+
+ @Nullable
+ public abstract String getProjectName();
+
+ @Nullable
+ public abstract String getFeatureSetName();
+
+ @Nullable
+ public abstract String getFeatureSetVersion();
+
+ @Nullable
+ public abstract String getTransformName();
+
+ @Nullable
+ public abstract String getPayload();
+
+ @Nullable
+ public abstract String getErrorMessage();
+
+ @Nullable
+ public abstract String getStackTrace();
+
+ public static Builder newBuilder() {
+ return new AutoValue_FailedElement.Builder().setTimestamp(Instant.now());
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setTimestamp(Instant timestamp);
+
+ public abstract Builder setProjectName(String projectName);
+
+ public abstract Builder setFeatureSetName(String featureSetName);
+
+ public abstract Builder setFeatureSetVersion(String featureSetVersion);
+
+ public abstract Builder setJobName(String jobName);
+
+ public abstract Builder setTransformName(String transformName);
+
+ public abstract Builder setPayload(String payload);
+
+ public abstract Builder setErrorMessage(String errorMessage);
+
+ public abstract Builder setStackTrace(String stackTrace);
+
+ public abstract FailedElement build();
+ }
+}
diff --git a/storage/api/src/main/java/feast/storage/api/write/FeatureSink.java b/storage/api/src/main/java/feast/storage/api/write/FeatureSink.java
new file mode 100644
index 0000000000..cc93e0f639
--- /dev/null
+++ b/storage/api/src/main/java/feast/storage/api/write/FeatureSink.java
@@ -0,0 +1,54 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.storage.api.write;
+
+import feast.core.FeatureSetProto;
+import feast.types.FeatureRowProto.FeatureRow;
+import java.io.Serializable;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+
+/** Interface for implementing user defined feature sink functionality. */
+public interface FeatureSink extends Serializable {
+
+ /**
+ * Set up storage backend for write. This method will be called once during pipeline
+ * initialisation.
+ *
+ * Examples when schemas need to be updated:
+ *
+ *
+ * - when a new entity is registered, a table usually needs to be created
+ *
- when a new feature is registered, a column with appropriate data type usually needs to be
+ * created
+ *
+ *
+ * If the storage backend is a key-value or a schema-less database, however, there may not be a
+ * need to manage any schemas.
+ *
+ * @param featureSet Feature set to be written
+ */
+ void prepareWrite(FeatureSetProto.FeatureSet featureSet);
+
+ /**
+ * Get a {@link PTransform} that writes feature rows to the store, and returns a {@link
+ * WriteResult} that splits successful and failed inserts to be separately logged.
+ *
+ * @return {@link PTransform}
+ */
+ PTransform, WriteResult> write();
+}
diff --git a/storage/api/src/main/java/feast/storage/api/write/WriteResult.java b/storage/api/src/main/java/feast/storage/api/write/WriteResult.java
new file mode 100644
index 0000000000..ab92f3600b
--- /dev/null
+++ b/storage/api/src/main/java/feast/storage/api/write/WriteResult.java
@@ -0,0 +1,89 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.storage.api.write;
+
+import com.google.common.collect.ImmutableMap;
+import feast.types.FeatureRowProto.FeatureRow;
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.*;
+
+/** The result of a write transform. */
+public final class WriteResult implements Serializable, POutput {
+
+ private final Pipeline pipeline;
+ private final PCollection successfulInserts;
+ private final PCollection failedInserts;
+
+ private static TupleTag successfulInsertsTag = new TupleTag<>("successfulInserts");
+ private static TupleTag failedInsertsTupleTag = new TupleTag<>("failedInserts");
+
+ /** Creates a {@link WriteResult} in the given {@link Pipeline}. */
+ public static WriteResult in(
+ Pipeline pipeline,
+ PCollection successfulInserts,
+ PCollection failedInserts) {
+ return new WriteResult(pipeline, successfulInserts, failedInserts);
+ }
+
+ private WriteResult(
+ Pipeline pipeline,
+ PCollection successfulInserts,
+ PCollection failedInserts) {
+
+ this.pipeline = pipeline;
+ this.successfulInserts = successfulInserts;
+ this.failedInserts = failedInserts;
+ }
+
+ /**
+ * Gets set of feature rows that were unsuccessfully written to the store. The failed feature rows
+ * are wrapped in FailedElement objects so implementations of WriteResult can be flexible in how
+ * errors are stored.
+ *
+ * @return FailedElements of unsuccessfully written feature rows
+ */
+ public PCollection getFailedInserts() {
+ return failedInserts;
+ }
+
+ /**
+ * Gets set of successfully written feature rows.
+ *
+ * @return PCollection of feature rows successfully written to the store
+ */
+ public PCollection getSuccessfulInserts() {
+ return successfulInserts;
+ }
+
+ @Override
+ public Pipeline getPipeline() {
+ return pipeline;
+ }
+
+ @Override
+ public Map, PValue> expand() {
+ return ImmutableMap.of(
+ successfulInsertsTag, successfulInserts, failedInsertsTupleTag, failedInserts);
+ }
+
+ @Override
+ public void finishSpecifyingOutput(
+ String transformName, PInput input, PTransform, ?> transform) {}
+}
diff --git a/storage/api/src/main/java/feast/storage/common/testing/TestUtil.java b/storage/api/src/main/java/feast/storage/common/testing/TestUtil.java
new file mode 100644
index 0000000000..d26930fbfe
--- /dev/null
+++ b/storage/api/src/main/java/feast/storage/common/testing/TestUtil.java
@@ -0,0 +1,187 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2019 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.storage.common.testing;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.util.Timestamps;
+import feast.core.FeatureSetProto.FeatureSet;
+import feast.core.FeatureSetProto.FeatureSetSpec;
+import feast.types.FeatureRowProto.FeatureRow;
+import feast.types.FeatureRowProto.FeatureRow.Builder;
+import feast.types.FieldProto.Field;
+import feast.types.ValueProto.*;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.commons.lang3.RandomStringUtils;
+
+@SuppressWarnings("WeakerAccess")
+public class TestUtil {
+
+ /**
+ * Create a Feature Row with random value according to the FeatureSetSpec
+ *
+ * See {@link #createRandomFeatureRow(FeatureSet, int)}
+ */
+ public static FeatureRow createRandomFeatureRow(FeatureSet featureSet) {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int randomStringSizeMaxSize = 12;
+ return createRandomFeatureRow(featureSet, random.nextInt(0, randomStringSizeMaxSize) + 4);
+ }
+
+ /**
+ * Create a Feature Row with random value according to the FeatureSet.
+ *
+ *
The Feature Row created contains fields according to the entities and features defined in
+ * FeatureSet, matching the value type of the field, with randomized value for testing.
+ *
+ * @param featureSet {@link FeatureSet}
+ * @param randomStringSize number of characters for the generated random string
+ * @return {@link FeatureRow}
+ */
+ public static FeatureRow createRandomFeatureRow(FeatureSet featureSet, int randomStringSize) {
+ Builder builder =
+ FeatureRow.newBuilder()
+ .setFeatureSet(getFeatureSetReference(featureSet))
+ .setEventTimestamp(Timestamps.fromMillis(System.currentTimeMillis()));
+
+ featureSet
+ .getSpec()
+ .getEntitiesList()
+ .forEach(
+ field -> {
+ builder.addFields(
+ Field.newBuilder()
+ .setName(field.getName())
+ .setValue(createRandomValue(field.getValueType(), randomStringSize))
+ .build());
+ });
+
+ featureSet
+ .getSpec()
+ .getFeaturesList()
+ .forEach(
+ field -> {
+ builder.addFields(
+ Field.newBuilder()
+ .setName(field.getName())
+ .setValue(createRandomValue(field.getValueType(), randomStringSize))
+ .build());
+ });
+
+ return builder.build();
+ }
+
+ private static String getFeatureSetReference(FeatureSet featureSet) {
+ FeatureSetSpec spec = featureSet.getSpec();
+ return String.format("%s/%s:%d", spec.getProject(), spec.getName(), spec.getVersion());
+ }
+
+ /**
+ * Create a random Feast {@link Value} of {@link ValueType.Enum}.
+ *
+ * @param type {@link ValueType.Enum}
+ * @param randomStringSize number of characters for the generated random string
+ * @return {@link Value}
+ */
+ public static Value createRandomValue(ValueType.Enum type, int randomStringSize) {
+ Value.Builder builder = Value.newBuilder();
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ switch (type) {
+ case INVALID:
+ case UNRECOGNIZED:
+ throw new IllegalArgumentException("Invalid ValueType: " + type);
+ case BYTES:
+ builder.setBytesVal(
+ ByteString.copyFrom(RandomStringUtils.randomAlphanumeric(randomStringSize).getBytes()));
+ break;
+ case STRING:
+ builder.setStringVal(RandomStringUtils.randomAlphanumeric(randomStringSize));
+ break;
+ case INT32:
+ builder.setInt32Val(random.nextInt());
+ break;
+ case INT64:
+ builder.setInt64Val(random.nextLong());
+ break;
+ case DOUBLE:
+ builder.setDoubleVal(random.nextDouble());
+ break;
+ case FLOAT:
+ builder.setFloatVal(random.nextFloat());
+ break;
+ case BOOL:
+ builder.setBoolVal(random.nextBoolean());
+ break;
+ case BYTES_LIST:
+ builder.setBytesListVal(
+ BytesList.newBuilder()
+ .addVal(
+ ByteString.copyFrom(
+ RandomStringUtils.randomAlphanumeric(randomStringSize).getBytes()))
+ .build());
+ break;
+ case STRING_LIST:
+ builder.setStringListVal(
+ StringList.newBuilder()
+ .addVal(RandomStringUtils.randomAlphanumeric(randomStringSize))
+ .build());
+ break;
+ case INT32_LIST:
+ builder.setInt32ListVal(Int32List.newBuilder().addVal(random.nextInt()).build());
+ break;
+ case INT64_LIST:
+ builder.setInt64ListVal(Int64List.newBuilder().addVal(random.nextLong()).build());
+ break;
+ case DOUBLE_LIST:
+ builder.setDoubleListVal(DoubleList.newBuilder().addVal(random.nextDouble()).build());
+ break;
+ case FLOAT_LIST:
+ builder.setFloatListVal(FloatList.newBuilder().addVal(random.nextFloat()).build());
+ break;
+ case BOOL_LIST:
+ builder.setBoolListVal(BoolList.newBuilder().addVal(random.nextBoolean()).build());
+ break;
+ }
+ return builder.build();
+ }
+
+ /**
+ * Create a field object with given name and type.
+ *
+ * @param name of the field.
+ * @param value of the field. Should be compatible with the valuetype given.
+ * @param valueType type of the field.
+ * @return Field object
+ */
+ public static Field field(String name, Object value, ValueType.Enum valueType) {
+ Field.Builder fieldBuilder = Field.newBuilder().setName(name);
+ switch (valueType) {
+ case INT32:
+ return fieldBuilder.setValue(Value.newBuilder().setInt32Val((int) value)).build();
+ case INT64:
+ return fieldBuilder.setValue(Value.newBuilder().setInt64Val((int) value)).build();
+ case FLOAT:
+ return fieldBuilder.setValue(Value.newBuilder().setFloatVal((float) value)).build();
+ case DOUBLE:
+ return fieldBuilder.setValue(Value.newBuilder().setDoubleVal((double) value)).build();
+ case STRING:
+ return fieldBuilder.setValue(Value.newBuilder().setStringVal((String) value)).build();
+ default:
+ throw new IllegalStateException("Unexpected valueType: " + value.getClass());
+ }
+ }
+}
diff --git a/storage/connectors/pom.xml b/storage/connectors/pom.xml
new file mode 100644
index 0000000000..265aa63247
--- /dev/null
+++ b/storage/connectors/pom.xml
@@ -0,0 +1,30 @@
+
+ pom
+
+
+ dev.feast
+ feast-parent
+ ${revision}
+ ../..
+
+
+ 4.0.0
+ feast-storage-connectors
+
+ Feast Storage Connectors
+
+
+ redis
+
+
+
+
+ junit
+ junit
+ 4.12
+ test
+
+
+
+
diff --git a/storage/connectors/redis/pom.xml b/storage/connectors/redis/pom.xml
new file mode 100644
index 0000000000..892838efc9
--- /dev/null
+++ b/storage/connectors/redis/pom.xml
@@ -0,0 +1,24 @@
+
+
+
+ dev.feast
+ feast-storage-connectors
+ ${revision}
+
+
+ 4.0.0
+ feast-storage-connector-redis
+
+ Feast Storage Connector for Redis
+
+
+
+ junit
+ junit
+ 4.12
+ test
+
+
+
+