From 8ef3e2a15f8a10d9b6927ab49e0ccc06abc9190e Mon Sep 17 00:00:00 2001 From: zhilingc Date: Mon, 21 Oct 2019 16:55:53 +0800 Subject: [PATCH] Re-implement metrics for ingestion, remove more redundant code --- .../main/java/feast/ingestion/ImportJob.java | 11 + .../ingestion/transform/FilterFeatureRow.java | 52 -- .../ingestion/transform/ReadFeatureRow.java | 132 ---- .../transform/ToFeatureRowExtended.java | 48 -- .../transform/WriteFeaturesTransform.java | 85 --- .../metrics/WriteMetricsTransform.java | 88 ++- ...ricsDoFn.java => WriteRowMetricsDoFn.java} | 46 +- .../config/ImportJobSpecsSupplierTest.java | 122 ---- .../feast/ingestion/model/FeaturesTest.java | 34 -- .../java/feast/ingestion/model/SpecsTest.java | 94 --- .../feast/ingestion/model/ValuesTest.java | 323 ---------- .../transform/CoalesceFeatureRowsTest.java | 568 ------------------ .../transform/ErrorsStoreTransformTest.java | 159 ----- .../transform/fn/ConvertTypesDoFnTest.java | 95 --- .../fn/FilterFeatureRowDoFnTest.java | 76 --- .../redis/FeatureRowRedisIOWriteTest.java | 185 ------ 16 files changed, 99 insertions(+), 2019 deletions(-) delete mode 100644 ingestion/src/main/java/feast/ingestion/transform/FilterFeatureRow.java delete mode 100644 ingestion/src/main/java/feast/ingestion/transform/ReadFeatureRow.java delete mode 100644 ingestion/src/main/java/feast/ingestion/transform/ToFeatureRowExtended.java delete mode 100644 ingestion/src/main/java/feast/ingestion/transform/WriteFeaturesTransform.java rename ingestion/src/main/java/feast/ingestion/transform/metrics/{WriteMetricsDoFn.java => WriteRowMetricsDoFn.java} (68%) delete mode 100644 ingestion/src/test/java/feast/ingestion/config/ImportJobSpecsSupplierTest.java delete mode 100644 ingestion/src/test/java/feast/ingestion/model/FeaturesTest.java delete mode 100644 ingestion/src/test/java/feast/ingestion/model/SpecsTest.java delete mode 100644 ingestion/src/test/java/feast/ingestion/model/ValuesTest.java delete mode 100644 ingestion/src/test/java/feast/ingestion/transform/CoalesceFeatureRowsTest.java delete mode 100644 ingestion/src/test/java/feast/ingestion/transform/ErrorsStoreTransformTest.java delete mode 100644 ingestion/src/test/java/feast/ingestion/transform/fn/ConvertTypesDoFnTest.java delete mode 100644 ingestion/src/test/java/feast/ingestion/transform/fn/FilterFeatureRowDoFnTest.java delete mode 100644 ingestion/src/test/java/feast/store/serving/redis/FeatureRowRedisIOWriteTest.java diff --git a/ingestion/src/main/java/feast/ingestion/ImportJob.java b/ingestion/src/main/java/feast/ingestion/ImportJob.java index da5aed214b..52872ae909 100644 --- a/ingestion/src/main/java/feast/ingestion/ImportJob.java +++ b/ingestion/src/main/java/feast/ingestion/ImportJob.java @@ -7,6 +7,7 @@ import feast.ingestion.transform.ReadFromSource; import feast.ingestion.transform.WriteFailedElementToBigQuery; import feast.ingestion.transform.WriteToStore; +import feast.ingestion.transform.metrics.WriteMetricsTransform; import feast.ingestion.utils.ResourceUtil; import feast.ingestion.utils.SpecUtil; import feast.ingestion.utils.StoreUtil; @@ -47,6 +48,7 @@ public static PipelineResult runPipeline(ImportOptions options) * 1. Read messages from Feast Source as FeatureRow * 2. Write FeatureRow to the corresponding Store * 3. Write elements that failed to be processed to a dead letter queue. + * 4. Write metrics to a metrics sink */ PipelineOptionsValidator.validate(ImportOptions.class, options); @@ -95,6 +97,15 @@ public static PipelineResult runPipeline(ImportOptions options) .setTableSpec(options.getDeadLetterTableSpec()) .build()); } + + // Step 4. Write metrics to a metrics sink. + convertedFeatureRows + .apply("WriteMetrics", WriteMetricsTransform.newBuilder() + .setFeatureSetSpec(featureSet) + .setStoreName(store.getName()) + .setSuccessTag(FEATURE_ROW_OUT) + .setFailureTag(DEADLETTER_OUT) + .build()); } } diff --git a/ingestion/src/main/java/feast/ingestion/transform/FilterFeatureRow.java b/ingestion/src/main/java/feast/ingestion/transform/FilterFeatureRow.java deleted file mode 100644 index fd85de29b1..0000000000 --- a/ingestion/src/main/java/feast/ingestion/transform/FilterFeatureRow.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.ingestion.transform; - -import feast.core.FeatureSetProto.FeatureSetSpec; -import feast.types.FeatureRowProto.FeatureRow; -import lombok.extern.slf4j.Slf4j; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; - -@Slf4j -public class FilterFeatureRow extends PTransform, PCollection> { - - private String featureSetId; - - public FilterFeatureRow(FeatureSetSpec featureSetSpec) { - this.featureSetId = - String.format("%s:%s", featureSetSpec.getName(), featureSetSpec.getVersion()); - } - - @Override - public PCollection expand(PCollection input) { - return input.apply( - "Filter unrelated featureRows", - ParDo.of( - new DoFn() { - @ProcessElement - public void processElement(ProcessContext c, @Element FeatureRow element) { - if (element.getFeatureSet().equals(featureSetId)) { - c.output(element); - } - } - })); - } -} diff --git a/ingestion/src/main/java/feast/ingestion/transform/ReadFeatureRow.java b/ingestion/src/main/java/feast/ingestion/transform/ReadFeatureRow.java deleted file mode 100644 index 5d0ca608ec..0000000000 --- a/ingestion/src/main/java/feast/ingestion/transform/ReadFeatureRow.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.ingestion.transform; - -import feast.core.FeatureSetProto.FeatureSetSpec; -import feast.core.SourceProto.KafkaSourceConfig; -import feast.core.SourceProto.SourceType; -import feast.ingestion.options.ImportOptions; -import feast.types.FeatureRowProto.FeatureRow; -import org.apache.beam.sdk.io.kafka.KafkaIO; -import org.apache.beam.sdk.io.kafka.KafkaRecord; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.TupleTag; - -public class ReadFeatureRow extends PTransform { - private final FeatureSetSpec featureSetSpec; - private final ImportOptions options; - private final String consumerGroupId; - private final KafkaSourceConfig sourceConfig; - - public ReadFeatureRow(FeatureSetSpec featureSetSpec, ImportOptions options) { - this.featureSetSpec = featureSetSpec; - this.options = options; - this.consumerGroupId = String.format("feast-import-job-%s", options.getJobName()); - this.sourceConfig = featureSetSpec.getSource().getKafkaSourceConfig(); - } - - @Override - public PCollectionTuple expand(PInput input) { - if (!featureSetSpec.getSource().getType().equals(SourceType.KAFKA)) { - throw new IllegalArgumentException( - "Only SourceType.KAFKA is supported for Source in Feast import job."); - } - - PCollection> out = input.getPipeline().apply( - KafkaIO.readBytes().withBootstrapServers(sourceConfig.getBootstrapServers()) - .withTopic(sourceConfig.getTopic())); - - // input - // .getPipeline() - // .apply( - // "ReadFromKafka", - // KafkaIO.read() - // .withBootstrapServers(sourceConfig.getBootstrapServers()) - // .withTopic(sourceConfig.getTopic()) - // .withKeyDeserializer(ByteArrayDeserializer.class) - // .withValueDeserializer(ByteArrayDeserializer.class) - // //.withConsumerConfigUpdates(ImmutableMap.of("group.id", consumerGroupId)) - // //.withReadCommitted() - // //.commitOffsetsInFinalize() - // ) - // .apply(ParDo.of(new DoFn, KafkaRecord>() { - // @ProcessElement - // public void processElement(ProcessContext context) { - // KafkaRecord element = context.element(); - // byte[] value = element.getKV().getValue(); - // context.output(element); - // } - // })); - //.apply("KafkaMessageToFeatureRow", new KafkaMessageToFeatureRow(options)); - - return PCollectionTuple.of("TODO", out); - - // .apply( - // "Convert KafkaRecord into FeatureRow", - // ParDo.of( - // new DoFn, FeatureRow>() { - // @ProcessElement - // public void processElemennt(ProcessContext context) { - // try { - // FeatureRow featureRow = - // FeatureRow.parseFrom(context.element().getKV().getValue()); - // context.output(featureRow); - // } catch (InvalidProtocolBufferException e) { - // e.printStackTrace(); - // } - // } - // })); - } - - static class KafkaMessageToFeatureRow - extends PTransform>, PCollectionTuple> { - - private final ImportOptions options; - - KafkaMessageToFeatureRow(ImportOptions options) { - this.options = options; - } - - @Override - public PCollectionTuple expand(PCollection> input) { - TupleTag featureRowTupleTag = new TupleTag<>(); - - return input.apply( - "TODO", - ParDo.of( - new DoFn, FeatureRow>() { - @ProcessElement - public void processElement(ProcessContext context) { - // byte[] value = context.element().getKV().getValue(); - // try { - // FeatureRow featureRow = FeatureRow.parseFrom(value); - // context.output(featureRow); - // } catch (InvalidProtocolBufferException e) { - // e.printStackTrace(); - // } - } - }) - .withOutputTags(featureRowTupleTag, null)); - } - } -} diff --git a/ingestion/src/main/java/feast/ingestion/transform/ToFeatureRowExtended.java b/ingestion/src/main/java/feast/ingestion/transform/ToFeatureRowExtended.java deleted file mode 100644 index 8cff7c79c9..0000000000 --- a/ingestion/src/main/java/feast/ingestion/transform/ToFeatureRowExtended.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.ingestion.transform; - -import feast.ingestion.utils.DateUtil; -import feast.types.FeatureRowExtendedProto.Attempt; -import feast.types.FeatureRowExtendedProto.FeatureRowExtended; -import feast.types.FeatureRowProto.FeatureRow; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.DateTime; - -public class ToFeatureRowExtended extends - PTransform, PCollection> { - - @Override - public PCollection expand(PCollection input) { - return input.apply(ParDo.of(new DoFn() { - @ProcessElement - public void processElement(@Element FeatureRow row, OutputReceiver out) { - out.output(FeatureRowExtended.newBuilder() - .setRow(row) - .setFirstSeen(DateUtil.toTimestamp(DateTime.now())) - .setLastAttempt(Attempt.newBuilder() - .setAttempts(0) - .build()) - .build()); - } - })); - } -} diff --git a/ingestion/src/main/java/feast/ingestion/transform/WriteFeaturesTransform.java b/ingestion/src/main/java/feast/ingestion/transform/WriteFeaturesTransform.java deleted file mode 100644 index 7e7b73318a..0000000000 --- a/ingestion/src/main/java/feast/ingestion/transform/WriteFeaturesTransform.java +++ /dev/null @@ -1,85 +0,0 @@ -package feast.ingestion.transform; - -import com.google.api.services.bigquery.model.TimePartitioning; -import feast.core.FeatureSetProto.FeatureSetSpec; -import feast.core.StoreProto.Store; -import feast.core.StoreProto.Store.BigQueryConfig; -import feast.core.StoreProto.Store.RedisConfig; -import feast.ingestion.options.ImportOptions; -import feast.store.serving.bigquery.FeatureRowToTableRowDoFn; -import feast.store.serving.redis.FeatureRowToRedisMutationDoFn; -import feast.store.serving.redis.RedisCustomIO; -import feast.types.FeatureRowProto.FeatureRow; -import lombok.extern.slf4j.Slf4j; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; -import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; - -@Slf4j -public class WriteFeaturesTransform extends PTransform, PDone> { - - private final Store store; - private final FeatureSetSpec featureSetSpec; - - public WriteFeaturesTransform(Store store, FeatureSetSpec featureSetSpec) { - this.store = store; - this.featureSetSpec = featureSetSpec; - } - - @Override - public PDone expand(PCollection input) { - ImportOptions options = - input.getPipeline().getOptions().as(ImportOptions.class); - - switch (store.getType()) { - case REDIS: - RedisConfig redisConfig = store.getRedisConfig(); - String redisHost = redisConfig.getHost(); - int redisPort = redisConfig.getPort(); - input - .apply( - "Create RedisMutation from FeatureRow", - ParDo.of(new FeatureRowToRedisMutationDoFn(featureSetSpec))) - .apply(RedisCustomIO.write(redisHost, redisPort)); - break; - - case BIGQUERY: - BigQueryConfig bqConfig = store.getBigqueryConfig(); - String tableSpec = - String.format( - "%s:%s.%s_v%s", - bqConfig.getProjectId(), - bqConfig.getDatasetId(), - featureSetSpec.getName(), - featureSetSpec.getVersion()); - TimePartitioning timePartitioning = - new TimePartitioning() - .setType("DAY") - .setField(FeatureRowToTableRowDoFn.getEventTimestampColumn()); - input - .apply( - "Create BigQuery TableRow from FeatureRow", - ParDo.of(new FeatureRowToTableRowDoFn(options.getJobName()))) - .apply( - BigQueryIO.writeTableRows() - .to(tableSpec) - .withCreateDisposition(CreateDisposition.CREATE_NEVER) - .withWriteDisposition(WriteDisposition.WRITE_APPEND) - .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) - .withTimePartitioning(timePartitioning)); - break; - - default: - log.warn( - "Store of type '{}' is not supported, no FeatureRows will be written.", - store.getType()); - break; - } - return PDone.in(input.getPipeline()); - } -} diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java index e1ced3862f..a9ff112f05 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java @@ -1,59 +1,87 @@ package feast.ingestion.transform.metrics; +import com.google.auto.value.AutoValue; import feast.core.FeatureSetProto.FeatureSetSpec; import feast.ingestion.options.ImportOptions; -import feast.types.FeatureRowExtendedProto.FeatureRowExtended; +import feast.ingestion.values.FailedElement; import feast.types.FeatureRowProto.FeatureRow; +import lombok.extern.slf4j.Slf4j; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PDone; -import org.joda.time.Duration; +import org.apache.beam.sdk.values.TupleTag; -public class WriteMetricsTransform extends PTransform, PDone> { - private final String storeName; - private final FeatureSetSpec featureSetSpec; +@Slf4j +@AutoValue +public abstract class WriteMetricsTransform extends PTransform { private static final long WINDOW_SIZE_SECONDS = 15; - public WriteMetricsTransform(String storeName, FeatureSetSpec featureSetSpec) { + public abstract String getStoreName(); - this.storeName = storeName; - this.featureSetSpec = featureSetSpec; + public abstract FeatureSetSpec getFeatureSetSpec(); + + public abstract TupleTag getSuccessTag(); + + public abstract TupleTag getFailureTag(); + + public static Builder newBuilder() { + return new AutoValue_WriteMetricsTransform.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setStoreName(String storeName); + + public abstract Builder setFeatureSetSpec(FeatureSetSpec featureSetSpec); + + public abstract Builder setSuccessTag(TupleTag successTag); + + public abstract Builder setFailureTag(TupleTag failureTag); + + public abstract WriteMetricsTransform build(); } @Override - public PDone expand(PCollection input) { + public PDone expand(PCollectionTuple input) { ImportOptions options = input.getPipeline().getOptions() .as(ImportOptions.class); switch (options.getMetricsExporterType()) { case "prometheus": - input + + input.get(getFailureTag()) .apply("Window records", - Window.into(FixedWindows.of(Duration.standardSeconds(WINDOW_SIZE_SECONDS)))) - .apply("Add key", ParDo.of(new DoFn>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.of(1, c.element().getRow())); - } - })) - .apply("Collect", GroupByKey.create()) - .apply("Write metrics", ParDo - .of(new WriteMetricsDoFn(options.getJobName(), storeName, featureSetSpec, - options.getPrometheusExporterAddress()))); + new WindowRecords<>(WINDOW_SIZE_SECONDS)) + .apply("Write deadletter metrics", ParDo.of( + WriteDeadletterRowMetricsDoFn.newBuilder() + .setFeatureSetSpec(getFeatureSetSpec()) + .setPgAddress(options.getPrometheusExporterAddress()) + .setStoreName(getStoreName()) + .build())); + + input.get(getSuccessTag()) + .apply("Window records", + new WindowRecords<>(WINDOW_SIZE_SECONDS)) + .apply("Write row metrics", ParDo + .of(WriteRowMetricsDoFn.newBuilder() + .setFeatureSetSpec(getFeatureSetSpec()) + .setPgAddress(options.getPrometheusExporterAddress()) + .setStoreName(getStoreName()) + .build())); + return PDone.in(input.getPipeline()); case "none": default: - input.apply("Noop", ParDo.of(new DoFn() { - @ProcessElement - public void processElement(ProcessContext c) {} - })); + input.get(getSuccessTag()).apply("Noop", + ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + } + })); return PDone.in(input.getPipeline()); } } diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java similarity index 68% rename from ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsDoFn.java rename to ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java index 7994f77daf..4739ab10bf 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java @@ -1,5 +1,6 @@ package feast.ingestion.transform.metrics; +import com.google.auto.value.AutoValue; import feast.core.FeatureSetProto.FeatureSetSpec; import feast.types.FeatureRowProto.FeatureRow; import feast.types.FieldProto.Field; @@ -13,23 +14,34 @@ import org.apache.beam.sdk.values.KV; @Slf4j -public class WriteMetricsDoFn extends DoFn>, Void> { +@AutoValue +public abstract class WriteRowMetricsDoFn extends DoFn>, Void> { private final String STORE_TAG_KEY = "feast_store"; private final String FEATURE_SET_NAME_TAG_KEY = "feast_featureSet_name"; private final String FEATURE_SET_VERSION_TAG_KEY = "feast_featureSet_version"; private final String FEATURE_TAG_KEY = "feast_feature_name"; - private final String pipelineName; - private final String storeName; - private final FeatureSetSpec featureSetSpec; - private String pgAddress; - - public WriteMetricsDoFn(String pipelineName, String storeName, FeatureSetSpec featureSetSpec, - String pgAddress) { - this.pipelineName = pipelineName; - this.storeName = storeName; - this.featureSetSpec = featureSetSpec; - this.pgAddress = pgAddress; + + public abstract String getStoreName(); + + public abstract FeatureSetSpec getFeatureSetSpec(); + + public abstract String getPgAddress(); + + public static Builder newBuilder() { + return new AutoValue_WriteRowMetricsDoFn.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setStoreName(String storeName); + + public abstract Builder setFeatureSetSpec(FeatureSetSpec featureSetSpec); + + public abstract Builder setPgAddress(String pgAddress); + + public abstract WriteRowMetricsDoFn build(); } @@ -52,18 +64,20 @@ public void processElement(ProcessContext c) { .labelNames(STORE_TAG_KEY, FEATURE_SET_NAME_TAG_KEY, FEATURE_SET_VERSION_TAG_KEY, FEATURE_TAG_KEY) .register(registry); + Long currentTimestamp = System.currentTimeMillis(); + FeatureSetSpec featureSetSpec = getFeatureSetSpec(); for (FeatureRow row : c.element().getValue()) { long eventTimestamp = row.getEventTimestamp().getSeconds() * 1000; long lag = currentTimestamp - eventTimestamp; rowLag - .labels(storeName, featureSetSpec.getName(), String.valueOf(featureSetSpec.getVersion())) + .labels(getStoreName(), featureSetSpec.getName(), String.valueOf(featureSetSpec.getVersion())) .observe(lag); for (Field field : row.getFieldsList()) { if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) { featureLag - .labels(storeName, featureSetSpec.getName(), + .labels(getStoreName(), featureSetSpec.getName(), String.valueOf(featureSetSpec.getVersion()), field.getName()) .observe(lag); } @@ -71,8 +85,8 @@ public void processElement(ProcessContext c) { } try { - PushGateway pg = new PushGateway(pgAddress); - pg.pushAdd(registry, pipelineName); + PushGateway pg = new PushGateway(getPgAddress()); + pg.pushAdd(registry, c.getPipelineOptions().getJobName()); } catch (IOException e) { log.warn("Unable to push metrics to server", e); } diff --git a/ingestion/src/test/java/feast/ingestion/config/ImportJobSpecsSupplierTest.java b/ingestion/src/test/java/feast/ingestion/config/ImportJobSpecsSupplierTest.java deleted file mode 100644 index 32a136ff5f..0000000000 --- a/ingestion/src/test/java/feast/ingestion/config/ImportJobSpecsSupplierTest.java +++ /dev/null @@ -1,122 +0,0 @@ -// /* -// * Copyright 2018 The Feast Authors -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * https://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// * -// */ -// -// package feast.ingestion.config; -// -// import static org.junit.Assert.assertEquals; -// -// import com.google.protobuf.util.JsonFormat; -// import feast.ingestion.model.Specs; -// import feast.specs.EntitySpecProto.EntitySpec; -// import feast.specs.FeatureSpecProto.FeatureSpec; -// import feast.specs.ImportJobSpecsProto.ImportJobSpecs; -// import feast.specs.ImportJobSpecsProto.SourceSpec; -// import feast.specs.ImportJobSpecsProto.SourceSpec.SourceType; -// import feast.specs.StorageSpecProto.StorageSpec; -// import feast.types.ValueProto.ValueType.Enum; -// import java.io.File; -// import java.io.IOException; -// import java.io.PrintWriter; -// import java.nio.file.Files; -// import lombok.extern.slf4j.Slf4j; -// import org.junit.Rule; -// import org.junit.Test; -// import org.junit.rules.TemporaryFolder; -// -// @Slf4j -// public class ImportJobSpecsSupplierTest { -// @Rule -// public TemporaryFolder temporaryFolder = new TemporaryFolder(); -// -// public String importSpecYaml = -// "---\n" -// + "sinkStorageSpec:\n" -// + " id: TEST_SERVING\n" -// + " type: serving.mock\n" -// + " options: {}\n" -// + "errorsStorageSpec:\n" -// + " id: ERRORS\n" -// + " type: stdout\n" -// + " options: {}\n" -// + "entitySpec:\n" -// + " name: testEntity\n" -// + " description: This is a test entity\n" -// + " tags: []\n" -// + "featureSpecs:\n" -// + " - id: testEntity.testInt64\n" -// + " entity: testEntity\n" -// + " name: testInt64\n" -// + " owner: feast@example.com\n" -// + " description: This is test feature of type integer\n" -// + " uri: https://example.com/\n" -// + " valueType: INT64\n" -// + " tags: []\n" -// + " options: {}\n" -// + "sourceSpec:\n" -// + " type: KAFKA\n" -// + " options:\n" -// + " bootstrapServers: localhost:8281\n" -// + "\n"; -// -// @Test -// public void testSupplierImportSpecYamlFile() throws IOException { -// File yamlFile = temporaryFolder.newFile("importJobSpecs.yaml"); -// try (PrintWriter printWriter = new PrintWriter(Files.newOutputStream(yamlFile.toPath()))) { -// printWriter.print(importSpecYaml); -// } -// -// ImportJobSpecs importJobSpecs = new ImportJobSpecsSupplier(yamlFile.getParent()).get(); -// Specs specs = new Specs("", importJobSpecs); -// System.out.println( -// JsonFormat.printer().omittingInsignificantWhitespace().print(importJobSpecs)); -// assertEquals( -// SourceSpec.newBuilder() -// .setType(SourceType.KAFKA) -// .putOptions("bootstrapServers", "localhost:8281") -// .build(), -// importJobSpecs.getSourceSpec()); -// -// assertEquals(StorageSpec.newBuilder() -// .setId("TEST_SERVING") -// .setType("serving.mock") -// .build(), importJobSpecs.getSinkStorageSpec()); -// -// assertEquals(StorageSpec.newBuilder() -// .setId("ERRORS") -// .setType("stdout") -// .build(), importJobSpecs.getErrorsStorageSpec()); -// -// assertEquals( -// EntitySpec.newBuilder() -// .setName("testEntity") -// .setDescription("This is a test entity") -// .build(), -// specs.getEntitySpec("testEntity")); -// -// assertEquals( -// FeatureSpec.newBuilder() -// .setId("testEntity.testInt64") -// .setEntity("testEntity") -// .setName("testInt64") -// .setOwner("feast@example.com") -// .setUri("https://example.com/") -// .setValueType(Enum.INT64) -// .setDescription("This is test feature of type integer") -// .build(), -// specs.getFeatureSpec("testEntity.testInt64")); -// } -// } diff --git a/ingestion/src/test/java/feast/ingestion/model/FeaturesTest.java b/ingestion/src/test/java/feast/ingestion/model/FeaturesTest.java deleted file mode 100644 index 5c70ba2a42..0000000000 --- a/ingestion/src/test/java/feast/ingestion/model/FeaturesTest.java +++ /dev/null @@ -1,34 +0,0 @@ -// /* -// * Copyright 2018 The Feast Authors -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * https://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// * -// */ -// -// package feast.ingestion.model; -// -// import feast.types.FeatureProto.Feature; -// import feast.types.ValueProto.Value; -// import org.junit.Assert; -// import org.junit.Test; -// -// -// public class FeaturesTest { -// -// @Test -// public void testFeatureOf() { -// Value value = Value.newBuilder().setInt32Val(123).build(); -// Feature feature = Features.of("a.feature.id", value); -// Assert.assertEquals("a.feature.id", feature.getId()); -// } -// } \ No newline at end of file diff --git a/ingestion/src/test/java/feast/ingestion/model/SpecsTest.java b/ingestion/src/test/java/feast/ingestion/model/SpecsTest.java deleted file mode 100644 index 879998248c..0000000000 --- a/ingestion/src/test/java/feast/ingestion/model/SpecsTest.java +++ /dev/null @@ -1,94 +0,0 @@ -// /* -// * Copyright 2019 The Feast Authors -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// * -// */ -// -// package feast.ingestion.model; -// -// import static junit.framework.TestCase.assertTrue; -// import static org.junit.Assert.assertEquals; -// -// import com.google.common.io.Resources; -// import feast.ingestion.config.ImportJobSpecsSupplier; -// import feast.specs.EntitySpecProto.EntitySpec; -// import feast.specs.FeatureSpecProto.FeatureSpec; -// import feast.specs.ImportJobSpecsProto.ImportJobSpecs; -// import feast.specs.ImportJobSpecsProto.SourceSpec; -// import feast.specs.ImportJobSpecsProto.SourceSpec.SourceType; -// import feast.specs.ImportSpecProto.Field; -// import feast.specs.ImportSpecProto.ImportSpec; -// import feast.specs.ImportSpecProto.Schema; -// import java.nio.file.Path; -// import java.nio.file.Paths; -// import org.junit.Before; -// import org.junit.Test; -// -// public class SpecsTest { -// -// ImportJobSpecs importJobSpecs; -// -// private Field.Builder newField(String featureId) { -// return Field.newBuilder().setFeatureId(featureId); -// } -// -// @Before -// public void before() { -// Path path = Paths.get(Resources.getResource("specs/").getPath()); -// importJobSpecs = new ImportJobSpecsSupplier(path.toString()).get(); -// } -// -// @Test -// public void testSingleFeatureAndEntity() { -// ImportJobSpecs importJobSpecs = this.importJobSpecs.toBuilder() -// .setSourceSpec(SourceSpec.newBuilder().setType(SourceType.KAFKA) -// .putOptions("bootstrapServers", "localhost:8281")) -// .clearFeatureSpecs() -// .addFeatureSpecs(FeatureSpec.newBuilder().setId("testEntity.testInt32").build()) -// .build(); -// -// Specs specs = Specs.of("testjob", importJobSpecs); -// -// assertEquals("testjob", specs.getJobName()); -// assertEquals(importJobSpecs.getSourceSpec(), specs.getSourceSpec()); -// -// assertEquals(1, specs.getEntitySpecs().size()); -// assertTrue(specs.getEntitySpecs().containsKey("testEntity")); -// -// assertEquals(1, specs.getFeatureSpecs().size()); -// assertTrue(specs.getFeatureSpecs().containsKey("testEntity.testInt32")); -// -// assertTrue(specs.getSinkStorageSpec().getId().equals("TEST_SERVING")); -// } -// -// @Test -// public void testGetFeatureSpec() { -// Specs specs = Specs.of("testjob", importJobSpecs); -// assertEquals( -// "testEntity.testInt32", specs.getFeatureSpec("testEntity.testInt32").getId()); -// } -// -// @Test -// public void testGetEntitySpec() { -// Specs specs = Specs.of("testjob", importJobSpecs); -// assertEquals("testEntity", specs.getEntitySpec("testEntity").getName()); -// } -// -// @Test -// public void testGetStorageSpec() { -// Specs specs = Specs.of("testjob", importJobSpecs); -// assertEquals(specs.getSinkStorageSpec().getId(), "TEST_SERVING"); -// } -// -// } diff --git a/ingestion/src/test/java/feast/ingestion/model/ValuesTest.java b/ingestion/src/test/java/feast/ingestion/model/ValuesTest.java deleted file mode 100644 index 7ef3ce17c1..0000000000 --- a/ingestion/src/test/java/feast/ingestion/model/ValuesTest.java +++ /dev/null @@ -1,323 +0,0 @@ -// /* -// * Copyright 2018 The Feast Authors -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * https://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// * -// */ -// -// package feast.ingestion.model; -// -// import static org.junit.Assert.assertEquals; -// import static org.junit.Assert.assertFalse; -// import static org.junit.Assert.assertTrue; -// import static org.junit.Assert.fail; -// -// import com.google.common.collect.ImmutableMap; -// import feast.ingestion.util.DateUtil; -// import com.google.protobuf.ByteString; -// import com.google.protobuf.Timestamp; -// import feast.types.ValueProto.Value; -// import feast.types.ValueProto.ValueType.Enum; -// import java.util.Base64; -// import java.util.Map; -// import java.util.function.Function; -// import org.joda.time.DateTime; -// import org.joda.time.DateTimeZone; -// import org.junit.Assert; -// import org.junit.Test; -// -// public class ValuesTest { -// static class ValueTest { -// Value input; -// Value expectedValue; -// Class expectedThrowable; -// -// private ValueTest() {} -// -// public static ValueTest of(Value input, Value expected) { -// ValueTest test = new ValueTest(); -// test.input = input; -// test.expectedValue = expected; -// return test; -// } -// -// public static ValueTest of(Value input, Class expected) { -// ValueTest test = new ValueTest(); -// test.input = input; -// test.expectedThrowable = expected; -// return test; -// } -// -// void apply(Function func) { -// try { -// Value output = func.apply(input); -// if (expectedThrowable != null) { -// fail("expected error"); -// } -// assertEquals(expectedValue, output); -// } catch (Throwable e) { -// if (expectedThrowable == null || !expectedThrowable.isInstance(e)) { -// throw e; -// } -// } -// } -// } -// -// @Test -// public void testValuesOfInt32() { -// assertEquals(123, Values.ofInt32(123).getInt32Val()); -// } -// -// @Test -// public void testValuesOfInt64() { -// assertEquals(123, Values.ofInt64(123).getInt64Val()); -// } -// -// @Test -// public void testValuesOfString() { -// assertEquals("123", Values.ofString("123").getStringVal()); -// } -// -// @Test -// public void testValuesOfBytes() { -// ByteString bytes = ByteString.copyFromUtf8("123"); -// Assert.assertArrayEquals( -// bytes.toByteArray(), Values.ofBytes(bytes.toByteArray()).getBytesVal().toByteArray()); -// } -// -// @Test -// public void testValuesOfByteString() { -// ByteString bytes = ByteString.copyFromUtf8("123"); -// assertEquals(bytes, Values.ofBytes(bytes).getBytesVal()); -// } -// -// @Test -// public void testValuesOfTimestampDateTime() { -// DateTime dateTime = DateTime.now().withZone(DateTimeZone.UTC); -// Assert.assertEquals(dateTime, DateUtil.toDateTime(Values.ofTimestamp(dateTime).getTimestampVal())); -// } -// -// @Test -// public void testValuesOfTimestamp() { -// Timestamp timestamp = DateUtil.toTimestamp(DateTime.now()); -// assertEquals(timestamp, Values.ofTimestamp(timestamp).getTimestampVal()); -// } -// -// @Test -// public void testValuesOfBool() { -// assertTrue(Values.ofBool(true).getBoolVal()); -// assertFalse(Values.ofBool(false).getBoolVal()); -// } -// -// @Test -// public void testValuesOfDouble() { -// assertEquals(Math.PI, Values.ofDouble(Math.PI).getDoubleVal(), 0.0); -// } -// -// @Test -// public void testValuesOfFloat() { -// float val = Double.valueOf(Math.PI).floatValue(); -// assertEquals(val, Values.ofFloat(val).getFloatVal(), 0.0); -// } -// -// @Test -// public void testToValueTypeFromValue() { -// assertEquals(Enum.STRING, Values.toValueType(Values.ofString("asdf"))); -// assertEquals(Enum.INT64, Values.toValueType(Values.ofInt64(1234))); -// assertEquals(Enum.INT32, Values.toValueType(Values.ofInt32(1234))); -// assertEquals(Enum.TIMESTAMP, Values.toValueType(Values.ofTimestamp(DateTime.now()))); -// assertEquals(Enum.BOOL, Values.toValueType(Values.ofBool(false))); -// assertEquals(Enum.BYTES, Values.toValueType(Values.ofBytes(ByteString.copyFromUtf8("abcd")))); -// assertEquals(Enum.DOUBLE, Values.toValueType(Values.ofDouble(Math.PI))); -// assertEquals(Enum.FLOAT, Values.toValueType(Values.ofFloat(1.234F))); -// } -// -// @Test -// public void testAsType() {} -// -// @Test -// public void testAsString() { -// DateTime datetime = DateTime.now().withZone(DateTimeZone.UTC); -// ByteString bytes = ByteString.copyFromUtf8("asdfasdfasdf"); -// Map assertions = -// ImmutableMap.builder() -// .put(Values.ofString("asdf"), "asdf") -// .put(Values.ofBool(true), "true") -// .put(Values.ofFloat(1.234F), "1.234") -// .put(Values.ofDouble(Math.PI), String.valueOf(Math.PI)) -// .put(Values.ofInt32(Integer.MAX_VALUE), String.valueOf(Integer.MAX_VALUE)) -// .put(Values.ofInt64(Long.MAX_VALUE), String.valueOf(Long.MAX_VALUE)) -// .put(Values.ofTimestamp(datetime), String.valueOf(DateUtil.toString(datetime))) -// .put(Values.ofBytes(bytes), Base64.getEncoder().encodeToString(bytes.toByteArray())) -// .build(); -// for (Value value : assertions.keySet()) { -// assertEquals(Values.ofString(assertions.get(value)), Values.asString(value)); -// } -// } -// -// @Test -// public void testAsTimestamp() { -// DateTime datetime = DateTime.now().withZone(DateTimeZone.UTC); -// ByteString bytes = ByteString.copyFromUtf8("asdfasdfasdf"); -// Function castFunc = Values::asTimestamp; -// ValueTest.of(Values.ofString("asdclk"), IllegalArgumentException.class).apply(castFunc); -// ValueTest.of( -// Values.ofString("2019-01-01T12:01:02.123Z"), -// Values.ofTimestamp(DateUtil.toTimestamp("2019-01-01T12:01:02.123Z"))) -// .apply(castFunc); -// ValueTest.of(Values.ofInt32(Integer.MAX_VALUE), UnsupportedOperationException.class) -// .apply(castFunc); -// ValueTest.of(Values.ofInt64(Long.MAX_VALUE), UnsupportedOperationException.class) -// .apply(castFunc); -// ValueTest.of(Values.ofBool(true), UnsupportedOperationException.class).apply(castFunc); -// ValueTest.of(Values.ofFloat(1.234F), UnsupportedOperationException.class).apply(castFunc); -// ValueTest.of(Values.ofDouble(Math.PI), UnsupportedOperationException.class).apply(castFunc); -// ValueTest.of(Values.ofBytes(bytes), UnsupportedOperationException.class).apply(castFunc); -// ValueTest.of(Values.ofTimestamp(datetime), Values.ofTimestamp(datetime)).apply(castFunc); -// } -// -// @Test -// public void testAsInt64() { -// DateTime datetime = DateTime.now().withZone(DateTimeZone.UTC); -// ByteString bytes = ByteString.copyFromUtf8("asdfasdfasdf"); -// Function castFunc = Values::asInt64; -// ValueTest.of(Values.ofString("asdclk"), NumberFormatException.class).apply(castFunc); -// ValueTest.of( -// Values.ofString(String.valueOf(Integer.MAX_VALUE)), Values.ofInt64(Integer.MAX_VALUE)) -// .apply(castFunc); -// ValueTest.of(Values.ofInt32(Integer.MAX_VALUE), Values.ofInt64(Integer.MAX_VALUE)) -// .apply(castFunc); -// ValueTest.of(Values.ofInt64(Long.MAX_VALUE), Values.ofInt64(Long.MAX_VALUE)).apply(castFunc); -// ValueTest.of(Values.ofBool(true), UnsupportedOperationException.class).apply(castFunc); -// ValueTest.of(Values.ofFloat(1.234F), UnsupportedOperationException.class).apply(castFunc); -// ValueTest.of(Values.ofDouble(Math.PI), UnsupportedOperationException.class).apply(castFunc); -// ValueTest.of(Values.ofBytes(bytes), UnsupportedOperationException.class).apply(castFunc); -// ValueTest.of(Values.ofTimestamp(datetime), UnsupportedOperationException.class).apply(castFunc); -// } -// -// @Test -// public void testAsFloat() { -// DateTime datetime = DateTime.now().withZone(DateTimeZone.UTC); -// ByteString bytes = ByteString.copyFromUtf8("asdfasdfasdf"); -// Function castFunc = Values::asFloat; -// ValueTest.of(Values.ofString("asdclk"), NumberFormatException.class).apply(castFunc); -// ValueTest.of(Values.ofString(String.valueOf(Float.MAX_VALUE)), Values.ofFloat(Float.MAX_VALUE)) -// .apply(castFunc); -// ValueTest.of(Values.ofInt32(Integer.MAX_VALUE), Values.ofFloat(Integer.MAX_VALUE)) -// .apply(castFunc); -// ValueTest.of(Values.ofInt64(Long.MAX_VALUE), UnsupportedOperationException.class) -// .apply(castFunc); -// ValueTest.of(Values.ofBool(true), UnsupportedOperationException.class).apply(castFunc); -// ValueTest.of(Values.ofFloat(Float.MAX_VALUE), Values.ofFloat(Float.MAX_VALUE)).apply(castFunc); -// ValueTest.of(Values.ofDouble(Math.PI), Values.ofFloat((float)Math.PI)).apply(castFunc); -// ValueTest.of(Values.ofBytes(bytes), UnsupportedOperationException.class).apply(castFunc); -// ValueTest.of(Values.ofTimestamp(datetime), UnsupportedOperationException.class).apply(castFunc); -// } -// -// @Test -// public void testAsBool() { -// DateTime datetime = DateTime.now().withZone(DateTimeZone.UTC); -// ByteString bytes = ByteString.copyFromUtf8("asdfasdfasdf"); -// Function castFunc = Values::asBool; -// ValueTest.of(Values.ofString("asdclk"), Values.ofBool(false)).apply(castFunc); -// ValueTest.of(Values.ofString("True"), Values.ofBool(true)).apply(castFunc); -// ValueTest.of(Values.ofString("true"), Values.ofBool(true)).apply(castFunc); -// ValueTest.of(Values.ofInt32(Integer.MAX_VALUE), IllegalArgumentException.class) -// .apply(castFunc); -// ValueTest.of(Values.ofInt32(0), Values.ofBool(false)) -// .apply(castFunc); -// ValueTest.of(Values.ofInt32(1), Values.ofBool(true)) -// .apply(castFunc); -// ValueTest.of(Values.ofInt64(Long.MAX_VALUE), IllegalArgumentException.class) -// .apply(castFunc); -// -// ValueTest.of(Values.ofInt64(0), Values.ofBool(false)) -// .apply(castFunc); -// ValueTest.of(Values.ofInt64(1), Values.ofBool(true)) -// .apply(castFunc); -// ValueTest.of(Values.ofBool(true), Values.ofBool(true)).apply(castFunc); -// -// ValueTest.of(Values.ofFloat(1.234F), IllegalArgumentException.class).apply(castFunc); -// ValueTest.of(Values.ofFloat(0), Values.ofBool(false)).apply(castFunc); -// ValueTest.of(Values.ofFloat(1), Values.ofBool(true)).apply(castFunc); -// -// ValueTest.of(Values.ofDouble(Math.PI), IllegalArgumentException.class).apply(castFunc); -// ValueTest.of(Values.ofDouble(0), Values.ofBool(false)).apply(castFunc); -// ValueTest.of(Values.ofDouble(1), Values.ofBool(true)).apply(castFunc); -// -// ValueTest.of(Values.ofBytes(bytes), UnsupportedOperationException.class).apply(castFunc); -// ValueTest.of(Values.ofTimestamp(datetime), UnsupportedOperationException.class).apply(castFunc); -// } -// -// @Test -// public void testAsInt32() { -// DateTime datetime = DateTime.now().withZone(DateTimeZone.UTC); -// ByteString bytes = ByteString.copyFromUtf8("asdfasdfasdf"); -// Function castFunc = Values::asInt32; -// ValueTest.of(Values.ofString("asdclk"), NumberFormatException.class).apply(castFunc); -// ValueTest.of( -// Values.ofString(String.valueOf(Integer.MAX_VALUE)), Values.ofInt32(Integer.MAX_VALUE)) -// .apply(castFunc); -// ValueTest.of(Values.ofInt32(Integer.MAX_VALUE), Values.ofInt32(Integer.MAX_VALUE)) -// .apply(castFunc); -// ValueTest.of(Values.ofInt64(Integer.MAX_VALUE), Values.ofInt32(Integer.MAX_VALUE)).apply(castFunc); -// ValueTest.of(Values.ofInt64(Integer.MIN_VALUE), Values.ofInt32(Integer.MIN_VALUE)).apply(castFunc); -// ValueTest.of(Values.ofInt64(Long.MAX_VALUE), IllegalArgumentException.class); -// ValueTest.of(Values.ofInt64(Long.MIN_VALUE), IllegalArgumentException.class); -// ValueTest.of(Values.ofBool(true), UnsupportedOperationException.class).apply(castFunc); -// ValueTest.of(Values.ofFloat(1.234F), UnsupportedOperationException.class).apply(castFunc); -// ValueTest.of(Values.ofDouble(Math.PI), UnsupportedOperationException.class).apply(castFunc); -// ValueTest.of(Values.ofBytes(bytes), UnsupportedOperationException.class).apply(castFunc); -// ValueTest.of(Values.ofTimestamp(datetime), UnsupportedOperationException.class).apply(castFunc); -// } -// -// @Test -// public void testAsDouble() { -// DateTime datetime = DateTime.now().withZone(DateTimeZone.UTC); -// ByteString bytes = ByteString.copyFromUtf8("asdfasdfasdf"); -// Function castFunc = Values::asDouble; -// ValueTest.of( -// Values.ofString(String.valueOf(Double.MAX_VALUE)), Values.ofDouble(Double.MAX_VALUE)) -// .apply(castFunc); -// -// ValueTest.of(Values.ofString("asdclk"), NumberFormatException.class).apply(castFunc); -// ValueTest.of(Values.ofInt32(Integer.MAX_VALUE), Values.ofDouble(Integer.MAX_VALUE)) -// .apply(castFunc); -// ValueTest.of(Values.ofInt64(Long.MAX_VALUE), Values.ofDouble(Long.MAX_VALUE)).apply(castFunc); -// ValueTest.of(Values.ofBool(true), UnsupportedOperationException.class).apply(castFunc); -// ValueTest.of(Values.ofFloat(Float.MAX_VALUE), Values.ofDouble(Float.MAX_VALUE)).apply(castFunc); -// ValueTest.of(Values.ofDouble(Math.PI), Values.ofDouble(Math.PI)).apply(castFunc); -// ValueTest.of(Values.ofBytes(bytes), UnsupportedOperationException.class).apply(castFunc); -// ValueTest.of(Values.ofTimestamp(datetime), UnsupportedOperationException.class).apply(castFunc); -// } -// -// @Test -// public void testAsBytes() { -// DateTime datetime = DateTime.now().withZone(DateTimeZone.UTC); -// ByteString bytes = ByteString.copyFromUtf8("asdfasdfasdf"); -// Function castFunc = Values::asBytes; -// ValueTest.of( -// Values.ofString(String.valueOf(Double.MAX_VALUE)), UnsupportedOperationException.class) -// .apply(castFunc); -// ValueTest.of(Values.ofInt32(Integer.MAX_VALUE), UnsupportedOperationException.class) -// .apply(castFunc); -// ValueTest.of(Values.ofInt64(Long.MAX_VALUE), UnsupportedOperationException.class) -// .apply(castFunc); -// ValueTest.of(Values.ofBool(true), UnsupportedOperationException.class).apply(castFunc); -// ValueTest.of(Values.ofFloat(Float.MAX_VALUE), UnsupportedOperationException.class) -// .apply(castFunc); -// ValueTest.of(Values.ofDouble(Math.PI), UnsupportedOperationException.class).apply(castFunc); -// ValueTest.of(Values.ofBytes(bytes), Values.ofBytes(bytes)); -// ValueTest.of(Values.ofTimestamp(datetime), UnsupportedOperationException.class).apply(castFunc); -// } -// } diff --git a/ingestion/src/test/java/feast/ingestion/transform/CoalesceFeatureRowsTest.java b/ingestion/src/test/java/feast/ingestion/transform/CoalesceFeatureRowsTest.java deleted file mode 100644 index a9f1e3bd0d..0000000000 --- a/ingestion/src/test/java/feast/ingestion/transform/CoalesceFeatureRowsTest.java +++ /dev/null @@ -1,568 +0,0 @@ -// /* -// * Copyright 2018 The Feast Authors -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * https://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// * -// */ -// -// package feast.ingestion.transform; -// -// import static feast.NormalizeFeatureRows.normalize; -// import static feast.ingestion.transform.CoalesceFeatureRows.toFeatureRow; -// import static org.hamcrest.MatcherAssert.assertThat; -// import static org.hamcrest.Matchers.equalTo; -// import static org.junit.Assert.assertEquals; -// -// import com.google.common.collect.Lists; -// import com.google.protobuf.Timestamp; -// import feast.FeastMatchers; -// import feast.NormalizeFeatureRows; -// import feast.ingestion.model.Features; -// import feast.ingestion.model.Values; -// import feast.types.FeatureProto.Feature; -// import feast.types.FeatureRowProto.FeatureRow; -// import feast_ingestion.types.CoalesceAccumProto.CoalesceAccum; -// import java.util.List; -// import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; -// import org.apache.beam.sdk.testing.PAssert; -// import org.apache.beam.sdk.testing.TestPipeline; -// import org.apache.beam.sdk.testing.TestStream; -// import org.apache.beam.sdk.transforms.Count; -// import org.apache.beam.sdk.transforms.Create; -// import org.apache.beam.sdk.values.PCollection; -// import org.joda.time.Duration; -// import org.joda.time.Instant; -// import org.junit.Rule; -// import org.junit.Test; -// -// public class CoalesceFeatureRowsTest { -// -// private static final Timestamp DEFAULT_TIMESTAMP = Timestamp.getDefaultInstance(); -// private static final FeatureRow DEFAULT_FEATURE_ROW = FeatureRow.getDefaultInstance().toBuilder() -// .setEventTimestamp(DEFAULT_TIMESTAMP).build(); -// -// @Rule -// public TestPipeline pipeline = TestPipeline.create(); -// -// @Test -// public void testBatch_withDistictKeys_shouldPassThroughNonIntersectingKeys() { -// List rows = Lists.newArrayList( -// FeatureRow.newBuilder().setEventTimestamp(Timestamp.getDefaultInstance()).setEntityKey("1") -// .build(), -// FeatureRow.newBuilder().setEventTimestamp(Timestamp.getDefaultInstance()).setEntityKey("2") -// .build()); -// -// PCollection input = pipeline.apply(Create.of(rows)) -// .setCoder(ProtoCoder.of(FeatureRow.class)); -// -// PCollection output = input.apply(new CoalesceFeatureRows()); -// -// PAssert.that(output.apply(Count.globally())).containsInAnyOrder(2L); -// PAssert.that(output).containsInAnyOrder(rows); -// -// pipeline.run(); -// } -// -// @Test -// public void test_withNoFeaturesSameTimestamp_shouldReturn1() { -// List rows = Lists.newArrayList( -// FeatureRow.newBuilder().setEventTimestamp(Timestamp.getDefaultInstance()).setEntityKey("1") -// .build(), -// FeatureRow.newBuilder().setEventTimestamp(Timestamp.getDefaultInstance()).setEntityKey("1") -// .build()); -// PCollection input = pipeline.apply(Create.of(rows)) -// .setCoder(ProtoCoder.of(FeatureRow.class)); -// -// PCollection output = input.apply(new CoalesceFeatureRows()); -// -// assertThat(CoalesceFeatureRows.combineFeatureRows(rows), equalTo(rows.get(0))); -// PAssert.that(output.apply(Count.globally())).containsInAnyOrder(1L); -// PAssert.that(output).containsInAnyOrder(rows.get(0)); -// -// pipeline.run(); -// } -// -// @Test -// public void testBatch_withNoFeaturesDifferentSeconds_shouldReturnLatest() { -// List rows1 = Lists.newArrayList( -// FeatureRow.newBuilder().setEntityKey("1") -// .setEventTimestamp(Timestamp.newBuilder().setSeconds(1)).build(), -// FeatureRow.newBuilder().setEntityKey("1") -// .setEventTimestamp(Timestamp.newBuilder().setSeconds(2)).build()); -// -// assertThat(CoalesceFeatureRows.combineFeatureRows(rows1), equalTo(rows1.get(1))); -// assertThat(CoalesceFeatureRows.combineFeatureRows(Lists.reverse(rows1)), -// equalTo(rows1.get(1))); -// } -// -// -// @Test -// public void testBatch_withNoFeaturesDifferentNanos_shouldReturnLatest() { -// List rows1 = Lists.newArrayList( -// FeatureRow.newBuilder().setEntityKey("1") -// .setEventTimestamp(Timestamp.newBuilder().setNanos(1)).build(), -// FeatureRow.newBuilder().setEntityKey("1") -// .setEventTimestamp(Timestamp.newBuilder().setNanos(2)).build()); -// -// assertThat(CoalesceFeatureRows.combineFeatureRows(rows1), equalTo(rows1.get(1))); -// assertThat(CoalesceFeatureRows.combineFeatureRows(Lists.reverse(rows1)), -// equalTo(rows1.get(1))); -// } -// -// @Test -// public void testBatch_shouldMergeFeatures() { -// List rows = Lists.newArrayList( -// FeatureRow.newBuilder().setEntityKey("1") -// .addFeatures(Feature.newBuilder().setId("f1").setValue(Values.ofInt32(1))) -// .build(), -// FeatureRow.newBuilder().setEntityKey("1") -// .addFeatures(Feature.newBuilder().setId("f2").setValue(Values.ofInt32(2))) -// .build()); -// PCollection input = pipeline.apply(Create.of(rows)) -// .setCoder(ProtoCoder.of(FeatureRow.class)); -// -// PCollection output = input.apply(new CoalesceFeatureRows()); -// -// PAssert.that(output.apply(Count.globally())).containsInAnyOrder(1L); -// PAssert.that(output.apply(new NormalizeFeatureRows())).containsInAnyOrder( -// FeatureRow.newBuilder().setEntityKey("1") -// .setEventTimestamp(Timestamp.getDefaultInstance()) -// .addFeatures(Feature.newBuilder().setId("f1").setValue(Values.ofInt32(1))) -// .addFeatures(Feature.newBuilder().setId("f2").setValue(Values.ofInt32(2))) -// .build() -// ); -// -// pipeline.run(); -// } -// -// @Test -// public void testStream_shouldMergeFeatures() { -// List rows = Lists.newArrayList( -// FeatureRow.newBuilder().setEntityKey("1") -// .setEventTimestamp(DEFAULT_TIMESTAMP) -// .addFeatures(Feature.newBuilder().setId("f1").setValue(Values.ofInt32(1))) -// .build(), -// FeatureRow.newBuilder().setEntityKey("1") -// .setEventTimestamp(DEFAULT_TIMESTAMP) -// .addFeatures(Feature.newBuilder().setId("f2").setValue(Values.ofInt32(2))) -// .build()); -// -// Instant start = new Instant(); -// Duration delay = Duration.standardSeconds(10); -// TestStream testStream = TestStream.create(ProtoCoder.of(FeatureRow.class)) -// .advanceWatermarkTo(start) -// .addElements(rows.get(0)) -// .addElements(rows.get(1)) -// .advanceWatermarkToInfinity(); -// -// PCollection input = pipeline.apply(testStream); -// PCollection output = input.apply(new CoalesceFeatureRows(delay, Duration.ZERO)); -// -// PAssert.that(output.apply(Count.globally())).containsInAnyOrder(1L); -// PAssert.that(output).containsInAnyOrder( -// FeatureRow.newBuilder().setEntityKey("1") -// .setEventTimestamp(DEFAULT_TIMESTAMP) -// .addFeatures(Feature.newBuilder().setId("f1").setValue(Values.ofInt32(1))) -// .addFeatures(Feature.newBuilder().setId("f2").setValue(Values.ofInt32(2))) -// .build() -// ); -// pipeline.run(); -// } -// -// @Test -// public void testStream_shouldIncludeRowAddedOnTimerEdge() { -// List rows = Lists.newArrayList( -// FeatureRow.newBuilder().setEntityKey("1") -// .addFeatures(Feature.newBuilder().setId("f1").setValue(Values.ofInt32(1))) -// .build(), -// FeatureRow.newBuilder().setEntityKey("1") -// .addFeatures(Feature.newBuilder().setId("f2").setValue(Values.ofInt32(2))) -// .build(), -// FeatureRow.newBuilder().setEntityKey("1") -// .addFeatures(Feature.newBuilder().setId("f3").setValue(Values.ofInt32(3))) -// .build()); -// -// Instant start = new Instant(); -// Duration delay = Duration.standardSeconds(10); -// TestStream testStream = TestStream.create(ProtoCoder.of(FeatureRow.class)) -// .advanceWatermarkTo(start) -// .addElements(rows.get(0)) -// .advanceWatermarkTo(start.plus(delay)) -// // This row will be included in the same pane because it's exactly -// // on the same water mark as the onTimer event -// .addElements(rows.get(1)) -// .advanceWatermarkTo(start.plus(delay).plus(delay).plus(delay)) -// .addElements(rows.get(2)) -// .advanceWatermarkToInfinity(); -// -// PCollection input = pipeline.apply(testStream); -// PCollection output = input -// .apply(new CoalesceFeatureRows(delay, Duration.ZERO)); -// -// PAssert.that(output).satisfies(FeastMatchers.hasCount(2L)); -// PAssert.that(output.apply(new NormalizeFeatureRows())).containsInAnyOrder( -// normalize(FeatureRow.newBuilder().setEntityKey("1") -// .addFeatures(Feature.newBuilder().setId("f1").setValue(Values.ofInt32(1))) -// .addFeatures(Feature.newBuilder().setId("f2").setValue(Values.ofInt32(2))) -// .build()), -// normalize(FeatureRow.newBuilder().setEntityKey("1") -// .addFeatures(Feature.newBuilder().setId("f3").setValue(Values.ofInt32(3))) -// .build()) -// ); -// pipeline.run(); -// } -// -// @Test -// public void testStream_shouldMergeFeatures_emittingPanes_overlappingTimers() { -// List rows = Lists.newArrayList( -// FeatureRow.newBuilder().setEntityKey("1") -// .addFeatures(Feature.newBuilder().setId("f1").setValue(Values.ofInt32(1))) -// .build(), -// FeatureRow.newBuilder().setEntityKey("1") -// .addFeatures(Feature.newBuilder().setId("f2").setValue(Values.ofInt32(2))) -// .build() -// ); -// -// Instant start = new Instant(); -// Duration delay = Duration.standardSeconds(10); -// TestStream testStream = TestStream.create(ProtoCoder.of(FeatureRow.class)) -// .advanceWatermarkTo(start) -// .addElements(rows.get(0)) -// .advanceWatermarkTo(start.plus(delay.dividedBy(2))) -// // second event before time triggers -// .addElements(rows.get(1)) -// .advanceWatermarkTo(start.plus(delay)) -// .advanceWatermarkToInfinity(); -// -// PCollection input = pipeline.apply(testStream); -// PCollection output = input.apply(new CoalesceFeatureRows(delay, Duration.ZERO)); -// -// PAssert.that(output).satisfies(FeastMatchers.hasCount(1L)); -// PAssert.that(output.apply(new NormalizeFeatureRows())).containsInAnyOrder( -// normalize(FeatureRow.newBuilder().setEntityKey("1") -// .addFeatures(Feature.newBuilder().setId("f1").setValue(Values.ofInt32(1))) -// .addFeatures(Feature.newBuilder().setId("f2").setValue(Values.ofInt32(2))) -// ) -// ); -// pipeline.run(); -// } -// -// @Test -// public void testStream_shouldNotSetTimerWhilePending() { -// Instant start = new Instant(); -// Duration delay = Duration.standardSeconds(10); -// TestStream testStream = TestStream.create(ProtoCoder.of(FeatureRow.class)) -// .advanceWatermarkTo(start) -// .addElements(DEFAULT_FEATURE_ROW) -// // this should not reset the timer as the first is still pending. -// .advanceWatermarkTo(start.plus(delay.dividedBy(2))) -// .addElements(DEFAULT_FEATURE_ROW) -// // timer should trigger causing the first output row -// .advanceWatermarkTo(start.plus(delay).plus(delay.dividedBy(2))) -// .addElements(FeatureRow.getDefaultInstance()) // this should cause a second output row. -// .advanceWatermarkTo(start.plus(delay).plus(delay)) -// .advanceWatermarkToInfinity(); -// -// PCollection input = pipeline.apply(testStream); -// PCollection output = input.apply(new CoalesceFeatureRows(delay, Duration.ZERO)); -// -// PAssert.that(output).satisfies(FeastMatchers.hasCount(2L)); -// PAssert.that(output).containsInAnyOrder(DEFAULT_FEATURE_ROW, DEFAULT_FEATURE_ROW); -// pipeline.run(); -// } -// -// @Test -// public void testStream_shouldOnlyEmitNewFeaturesInSecondPane() { -// Instant start = new Instant(); -// Duration delay = Duration.standardSeconds(10); -// TestStream testStream = TestStream.create(ProtoCoder.of(FeatureRow.class)) -// .advanceWatermarkTo(start) -// .addElements( -// FeatureRow.newBuilder() -// .setEventTimestamp(DEFAULT_TIMESTAMP) -// .addFeatures(Features.of("f1", Values.ofString("a"))) -// .build()) -// // this should should emit a row -// .advanceWatermarkTo(start.plus(delay).plus(delay)) -// .addElements( -// FeatureRow.newBuilder() -// .setEventTimestamp(DEFAULT_TIMESTAMP) -// .addFeatures(Features.of("f2", Values.ofString("b"))) -// .build()) -// // this should emit a row with f2 but without f1 because it hasn't had an update -// .advanceWatermarkToInfinity(); -// -// PCollection input = pipeline.apply(testStream); -// PCollection output = input.apply(new CoalesceFeatureRows(delay, Duration.ZERO)); -// -// PAssert.that(output).satisfies(FeastMatchers.hasCount(2L)); -// PAssert.that(output).containsInAnyOrder( -// FeatureRow.newBuilder() -// .setEventTimestamp(DEFAULT_TIMESTAMP) -// .addFeatures(Features.of("f1", Values.ofString("a"))) -// .build(), -// FeatureRow.newBuilder() -// .setEventTimestamp(DEFAULT_TIMESTAMP) -// .addFeatures(Features.of("f2", Values.ofString("b"))) -// .build()); -// pipeline.run(); -// } -// -// @Test -// public void test_combineFeatureRows_shouldCountRows() { -// List rows = Lists.newArrayList( -// FeatureRow.getDefaultInstance(), -// FeatureRow.getDefaultInstance(), -// FeatureRow.getDefaultInstance()); -// CoalesceAccum accum = CoalesceFeatureRows -// .combineFeatureRowsWithSeed(CoalesceAccum.getDefaultInstance(), rows); -// assertEquals(3, accum.getCounter()); -// } -// -// @Test -// public void test_combineFeatureRows_shouldOverwriteWhenLaterEventTimestampProcessedSecond() { -// List rows = Lists.newArrayList( -// FeatureRow.newBuilder() -// .addFeatures(Features.of("f1", Values.ofString("a"))) -// .setEventTimestamp(Timestamp.newBuilder().setSeconds(1)) -// .build(), -// FeatureRow.newBuilder() -// .addFeatures(Features.of("f1", Values.ofString("b"))) -// .setEventTimestamp(Timestamp.newBuilder().setSeconds(2)) -// .build()); -// CoalesceAccum accum = CoalesceFeatureRows -// .combineFeatureRowsWithSeed(CoalesceAccum.getDefaultInstance(), rows); -// assertEquals(accum.getFeaturesMap().get("f1"), Features.of("f1", Values.ofString("b"))); -// } -// -// @Test -// public void test_combineFeatureRows_shouldNotOverwriteWhenEarlierEventTimestampProcessedSecond() { -// List rows = Lists.newArrayList( -// FeatureRow.newBuilder() -// .addFeatures(Features.of("f1", Values.ofString("b"))) -// .setEventTimestamp(Timestamp.newBuilder().setSeconds(2)) -// .build(), -// FeatureRow.newBuilder() -// .addFeatures(Features.of("f1", Values.ofString("a"))) -// .setEventTimestamp(Timestamp.newBuilder().setSeconds(1)) -// .build()); -// CoalesceAccum accum = CoalesceFeatureRows -// .combineFeatureRowsWithSeed(CoalesceAccum.getDefaultInstance(), rows); -// assertEquals(accum.getFeaturesMap().get("f1"), Features.of("f1", Values.ofString("b"))); -// } -// -// @Test -// public void test_combineFeatureRows_shouldOverwriteWhenSameEventTimestampProcessedSecond() { -// List rows = Lists.newArrayList( -// FeatureRow.newBuilder() -// .addFeatures(Features.of("f1", Values.ofString("a"))) -// .setEventTimestamp(Timestamp.newBuilder().setSeconds(2)) -// .build(), -// FeatureRow.newBuilder() -// .addFeatures(Features.of("f1", Values.ofString("b"))) -// .setEventTimestamp(Timestamp.newBuilder().setSeconds(2)) -// .build() -// ); -// CoalesceAccum accum = CoalesceFeatureRows -// .combineFeatureRowsWithSeed(CoalesceAccum.getDefaultInstance(), rows); -// assertEquals(accum.getFeaturesMap().get("f1"), Features.of("f1", Values.ofString("b"))); -// } -// -// @Test -// public void test_shouldPickLatestFeatures() { -// List rows = Lists.newArrayList( -// FeatureRow.newBuilder().setEntityKey("1") -// .setEventTimestamp( -// Timestamp.newBuilder().setSeconds(1)) // old row with non unique feature -// .addFeatures(Feature.newBuilder().setId("f1").setValue(Values.ofInt32(1))) -// .build(), -// FeatureRow.newBuilder().setEntityKey("1") -// .setEventTimestamp(Timestamp.newBuilder().setSeconds(2)) -// .addFeatures(Feature.newBuilder().setId("f1").setValue(Values.ofInt32(2))) -// .addFeatures(Feature.newBuilder().setId("f2").setValue(Values.ofInt32(2))) -// .build(), -// FeatureRow.newBuilder().setEntityKey("1") -// .setEventTimestamp(Timestamp.newBuilder().setSeconds(1)) // old row with unique feature -// .addFeatures(Feature.newBuilder().setId("f3").setValue(Values.ofInt32(3))) -// .build()); -// -// CoalesceAccum accum = CoalesceFeatureRows -// .combineFeatureRowsWithSeed(CoalesceAccum.getDefaultInstance(), rows); -// assertEquals(3, accum.getCounter()); -// assertThat(toFeatureRow(accum, 0), equalTo( -// FeatureRow.newBuilder().setEntityKey("1") -// .setEventTimestamp(Timestamp.newBuilder().setSeconds(2)) -// .addFeatures(Feature.newBuilder().setId("f1").setValue(Values.ofInt32(2))) -// .addFeatures(Feature.newBuilder().setId("f2").setValue(Values.ofInt32(2))) -// .addFeatures(Feature.newBuilder().setId("f3").setValue(Values.ofInt32(3))) -// .build() -// )); -// } -// -// @Test -// public void testStream_withNoInput() { -// TestStream testStream = TestStream.create(ProtoCoder.of(FeatureRow.class)) -// .advanceWatermarkToInfinity(); -// -// PCollection input = pipeline.apply(testStream); -// PCollection output = input.apply(new CoalesceFeatureRows()); -// -// PAssert.that(output).satisfies(FeastMatchers.hasCount(0L)); -// pipeline.run(); -// } -// -// @Test -// public void testBatch_withNoInput() { -// PCollection input = pipeline.apply(Create.empty(ProtoCoder.of(FeatureRow.class))); -// PCollection output = input.apply(new CoalesceFeatureRows()); -// -// PAssert.that(output).satisfies(FeastMatchers.hasCount(0L)); -// pipeline.run(); -// } -// -// @Test -// public void testStream_withTimeout_shouldRemoveState() { -// List rows = Lists.newArrayList( -// FeatureRow.newBuilder().setEntityKey("1") -// .addFeatures(Feature.newBuilder().setId("f1").setValue(Values.ofInt32(1))) -// .build(), -// FeatureRow.newBuilder().setEntityKey("1") -// .addFeatures(Feature.newBuilder().setId("f2").setValue(Values.ofInt32(2))) -// .build() -// ); -// -// Instant start = new Instant(); -// Duration delay = Duration.standardSeconds(10); -// Duration timeout = Duration.standardMinutes(30); -// TestStream testStream = TestStream.create(ProtoCoder.of(FeatureRow.class)) -// .addElements(rows.get(0)) -// .advanceWatermarkTo(start.plus(timeout)) -// // first element should get fired, as the delay water mark is reached before the timeout -// // watermark, then state should be cleared when it reaches the timeout watermark. -// .addElements(rows.get(1)) -// .advanceWatermarkToInfinity(); -// -// PCollection input = pipeline.apply(testStream); -// PCollection output = input.apply(new CoalesceFeatureRows(delay, timeout)); -// -// PAssert.that(output).satisfies(FeastMatchers.hasCount(2L)); -// PAssert.that(output.apply(new NormalizeFeatureRows())).containsInAnyOrder( -// FeatureRow.newBuilder().setEntityKey("1") -// .addFeatures(Feature.newBuilder().setId("f1").setValue(Values.ofInt32(1))) -// .setEventTimestamp(Timestamp.getDefaultInstance()) -// .build(), -// FeatureRow.newBuilder().setEntityKey("1") -// .addFeatures(Feature.newBuilder().setId("f2").setValue(Values.ofInt32(2))) -// .setEventTimestamp(Timestamp.getDefaultInstance()) -// .build() -// ); -// pipeline.run(); -// } -// -// @Test -// public void testStream_withDelayAfterTimeout_shouldProcessBagBeforeClear() { -// List rows = Lists.newArrayList( -// FeatureRow.newBuilder().setEntityKey("1") -// .addFeatures(Feature.newBuilder().setId("f1").setValue(Values.ofInt32(1))) -// .build(), -// FeatureRow.newBuilder().setEntityKey("1") -// .addFeatures(Feature.newBuilder().setId("f2").setValue(Values.ofInt32(2))) -// .build() -// ); -// -// Instant start = new Instant(); -// Duration delay = Duration.standardMinutes(40); -// Duration timeout = Duration.standardMinutes(30); -// TestStream testStream = TestStream.create(ProtoCoder.of(FeatureRow.class)) -// .addElements(rows.get(0)) -// .addElements(rows.get(1)) -// // first element should get fired, as the delay water mark is reached before the timeout -// // watermark, then state should be cleared when it reaches the timeout watermark. -// // If it didn't process the bag before clearing it, we'd get no output events at all. -// .advanceWatermarkToInfinity(); -// -// PCollection input = pipeline.apply(testStream); -// PCollection output = input.apply(new CoalesceFeatureRows(delay, timeout)); -// -// PAssert.that(output).satisfies(FeastMatchers.hasCount(1L)); -// PAssert.that(output.apply(new NormalizeFeatureRows())).containsInAnyOrder( -// FeatureRow.newBuilder().setEntityKey("1") -// .setEventTimestamp(Timestamp.getDefaultInstance()) -// .addFeatures(Feature.newBuilder().setId("f1").setValue(Values.ofInt32(1))) -// .addFeatures(Feature.newBuilder().setId("f2").setValue(Values.ofInt32(2))) -// .build() -// ); -// pipeline.run(); -// } -// -// @Test -// public void test_toFeatureRow_shouldBeNewMarkedFeaturesOnly() { -// CoalesceAccum accum = CoalesceAccum.newBuilder() -// .putFeatures("f1", Features.of("f1", Values.ofString("a"))) -// .putFeatures("f2", Features.of("f2", Values.ofString("b"))) -// .putFeatures("f3", Features.of("f3", Values.ofString("c"))) -// .putFeatures("f4", Features.of("f4", Values.ofString("d"))) -// .putFeatureMarks("f1", 1) -// .putFeatureMarks("f2", 1) -// .putFeatureMarks("f3", 2) -// .putFeatureMarks("f4", 3) -// .setCounter(3) -// .build(); -// -// FeatureRow output = normalize(toFeatureRow(accum, 0)); -// assertThat(output, -// equalTo(FeatureRow.newBuilder() -// .addFeatures(Features.of("f1", Values.ofString("a"))) -// .addFeatures(Features.of("f2", Values.ofString("b"))) -// .addFeatures(Features.of("f3", Values.ofString("c"))) -// .addFeatures(Features.of("f4", Values.ofString("d"))) -// .setEventTimestamp(DEFAULT_TIMESTAMP) -// .build())); -// output = normalize(toFeatureRow(accum, 1)); -// assertThat(output, -// equalTo(FeatureRow.newBuilder() -// .addFeatures(Features.of("f3", Values.ofString("c"))) -// .addFeatures(Features.of("f4", Values.ofString("d"))) -// .setEventTimestamp(DEFAULT_TIMESTAMP) -// .build())); -// output = normalize(toFeatureRow(accum, 2)); -// assertThat(output, -// equalTo(FeatureRow.newBuilder() -// .addFeatures(Features.of("f4", Values.ofString("d"))) -// .setEventTimestamp(DEFAULT_TIMESTAMP) -// .build())); -// output = normalize(toFeatureRow(accum, 3)); -// assertThat(output, -// equalTo(FeatureRow.newBuilder() -// .setEventTimestamp(DEFAULT_TIMESTAMP) -// .build())); -// } -// -// @Test(expected = IllegalArgumentException.class) -// public void test_toFeatureRow_markTooHigh_shouldThrow() { -// CoalesceAccum accum = CoalesceAccum.newBuilder() -// .putFeatures("f1", Features.of("f1", Values.ofString("a"))) -// .putFeatures("f2", Features.of("f2", Values.ofString("b"))) -// .putFeatures("f3", Features.of("f3", Values.ofString("c"))) -// .putFeatures("f4", Features.of("f4", Values.ofString("d"))) -// .putFeatureMarks("f1", 1) -// .putFeatureMarks("f2", 1) -// .putFeatureMarks("f3", 2) -// .putFeatureMarks("f4", 3) -// .setCounter(3) -// .build(); -// normalize(toFeatureRow(accum, 4)); -// // we throw an exception because use case should check that we have new features before trying -// // to emit them. -// } -// } \ No newline at end of file diff --git a/ingestion/src/test/java/feast/ingestion/transform/ErrorsStoreTransformTest.java b/ingestion/src/test/java/feast/ingestion/transform/ErrorsStoreTransformTest.java deleted file mode 100644 index 3e3daf8eec..0000000000 --- a/ingestion/src/test/java/feast/ingestion/transform/ErrorsStoreTransformTest.java +++ /dev/null @@ -1,159 +0,0 @@ -// /* -// * Copyright 2018 The Feast Authors -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * https://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// * -// */ -// -// package feast.ingestion.transform; -// -// import static feast.ingestion.model.Errors.toError; -// import static feast.store.MockFeatureErrorsFactory.MOCK_ERRORS_STORE_TYPE; -// import static feast.store.errors.logging.StderrFeatureErrorsFactory.TYPE_STDERR; -// import static feast.store.errors.logging.StdoutFeatureErrorsFactory.TYPE_STDOUT; -// import static org.junit.Assert.assertEquals; -// -// import feast.ingestion.model.Specs; -// import feast.ingestion.options.ImportJobPipelineOptions; -// import feast.specs.ImportJobSpecsProto.ImportJobSpecs; -// import feast.specs.StorageSpecProto.StorageSpec; -// import feast.store.MockFeatureErrorsFactory; -// import feast.store.errors.FeatureErrorsFactoryService; -// import feast.types.FeatureRowExtendedProto.Attempt; -// import feast.types.FeatureRowExtendedProto.Error; -// import feast.types.FeatureRowExtendedProto.FeatureRowExtended; -// import java.io.File; -// import java.io.IOException; -// import java.nio.file.Files; -// import java.nio.file.Path; -// import java.nio.file.Paths; -// import java.util.Arrays; -// import java.util.List; -// import java.util.stream.Collectors; -// import java.util.stream.Stream; -// import lombok.extern.slf4j.Slf4j; -// import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; -// import org.apache.beam.sdk.options.PipelineOptionsFactory; -// import org.apache.beam.sdk.testing.PAssert; -// import org.apache.beam.sdk.testing.TestPipeline; -// import org.apache.beam.sdk.transforms.Create; -// import org.apache.beam.sdk.transforms.Flatten; -// import org.apache.beam.sdk.values.PCollection; -// import org.apache.beam.sdk.values.PCollectionList; -// import org.junit.Before; -// import org.junit.Rule; -// import org.junit.Test; -// import org.junit.rules.TemporaryFolder; -// -// @Slf4j -// public class ErrorsStoreTransformTest { -// -// @Rule -// public TemporaryFolder tempFolder = new TemporaryFolder(); -// -// @Rule -// public TestPipeline pipeline = TestPipeline.create(); -// -// private ImportJobPipelineOptions options; -// private PCollection inputs; -// private List errors; -// -// public Specs getSpecs(String errorsStorageType) { -// return new Specs("test", ImportJobSpecs.newBuilder() -// .setErrorsStorageSpec(StorageSpec.newBuilder() -// .setId("ERRORS") -// .setType(errorsStorageType)).build()); -// } -// -// @Before -// public void setUp() { -// options = PipelineOptionsFactory.create().as(ImportJobPipelineOptions.class); -// options.setJobName("test"); -// -// errors = -// Arrays.asList( -// errorOf("test", new Exception("err")), errorOf("test", new Exception("err2"))); -// inputs = pipeline.apply(Create.of(errors)).setCoder(ProtoCoder.of(FeatureRowExtended.class)); -// } -// -// private FeatureRowExtended errorOf(String transform, Throwable cause) { -// Error error = toError(transform, cause); -// return FeatureRowExtended.newBuilder() -// .setLastAttempt(Attempt.newBuilder().setError(error).build()) -// .build(); -// } -// -// @Test -// public void shouldWriteToGivenErrorsStore() { -// ErrorsStoreTransform transform = new ErrorsStoreTransform( -// FeatureErrorsFactoryService.getAll(), -// getSpecs(MOCK_ERRORS_STORE_TYPE), options); -// transform.expand(inputs); -// -// MockFeatureErrorsFactory factory = FeatureErrorsFactoryService -// .get(MockFeatureErrorsFactory.class); -// -// PCollection writtenToErrors = -// PCollectionList.of(factory.getWrite().getInputs()) -// .apply("flatten errors input", Flatten.pCollections()); -// -// PAssert.that(writtenToErrors).containsInAnyOrder(errors); -// pipeline.run(); -// } -// -// @Test -// public void logErrorsToStdErr() { -// ErrorsStoreTransform transform = new ErrorsStoreTransform( -// FeatureErrorsFactoryService.getAll(), -// getSpecs(TYPE_STDERR), options); -// inputs.apply(transform); -// pipeline.run(); -// } -// -// -// @Test -// public void logErrorsToStdOut() { -// ErrorsStoreTransform transform = new ErrorsStoreTransform( -// FeatureErrorsFactoryService.getAll(), -// getSpecs(TYPE_STDOUT), options); -// inputs.apply(transform); -// pipeline.run(); -// } -// -// @Test -// public void logErrorsToWorkspace() throws IOException, InterruptedException { -// String tempWorkspace = tempFolder.newFolder().toString(); -// options.setWorkspace(tempWorkspace); -// ErrorsStoreTransform transform = new ErrorsStoreTransform( -// FeatureErrorsFactoryService.getAll(), -// getSpecs(""), options); -// inputs.apply(transform); -// pipeline.run().waitUntilFinish(); -// -// File dir = new File(tempWorkspace); -// -// File[] errorsDirs = dir.listFiles((d, name) -> name.startsWith("errors-")); -// int lineCount = Files.list(Paths.get(tempWorkspace) -// .resolve(errorsDirs[0].getName()) // errors workspace dir -// .resolve("test") // test entity dir -// ).flatMap(path -> { -// try { -// return Files.readAllLines(path).stream(); -// } catch (IOException e) { -// throw new RuntimeException(); -// } -// }).collect(Collectors.toList()).size(); -// assertEquals(2, lineCount); -// } -// } -// diff --git a/ingestion/src/test/java/feast/ingestion/transform/fn/ConvertTypesDoFnTest.java b/ingestion/src/test/java/feast/ingestion/transform/fn/ConvertTypesDoFnTest.java deleted file mode 100644 index acef92f47f..0000000000 --- a/ingestion/src/test/java/feast/ingestion/transform/fn/ConvertTypesDoFnTest.java +++ /dev/null @@ -1,95 +0,0 @@ -// /* -// * Copyright 2019 The Feast Authors -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * https://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// * -// */ -// -// package feast.ingestion.transform.fn; -// -// -// import com.google.common.collect.Lists; -// import feast.ingestion.model.Features; -// import feast.ingestion.model.Specs; -// import feast.ingestion.model.Values; -// import feast.ingestion.util.DateUtil; -// import feast.ingestion.values.PFeatureRows; -// import feast.specs.FeatureSpecProto.FeatureSpec; -// import feast.specs.ImportJobSpecsProto; -// import feast.types.FeatureRowExtendedProto.Attempt; -// import feast.types.FeatureRowExtendedProto.FeatureRowExtended; -// import feast.types.FeatureRowProto.FeatureRow; -// import feast.types.ValueProto.ValueType.Enum; -// import java.util.List; -// import lombok.extern.slf4j.Slf4j; -// import org.apache.beam.sdk.testing.PAssert; -// import org.apache.beam.sdk.testing.TestPipeline; -// import org.apache.beam.sdk.transforms.Create; -// import org.junit.Rule; -// import org.junit.Test; -// -// @Slf4j -// public class ConvertTypesDoFnTest { -// -// @Rule -// public TestPipeline pipeline = TestPipeline.create(); -// -// @Test -// public void testStringTo() { -// FeatureRowExtended row = FeatureRowExtended.newBuilder().setRow( -// FeatureRow.newBuilder().addAllFeatures(Lists.newArrayList( -// Features.of("STRING_TO_INT32", Values.ofString("123")), -// Features.of("STRING_TO_INT64", Values.ofString("123")), -// Features.of("STRING_TO_FLOAT", Values.ofString("123")), -// Features.of("STRING_TO_DOUBLE", Values.ofString("123")), -// Features.of("STRING_TO_STRING", Values.ofString("123")), -// Features.of("STRING_TO_BOOL", Values.ofString("true")), -// Features.of("STRING_TO_TIMESTAMP", Values.ofString("2019-01-31T19:19:19.123Z")) -// ))).build(); -// -// List featureSpecs = Lists.newArrayList( -// FeatureSpec.newBuilder().setId("STRING_TO_INT32").setValueType(Enum.INT32).build(), -// FeatureSpec.newBuilder().setId("STRING_TO_INT64").setValueType(Enum.INT64).build(), -// FeatureSpec.newBuilder().setId("STRING_TO_FLOAT").setValueType(Enum.FLOAT).build(), -// FeatureSpec.newBuilder().setId("STRING_TO_DOUBLE").setValueType(Enum.DOUBLE).build(), -// FeatureSpec.newBuilder().setId("STRING_TO_BOOL").setValueType(Enum.BOOL).build(), -// FeatureSpec.newBuilder().setId("STRING_TO_STRING").setValueType(Enum.STRING).build(), -// FeatureSpec.newBuilder().setId("STRING_TO_TIMESTAMP").setValueType(Enum.TIMESTAMP).build() -// ); -// PFeatureRows output = PFeatureRows.of(pipeline.apply(Create.of(row))) -// .applyDoFn("name", -// new ConvertTypesDoFn( -// new Specs("", ImportJobSpecsProto.ImportJobSpecs.newBuilder() -// .addAllFeatureSpecs(featureSpecs).build()))); -// -// PAssert.that(output.getErrors()).satisfies(rows -> { -// if (rows.iterator().hasNext()) { -// log.error(rows.iterator().next().getLastAttempt().getError().toString()); -// } -// return null; -// }); -// -// PAssert.that(output.getMain()).containsInAnyOrder(FeatureRowExtended.newBuilder().setRow( -// FeatureRow.newBuilder().addAllFeatures(Lists.newArrayList( -// Features.of("STRING_TO_INT32", Values.ofInt32(123)), -// Features.of("STRING_TO_INT64", Values.ofInt64(123L)), -// Features.of("STRING_TO_FLOAT", Values.ofFloat(123F)), -// Features.of("STRING_TO_DOUBLE", Values.ofDouble(123.0)), -// Features.of("STRING_TO_STRING", Values.ofString("123")), -// Features.of("STRING_TO_BOOL", Values.ofBool(true)), -// Features.of("STRING_TO_TIMESTAMP", -// Values.ofTimestamp(DateUtil.toTimestamp("2019-01-31T19:19:19.123Z"))) -// ))).setLastAttempt(Attempt.getDefaultInstance()).build()); -// pipeline.run(); -// } -// } \ No newline at end of file diff --git a/ingestion/src/test/java/feast/ingestion/transform/fn/FilterFeatureRowDoFnTest.java b/ingestion/src/test/java/feast/ingestion/transform/fn/FilterFeatureRowDoFnTest.java deleted file mode 100644 index 55a6f9b302..0000000000 --- a/ingestion/src/test/java/feast/ingestion/transform/fn/FilterFeatureRowDoFnTest.java +++ /dev/null @@ -1,76 +0,0 @@ -// /* -// * Copyright 2018 The Feast Authors -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * https://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// * -// */ -// package feast.ingestion.transform.fn; -// -// import feast.types.FeatureProto.Feature; -// import feast.types.FeatureRowProto.FeatureRow; -// import feast.types.ValueProto.Value; -// import java.util.Arrays; -// import java.util.List; -// import org.apache.beam.sdk.testing.PAssert; -// import org.apache.beam.sdk.testing.TestPipeline; -// import org.apache.beam.sdk.transforms.Create; -// import org.apache.beam.sdk.transforms.ParDo; -// import org.apache.beam.sdk.values.PCollection; -// import org.junit.Rule; -// import org.junit.Test; -// -// public class FilterFeatureRowDoFnTest { -// @Rule public TestPipeline testPipeline = TestPipeline.create(); -// -// @Test -// public void shouldIgnoreUnspecifiedFeatureID() { -// String featureId1 = "testentity.feature1"; -// String featureId2 = "testentity.feature2"; -// String featureId3 = "testentity.feature3"; -// -// List specifiedFeatureIds = Arrays.asList(featureId1, featureId2, featureId3); -// FilterFeatureRowDoFn doFn = new FilterFeatureRowDoFn(specifiedFeatureIds); -// -// FeatureRow row = -// FeatureRow.newBuilder() -// .setEntityKey("1234") -// .setEntityName("testentity") -// .addFeatures( -// Feature.newBuilder().setId(featureId1).setValue(Value.newBuilder().setInt64Val(10))) -// .addFeatures( -// Feature.newBuilder().setId(featureId2).setValue(Value.newBuilder().setInt64Val(11))) -// .addFeatures( -// Feature.newBuilder().setId(featureId3).setValue(Value.newBuilder().setInt64Val(12))) -// // this feature should be ignored -// .addFeatures(Feature.newBuilder().setId("testEntity.unknown_feature")) -// .build(); -// -// PCollection output = testPipeline.apply(Create.of(row)) -// .apply(ParDo.of(doFn)); -// -// FeatureRow expRow = -// FeatureRow.newBuilder() -// .setEntityKey("1234") -// .setEntityName("testentity") -// .addFeatures( -// Feature.newBuilder().setId(featureId1).setValue(Value.newBuilder().setInt64Val(10))) -// .addFeatures( -// Feature.newBuilder().setId(featureId2).setValue(Value.newBuilder().setInt64Val(11))) -// .addFeatures( -// Feature.newBuilder().setId(featureId3).setValue(Value.newBuilder().setInt64Val(12))) -// .build(); -// PAssert.that(output).containsInAnyOrder(expRow); -// -// testPipeline.run(); -// } -// } diff --git a/ingestion/src/test/java/feast/store/serving/redis/FeatureRowRedisIOWriteTest.java b/ingestion/src/test/java/feast/store/serving/redis/FeatureRowRedisIOWriteTest.java deleted file mode 100644 index cc3044bef6..0000000000 --- a/ingestion/src/test/java/feast/store/serving/redis/FeatureRowRedisIOWriteTest.java +++ /dev/null @@ -1,185 +0,0 @@ -// /* -// * Copyright 2019 The Feast Authors -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// * -// */ -// -// package feast.store.serving.redis; -// -// import static feast.store.serving.redis.FeatureRowToRedisMutationDoFn.getFeatureIdSha1Prefix; -// import static feast.store.serving.redis.FeatureRowToRedisMutationDoFn.getRedisBucketKey; -// import static org.junit.Assert.assertEquals; -// -// import com.google.common.io.Resources; -// import com.google.protobuf.Timestamp; -// import feast.ingestion.config.ImportJobSpecsSupplier; -// import feast.ingestion.model.Features; -// import feast.ingestion.model.Specs; -// import feast.ingestion.model.Values; -// import feast.ingestion.util.DateUtil; -// import feast.specs.EntitySpecProto.EntitySpec; -// import feast.specs.FeatureSpecProto.FeatureSpec; -// import feast.specs.ImportJobSpecsProto.ImportJobSpecs; -// import feast.specs.ImportJobSpecsProto.SourceSpec; -// import feast.specs.ImportSpecProto.Field; -// import feast.specs.ImportSpecProto.ImportSpec; -// import feast.specs.ImportSpecProto.Schema; -// import feast.specs.StorageSpecProto.StorageSpec; -// import feast.storage.RedisProto.RedisBucketKey; -// import feast.storage.RedisProto.RedisBucketValue; -// import feast.store.FeatureStoreWrite; -// import feast.types.FeatureRowExtendedProto.FeatureRowExtended; -// import feast.types.FeatureRowProto.FeatureRow; -// import java.io.IOException; -// import java.nio.file.Path; -// import java.nio.file.Paths; -// import org.apache.beam.sdk.testing.TestPipeline; -// import org.apache.beam.sdk.transforms.Create; -// import org.apache.beam.sdk.values.PCollection; -// import org.joda.time.DateTime; -// import org.junit.AfterClass; -// import org.junit.BeforeClass; -// import org.junit.Rule; -// import org.junit.Test; -// import redis.clients.jedis.Jedis; -// import redis.embedded.Redis; -// import redis.embedded.RedisServer; -// -// public class FeatureRowRedisIOWriteTest { -// -// // private static final String featureInt32 = "testEntity.testInt32"; -// // private static final String featureString = "testEntity.testString"; -// // -// // private static int REDIS_PORT = 51234; -// // private static Redis redis; -// // private static Jedis jedis; -// // private static ImportJobSpecs importJobSpecs; -// // -// // @Rule -// // public TestPipeline testPipeline = TestPipeline.create(); -// // -// // @BeforeClass -// // public static void classSetup() throws IOException { -// // redis = new RedisServer(REDIS_PORT); -// // redis.start(); -// // Path path = Paths.get(Resources.getResource("specs/").getPath()); -// // importJobSpecs = new ImportJobSpecsSupplier(path.toString()).get(); -// // jedis = new Jedis("localhost", REDIS_PORT); -// // } -// // -// // @AfterClass -// // public static void teardown() { -// // redis.stop(); -// // } -// // -// // Specs getSpecs() { -// // Specs specs = Specs.of( -// // "test job", -// // importJobSpecs.toBuilder() -// // .setSourceSpec(SourceSpec.newBuilder()) -// // .setSinkStorageSpec(StorageSpec.newBuilder() -// // .setId("REDIS1").setType("REDIS") -// // .putOptions("port", String.valueOf(REDIS_PORT)) -// // .putOptions("host", "localhost") -// // .putOptions("batchSize", "1") -// // .putOptions("timeout", "2000") -// // .build()) -// // .build()); -// // return specs; -// // } -// // -// // @Test -// // public void testWrite() throws IOException { -// // -// // Specs specs = getSpecs(); -// // new RedisServingFactory().create(specs.getSinkStorageSpec(), specs); -// // FeatureRowRedisIO.Write write = -// // new FeatureRowRedisIO.Write( -// // RedisStoreOptions.builder().host("localhost").port(REDIS_PORT).build(), specs); -// // -// // Timestamp now = DateUtil.toTimestamp(DateTime.now()); -// // -// // FeatureRowExtended rowExtended = -// // FeatureRowExtended.newBuilder() -// // .setRow( -// // FeatureRow.newBuilder() -// // .setEntityName("testEntity") -// // .setEntityKey("1") -// // .setEventTimestamp(now) -// // .addFeatures(Features.of(featureInt32, Values.ofInt32(1))) -// // .addFeatures(Features.of(featureString, Values.ofString("a")))) -// // .build(); -// // -// // PCollection input = testPipeline.apply(Create.of(rowExtended)); -// // -// // input.apply("write to embedded redis", write); -// // -// // testPipeline.run(); -// // -// // RedisBucketKey featureInt32Key = -// // getRedisBucketKey("1", getFeatureIdSha1Prefix(featureInt32), 0L); -// // RedisBucketKey featureStringKey = -// // getRedisBucketKey("1", getFeatureIdSha1Prefix(featureString), 0L); -// // -// // RedisBucketValue featureInt32Value = -// // RedisBucketValue.parseFrom(jedis.get(featureInt32Key.toByteArray())); -// // RedisBucketValue featureStringValue = -// // RedisBucketValue.parseFrom(jedis.get(featureStringKey.toByteArray())); -// // -// // assertEquals(Values.ofInt32(1), featureInt32Value.getValue()); -// // assertEquals(now, featureInt32Value.getEventTimestamp()); -// // assertEquals(Values.ofString("a"), featureStringValue.getValue()); -// // assertEquals(now, featureStringValue.getEventTimestamp()); -// // } -// // -// // @Test -// // public void testWriteFromOptions() throws IOException { -// // Specs specs = getSpecs(); -// // FeatureStoreWrite write = new RedisServingFactory() -// // .create(specs.getSinkStorageSpec(), specs); -// // -// // Timestamp now = DateUtil.toTimestamp(DateTime.now()); -// // FeatureRowExtended rowExtended = -// // FeatureRowExtended.newBuilder() -// // .setRow( -// // FeatureRow.newBuilder() -// // .setEntityName("testEntity") -// // .setEntityKey("1") -// // .setEventTimestamp(now) -// // .addFeatures(Features.of(featureInt32, Values.ofInt32(1))) -// // .addFeatures(Features.of(featureString, Values.ofString("a")))) -// // .build(); -// // -// // PCollection input = testPipeline.apply(Create.of(rowExtended)); -// // -// // input.apply("write to embedded redis", write); -// // -// // testPipeline.run(); -// // -// // RedisBucketKey featureInt32Key = -// // getRedisBucketKey("1", getFeatureIdSha1Prefix(featureInt32), 0L); -// // RedisBucketKey featureStringKey = -// // getRedisBucketKey("1", getFeatureIdSha1Prefix(featureString), 0L); -// // -// // RedisBucketValue featureInt32Value = -// // RedisBucketValue.parseFrom(jedis.get(featureInt32Key.toByteArray())); -// // RedisBucketValue featureStringValue = -// // RedisBucketValue.parseFrom(jedis.get(featureStringKey.toByteArray())); -// // -// // assertEquals(Values.ofInt32(1), featureInt32Value.getValue()); -// // assertEquals(now, featureInt32Value.getEventTimestamp()); -// // assertEquals(Values.ofString("a"), featureStringValue.getValue()); -// // assertEquals(now, featureStringValue.getEventTimestamp()); -// // } -// }